闽公网安备 35020302035485号
在当今相互关联的世界中,高效的文件共享是许多应用程序的基本需求,特别是在分布式环境中运行的应用程序。在本文中,我们将探索如何利用Kafka和Rust的强大功能来构建高效的文件共享系统。
cargo new rust-kafka在深入实现细节之前,让我们简要讨论一下rdkafka,它是Kafka C API的Rust绑定。rdkafka为与Kafka集群交互提供了一个高级的、习惯的Rust接口,这使得它成为Rust开发人员将Kafka集成到应用程序中的绝佳选择。
[dependencies]
lazy_static = "1.4.0"
rdkafka = "0.36.2"
tokio = { version = "1.36.0", features = ["full"] }
生产者代码(发送文件到Kafka)
创建src/bin/producer.rs文件,在文件中写入以下代码:use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout::Never;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::time::Instant;
async fn produce_file_to_kafka(
file_path: &str,
topic: &str,
brokers: &str,
) -> Result<(), Box<dyn std::error::Error>> {
// 配置Kafka生产者
let producer: FutureProducer = rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("produce.offset.report", "true")
.set("message.max.bytes", (1 * 1024 * 1024).to_string())
.create()?;
// 读取文件块,并产生每个块作为一个带有Kafka头的消息
let mut file = File::open(file_path).await?;
let mut reader = BufReader::new(&mut file);
let mut buffer = vec![0u8; 1000 * 1024]; // 1000KB缓冲区大小
let mut total_bytes_read = 0;
let mut total_chunks_sent = 0;
let filename_with_ext = match file_path.rsplit_once('/') {
Some((_, name)) => name,
None => file_path,
};
let now = Instant::now(); // 起始时间
// 开始读取文件
loop {
let n = reader.read(&mut buffer).await.unwrap_or(0);
if n == 0 {
break;
}
total_bytes_read += n;
total_chunks_sent += 1;
// 生成文件块作为带有Kafka头的消息
let record = FutureRecord::to(topic)
.payload(&buffer[0..n]) // 只发送实际读取的数据
.key(filename_with_ext);
match producer.send(record, Never).await {
Ok(_) => println!("File chunk {} sent successfully.", total_chunks_sent),
Err((kafka_error, _)) => {
eprintln!("Error sending remaining data: {:?}", kafka_error);
return Err(Box::new(kafka_error));
}
}
// 堆代码 duidaima.com
// 为下一次迭代重置缓冲区
buffer = vec![0u8; 1000 * 1024];
}
// 处理剩余数据(如有)
if total_bytes_read % (1000 * 1024) != 0 {
let remaining_data = &buffer[0..(total_bytes_read % (1000 * 1024))];
// 将剩余的数据生成作为带有Kafka头的消息
let record = FutureRecord::to(topic)
.payload(remaining_data)
.key(filename_with_ext)
.headers(OwnedHeaders::new().insert(Header {
key: "file_name",
value: Some(filename_with_ext),
}));
match producer.send(record, Never).await {
Ok(_) => println!("Remaining data sent successfully."),
Err((kafka_error, _)) => {
eprintln!("Error sending remaining data: {:?}", kafka_error);
return Err(Box::new(kafka_error));
}
}
}
let elapsed_time = now.elapsed();
println!("Running took {} seconds.", elapsed_time.as_secs());
println!("Total bytes read: {} bytes", total_bytes_read);
// 关闭文件和producer
drop(file);
drop(producer);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let file_path = "./test.txt"; // 你的文件路径
let topic = "file-sharing-topic";
let brokers = "localhost:9092";
produce_file_to_kafka(file_path, topic, brokers).await?;
println!("File successfully sent to Kafka!");
Ok(())
}
代码解析:use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
async fn consume_files_from_kafka(
topic: &str,
brokers: &str,
) -> Result<(), Box<dyn std::error::Error>> {
// Kafka消费者配置
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "file-consumer-group")
.set("bootstrap.servers", brokers)
.set("auto.offset.reset", "earliest")
.create()?;
// 订阅Kafka主题
consumer.subscribe(&[topic])?;
// 存放目录
let output_directory = "output/";
// 开始消费消息
loop {
match consumer.recv().await {
Ok(msg) => {
let key = msg.key().map(|k| String::from_utf8_lossy(k));
let payload = msg.payload().unwrap();
if let Some(filename) = key {
// 创建文件路径
let file_path = format!("{}{}", output_directory, filename);
// 将有效负载写入文件
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&file_path)
.await?;
file.write_all(payload).await?;
println!("File saved: {}", file_path);
} else {
println!("Message without a filename key");
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Message without a filename key",
)));
}
}
Err(e) => {
eprintln!("Error consuming message: {:?}", e);
return Err(Box::new(e));
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let topic = "file-sharing-topic";
let brokers = "localhost:9092";
consume_files_from_kafka(topic, brokers).await?;
Ok(())
}
代码解析:6,错误处理:我们处理在消息消费,文件写入或Kafka通信期间可能发生的任何错误。将错误打印到控制台以进行调试。
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
- 9093:9093
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
总结