在当今相互关联的世界中,高效的文件共享是许多应用程序的基本需求,特别是在分布式环境中运行的应用程序。在本文中,我们将探索如何利用Kafka和Rust的强大功能来构建高效的文件共享系统。
cargo new rust-kafka在深入实现细节之前,让我们简要讨论一下rdkafka,它是Kafka C API的Rust绑定。rdkafka为与Kafka集群交互提供了一个高级的、习惯的Rust接口,这使得它成为Rust开发人员将Kafka集成到应用程序中的绝佳选择。
[dependencies] lazy_static = "1.4.0" rdkafka = "0.36.2" tokio = { version = "1.36.0", features = ["full"] }
生产者代码(发送文件到Kafka)
创建src/bin/producer.rs文件,在文件中写入以下代码:use rdkafka::message::{Header, OwnedHeaders}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout::Never; use tokio::fs::File; use tokio::io::{AsyncReadExt, BufReader}; use tokio::time::Instant; async fn produce_file_to_kafka( file_path: &str, topic: &str, brokers: &str, ) -> Result<(), Box<dyn std::error::Error>> { // 配置Kafka生产者 let producer: FutureProducer = rdkafka::config::ClientConfig::new() .set("bootstrap.servers", brokers) .set("produce.offset.report", "true") .set("message.max.bytes", (1 * 1024 * 1024).to_string()) .create()?; // 读取文件块,并产生每个块作为一个带有Kafka头的消息 let mut file = File::open(file_path).await?; let mut reader = BufReader::new(&mut file); let mut buffer = vec![0u8; 1000 * 1024]; // 1000KB缓冲区大小 let mut total_bytes_read = 0; let mut total_chunks_sent = 0; let filename_with_ext = match file_path.rsplit_once('/') { Some((_, name)) => name, None => file_path, }; let now = Instant::now(); // 起始时间 // 开始读取文件 loop { let n = reader.read(&mut buffer).await.unwrap_or(0); if n == 0 { break; } total_bytes_read += n; total_chunks_sent += 1; // 生成文件块作为带有Kafka头的消息 let record = FutureRecord::to(topic) .payload(&buffer[0..n]) // 只发送实际读取的数据 .key(filename_with_ext); match producer.send(record, Never).await { Ok(_) => println!("File chunk {} sent successfully.", total_chunks_sent), Err((kafka_error, _)) => { eprintln!("Error sending remaining data: {:?}", kafka_error); return Err(Box::new(kafka_error)); } } // 堆代码 duidaima.com // 为下一次迭代重置缓冲区 buffer = vec![0u8; 1000 * 1024]; } // 处理剩余数据(如有) if total_bytes_read % (1000 * 1024) != 0 { let remaining_data = &buffer[0..(total_bytes_read % (1000 * 1024))]; // 将剩余的数据生成作为带有Kafka头的消息 let record = FutureRecord::to(topic) .payload(remaining_data) .key(filename_with_ext) .headers(OwnedHeaders::new().insert(Header { key: "file_name", value: Some(filename_with_ext), })); match producer.send(record, Never).await { Ok(_) => println!("Remaining data sent successfully."), Err((kafka_error, _)) => { eprintln!("Error sending remaining data: {:?}", kafka_error); return Err(Box::new(kafka_error)); } } } let elapsed_time = now.elapsed(); println!("Running took {} seconds.", elapsed_time.as_secs()); println!("Total bytes read: {} bytes", total_bytes_read); // 关闭文件和producer drop(file); drop(producer); Ok(()) } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let file_path = "./test.txt"; // 你的文件路径 let topic = "file-sharing-topic"; let brokers = "localhost:9092"; produce_file_to_kafka(file_path, topic, brokers).await?; println!("File successfully sent to Kafka!"); Ok(()) }代码解析:
use rdkafka::config::ClientConfig; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::message::Message; use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; async fn consume_files_from_kafka( topic: &str, brokers: &str, ) -> Result<(), Box<dyn std::error::Error>> { // Kafka消费者配置 let consumer: StreamConsumer = ClientConfig::new() .set("group.id", "file-consumer-group") .set("bootstrap.servers", brokers) .set("auto.offset.reset", "earliest") .create()?; // 订阅Kafka主题 consumer.subscribe(&[topic])?; // 存放目录 let output_directory = "output/"; // 开始消费消息 loop { match consumer.recv().await { Ok(msg) => { let key = msg.key().map(|k| String::from_utf8_lossy(k)); let payload = msg.payload().unwrap(); if let Some(filename) = key { // 创建文件路径 let file_path = format!("{}{}", output_directory, filename); // 将有效负载写入文件 let mut file = OpenOptions::new() .create(true) .append(true) .open(&file_path) .await?; file.write_all(payload).await?; println!("File saved: {}", file_path); } else { println!("Message without a filename key"); return Err(Box::new(std::io::Error::new( std::io::ErrorKind::Other, "Message without a filename key", ))); } } Err(e) => { eprintln!("Error consuming message: {:?}", e); return Err(Box::new(e)); } } } } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let topic = "file-sharing-topic"; let brokers = "localhost:9092"; consume_files_from_kafka(topic, brokers).await?; Ok(()) }代码解析:
6,错误处理:我们处理在消息消费,文件写入或Kafka通信期间可能发生的任何错误。将错误打印到控制台以进行调试。
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:latest container_name: kafka depends_on: - zookeeper ports: - 9092:9092 - 9093:9093 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1总结