• 如何使用Rust 实现TAR文件的异步流式处理
  • 发布于 1个月前
  • 92 热度
    0 评论

异步流对你来说还是个谜吗?如果你自己动手写一个,完全可以掌握它。通过引入 async/await 编程范式,异步编程已经变得对大多数软件开发者来说更加易于上手。代码看起来几乎就像是经典的阻塞式流程,这得益于编译器在幕后构建了一个复杂的状态机。


再深入一层,Rust 使用轮询(polling)、唤醒器(waker)和固定(pinning)。这些组件虽然不使用 async/await 语法,但仍然提供了完整的非阻塞并发能力。代价是没有自动生成的状态机,这可能需要你自己来实现。作为回报,你可以获得性能提升并对工作代码有完全的控制权。一个你需要并且应该处于轮询级别的例子就是异步流。它们完美地融入了 async/await 的世界,同时在后台实现了真正的非阻塞机制,没有显著的开销。


在这个故事中,我们将编写一个用于输出 TAR 归档文件的二进制块流。我们会创建一个复杂的状态机并尝试展示最终的效果。此外,我们还会尝试将它集成到使用 Docker Runtime API 将文件上传到运行中的容器的过程中。

异步流与常规迭代器非常相似。你经常遍历各种集合。例如,你可能会写一个普通的循环,像下面这样:
fn main() {
    let numbers = vec![1, 2, 3, 4, 5];
    // 堆代码 duidaima.com
    for number in numbers.iter() {
        println!("Number: {}", number);
    }
}
代码使用 for 循环来惯用遍历集合。这实际上等同于下面的语法。请注意,这里临时创建的迭代器是一个可变变量。
fn main() {
    let numbers = vec![1, 2, 3, 4, 5];
    let mut iter = numbers.iter();

    while let Some(number) = iter.next() {
        println!("Number: {}", number);
    }
}
因为我们是在遍历一个向量,我们知道这是非常快的。next 函数并没有执行任何 I/O 操作。但如果我们的迭代器提供了一系列数字或网络连接呢?我们只需修改几行代码就可以从 async/await 中获益。
#[tokio::main]
async fn main() {
    let mut numbers = fetch_numbers();

    while let Some(number) = numbers.next().await {
        println!("{}", number);
    }
}
你可以看到它几乎就像一个常规的迭代器,但实际上它是一个异步流。这不是很美妙吗?让我们来看看那些尚未实现的特性是如何展现的。
pub trait Iterator {
    type Item;

    fn next(&mut self) -> Option<Self::Item>;
}

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;
}
你可能会注意到一些相似之处。两者都定义了一个返回的项类型,正好是你在 while 循环中匹配的那种类型。它们都需要实现一个 next 函数来提供下一个要产生的项。你可能会问,到底什么是 TAR 格式,以及它与 Rust 中的异步流有什么关系。TAR 文件格式本质上是一种将一组文件和目录打包成单个文件的方法。这种格式与磁带驱动器的历史紧密相关,磁带驱动器是早期计算机时代的一种数据存储形式。它专门针对磁带存储设计,因此得名“Tape ARchive”。

为了创建一个 tarball,我们需要处理多个文件。对于每个文件,我们需要创建 512 字节的头部,然后附加文件内容,如果需要的话还要加上 512 字节的填充。你看,大量的文件意味着大量的 I/O 操作。这不是邀请我们来玩转异步流吗?

让我们先来看一个为我手头的一个样例 Cargo.toml 文件创建的非常简单的 TAR 头部。正如前面提到的,头部必须恰好是 512 字节,并且编码了大量的关于单个文件的信息。
00000000: 4361 7267 6f2e 746f 6d6c 0000 0000 0000  Cargo.toml......
00000010: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000020: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000030: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000040: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000050: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000060: 0000 0000 3030 3030 3634 3400 3030 3031  ....0000644.0001
00000070: 3735 3000 3030 3031 3735 3000 3030 3030  750.0001750.0000
00000080: 3030 3030 3037 3000 3134 3537 3136 3533  0000070.14571653
00000090: 3534 3000 3031 3233 3435 0020 3000 0000  540.012345. 0...
000000a0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000000b0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000000c0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000000d0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000000e0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000000f0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000100: 0075 7374 6172 2020 0076 7363 6f64 6500  .ustar  .vscode.
00000110: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000120: 0000 0000 0000 0000 0076 7363 6f64 6500  .........vscode.
00000130: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000140: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000150: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000160: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000170: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000180: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00000190: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000001a0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000001b0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000001c0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000001d0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000001e0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
000001f0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
有了文档在手,我们可以试着解码其中编码的信息:
文件名:Cargo.toml
文件权限:0000644,以八进制字符串编码
用户和组:0001750,以八进制字符串编码
文件大小:0000070,以八进制字符串编码 → 56 字节
文件时间戳:14571653540,以八进制字符串编码 → 1709660000

