异步流对你来说还是个谜吗?如果你自己动手写一个,完全可以掌握它。通过引入 async/await 编程范式,异步编程已经变得对大多数软件开发者来说更加易于上手。代码看起来几乎就像是经典的阻塞式流程,这得益于编译器在幕后构建了一个复杂的状态机。
再深入一层,Rust 使用轮询(polling)、唤醒器(waker)和固定(pinning)。这些组件虽然不使用 async/await 语法,但仍然提供了完整的非阻塞并发能力。代价是没有自动生成的状态机,这可能需要你自己来实现。作为回报,你可以获得性能提升并对工作代码有完全的控制权。一个你需要并且应该处于轮询级别的例子就是异步流。它们完美地融入了 async/await 的世界,同时在后台实现了真正的非阻塞机制,没有显著的开销。
fn main() { let numbers = vec![1, 2, 3, 4, 5]; // 堆代码 duidaima.com for number in numbers.iter() { println!("Number: {}", number); } }代码使用 for 循环来惯用遍历集合。这实际上等同于下面的语法。请注意,这里临时创建的迭代器是一个可变变量。
fn main() { let numbers = vec![1, 2, 3, 4, 5]; let mut iter = numbers.iter(); while let Some(number) = iter.next() { println!("Number: {}", number); } }因为我们是在遍历一个向量,我们知道这是非常快的。next 函数并没有执行任何 I/O 操作。但如果我们的迭代器提供了一系列数字或网络连接呢?我们只需修改几行代码就可以从 async/await 中获益。
#[tokio::main] async fn main() { let mut numbers = fetch_numbers(); while let Some(number) = numbers.next().await { println!("{}", number); } }你可以看到它几乎就像一个常规的迭代器,但实际上它是一个异步流。这不是很美妙吗?让我们来看看那些尚未实现的特性是如何展现的。
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; } pub trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; }你可能会注意到一些相似之处。两者都定义了一个返回的项类型,正好是你在 while 循环中匹配的那种类型。它们都需要实现一个 next 函数来提供下一个要产生的项。你可能会问,到底什么是 TAR 格式,以及它与 Rust 中的异步流有什么关系。TAR 文件格式本质上是一种将一组文件和目录打包成单个文件的方法。这种格式与磁带驱动器的历史紧密相关,磁带驱动器是早期计算机时代的一种数据存储形式。它专门针对磁带存储设计,因此得名“Tape ARchive”。
00000000: 4361 7267 6f2e 746f 6d6c 0000 0000 0000 Cargo.toml...... 00000010: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000020: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000030: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000040: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000050: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000060: 0000 0000 3030 3030 3634 3400 3030 3031 ....0000644.0001 00000070: 3735 3000 3030 3031 3735 3000 3030 3030 750.0001750.0000 00000080: 3030 3030 3037 3000 3134 3537 3136 3533 0000070.14571653 00000090: 3534 3000 3031 3233 3435 0020 3000 0000 540.012345. 0... 000000a0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000000b0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000000c0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000000d0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000000e0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000000f0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000100: 0075 7374 6172 2020 0076 7363 6f64 6500 .ustar .vscode. 00000110: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000120: 0000 0000 0000 0000 0076 7363 6f64 6500 .........vscode. 00000130: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000140: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000150: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000160: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000170: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000180: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00000190: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000001a0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000001b0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000001c0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000001d0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000001e0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 000001f0: 0000 0000 0000 0000 0000 0000 0000 0000 ................有了文档在手,我们可以试着解码其中编码的信息:
头部校验和:012345,以八进制字符串编码
enum TarEntry { File(String), } struct TarArchive { entries: Vec<TarEntry>, } impl TarArchive { fn new() -> Self { Self { entries: Vec::new() } } fn append_file(&mut self, file: String) { self.entries.push(TarEntry::File(file)); } }你看到的是,归档只包含了要被归档的条目。现在,我们可以增加将其转换为流的可能性。这将会消耗掉归档内容。
impl TarArchive { ... fn into_stream(self, buffer_size: usize) -> TarStream { TarStream::new(self.entries, buffer_size) } } struct TarStream { buffer_size: usize, entries: VecDeque<TarEntry>, } impl TarStream { fn new(entries: Vec<TarEntry>, buffer_size: usize) -> Self { Self { buffer_size: buffer_size / 512 * 512, entries: entries.into(), } } }我们几乎创建了一个流。唯一缺少的部分是 Stream 特性的实际代码实现。
enum TarChunk { Header(String, Box<[u8; 512]>), Data(Vec<u8>), Padding(usize), } enum TarError {} type TarResult<T> = Result<T, TarError>; impl Stream for TarStream { type Item = TarResult<TarChunk>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { todo!() }特质实现要求定义流中的项是什么。在我们的情况下,我们想要返回一个块,确切地说是三种类型的块。头部将包含文件名和 512 字节的有效载荷。数据变体是已读文件的一个切片,但它也可能包含填充,正如文档所规定的。最后,填充变体——每个 tarball 的末尾总是发送两个空块。有了这样一个草稿,我们就可以编写一个消费者。将发生的事情输出到控制台可能就足够好了。以下代码将为三个文件生成一个 tarball。它不会真正写出 tarball,而只会显示交互式的进度。
#[tokio::main] async fn main() { let mut archive = TarArchive::new(); archive.append_file("enwiki-20230801-pages-meta-history27.xml-p74198591p74500204".to_owned()); archive.append_file("lubuntu-22.04.3-desktop-amd64.iso".to_owned()); archive.append_file("qemu-8.2.1.tar.xz".to_owned()); let mut stream = archive.into_stream(10 * 1024 * 1024); while let Some(chunk) = stream.next().await { match chunk { Ok(TarChunk::Header(path, _)) => println!("\nheader {path}"), Ok(TarChunk::Data(_)) => print!("."), Ok(TarChunk::Padding(0)) => println!("\npadding 0"), Ok(TarChunk::Padding(index)) => println!("padding {index}"), Err(error) => println!("error: {:?}", error), } std::io::stdout().flush().unwrap(); } }在我的机器上,它产生了以下输出:
header enwiki-20230801-pages-meta-history27.xml-p74198591p74500204 ......................................................................... ......................................................................... ......................................................................... ......................................................................... ......................................................................... ......................................................................... ............................................................. header lubuntu-22.04.3-desktop-amd64.iso ......................................................................... ......................................................................... ......................................................................... ............................................................ header qemu-8.2.1.tar.xz ............. padding 0 padding 1点的数量代表每个 10MB 的数据块。应用程序仅运行了几秒钟。为了让我们的实现达到这样的效果,我们可以添加一个状态字段到流中,以便知道我们在状态机中的位置。
struct TarStream { state: TarState, ... } impl TarStream { pub fn new(entries: Vec<TarEntry>, buffer_size: usize) -> Self { Self { state: TarState::init(), ... } } } impl Stream for TarStream { ... fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let self_mut = self.get_mut(); loop { let mut state = TarState::completed(); mem::swap(&mut state, &mut self_mut.state); let result = match state { TarState::Init(state) => state.poll(cx), TarState::Open(state) => state.poll(cx), TarState::Header(state) => state.poll(cx), TarState::Read(state) => state.poll(cx), TarState::Padding(state) => state.poll(cx), TarState::Completed(state) => state.poll(cx), }; let (state, poll) = match result { TarPollResult::ContinueLooping(state) => (state, None), TarPollResult::ReturnPolling(state, poll) => (state, Some(poll)), TarPollResult::NextEntry() => match self_mut.entries.pop_front() { None => (TarState::padding(), None), Some(entry) => (TarState::open(self_mut.buffer_size, entry), None), }, }; self_mut.state = state; if let Some(poll) = poll { return poll; } } } }你看到的是,每个状态变体都会接收到一个轮询。它返回一个轮询结果,基于这个结果,会决定是继续迭代、返回找到的轮询结果,还是直接跳转到下一个条目。每次轮询后,当前状态都会更新。但是轮询究竟是什么呢?我们会经常用到它。从一个角度来看,当我们实现一个异步流时,我们正在被轮询。从另一个角度看,我们会将轮询委托给其他可轮询的实体。每个轮询函数都有一个共同的特点:它接收一个 Context 并返回一个 Poll 枚举值。
enum TarState { Init(TarStateInit), Open(TarStateOpen), Header(TarStateHeader), Read(TarStateRead), Padding(TarStatePadding), Completed(TarStateCompleted), }这些状态各自具有特殊的意义:
impl TarStateHandler for TarStateInit { fn poll(self, _cx: &mut Context<'_>) -> TarPollResult { TarPollResult::NextEntry() } }你看到的是,初始状态几乎什么都不做。它的主要功能是指示流继续到下一个条目。记住,解析从任何状态返回的结果的过程是由流本身处理的:
let (state, poll) = match result { TarPollResult::ContinueLooping(state) => (state, None), TarPollResult::ReturnPolling(state, poll) => (state, Some(poll)), TarPollResult::NextEntry() => match self_mut.entries.pop_front() { None => (TarState::padding(), None), Some(entry) => (TarState::open(self_mut.buffer_size, entry), None), }, };在这种情况下,“下一个条目”意味着从向量中挑选下一个条目。如果没有找到任何条目,我们就开始填充;否则,我们带着新选择的条目过渡到“打开”状态。
pub async fn open(path: impl AsRef<Path>) -> io::Result<File> { let path = path.as_ref().to_owned(); let std = asyncify(|| StdFile::open(path)).await?; Ok(File::from_std(std)) }这对我们的意义是什么?这意味着存在一些挑战,因为我们需要轮询一个 Future。轮询一个 Future 需要将其存储为一个固定的、装箱的动态对象。
struct TarStateOpen { buffer_size: usize, task: Pin<Box<dyn Future<Output = Result<(String, File), std::io::Error>> + Send>>, } impl TarStateOpen { fn new(buffer_size: usize, entry: TarEntry) -> Self { let task = async move { match entry { TarEntry::File(path) => match File::open(&path).await { Ok(file) => Ok((path, file)), Err(error) => Err(error), }, } }; Self { buffer_size: buffer_size, task: Box::pin(task), } } } impl TarStateHandler for TarStateOpen { fn poll(mut self, cx: &mut Context<'_>) -> TarPollResult { let (path, file) = match self.task.as_mut().poll(cx) { Poll::Pending => return TarState::Open(self).pending(), Poll::Ready(Err(error)) => return TarState::failed(TarError::IOFailed(error)), Poll::Ready(Ok((path, file))) => (path, file), }; TarStateHeader::new(self.buffer_size, path, file).poll(cx) } }你看到的是,在构造函数中,我们存储了一个指向从异步块返回的 Future 的固定引用。我们不能直接等待它,但我们可以在 poll 函数中稍后轮询它。如果我们得到 Pending,我们就简单地返回相同的 Init 状态,并附带一个 Pending 的流结果。如果我们成功地收到了一个元组,我们就把它传递给下一个状态。这不是很直接吗?
pub async fn metadata(&self) -> io::Result<Metadata> { let std = self.std.clone(); asyncify(move || std.metadata()).await }这意味着我们需要存储这个 Future 以及我们在前一个状态中收到的文件。
struct TarStateHeader { buffer_size: usize, path: String, task: Pin<Box<dyn Future<Output = Result<(File, Metadata), std::io::Error>> + Send>>, } impl TarStateHeader { fn new<'a>(buffer_size: usize, path: String, file: File) -> TarStateHeader { let task = async move { match file.metadata().await { Ok(metadata) => Ok((file, metadata)), Err(error) => Err(error), } }; Self { path: path, task: Box::pin(task), buffer_size: buffer_size, } } } impl TarStateHandler for TarStateHeader { fn poll(mut self, cx: &mut Context<'_>) -> TarPollResult { let (file, metadata) = match self.task.as_mut().poll(cx) { Poll::Pending => return TarState::Header(self).pending(), Poll::Ready(Err(error)) => return TarState::failed(TarError::IOFailed(error)), Poll::Ready(Ok(metadata)) => metadata, }; let length: u64 = metadata.len(); let header: TarHeader = TarHeader::empty(self.path); match header.write(&metadata) { Ok(chunk) => TarState::read(self.buffer_size, file, length).ready(chunk), Err(error) => TarState::failed(error), } } }元数据将用于填充 TAR 头部结构,而文件将被传递到下一阶段。如同之前描述的状态一样,Pending 的结果意味着我们需要将它连同状态(即 self)一起返回,以便后续恢复。你可能对 TAR 头部是如何构建并转换为块感兴趣的。我有意省略了一些代码,只是为了展示构建头部的基本步骤。
struct TarHeader { path: String, data: Box<[u8; 512]>, } impl TarHeader { ... fn write(mut self, metadata: &Metadata) -> TarResult<TarChunk> { let data = &mut self.data; Self::write_name(data, &self.path)?; Self::write_mode(data, metadata)?; Self::write_uid(data, 0)?; Self::write_gid(data, 0)?; Self::write_size(data, metadata)?; Self::write_mtime(data, metadata)?; Self::write_magic(data)?; Self::write_type_flag(data)?; Self::write_chksum(data)?; Ok(self.into()) } } impl Into<TarChunk> for TarHeader { fn into(self) -> TarChunk { TarChunk::header(self.path, self.data) } }最后是读取状态。它似乎是最重要的状态,因为 CPU 在这里将花费大部分时间。它也将是最常
struct TarStateRead { buffer_size: usize, file: File, left: usize, completed: usize, chunk: TarChunk, offset: usize, } impl TarStateRead { fn new(buffer_size: usize, file: File, length: u64) -> Self { let left = length as usize / 512; let available = buffer_size / 512; let pages = std::cmp::min(available, left); let pages = pages + if length as usize > 0 { 1 } else { 0 }; Self { buffer_size: buffer_size, file: file, left: length as usize, completed: 0, chunk: TarChunk::data(pages), offset: 0, } } fn advance(self, bytes: usize) -> Self { Self { buffer_size: self.buffer_size, file: self.file, left: self.left - bytes, completed: self.completed + bytes, chunk: self.chunk, offset: self.offset + bytes, } } fn next(self) -> (TarChunk, Self) { let left = self.left / 512; let available = self.buffer_size / 512; let pages = std::cmp::min(available, left); let pages = pages + if self.left % 512 > 0 { 1 } else { 0 }; ( self.chunk, Self { buffer_size: self.buffer_size, file: self.file, left: self.left, completed: self.completed, chunk: TarChunk::data(pages), offset: 0, }, ) } }你看到的是,我们需要管理许多更多的字段。状态不仅包含要从中读取的文件,还包括文件长度以及一些表示当前位置、剩余字节数或在当前块中的偏移量的数字。你也可以注意到一个当前块,它可能还没有被完全填满,甚至可能是空的。我们定义了两个有用的函数:advance 和 next。第一个函数帮助我们在当前块中推进偏移量,而后者创建一个新的状态,返回之前填充的块。
impl TarStateHandler for TarStateRead { fn poll(mut self, cx: &mut Context<'_>) -> TarPollResult { let pinned: Pin<&mut File> = Pin::new(&mut self.file); let data = match self.chunk.offset(self.offset) { Err(error) => return TarState::failed(error), Ok(data) => data, }; let mut buffer: ReadBuf<'_> = ReadBuf::new(data); match pinned.poll_read(cx, &mut buffer) { Poll::Pending => return TarState::Read(self).pending(), Poll::Ready(Err(error)) => return TarState::failed(TarError::IOFailed(error)), _ => (), } let read: usize = buffer.filled().len(); let advanced: TarStateRead = self.advance(read); if advanced.left == 0 { return TarState::init().ready(advanced.chunk); } if advanced.offset == advanced.chunk.len() { let (chunk, state) = advanced.next(); return TarState::from(TarState::Read(state)).ready(chunk); } TarState::from(TarState::Read(advanced)).looping() } }主要流程是类似的。当数据还未准备好时,我们返回待定状态。但当我们收到一些数据时,我们需要做出几个决策。当前块是否已被完全填满?我们是否已经完成了文件的读取?这些问题的答案会影响返回的状态,或者可能影响流,产生一个块。接下来是什么呢?是填充状态,对吧?它是最简单的状态之一。它包括一个用于表示其索引的字段。
struct TarStatePadding { index: usize, } impl TarStatePadding { fn new() -> Self { Self { index: 0 } } fn next(self) -> Self { Self { index: self.index + 1 } } }我们知道我们需要在结尾处发送正好两个这样的填充块。这使得状态处理器相对简单明了。
impl TarStateHandler for TarStatePadding { fn poll(self, _cx: &mut Context<'_>) -> TarPollResult { match self.index { 0 => TarState::Padding(self.next()).ready(TarChunk::padding(0)), index => TarState::completed().ready(TarChunk::padding(index)), } } }一旦我们知道已经发送了两个填充块,我们就需要过渡到完成状态。这很复杂吗?一点也不。它仅仅涉及返回 None 来表示流的结束。
impl TarStateHandler for TarStateCompleted { fn poll(self, _cx: &mut Context<'_>) -> TarPollResult { TarPollResult::ReturnPolling(TarState::completed(), Poll::Ready(None)) } }当我们覆盖了所有状态之后,你可能会感到有些不知所措。所有这些代码实际上可以通过使用 async/await 由编译器自动生成。它会稍微更简洁一些,因为可维护性并不是编译器的首要关注点。我在开头提到过,我们还将尝试使用这个异步流将几个文件上传到创建的 Docker 容器中。在我的上一个故事中,我通过 Unix 套接字构建了一个轻量级的 HTTP 客户端来与 Docker Runtime API 通信。我们将扩展它的功能。
pub struct TarBody { inner: TarStream, } impl TarBody { pub fn from(stream: TarStream) -> Self { Self { inner: stream } } } impl Body for TarBody { type Data = Bytes; type Error = DockerError; fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { let self_mut: &mut TarBody = self.get_mut(); let pointer: &mut TarStream = &mut self_mut.inner; let inner: Pin<&mut TarStream> = Pin::new(pointer); match inner.poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(chunk) => match chunk { None => Poll::Ready(None), Some(Err(error)) => Poll::Ready(Some(DockerError::raise_outgoing_archive_failed(error))), Some(Ok(chunk)) => { let data: Vec<u8> = chunk.into(); let frame: Frame<Bytes> = Frame::data(Bytes::from(data)); Poll::Ready(Some(Ok(frame))) } }, } } }在 33 行代码中,我们神奇地将一个 tar 块转换成了 hyper 所期望的字节帧,并在这个过程中处理了所有可能出现的错误。这个过程如何整合进 HTTP 客户端中呢?请考虑以下代码:
async fn container_upload(&self, id: &str, path: &str, archive: TarArchive) -> DockerResult<ContainerUpload> { let url: String = format!("/v1.42/containers/{id}/archive?path={path}"); let connection: DockerConnection<TarBody> = DockerConnection::open(&self.socket).await?; let stream: TarStream = archive.into_stream(256 * 1024); let data: TarBody = TarBody::from(stream); match connection.put(&url, data).await { Ok(response) => match response.into_bytes().await { Ok(_) => Ok(ContainerUpload::Succeeded), Err(error) => Err(error), }, Err(error) => match error { DockerError::StatusFailed(url, status, response) => match status.as_u16() { 400 => Ok(ContainerUpload::BadParameter(response.into_error().await?)), 403 => Ok(ContainerUpload::PermissionDenied(response.into_error().await?)), 404 => Ok(ContainerUpload::NoSuchContainer(response.into_error().await?)), 500 => Ok(ContainerUpload::ServerError(response.into_error().await?)), _ => Err(DockerError::StatusFailed(url, status, response)), }, error => Err(error), }, } }上传函数接收我们的 tarball 流,将其转换为 tar body 流,并传递给 put 函数。我认为这是一种完美的集成。我们如何知道它按预期工作呢?让我们创建一个容器,上传几个文件,并计算它们的哈希值。然后我们可以将这些哈希值与本地文件的哈希值进行比较,以确认一切按预期工作。