• 手动实现Rust中的Channel,理解同步原语CondVar
  • 发布于 2个月前
  • 108 热度
    0 评论
  • 黄月英
  • 0 粉丝 51 篇博客
  •   
在今天的文章中,我们将通过手动实现Rust中的Channel,来理解同步原语CondVar、Arc和Mutex。我们使用Rust标准库中的VecDeque队列模拟Rust Channel,VecDeque是一个双端队列,作为一个不断增长的环形缓冲区。

数据模型
为了使我们的解决方案更有条理,我们定义了3个结构体:

Transmitter
Transmitter结构体(tx)持有一个store字段,它是由Arc/Mutex封装的VecDeque队列;还有一个emitter字段,它是Condvar类型,用于基于条件的同步。
struct Transmitter<T> {
    store: Arc<Mutex<VecDeque<T>>>,
    emitter: Arc<Condvar>,
}
Receiver
Receiver结构体(rx)与Transmitter结构体相似,也包含一个store字段和一个emitter字段。
struct Receiver<T> {
    store: Arc<Mutex<VecDeque<T>>>,
    emitter: Arc<Condvar>,
}
Channel
Channel结构体包含Transmitter结构体和Receiver结构体类型的字段。
struct Channel<T> {
    tx: Transmitter<T>,
    rx: Receiver<T>,
}
队列(VecDeque)将在通道上为一个或多个线程共享。由于是在多线程场景中,Rc不是线程安全的,所以我们需要一个原子引用计数器Arc,它是线程安全的。因为Arc是一个引用计数器,它的引用是不可变的。对于底层数据的可变性,我们需要使用RefCell实现内部可变性。与Rc相同,RefCell不是线程安全的。对于线程安全的场景,我们需要使用互斥锁Mutex,同步对数据的访问。

Condvar是并发系统中用于同步的原语,它可以让线程“等待”(挂起),直到满足给定的条件。对于阻塞队列,我们基本上需要以下条件(伪代码)。

对于其中一个线程:
queue = some_array
mutex = os_lock
emitter = os_condvar

// 线程被挂起,直到队列中有数据
// 没有CPU消耗
while queue is empty
   emitter.wait(mutex)
end
// 堆代码 duidaima.com
// 其他线程发出了一个信号
data = queue.pop
其他线程:
queue.push(data)
emitter.signal
数据模型的实现
现在,让我们在模拟Channel中实现send和recv(接收)方法。

Transmitter
Transmitter结构体(tx)有一个叫做send的方法,它基本实现以下功能:
1.锁定共享队列(store.lock().unwrap())
2.将数据 推送到队列(push_back(data))
3.发出一个信号(emitter.notify_one)来通知正在队列中等待数据的挂起线程
impl<T> Transmitter<T> {
    fn send(&self, data: T) {
        self.store.lock().unwrap().push_back(data);
        self.emitter.notify_one();
    }
}
Receiver
Receiver结构体(rx)有一个名为recv (receive的缩写)的方法,它基本实现以下功能:
.锁定共享队列(store.lock().unwrap())
.当队列为空时,挂起当前线程,直到满足条件。换句话说,就是线程在操作系统中挂起,因此不消耗CPU (emitter.wait)。
.一旦线程被唤醒,它就可以从队列中弹出数据(store.pop_front())。
impl<T> Receiver<T> {
    fn recv(&self) -> Option<T> {
        let mut store = self.store.lock().unwrap();

        while store.is_empty() {
            store = self.emitter.wait(store).unwrap();
        }

        store.pop_front()
    }

    fn try_recv(&self) -> Option<T> {
        self.store.lock().unwrap().pop_front()
    }
}
此外,Receiver结构体还有一个名为try_recv的方法,该方法不会阻塞线程,也不会使用Condvar条件。

Channel
一旦Transmitter和Receiver已经实现,Channel的实现就很简单了:
impl<T> Channel<T> {
    fn new() -> Self {
        let store = Arc::new(Mutex::new(VecDeque::new()));
        let emitter = Arc::new(Condvar::new());

        Channel {
            tx: Transmitter { store: Arc::clone(&store), emitter: Arc::clone(&emitter) },
            rx: Receiver { store: Arc::clone(&store), emitter: Arc::clone(&emitter) },
        }
    }
}
注意Mutex和Condvar都被封装在Arc(原子引用计数器)中,因为我们必须同时在tx和rx之间共享它们。

测试
在main函数中,实现如下功能:
1.创建一个通道并分别绑定tx和rx
2.tx用于向通道发送数据
3.Rx从通道接收数据
fn main() {
    // 初始化通道
    let channel = Channel::new();
    let (tx, rx) = (channel.tx, channel.rx);

    // 将数据推送到通道
    tx.send("Some job to do: 1");
    tx.send("Another job: 2");

    // 从通道接收数据
    let worker = thread::spawn(move || {
        loop {
            let job = rx.recv(); // 我们也可以使用try_recv

            match job {
                Some(job) => println!("Job: {}", job),
                None => break,
            }
        }
    });

    // 向通道推送更多数据
    tx.send("Yet another job");

    worker.join().unwrap();
}
运行代码,结果如下:
Job: Some job to do: 1
Job: Another job: 2
Job: Yet another job

用户评论