头部校验和:012345,以八进制字符串编码


除此之外,文件还包含其他可选的数据,我们在生成头部时可以忽略这些数据。现在是时候开始编写基本的 Rust 代码了。我们首先定义几个结构体,并给出一个示例使用场景,但不实际实现。想象一下,你可以创建一个新的 TarArchive 结构体实例,并通过文件名添加几个文件。
enum TarEntry {
    File(String),
}

struct TarArchive {
    entries: Vec<TarEntry>,
}

impl TarArchive {
    fn new() -> Self {
        Self { entries: Vec::new() }
    }

    fn append_file(&mut self, file: String) {
        self.entries.push(TarEntry::File(file));
    }
}
你看到的是,归档只包含了要被归档的条目。现在,我们可以增加将其转换为流的可能性。这将会消耗掉归档内容。
impl TarArchive {
    ...

    fn into_stream(self, buffer_size: usize) -> TarStream {
        TarStream::new(self.entries, buffer_size)
    }
}

struct TarStream {
    buffer_size: usize,
    entries: VecDeque<TarEntry>,
}

impl TarStream {
    fn new(entries: Vec<TarEntry>, buffer_size: usize) -> Self {
        Self {
            buffer_size: buffer_size / 512 * 512,
            entries: entries.into(),
        }
    }
}
我们几乎创建了一个流。唯一缺少的部分是 Stream 特性的实际代码实现。
enum TarChunk {
    Header(String, Box<[u8; 512]>),
    Data(Vec<u8>),
    Padding(usize),
}

enum TarError {}
type TarResult<T> = Result<T, TarError>;

impl Stream for TarStream {
    type Item = TarResult<TarChunk>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        todo!()
    }
特质实现要求定义流中的项是什么。在我们的情况下,我们想要返回一个块,确切地说是三种类型的块。头部将包含文件名和 512 字节的有效载荷。数据变体是已读文件的一个切片,但它也可能包含填充,正如文档所规定的。最后,填充变体——每个 tarball 的末尾总是发送两个空块。有了这样一个草稿,我们就可以编写一个消费者。将发生的事情输出到控制台可能就足够好了。以下代码将为三个文件生成一个 tarball。它不会真正写出 tarball,而只会显示交互式的进度。
#[tokio::main]
async fn main() {
    let mut archive = TarArchive::new();

    archive.append_file("enwiki-20230801-pages-meta-history27.xml-p74198591p74500204".to_owned());
    archive.append_file("lubuntu-22.04.3-desktop-amd64.iso".to_owned());
    archive.append_file("qemu-8.2.1.tar.xz".to_owned());

    let mut stream = archive.into_stream(10 * 1024 * 1024);

    while let Some(chunk) = stream.next().await {
        match chunk {
            Ok(TarChunk::Header(path, _)) => println!("\nheader {path}"),
            Ok(TarChunk::Data(_)) => print!("."),
            Ok(TarChunk::Padding(0)) => println!("\npadding 0"),
            Ok(TarChunk::Padding(index)) => println!("padding {index}"),
            Err(error) => println!("error: {:?}", error),
        }

        std::io::stdout().flush().unwrap();
    }
}
在我的机器上,它产生了以下输出:
header enwiki-20230801-pages-meta-history27.xml-p74198591p74500204
.........................................................................
.........................................................................
.........................................................................
.........................................................................
.........................................................................
.........................................................................
.............................................................
header lubuntu-22.04.3-desktop-amd64.iso
.........................................................................
.........................................................................
.........................................................................
............................................................
header qemu-8.2.1.tar.xz
.............
padding 0
padding 1
点的数量代表每个 10MB 的数据块。应用程序仅运行了几秒钟。为了让我们的实现达到这样的效果,我们可以添加一个状态字段到流中,以便知道我们在状态机中的位置。
struct TarStream {
    state: TarState,
    ...
}

