• Rust Tokio 任务取消模式
  • 发布于 2个月前
  • 194 热度
    0 评论
我最近一直在尝试重新学习 Rust。开始时进展有些缓慢,但我觉得我终于开始感受到累积效应在起作用了。也许这只是我的大脑在跟我开玩笑,但现在写 Rust 时我感觉比几周前轻松多了。

我过去曾对 Rust 有所了解,但从未构建过任何“实质性”的东西,而往往最大的学习成果都来源于这些大型项目,因为你会遇到在小规模项目中不会出现的问题。但这并不是说构建小型项目没有用!相反!它们是让你熟悉任何新语言的绝佳方式!只是小型项目通常不会突出显示你在构建它们时不需要的某些语言领域的知识不足。

对我来说,其中一个领域是并发性。Rust 支持异步编程[1],允许你启动异步任务,然后通过运行时执行器在 OS 线程之间调度执行。和 Rust 中的其他所有内容一样,异步编程也必须是内存安全的,所以你需要确保借用检查器(borrow checker)满意。这有时是一个真正的挑战。

在 Rust 领域,更奇怪的是,虽然异步编程是一等公民,但标准库(即 async_std)相当有限,或者至少对于一个像我这样的未经训练的初学者来说是这样。结果人们倾向于使用 tokio[2] crate,它提供了丰富的功能,并有一个非常棒的教程。我对 Rust 异步编程的大部分了解都是通过遵循 tokio 教程[3]学到的。

教程中没有详细介绍的一点是任务取消,这是我在一个项目中需要的功能。具体来说,我需要启动一堆工作任务,并需要一种方法来取消其中的任何一个或全部。我做了一些研究和破解,并想出了一些模式,我想把它们放在我可以轻松访问的地方。

我写这篇博客文章和配套的 GitHub 仓库[4]的目的是记录这些模式,并希望这篇文章能像一份活的文档一样,基于我未来希望学习的新的 tokio 技巧不断得到更新。正如我所说,我目前是一个 Rust 菜鸟,所以这些“模式”可能完全不靠谱——如果你发现它们是这样的,请在文章末尾留下评论,我会很乐意更新它。

Select 和 Channel
这些模式的核心是 tokio 的两个特性(假设你已经熟悉任务的创建):
channel:用于任务间通信
select:用于等待多个异步计算(不一定是任务!)

乍一看,它们与 Go 的 channel 和 select 惊人地相似,但魔鬼藏在细节中。虽然 Go 提供了一个单一的 channel 构造,但其语义(发送/接收)是通过可选的 <-/-> 运算符“调整”的(仅当明确写出时,编译器才会检查——不幸的是,这很容易忘记,而且往往会被许多 Go 程序员省略);tokio 的 channel 看似更复杂一些,但在内存安全和程序韧性方面更为强大。

Tokio 的 channel 创建了两个不同的对象用于任务间通信。你不能使用单个 channel 同时进行接收和发送。这防止了我在 Go 编程中多次遇到的一类问题,比如当你意外地省略了可选的 channel 语义,并向一个只应接收数据的 channel 发送数据时——Go 编译器看不出有什么问题,但你却要花上几个小时调试为什么事情不起作用。

从 Go 程序员的角度来看,另一个有趣的特性是,你可以创建多生产者单消费者 channel(通过 mpsc 缩写表示),这可以防止我在 Go 程序中遇到的另一类问题:多个接收者修改同一个对象,从而导致难以发现的数据竞争。在 Go 中,你可以轻松地将一个指针通过 channel 发送给多个消费者,并承诺它是只读的,或者你永远不会尝试修改它;问题在于,Go 编译器不会强制执行这些规则——这实际上只是一种你希望使用你代码的人遵循的约定/未成文规则。拥有一个可以帮助你强制执行这些规则的编译器非常实用,并且与 Rust 的内存安全原则相契合。实际上,并发类型安全的故事更加有趣,涉及讨论 Send 和 Sync 特性,但本文是关于任务取消模式的,因此我会让你自己去深入研究这些特性。

如果你确实需要将多个值发送给多个消费者,Tokio 已经为你准备好了,不用担心:Tokio 提供了 broadcast channel。

