闽公网安备 35020302035485号
use reqwest::Client;
use tokio::stream::StreamExt;
# 堆代码 duidaima.com
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["http://example.com/data1", "http://example.com/data2"];
let fetches = futures::stream::iter(urls)
.map(|url| {
let client = client.clone();
async move {
let resp = client.get(url).send().await?;
let body = resp.text().await?;
Ok(body)
}
})
.buffer_unordered(5);
fetches
.for_each(|res| async {
match res {
Ok(data) => println!("Fetched data: {}", data),
Err(e) => eprintln!("Error: {}", e),
}
})
.await;
}
2,数据转换 let data = vec![1, 2, 3, 4, 5];
let processed_data: Vec<_> = data
.into_iter()
.filter(|&x| x % 2 == 0)
.map(|x| x * 2)
.collect();
println!("Processed data: {:?}", processed_data);
3,数据存储use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
fn establish_connection() -> SqliteConnection {
SqliteConnection::establish("test.db").expect("Error connecting to database")
}
fn main() {
let connection = establish_connection();
// Insert and query data using Diesel ORM
}
4,数据服务use axum::{routing::get, Json, Router};
use serde::Serialize;
use std::net::SocketAddr;
#[derive(Serialize)]
struct HelloWorld {
message: String,
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/", get(hello_world));
let addr = SocketAddr::from(([127.0.0.1], 3000));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
async fn hello_world() -> Json<HelloWorld> {
Json(HelloWorld {
message: "Hello, World!".to_string(),
})
}
5,数据挖掘use polars::prelude::*;
use std::fs::File;
use std::io::BufReader;
fn main() -> Result<()> {
let file = File::open("path/to/your/data.csv").expect("Failed to open file");
let reader = BufReader::new(file);
let df = CsvReader::new(reader)
.infer_schema(None)
.has_header(true)
.finish()?;
// Print the first few rows of the DataFrame
println!("DataFrame preview:");
println!("{:?}", df.head(Some(5)));
// Example: Calculate the mean and sum of a column named "value"
let mean_value = df
.lazy()
.select([col("value").mean().alias("mean_value")])
.collect()?
.column("mean_value")?
.get(0);
println!("Mean of 'value' column: {:?}", mean_value);
let sum_value = df
.lazy()
.select([col("value").sum().alias("sum_value")])
.collect()?
.column("sum_value")?
.get(0);
println!("Sum of 'value' column: {:?}", sum_value);
// Example: Group by a column named "category" and calculate aggregate metrics
let grouped_df = df
.lazy()
.groupby([col("category")])
.agg([
col("value").mean().alias("mean_value"),
col("value").sum().alias("sum_value"),
col("value").count().alias("count"),
])
.collect()?;
println!("Grouped DataFrame with aggregate metrics:");
println!("{:?}", grouped_df);
Ok(())
}
数据工程的游戏规则改变者:DataFusionuse datafusion::prelude::*;
use arrow::record_batch::RecordBatch;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// 创建一个新的执行上下文
let mut ctx = ExecutionContext::new();
// 注册CSV文件
ctx.register_csv("example", "path/to/csv/file", CsvReadOptions::new()).await?;
// 执行SQL查询
let df = ctx.sql("SELECT * FROM example WHERE column_name > 100").await?;
// 收集结果
let batches: Vec<RecordBatch> = df.collect().await?;
// Print the results
for batch in batches {
println!("{:?}", batch);
}
Ok(())
}
由于DataFusion的出现,新的库如ballista将取代Spark在数据处理方面的地位,Ballista是一个用Rust编写的分布式计算平台,专为高性能、大规模数据处理而设计。它利用Apache Arrow实现高效的内存列数据表示,利用DataFusion实现查询执行,允许开发人员以分布式方式执行复杂的数据转换和分析。use ballista::prelude::*;
use tokio;
#[tokio::main]
async fn main() -> Result<()> {
// 创建一个Ballista上下文
let ctx = BallistaContext::local();
// 注册CSV文件
ctx.register_csv("example", "path/to/your/data.csv", CsvReadOptions::new()).await?;
// 执行SQL查询
let df = ctx.sql("SELECT * FROM example WHERE some_column > 100").await?;
// 收集并打印结果
let batches = df.collect().await?;
for batch in batches {
println!("{:?}", batch);
}
Ok(())
}
总结