impl TarStream {
    pub fn new(entries: Vec<TarEntry>, buffer_size: usize) -> Self {
        Self {
            state: TarState::init(),
            ...
        }
    }
}

impl Stream for TarStream {
    ...

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let self_mut = self.get_mut();

        loop {
            let mut state = TarState::completed();
            mem::swap(&mut state, &mut self_mut.state);

            let result = match state {
                TarState::Init(state) => state.poll(cx),
                TarState::Open(state) => state.poll(cx),
                TarState::Header(state) => state.poll(cx),
                TarState::Read(state) => state.poll(cx),
                TarState::Padding(state) => state.poll(cx),
                TarState::Completed(state) => state.poll(cx),
            };

            let (state, poll) = match result {
                TarPollResult::ContinueLooping(state) => (state, None),
                TarPollResult::ReturnPolling(state, poll) => (state, Some(poll)),
                TarPollResult::NextEntry() => match self_mut.entries.pop_front() {
                    None => (TarState::padding(), None),
                    Some(entry) => (TarState::open(self_mut.buffer_size, entry), None),
                },
            };

            self_mut.state = state;

            if let Some(poll) = poll {
                return poll;
            }
        }
    }
}
你看到的是,每个状态变体都会接收到一个轮询。它返回一个轮询结果,基于这个结果,会决定是继续迭代、返回找到的轮询结果,还是直接跳转到下一个条目。每次轮询后,当前状态都会更新。但是轮询究竟是什么呢?我们会经常用到它。从一个角度来看,当我们实现一个异步流时,我们正在被轮询。从另一个角度看,我们会将轮询委托给其他可轮询的实体。每个轮询函数都有一个共同的特点:它接收一个 Context 并返回一个 Poll 枚举值。

Context 包含一个唤醒器(waker),当任务被阻塞时,它会被用来恢复任务。在这个故事中,我们不会深入探讨这个话题。更重要的是理解 Poll 枚举。它有两个变体:Pending 和 Ready。

Pending 变体表示轮询过程尚未完成。当你从函数中收到这个变体时,你知道还需要等待更多的时间,但不需要立即再次轮询。如果你收到 Ready,这意味着你已经获得了你所寻找的值。

我希望这个解释有助于你理解之前的列表。你可能还会好奇为什么我们需要有一个状态机。想象一下这种情况:有人调用了你的轮询函数,并且你返回了 Pending。这意味着你承诺在他们可以继续时通知他们。当这种情况发生时,被通知的人会再次调用你的轮询函数,你需要识别出上次你们停留在哪里。你需要负责追踪他们的进度。状态机模式是一种优雅的方式来建模这一点。

定义的状态机用于处理 tarball 的创建,它包含几个作为枚举变体的状态:
enum TarState {
    Init(TarStateInit),
    Open(TarStateOpen),
    Header(TarStateHeader),
    Read(TarStateRead),
    Padding(TarStatePadding),
    Completed(TarStateCompleted),
}
这些状态各自具有特殊的意义:
Init 代表处理单个文件的开始。
Open 保存关于开始打开文件的过程的信息。
Header 保存关于开始读取元数据(如长度和权限)的过程的信息。
Read 跟踪从文件中读取每个块的进度。
Padding 用于指示哪些填充信息已经被输出。
Completed 代表流生成的最终阶段。
状态机的基本流程可以用下图来表示。请记住,在出现任何错误的情况下,我们会直接过渡到 Completed 状态。

让我们首先尝试理解初始状态,作为一个热身练习。
impl TarStateHandler for TarStateInit {
    fn poll(self, _cx: &mut Context<'_>) -> TarPollResult {
        TarPollResult::NextEntry()
    }
}
你看到的是,初始状态几乎什么都不做。它的主要功能是指示流继续到下一个条目。记住,解析从任何状态返回的结果的过程是由流本身处理的:
let (state, poll) = match result {
    TarPollResult::ContinueLooping(state) => (state, None),
    TarPollResult::ReturnPolling(state, poll) => (state, Some(poll)),
    TarPollResult::NextEntry() => match self_mut.entries.pop_front() {
        None => (TarState::padding(), None),
        Some(entry) => (TarState::open(self_mut.buffer_size, entry), None),
    },
};
在这种情况下,“下一个条目”意味着从向量中挑选下一个条目。如果没有找到任何条目,我们就开始填充;否则,我们带着新选择的条目过渡到“打开”状态。