一般来说,Tokio 实际上提供了四种类型的 channel:
mpsc:多生产者单消费者,如上所述
oneshot:用于发送和接收单个值;一旦发送,channel 就会关闭
broadcast:如上所述——多个发送者,多个消费者
watch:单生产者,多消费者
正如我所说,这些在 Tokio 教程中都有很好的文档和讨论。我花了些时间来消化这些概念,因为我自然倾向于将它们与它们在 Go 语言中的等价物进行比较,但它们之间有一些微妙的差异。但无论如何,让我们继续探讨利用 select 和 channel的实际取消模式。

以下是我提出的一些模式的不完整列表。对于经验丰富的 Rust 程序员来说,其中一些可能看起来有些牵强,但对我来说它们相当有效,正如我所说,我还是 Rust 的新手,所以我可能做错了什么。接下来是这些模式!

drop JoinHandle 不会取消任务
注意:colorfulchew[5] 在 Reddit 上正确地指出[6],使用 drop 实际上并不会取消任务,但当句柄超出作用域时丢弃它确实会——请参见本示例下方的示例。abcSilverline[7] 提到我误解了官方文档[8],该文档明确指出:

“当 JoinHandle 被丢弃时,它会分离与其关联的任务,这意味着不再有任何指向该任务的句柄,也无法再等待它完成。
每次你在 tokio 中启动(spawn)一个任务时,你都会得到一个 JoinHandle。你可以使用 join 句柄来等待任务完成,但我原以为你可以仅仅通过丢弃它来强制终止任务。下面是一个愚蠢的例子:
use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // do some work
        tokio::time::sleep(Duration::from_secs(10)).await;
        println!("Task completed");
    });

    // Cancel the task after 1 second
    time::sleep(Duration::from_millis(100)).await;
    drop(handle);

    println!("Task was cancelled");
}
我也错误地以为我可以这样做:
use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    {
        let handle = tokio::spawn(async {
            // do some work
            tokio::time::sleep(Duration::from_secs(10)).await;
            println!("Task completed");
        });

        // Cancel the task after 100ms
        time::sleep(Duration::from_millis(100)).await;
    }

    println!("Task was cancelled");
}
唉,不是的,丢弃句柄并不会取消正在运行的任务!

终止任务
这是取消任务的最极端方式,不给清理工作留下任何空间:
use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // do some work
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("Task completed");
    });

    // Cancel the task after 100 milliseconds
    time::sleep(Duration::from_millis(100)).await;
    handle.abort();
    time::sleep(Duration::from_secs(2)).await;

    println!("Task was cancelled");
}
使用 oneshot
如果你只需要一次性地将取消信号广播给多个任务,那么 oneshot channel 就能很好地满足你的需求。oneshot channel 允许在 channel 上进行一次发送,多个接收者可以监听这次发送。与直接丢弃句柄不同,这种模式允许你的 channel 进行一些清理工作。以下是一个示例:
use tokio::time::Duration;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    let task = tokio::spawn(async move {
        tokio::select! {
            _ = rx => {
                println!("Task is cancelling...");
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Task completed normally");
            }
        }
        println!("Task is cleaning up");
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Send a cancellation signal
    let _ = tx.send(());

    // Wait for the tasks to finish
    // NOTE: we could do this instead:
    // let _ = tokio::join!(task);
    let _ = task.await;
}
现在,如果你运行这个程序,你会得到类似这样的结果:
Task is cancelling...
Task is cleaning up
oneshot channel 的局限性在于,你不能使用它来取消多个任务。它实际上是为一次性通知而设计和优化的。所以请注意这一点!

使用 broadcast 取消多个任务
如果你想要取消多个任务,你可以使用 broadcast channel。你可以有多个生产者向 channel 发送数据,同时也有多个消费者从 channel 接收数据。每个消费者都能看到 channel 上发送的每个值。非常方便!

以下是一个简单的示例,展示了如何使用 broadcast channel 来取消多个任务并让它们进行清理:
use tokio::sync::broadcast;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(1);
    let mut rx2 = tx.subscribe();

    let task1 = tokio::spawn(async move {
        tokio::select! {
            _ = rx1.recv() => {
                println!("Task 1 is cancelling...");
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Task 1 completed normally");
            }
        }
        println!("Task 1 is cleaning up");
    });

    let task2 = tokio::spawn(async move {
        tokio::select! {
            _ = rx2.recv() => {
                println!("Task 2 is cancelling...");
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Task 2 completed normally");
            }
        }
        println!("Task 2 is cleaning up");
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Send a cancellation signal
    let _ = tx.send(());

    // Wait for the tasks to finish
    let _ = tokio::join!(task1, task2);
}
如果你运行这个程序,你会得到类似这样的结果:
Task 2 is cancelling...
Task 2 is cleaning up
Task 1 is cancelling...
Task 1 is cleaning up
请注意,在你运行时,取消的顺序可能会有所不同,因为任务可能会以不同的顺序被取消!
请注意,取消的顺序可能会因你的情况而异,因为任务可能会以不同的顺序被取消!

如果你只是想从单个任务向多个任务发送取消信号,那么使用 broadcast channel 可能会有点过度,因为它提供了在多个任务之间传递消息的所有机制。

如果你既需要消息传递又需要取消功能,那么 broadcast channel 会很有用。但如果你只需要取消功能,那么有更好的方式(或者说可能不一定是更好的,但开销更少),那就是 watch channel。

使用 watch 取消多个任务
watch channel 是一个单生产者多消费者 channel。同样,watch channel 给了任务在取消后自行清理的机会。但缺点是,消费者只能看到 channel 上发送的最新值——这意味着,如果你的任务在 channel 上发送值之后才启动,它可能会错过这个值,因此不会被取消,所以请注意这一点。以下是一个简单的示例:
use tokio::sync::watch;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = watch::channel(false);
    let mut rx2 = tx.subscribe();

    let task1 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = rx1.changed() => {
                    if *rx1.borrow() {
                        println!("Task 1 is cancelling...");
                        break;
                    }
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 1 completed normally");
                    break;
                }
            }
        }
        println!("Task 1 is cleaning up");
    });

    let task2 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = rx2.changed() => {
                    if *rx2.borrow() {
                        println!("Task 2 is cancelling...");
                        break;
                    }
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 2 completed normally");
                    break;
                }
            }
        }
        println!("Task 2 is cleaning up");
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Send a cancellation signal
    let _ = tx.send(true);

    // Wait for the tasks to finish
    let _ = tokio::join!(task1, task2);
}
这个 channel 被设计用来监视程序中的变化。有点像特定数据的发布/订阅模式:文档提到将配置更改作为标准示例进行监视,但你也可以将其用于取消操作。

