Tokio是Rust中异步编程的运行时,围绕Future的抽象而构建。它利用了Rust的所有权和并发模型的强大功能来并发处理多个任务。它提供了一系列用于编写异步代码的实用程序,包括I/O驱动程序、计时器和异步任务调度器。了解Tokio的高级概念可以进一步帮助我们创建更复杂和高效的应用程序。
首先在我们项目中的Cargo.toml文件中加入依赖库:
[dependencies]
tokio = { version = "1.28.2", features = ["full"] }
tokio-stream = "0.1.14"
Streams(流)
Tokio中的streams是异步值的序列,它们类似于迭代器,但不是阻塞执行。streams可以表示事件序列或异步I/O。例如,streams可以表示从WebSocket连接传入的消息。
下面是一个简单的streams例子:
use tokio_stream::{once, StreamExt};
# 堆代码 duidaima.com
#[tokio::main]
async fn main() {
let mut stream = once(5);
while let Some(v) = stream.next().await {
println!("{}", v);
}
}
这个例子创建了一个只包含一个元素5的streams,然后将其打印出来。
Channels(通道)
Tokio中的channel用于异步任务之间的通信,可以使用它们将数据从一个任务发送到另一个任务。channel可以是有界的,也可以是无界的,有界channel对一次可以容纳多少消息有限制,而无界channel则没有这样的限制。
下面是一个简单的channel例子:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
for i in 0..10 {
if tx.send(i).await.is_err() {
break;
}
}
});
while let Some(v) = rx.recv().await {
println!("{}", v);
}
}
这个例子创建了一个容量为100的channel,然后,它启动一个新任务,将数字从0到9发送到channel中。同时,主任务从channel接收数字并打印出来。
Timeouts(超时)
Tokio提供了设置任务超时的实用程序,可以使用超时来防止任务花费太长时间。如果一个任务在超时前没有完成,它将被取消。
下面是一个超时的例子:
use tokio::time::{sleep, timeout, Duration};
#[tokio::main]
async fn main() {
let result = timeout(Duration::from_secs(5), sleep(Duration::from_secs(10))).await;
match result {
Ok(_) => println!("Task finished in time"),
Err(_) => println!("Task timed out"),
}
}
这个例子创建了一个休眠10秒的任务,并为它设置了5秒的超时。因此,任务将在完成之前被取消,并且程序将打印“task timed out”。
错误处理
Tokio中的错误处理方式与同步Rust中的相同,也使用Result和Option类型处理可能失败的函数。当使用?操作符时,如果表达式为Err,则函数将立即返回并将控制权交还给执行器。
下面是一个错误处理的例子:
use tokio::fs::File;
use tokio::io::AsyncReadExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut file = File::open("foo.txt").await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
println!("{}", contents);
Ok(())
}
这个例子试图打开一个名为“foo.txt”的文件并读取其内容,如果文件无法打开或内容无法读取,该函数将返回一个错误。
简而言之,Tokio是Rust中异步编程的强大工具。它为构建高效的并发应用程序提供了一个健壮的框架,并提供了各种有用的实用程序来处理流、通道、超时和错误。