“打开”状态稍微复杂一些。打开一个文件是一个可能会被阻塞的操作。然而,似乎有些操作系统并不完全支持这一操作,而 Tokio 人为地使一个标准调用变得某种程度上异步。
pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
    let path = path.as_ref().to_owned();
    let std = asyncify(|| StdFile::open(path)).await?;

    Ok(File::from_std(std))
}
这对我们的意义是什么?这意味着存在一些挑战,因为我们需要轮询一个 Future。轮询一个 Future 需要将其存储为一个固定的、装箱的动态对象。
struct TarStateOpen {
    buffer_size: usize,
    task: Pin<Box<dyn Future<Output = Result<(String, File), std::io::Error>> + Send>>,
}

impl TarStateOpen {
    fn new(buffer_size: usize, entry: TarEntry) -> Self {
        let task = async move {
            match entry {
                TarEntry::File(path) => match File::open(&path).await {
                    Ok(file) => Ok((path, file)),
                    Err(error) => Err(error),
                },
            }
        };

        Self {
            buffer_size: buffer_size,
            task: Box::pin(task),
        }
    }
}

impl TarStateHandler for TarStateOpen {
    fn poll(mut self, cx: &mut Context<'_>) -> TarPollResult {
        let (path, file) = match self.task.as_mut().poll(cx) {
            Poll::Pending => return TarState::Open(self).pending(),
            Poll::Ready(Err(error)) => return TarState::failed(TarError::IOFailed(error)),
            Poll::Ready(Ok((path, file))) => (path, file),
        };

        TarStateHeader::new(self.buffer_size, path, file).poll(cx)
    }
} 
你看到的是,在构造函数中,我们存储了一个指向从异步块返回的 Future 的固定引用。我们不能直接等待它,但我们可以在 poll 函数中稍后轮询它。如果我们得到 Pending,我们就简单地返回相同的 Init 状态,并附带一个 Pending 的流结果。如果我们成功地收到了一个元组,我们就把它传递给下一个状态。这不是很直接吗?

Header 状态也需要轮询一个 Future,原因相同。获取元数据只能通过阻塞调用实现,Tokio 引入了人工异步化:
pub async fn metadata(&self) -> io::Result<Metadata> {
    let std = self.std.clone();
    asyncify(move || std.metadata()).await
}
这意味着我们需要存储这个 Future 以及我们在前一个状态中收到的文件。
struct TarStateHeader {
    buffer_size: usize,
    path: String,
    task: Pin<Box<dyn Future<Output = Result<(File, Metadata), std::io::Error>> + Send>>,
}

impl TarStateHeader {
    fn new<'a>(buffer_size: usize, path: String, file: File) -> TarStateHeader {
        let task = async move {
            match file.metadata().await {
                Ok(metadata) => Ok((file, metadata)),
                Err(error) => Err(error),
            }
        };

        Self {
            path: path,
            task: Box::pin(task),
            buffer_size: buffer_size,
        }
    }
}

impl TarStateHandler for TarStateHeader {
    fn poll(mut self, cx: &mut Context<'_>) -> TarPollResult {
        let (file, metadata) = match self.task.as_mut().poll(cx) {
            Poll::Pending => return TarState::Header(self).pending(),
            Poll::Ready(Err(error)) => return TarState::failed(TarError::IOFailed(error)),
            Poll::Ready(Ok(metadata)) => metadata,
        };

        let length: u64 = metadata.len();
        let header: TarHeader = TarHeader::empty(self.path);

        match header.write(&metadata) {
            Ok(chunk) => TarState::read(self.buffer_size, file, length).ready(chunk),
            Err(error) => TarState::failed(error),
        }
    }
}
元数据将用于填充 TAR 头部结构,而文件将被传递到下一阶段。如同之前描述的状态一样,Pending 的结果意味着我们需要将它连同状态(即 self)一起返回,以便后续恢复。你可能对 TAR 头部是如何构建并转换为块感兴趣的。我有意省略了一些代码,只是为了展示构建头部的基本步骤。
struct TarHeader {
    path: String,
    data: Box<[u8; 512]>,
}

impl TarHeader {
    ...

