• Rust异步编程
  • 发布于 2个月前
  • 445 热度
    0 评论
在这篇文章中将解释一些用于在Rust中实现异步编程的底层结构,同时将其与高级crate功能联系起来。

什么是异步编程?
简而言之,它允许在不等待另一个操作完成的情况下进行多个不同的操作。例如,如果你想做鸡蛋和吐司,你可以先煎鸡蛋,但你不必等到它们煎好再开始烤面包。那么Rust是如何实现异步编程的呢?

例子比讲述更好,所以这里有一个rust异步函数的例子。首先,让我们创建一个新的项目:
cargo new asyncexamples --bin
接下来,在我们的cargo.toml文件中添加以下依赖项:
[dependencies]
futures = "0.3.28"
tokio = {version = "1.27.0",  features = ["full"]}
现在,编写我们的async函数:
// 堆代码 duidaima.com
use tokio::time::{sleep, Duration};

async fn eggs() -> u8 {
    println!("Proceed with expensive computation");
    sleep(Duration::from_millis(1000)).await;
    println!("Expensive computation is finished!");
    20
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Our first Async Function!");
    let x = eggs();
    println!("The END!?");
    Ok(())
}
执行cargo run
Our first Async Function!
The END!?
有两件事引起了我的注意,第一个是为什么没有任何print语句在异步函数运行?其次,你可能会注意到的另一件事是,eggs实际上返回了一些东西,它实现了Future trait,它的输出类型是u8。因为Async实际上只是下面代码的语法糖。
use futures::Future;
use tokio::time::{sleep, Duration};

fn eggs() -> impl Future<Output = u8> {
    println!("Proceed with expensive computation");
    async {
        sleep(Duration::from_millis(1000)).await;
        println!("Expensive computation is finished!");
        20
    }
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Our first Async Function!");
    let x = eggs();
    println!("The END!?");
    Ok(())
}
这显示了由async关键字封装的块,它实现了Future trait类型,这就是要返回的东西!

Rust使用这个特性来表示一个值(在本例中为u8类型),这个值可能现在还不可用,但将来会可用。(因此他们称之为“Future”)。
pub trait Future {
    type Output; // 这是trait返回的类型 
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
现在,Futures trait有一个名为“poll”的函数,它返回一个Poll枚举,可以是:
Poll:Pending; // 表示任务仍在进行或等待中
Poll:Ready(Self::Output); // 表示完成
// 注意Self::Output包含我们的实际值!
poll函数用于表示异步块的状态,如果它仍在进行中,则返回Poll::Pending,否则返回Poll::Ready,并包含该异步块的返回值!Context类型是什么?它的意思是表示“异步任务的上下文”,它被用来提供对&Waker类型对象的访问,然后用于“唤醒”当前任务!

那么我们如何访问函数要返回的值呢?我们使用await,这意味着放弃对当前线程的执行控制,为其他线程的执行腾出空间。
use tokio::time::{sleep, Duration};
// 堆代码 duidaima.com
async fn eggs() -> u8 {
    println!("Proceed with expensive computation");
    sleep(Duration::from_millis(1000)).await;
    println!("Expensive computation is finished!");
    20
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Our first Async Function!");
    let x = eggs().await;
    println!("x = {x}");
    println!("The END!?");
    Ok(())
}
现在我已经在async eggs函数中添加了await,但是为什么我们一开始就需要添加它呢?这是因为,在Rust中,future是懒惰的,为了让它运行,我们必须调用await,稍后会调用poll函数,但是调用这个poll函数的是什么呢?

正如前面提到的,当我们调用.await时,实际上会放弃对当前线程的控制,并将其发送到某个地方。所以,我们显然需要一种方法来管理这些线程,这就是Tokio异步运行时发挥作用的地方。

Tokio有一个全局执行器,它将帮助我们运行这些异步块,并反过来做我们想要它做的计算。在此之前,我简要介绍了Waker是什么,以及他们做了些什么。我想现在是时候介绍这些对象背后的原因了。注意,执行器和唤醒器有两种实现方式。一种方法是使用分别由唤醒者和执行者调用的thread::unpark和thread::park。Thread::park意味着阻止当前线程运行,这样我们就不用等待接收消息,而可以做其他事情。Thread::unpark将被Waker对象调用,告诉执行者它已经等待了足够长的时间。另一种方法你可以使用Executor中的mspc通道和事件队列复制此功能。

假设我们现在把实现Future特性的异步块交给一个执行器。为了让执行器开始运行这个异步块直至完成,必须调用.await。一旦完成,执行器就会调用poll函数,并传入一个Context类型的对象,该对象封装了执行器创建的Waker对象。如果poll函数返回poll::Pending,那么该块还没有完成运行。因此,现在我们需要再次检查,而不是从循环中强制调用轮询函数,实际上可以使用Waker API向执行器发出任务完全完成或部分完成的信号。

我们从哪找这个Future的执行器?我们可以使用像Tokio这样的异步运行时。Tokio有一个运行时,它是一个强大的执行器,可以在单个运行时中管理各种Tokio组件。在下面的示例中,我们将使用一个多线程执行器,它允许运行时并发执行。注意,Tokio还提供单线程运行时!
use tokio::time::{sleep, Duration};

async fn eggs() -> String {
    println!("Lets start cooking some eggs!");
    sleep(Duration::from_millis(1000)).await;
    println!("Eggs are finished!");
    "Eggs!".to_string()
}

fn simple_example() {
    println!("Lets make some eggs!");
    let mut egg = "Raw Eggs".to_string();
    // rt 是执行器
    let rt = tokio::runtime::Builder::new_multi_thread()
                                                .enable_all()
                                                .build()
                                                .unwrap();
    // 在Tokio运行时上运行future直到完成,将其视为异步任务的起点。                                    
    rt.block_on(
        // 实现future的异步块!
        async {
            // 调用.await将给执行器一个绿色信号,让它运行直到完成!
            egg = eggs().await;
        }
    );
    println!("{}", egg);
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    simple_example();
    Ok(())
}
总结一下,我们有一个执行器,这个执行器为每个给定的异步任务创建一个Waker类型对象,该对象被一些上下文类型包装。执行器被赋予这些异步任务,它可以调用poll函数,该函数返回一个描述任务状态的enum。异步任务可以使用Waker类型通知执行者当前任务(部分)完成,所以执行者不会浪费任何时间调用poll函数来检查异步任务的状态。
用户评论