use reqwest::Client; use tokio::stream::StreamExt; # 堆代码 duidaima.com #[tokio::main] async fn main() { let client = Client::new(); let urls = vec!["http://example.com/data1", "http://example.com/data2"]; let fetches = futures::stream::iter(urls) .map(|url| { let client = client.clone(); async move { let resp = client.get(url).send().await?; let body = resp.text().await?; Ok(body) } }) .buffer_unordered(5); fetches .for_each(|res| async { match res { Ok(data) => println!("Fetched data: {}", data), Err(e) => eprintln!("Error: {}", e), } }) .await; }2,数据转换
let data = vec![1, 2, 3, 4, 5]; let processed_data: Vec<_> = data .into_iter() .filter(|&x| x % 2 == 0) .map(|x| x * 2) .collect(); println!("Processed data: {:?}", processed_data);3,数据存储
use diesel::prelude::*; use diesel::sqlite::SqliteConnection; fn establish_connection() -> SqliteConnection { SqliteConnection::establish("test.db").expect("Error connecting to database") } fn main() { let connection = establish_connection(); // Insert and query data using Diesel ORM }4,数据服务
use axum::{routing::get, Json, Router}; use serde::Serialize; use std::net::SocketAddr; #[derive(Serialize)] struct HelloWorld { message: String, } #[tokio::main] async fn main() { let app = Router::new().route("/", get(hello_world)); let addr = SocketAddr::from(([127.0.0.1], 3000)); axum::Server::bind(&addr) .serve(app.into_make_service()) .await .unwrap(); } async fn hello_world() -> Json<HelloWorld> { Json(HelloWorld { message: "Hello, World!".to_string(), }) }5,数据挖掘
use polars::prelude::*; use std::fs::File; use std::io::BufReader; fn main() -> Result<()> { let file = File::open("path/to/your/data.csv").expect("Failed to open file"); let reader = BufReader::new(file); let df = CsvReader::new(reader) .infer_schema(None) .has_header(true) .finish()?; // Print the first few rows of the DataFrame println!("DataFrame preview:"); println!("{:?}", df.head(Some(5))); // Example: Calculate the mean and sum of a column named "value" let mean_value = df .lazy() .select([col("value").mean().alias("mean_value")]) .collect()? .column("mean_value")? .get(0); println!("Mean of 'value' column: {:?}", mean_value); let sum_value = df .lazy() .select([col("value").sum().alias("sum_value")]) .collect()? .column("sum_value")? .get(0); println!("Sum of 'value' column: {:?}", sum_value); // Example: Group by a column named "category" and calculate aggregate metrics let grouped_df = df .lazy() .groupby([col("category")]) .agg([ col("value").mean().alias("mean_value"), col("value").sum().alias("sum_value"), col("value").count().alias("count"), ]) .collect()?; println!("Grouped DataFrame with aggregate metrics:"); println!("{:?}", grouped_df); Ok(()) }数据工程的游戏规则改变者:DataFusion
use datafusion::prelude::*; use arrow::record_batch::RecordBatch; #[tokio::main] async fn main() -> datafusion::error::Result<()> { // 创建一个新的执行上下文 let mut ctx = ExecutionContext::new(); // 注册CSV文件 ctx.register_csv("example", "path/to/csv/file", CsvReadOptions::new()).await?; // 执行SQL查询 let df = ctx.sql("SELECT * FROM example WHERE column_name > 100").await?; // 收集结果 let batches: Vec<RecordBatch> = df.collect().await?; // Print the results for batch in batches { println!("{:?}", batch); } Ok(()) }由于DataFusion的出现,新的库如ballista将取代Spark在数据处理方面的地位,Ballista是一个用Rust编写的分布式计算平台,专为高性能、大规模数据处理而设计。它利用Apache Arrow实现高效的内存列数据表示,利用DataFusion实现查询执行,允许开发人员以分布式方式执行复杂的数据转换和分析。
use ballista::prelude::*; use tokio; #[tokio::main] async fn main() -> Result<()> { // 创建一个Ballista上下文 let ctx = BallistaContext::local(); // 注册CSV文件 ctx.register_csv("example", "path/to/your/data.csv", CsvReadOptions::new()).await?; // 执行SQL查询 let df = ctx.sql("SELECT * FROM example WHERE some_column > 100").await?; // 收集并打印结果 let batches = df.collect().await?; for batch in batches { println!("{:?}", batch); } Ok(()) }总结