    fn write(mut self, metadata: &Metadata) -> TarResult<TarChunk> {
        let data = &mut self.data;

        Self::write_name(data, &self.path)?;
        Self::write_mode(data, metadata)?;
        Self::write_uid(data, 0)?;
        Self::write_gid(data, 0)?;
        Self::write_size(data, metadata)?;
        Self::write_mtime(data, metadata)?;
        Self::write_magic(data)?;
        Self::write_type_flag(data)?;
        Self::write_chksum(data)?;

        Ok(self.into())
    }
}

impl Into<TarChunk> for TarHeader {
    fn into(self) -> TarChunk {
        TarChunk::header(self.path, self.data)
    }
}
最后是读取状态。它似乎是最重要的状态,因为 CPU 在这里将花费大部分时间。它也将是最常
struct TarStateRead {
    buffer_size: usize,
    file: File,
    left: usize,
    completed: usize,
    chunk: TarChunk,
    offset: usize,
}

impl TarStateRead {
    fn new(buffer_size: usize, file: File, length: u64) -> Self {
        let left = length as usize / 512;
        let available = buffer_size / 512;

        let pages = std::cmp::min(available, left);
        let pages = pages + if length as usize > 0 { 1 } else { 0 };

        Self {
            buffer_size: buffer_size,
            file: file,
            left: length as usize,
            completed: 0,
            chunk: TarChunk::data(pages),
            offset: 0,
        }
    }

    fn advance(self, bytes: usize) -> Self {
        Self {
            buffer_size: self.buffer_size,
            file: self.file,
            left: self.left - bytes,
            completed: self.completed + bytes,
            chunk: self.chunk,
            offset: self.offset + bytes,
        }
    }

    fn next(self) -> (TarChunk, Self) {
        let left = self.left / 512;
        let available = self.buffer_size / 512;

        let pages = std::cmp::min(available, left);
        let pages = pages + if self.left % 512 > 0 { 1 } else { 0 };

        (
            self.chunk,
            Self {
                buffer_size: self.buffer_size,
                file: self.file,
                left: self.left,
                completed: self.completed,
                chunk: TarChunk::data(pages),
                offset: 0,
            },
        )
    }
}
你看到的是,我们需要管理许多更多的字段。状态不仅包含要从中读取的文件,还包括文件长度以及一些表示当前位置、剩余字节数或在当前块中的偏移量的数字。你也可以注意到一个当前块,它可能还没有被完全填满,甚至可能是空的。我们定义了两个有用的函数:advance 和 next。第一个函数帮助我们在当前块中推进偏移量,而后者创建一个新的状态,返回之前填充的块。

因为读取是我们流中最关键的部分,状态处理器也需要几行代码:
impl TarStateHandler for TarStateRead {
    fn poll(mut self, cx: &mut Context<'_>) -> TarPollResult {
        let pinned: Pin<&mut File> = Pin::new(&mut self.file);
        let data = match self.chunk.offset(self.offset) {
            Err(error) => return TarState::failed(error),
            Ok(data) => data,
        };

        let mut buffer: ReadBuf<'_> = ReadBuf::new(data);
        match pinned.poll_read(cx, &mut buffer) {
            Poll::Pending => return TarState::Read(self).pending(),
            Poll::Ready(Err(error)) => return TarState::failed(TarError::IOFailed(error)),
            _ => (),
        }

        let read: usize = buffer.filled().len();
        let advanced: TarStateRead = self.advance(read);

        if advanced.left == 0 {
            return TarState::init().ready(advanced.chunk);
        }

        if advanced.offset == advanced.chunk.len() {
            let (chunk, state) = advanced.next();
            return TarState::from(TarState::Read(state)).ready(chunk);
        }

        TarState::from(TarState::Read(advanced)).looping()
    }
}
主要流程是类似的。当数据还未准备好时,我们返回待定状态。但当我们收到一些数据时,我们需要做出几个决策。当前块是否已被完全填满?我们是否已经完成了文件的读取?这些问题的答案会影响返回的状态,或者可能影响流,产生一个块。接下来是什么呢?是填充状态,对吧?它是最简单的状态之一。它包括一个用于表示其索引的字段。
struct TarStatePadding {
    index: usize,
}

impl TarStatePadding {
    fn new() -> Self {
        Self { index: 0 }
    }