请注意,我们在初始化 channel 时发送了 false,并且我们使用 true 来发送取消信号。每个任务都需要检查是否收到了 true,因为它们也可能会收到 false。我们也可以发送一些枚举或字符串,如 Foo::cancel 或 "cancel",但同样,每个任务都需要检查是否是取消信号,如果是,则进行一些清理然后退出。

取消令牌
官方文档在关于优雅关闭[9]的文章中列出了一个名为 CancellationToken 的东西。这在 tokio crate 本身中不可用,而是在相关的 toko_util[10] crate 中。

我没有在任何项目中用过它,因为我试图避免引入比我已经使用的更多的依赖关系,但这是另一个有趣的选择,实际上是针对取消的。
use tokio::time::{sleep, Duration};
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    // Create a CancellationToken
    let token = CancellationToken::new();

    let token1 = token.clone();
    let token2 = token.clone();

    let task1 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token1.cancelled() => {
                        println!("Task 1 is cancelling...");
                        break;
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 1 completed normally");
                    break;
                }
            }
        }
        println!("Task 1 is cleaning up");
    });

    let task2 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token2.cancelled() => {
                        println!("Task 2 is cancelling...");
                        break;
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 2 completed normally");
                    break;
                }
            }
        }
        println!("Task 2 is cleaning up");
    });

    sleep(Duration::from_millis(100)).await;

    // Send a cancellation signal
    token.cancel();

    // Wait for the tasks to finish
    let _ = tokio::join!(task1, task2);
}
请注意我们是如何克隆令牌的,以便将其移动到单独的异步任务中。值得一提的是,还有一个叫做 child_token[11] 的东西,用官方文档的话来说:


与克隆的 CancellationToken 不同,取消子令牌不会取消父令牌。

结论
这是我过去几周在学习 tokio 的过程中整理的取消选项列表。正如我所说,这绝不是一份完整的列表,而且很可能还有更多可供选择的选项,我迫切希望了解它们,所以请不要害羞,在评论中告诉我。你可以在 GitHub[12] 上找到这篇博客文章中列出的所有代码示例作为参考。不要害羞,用新的模式或修复现有模式打开一个 PR(Pull Request)吧!
用户评论