• Rust中futures::stream::FuturesUnordered的用法
  • 发布于 2个月前
  • 444 热度
    0 评论
  • 黄月英
  • 0 粉丝 47 篇博客
  •   
在这篇文章中,我们将探索futures::stream::FuturesUnordered,这是一个强大而有效的工具,可以同时处理多个异步任务。这个工具允许我们以非阻塞的方式轮询多个Future,自动处理完成的单个任务,并在它们可用时产生结果。

让我们创建一些异步函数来模拟不同的任务,并使用FuturesUnordered来并发地运行它们。首先新建一个Rust项目:
cargo new futuresunordered_example
然后向Cargo.toml文件添加所需的依赖项:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
接下来,创建异步函数:
use std::time::Duration;
use tokio::time::sleep;
// 堆代码 duidaima.com
async fn task_one() -> String {
    sleep(Duration::from_secs(3)).await;
    "Task one completed".to_owned()
}
async fn task_two() -> String {
    sleep(Duration::from_secs(1)).await;
    "Task two completed".to_owned()
}
async fn task_three() -> String {
    sleep(Duration::from_secs(2)).await;
    "Task three completed".to_owned()
}
我们定义了三个异步函数task_one、task_two和task_three,每个函数都使用tokio::time::sleep模拟一个具有不同持续时间的任务。现在,我们将使用FuturesUnordered来并发运行这些任务:
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::pin::Pin;
use std::future::Future;

#[tokio::main]
async fn main() {
    let mut tasks = FuturesUnordered::<Pin<Box<dyn Future<Output = String>>>>::new();
     tasks.push(Box::pin(task_one()));
    tasks.push(Box::pin(task_two()));
    tasks.push(Box::pin(task_three()));
    while let Some(result) = tasks.next().await {
        println!("{}", result);
    }
}
在代码中使用Box::pin创建每个future的固定、堆分配版本。确保future的内存位置在它开始执行时不会改变。

当你运行这个示例时,你将看到任务是并发执行的,并且它们的结果在完成时打印出来。输出的顺序可能不同,因为它取决于每个任务的完成时间:
Task two completed
Task three completed
Task one completed
在本例中,我们定义了三个异步函数task_one、task_two和task_three,每个函数都使用tokio::time::sleep模拟一个具有不同持续时间的任务。我们创建了一个名为tasks的FuturesUnordered实例,并将三个任务推入其中。

FuturesUnordered是如何工作的
futures::stream::FuturesUnordered背后的理论在于它的实现,它结合了高效的任务管理和Rust的异步编程模型。

下面是它的工作原理概述:
1,FuturesUnordered是一个包含一组Future的数据结构。在内部,它维护着两个Future链表——一个用于准备接受轮询的Future,另一个用于尚未准备好接受轮询的Future。

2,当你使用push()将一个Future添加到FuturesUnordered时,它最初被放置在未准备好的Future列表中。然后,future被注册到一个异步运行时中,以便在它准备好进行调度时(即当它准备好进行轮询时)得到通知。

3,当一个Future准备好了,它就从未准备好列表移到了准备好列表。这是通过异步运行时通知与future相关的唤醒器执行的。

4,FuturesUnordered结构本身实现了Stream特征。当你在FuturesUnordered实例上调用next()时,它将轮询就绪列表中的Future。

5,当就绪列表中有可用的future时,next().await调用将解析到已完成的future。如果没有准备好的future,它将异步地等待,直到future准备好。

6,当一个future被轮询并完成时,它将从就绪列表中删除,结果由next().await返回。如果future没有完成,它仍然在准备清单上,与此同时,另一个Future可能会被轮询。这个过程确保FuturesUnordered有效地同时处理多个Future,利用可用资源并避免阻塞。

7,如果一个future被取消或删除,它将从它各自的列表中删除(准备好或未准备好),确保与future相关的任何资源都被释放。

8,只要在就绪或未就绪列表中有Future,FuturesUnordered stream就继续返回值。一旦所有Future都完成或取消,FuturesUnordered stream将返回None,表明stream已经结束。

总结
FuturesUnordered确保没有一个Future被落下,每一个Future都能在时机成熟时成为关注的焦点。我们也分析了FuturesUnordered如何维护两个列表,一个用于准备轮询的Futre,另一个用于仍在等待轮询的Future。利用FuturesUnordered的功能,Rust应用程序可以获得更好的性能,有效地管理并发性,并最大限度地利用资源。
用户评论