    fn next(self) -> Self {
        Self { index: self.index + 1 }
    }
}
我们知道我们需要在结尾处发送正好两个这样的填充块。这使得状态处理器相对简单明了。
impl TarStateHandler for TarStatePadding {
    fn poll(self, _cx: &mut Context<'_>) -> TarPollResult {
        match self.index {
            0 => TarState::Padding(self.next()).ready(TarChunk::padding(0)),
            index => TarState::completed().ready(TarChunk::padding(index)),
        }
    }
}
一旦我们知道已经发送了两个填充块,我们就需要过渡到完成状态。这很复杂吗?一点也不。它仅仅涉及返回 None 来表示流的结束。
impl TarStateHandler for TarStateCompleted {
    fn poll(self, _cx: &mut Context<'_>) -> TarPollResult {
        TarPollResult::ReturnPolling(TarState::completed(), Poll::Ready(None))
    }
}
当我们覆盖了所有状态之后,你可能会感到有些不知所措。所有这些代码实际上可以通过使用 async/await 由编译器自动生成。它会稍微更简洁一些,因为可维护性并不是编译器的首要关注点。我在开头提到过,我们还将尝试使用这个异步流将几个文件上传到创建的 Docker 容器中。在我的上一个故事中,我通过 Unix 套接字构建了一个轻量级的 HTTP 客户端来与 Docker Runtime API 通信。我们将扩展它的功能。

让我们从将我们的 tarball 流与 hyper 的 body 流集成开始,后者也是一个需要实现的异步流。Hyper 足够智能,不会期望我们在 PUT 请求中传递 8GB 的数据。相反,我们可以提供一个 body 块的流。让我们来写一个:
pub struct TarBody {
    inner: TarStream,
}

impl TarBody {
    pub fn from(stream: TarStream) -> Self {
        Self { inner: stream }
    }
}

impl Body for TarBody {
    type Data = Bytes;
    type Error = DockerError;

    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        let self_mut: &mut TarBody = self.get_mut();
        let pointer: &mut TarStream = &mut self_mut.inner;
        let inner: Pin<&mut TarStream> = Pin::new(pointer);

        match inner.poll_next(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(chunk) => match chunk {
                None => Poll::Ready(None),
                Some(Err(error)) => Poll::Ready(Some(DockerError::raise_outgoing_archive_failed(error))),
                Some(Ok(chunk)) => {
                    let data: Vec<u8> = chunk.into();
                    let frame: Frame<Bytes> = Frame::data(Bytes::from(data));

                    Poll::Ready(Some(Ok(frame)))
                }
            },
        }
    }
}
在 33 行代码中,我们神奇地将一个 tar 块转换成了 hyper 所期望的字节帧,并在这个过程中处理了所有可能出现的错误。这个过程如何整合进 HTTP 客户端中呢?请考虑以下代码:
async fn container_upload(&self, id: &str, path: &str, archive: TarArchive) -> DockerResult<ContainerUpload> {
    let url: String = format!("/v1.42/containers/{id}/archive?path={path}");
    let connection: DockerConnection<TarBody> = DockerConnection::open(&self.socket).await?;

    let stream: TarStream = archive.into_stream(256 * 1024);
    let data: TarBody = TarBody::from(stream);

    match connection.put(&url, data).await {
        Ok(response) => match response.into_bytes().await {
            Ok(_) => Ok(ContainerUpload::Succeeded),
            Err(error) => Err(error),
        },
        Err(error) => match error {
            DockerError::StatusFailed(url, status, response) => match status.as_u16() {
                400 => Ok(ContainerUpload::BadParameter(response.into_error().await?)),
                403 => Ok(ContainerUpload::PermissionDenied(response.into_error().await?)),
                404 => Ok(ContainerUpload::NoSuchContainer(response.into_error().await?)),
                500 => Ok(ContainerUpload::ServerError(response.into_error().await?)),
                _ => Err(DockerError::StatusFailed(url, status, response)),
            },
            error => Err(error),
        },
    }
}
上传函数接收我们的 tarball 流,将其转换为 tar body 流,并传递给 put 函数。我认为这是一种完美的集成。我们如何知道它按预期工作呢?让我们创建一个容器,上传几个文件,并计算它们的哈希值。然后我们可以将这些哈希值与本地文件的哈希值进行比较,以确认一切按预期工作。

在这个故事中,我们详细地实现了一个异步流。我们创建了一个相当复杂的状态机。我们不仅在控制台上验证了 tarball 的创建过程,而且还证明了创建的 TAR 文件被 Docker Runtime API 完全识别。
用户评论