• 如何在Rust中使用Kafka消息队列
  • 发布于 1个月前
  • 132 热度
    0 评论
Rust是一种相对较新的编程语言,首次出现于2015年。自那时起,它已成为替代在高性能应用中广泛使用的编程语言(如C和C++)的有力竞争者。这主要归功于其基于所有权的资源管理(OBRM)模型,该模型本质上强制用户遵循RAII方法,并且其静态类型和编译特性。

对于想要深入了解Rust及其众多独特特性的读者,可以参考《Rust by Example》和《The Rust Programming Language》,这两本都是由Rust项目贡献者编写和维护的免费电子书。

Kafka
Apache Kafka是一款开源软件,专为分布式事件流设计,注重高吞吐量和高性能应用。Kafka是一个强大的工具,全面介绍它超出了本文的范围。在本文中,我们只需了解Kafka用于在应用程序之间传递消息。它作为一个独立的服务运行,维护着在Kafka中被称为“主题”的队列。

一个简单的理解Kafka的方式是将其视为一个Python字典,该字典将字符串映射到消息。
MESSAGE_QUEUE = {
  "topic1": [message1_1, message1_2, message1_3, ...],
  "topic2": [message2_1, message1_1, message2_2, ...],
  "topic3": [message3_1, ...]
}
Kafka生产者会将消息(任何类型)追加到一个或多个队列中。Kafka消费者随后会订阅这些主题并解析传入的消息。消费者可以根据配置,仅解析新消息、存储在磁盘上的整个队列,或队列中的任何子集消息。Kafka的功能远不止这些,但对于初学者来说,这种基本的理解将有助于理解Kafka在本文后续部分中用于进程间通信的用途。

为了便于本文的讨论,假设读者已经设置了一个Kafka集群,或者可以访问在云端运行的Kafka集群,例如Confluent云。设置Kafka集群非常简单,网上有许多资源可以帮助初学者入门。

Kafka + Rust
许多现代系统需要进程间通信,使用消息队列系统,如Kafka、RabbitMQ、IBM MQ,或由云服务提供商(如Amazon和Google)提供的其他发布/订阅系统。现代语言和技术,如Rust和Kafka,必须被考虑用于需要高性能消息传递和快速处理时间的应用,因为它们具有低延迟和高吞吐量的能力。Kafka生产者 任何消息传递系统的良好起点是创建一个应用程序,将消息放入消息队列,通常称为生产者。在本文中,使用rust-rdkafka来创建生产者和消费者。

首先,必须将rdkafka和tokio依赖项添加到Cargo.toml文件中,可以通过cargo添加依赖项。
cargo add rdkafka tokio
注意 :rdkafka:0.36.2并且为cmake-build的features,tokio:1.37.0
先把Kafka的消息结构体定义一下
use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;

pub struct Producer {
    future_producer: FutureProducer,
    topic : String,
    message_id: usize
}
Kafka生产者需要三个成员:
FutureProducer:这是一个返回消息发送结果的rdkafka生产者,通过Future来避免用户定义的回调函数。
一个发布消息的主题。这也可以是一个主题列表,只需对发送逻辑进行一些修改。

一个消息ID,用于在每次运行之间为每个发送的消息分配唯一标识符。


生产者还将实现两个函数:一个关联的new函数用于创建结构体的实例,以及一个方法用于将消息放入指定的Kafka主题和服务器。这些方法的实现如下所示,并附有注释以说明其功能。
impl Producer {
    pub fn new(bootstrap_server : &str, message_timeout : &str, topic : &str) -> Producer {
        // 创建rdkafka的FutureProducer
        let producer: &FutureProducer = &ClientConfig::new()
            .set("bootstrap.servers", bootstrap_server)
            .set("message.timeout.ms", message_timeout)
            .create()
            .expect("Producer creation error");

        // 创建并返回Producer结构体
        Producer { future_producer: producer.clone(), topic: String::from(topic), message_id: 0 }
    }

    pub async fn produce(&mut self, message_bytes: &str) -> bool {
        // 获取发送消息的交付状态(Future),这是将消息发送到指定主题和代理的结果
        let delivery_status =
            self.future_producer.send(
                FutureRecord::to(self.topic.as_str())
                    .payload(&format!("Message: {}", message_bytes))
                    .key(&format!("Key: {}", self.message_id))
                    .headers(OwnedHeaders::new().insert(Header {
                        key: "header_key",
                        value: Some("header_value"),
                    })),
                Duration::from_secs(0),
            ).await;

        // 递增消息ID
        self.message_id += 1;
        // 堆代码 duidaima.com
        // 返回值取决于交付状态
        match delivery_status.err() {
            None => { true }    // 没有错误 => 消息发送成功
            Some(err) => {
                // 如果有错误,打印错误并返回false
                print!("Error sending message {0} on topic {1}: {2}", message_bytes, self.topic, err.0);
                false
            }
        }
    }
}
将创建两个独立的Producer,分别发布到不同的主题。请注意,一个Producer可以发布到不同的主题,但每个主题使用一个Producer可以使代码更简洁。
mod producer;

use std::future::Future;
use producer::producer::{ Producer };

#[tokio::main]
async fn main() {
    // 定义一些要发送的消息(生产者)——一个模拟的行情信息
    let price_updates = vec![
        "AAPL 169.58", "TSLA 164.90", "IBM 189.14", "PARA 11.97", "INTC 38.71",
        "AAPL 169.65", "TSLA 165.14", "IBM 189.34", "PARA 12.01", "INTC 38.89",
        "AAPL 169.60", "TSLA 162.56", "IBM 189.49", "PARA 12.23", "INTC 39.01",
        "AAPL 169.50", "TSLA 163.53", "IBM 189.02", "PARA 12.12", "INTC 38.92",
        "AAPL 169.90", "TSLA 163.92", "IBM 190.21", "PARA 11.99", "INTC 38.24",
    ];

    let trade_updates = vec![
        "BUY AAPL 100 170.00",
        "SELL PARA 453 11.23",
        "BUY INTC 200 39.55",
        "BUY TSLA 1000 170.00",
        "SELL AAPL 200 171.12",
    ];

    // 指定生产者属性...
    let server : &str = "127.0.0.1:9092";
    let timeout : &str = "5000";

    // ... 并创建生产者
    let mut price_producer = Producer::new(server, timeout, "ticker.topic");
    let mut trade_producer = Producer::new(server, timeout, "trade.topic");
    for update in price_updates {
        match price_producer.produce(update).await {
            true => { println!("价格更新发送成功。"); }
            false => { println!("发送消息失败"); }
        }
    }

    for trade in trade_updates {
        match trade_producer.produce(trade).await {
            true => { println!("交易更新发送成功。"); }
            false => { println!("发送消息失败"); }
        }
    }
}
在上面的异步主函数中,创建了两个消息列表:一个交易列表(trade_updates)和一个价格列表(price_updates)。还为两个主题(ticker.topic和trade.topic)分别创建了一个生产者,用于依次发送各自列表中的每条消息。在发送消息时,成功或失败的方法会被记录到控制台,给出以下输出。
用户评论