MESSAGE_QUEUE = { "topic1": [message1_1, message1_2, message1_3, ...], "topic2": [message2_1, message1_1, message2_2, ...], "topic3": [message3_1, ...] }Kafka生产者会将消息(任何类型)追加到一个或多个队列中。Kafka消费者随后会订阅这些主题并解析传入的消息。消费者可以根据配置,仅解析新消息、存储在磁盘上的整个队列,或队列中的任何子集消息。Kafka的功能远不止这些,但对于初学者来说,这种基本的理解将有助于理解Kafka在本文后续部分中用于进程间通信的用途。
cargo add rdkafka tokio注意 :rdkafka:0.36.2并且为cmake-build的features,tokio:1.37.0
use rdkafka::config::ClientConfig; use rdkafka::message::{Header, OwnedHeaders}; use rdkafka::producer::{FutureProducer, FutureRecord}; use std::time::Duration; pub struct Producer { future_producer: FutureProducer, topic : String, message_id: usize }Kafka生产者需要三个成员:
一个消息ID,用于在每次运行之间为每个发送的消息分配唯一标识符。
impl Producer { pub fn new(bootstrap_server : &str, message_timeout : &str, topic : &str) -> Producer { // 创建rdkafka的FutureProducer let producer: &FutureProducer = &ClientConfig::new() .set("bootstrap.servers", bootstrap_server) .set("message.timeout.ms", message_timeout) .create() .expect("Producer creation error"); // 创建并返回Producer结构体 Producer { future_producer: producer.clone(), topic: String::from(topic), message_id: 0 } } pub async fn produce(&mut self, message_bytes: &str) -> bool { // 获取发送消息的交付状态(Future),这是将消息发送到指定主题和代理的结果 let delivery_status = self.future_producer.send( FutureRecord::to(self.topic.as_str()) .payload(&format!("Message: {}", message_bytes)) .key(&format!("Key: {}", self.message_id)) .headers(OwnedHeaders::new().insert(Header { key: "header_key", value: Some("header_value"), })), Duration::from_secs(0), ).await; // 递增消息ID self.message_id += 1; // 堆代码 duidaima.com // 返回值取决于交付状态 match delivery_status.err() { None => { true } // 没有错误 => 消息发送成功 Some(err) => { // 如果有错误,打印错误并返回false print!("Error sending message {0} on topic {1}: {2}", message_bytes, self.topic, err.0); false } } } }将创建两个独立的Producer,分别发布到不同的主题。请注意,一个Producer可以发布到不同的主题,但每个主题使用一个Producer可以使代码更简洁。
mod producer; use std::future::Future; use producer::producer::{ Producer }; #[tokio::main] async fn main() { // 定义一些要发送的消息(生产者)——一个模拟的行情信息 let price_updates = vec![ "AAPL 169.58", "TSLA 164.90", "IBM 189.14", "PARA 11.97", "INTC 38.71", "AAPL 169.65", "TSLA 165.14", "IBM 189.34", "PARA 12.01", "INTC 38.89", "AAPL 169.60", "TSLA 162.56", "IBM 189.49", "PARA 12.23", "INTC 39.01", "AAPL 169.50", "TSLA 163.53", "IBM 189.02", "PARA 12.12", "INTC 38.92", "AAPL 169.90", "TSLA 163.92", "IBM 190.21", "PARA 11.99", "INTC 38.24", ]; let trade_updates = vec![ "BUY AAPL 100 170.00", "SELL PARA 453 11.23", "BUY INTC 200 39.55", "BUY TSLA 1000 170.00", "SELL AAPL 200 171.12", ]; // 指定生产者属性... let server : &str = "127.0.0.1:9092"; let timeout : &str = "5000"; // ... 并创建生产者 let mut price_producer = Producer::new(server, timeout, "ticker.topic"); let mut trade_producer = Producer::new(server, timeout, "trade.topic"); for update in price_updates { match price_producer.produce(update).await { true => { println!("价格更新发送成功。"); } false => { println!("发送消息失败"); } } } for trade in trade_updates { match trade_producer.produce(trade).await { true => { println!("交易更新发送成功。"); } false => { println!("发送消息失败"); } } } }在上面的异步主函数中,创建了两个消息列表:一个交易列表(trade_updates)和一个价格列表(price_updates)。还为两个主题(ticker.topic和trade.topic)分别创建了一个生产者,用于依次发送各自列表中的每条消息。在发送消息时,成功或失败的方法会被记录到控制台,给出以下输出。