• 深入了解Rust中的线程池
  • 发布于 1个月前
  • 72 热度
    0 评论
在某些情况下,你需要并发地执行许多短期任务。创建和销毁执行这些任务线程的开销可能会抑制程序的性能。解决这个问题的一个办法是建立一个任务池,并在需要时将它们从这个任务池中取出。务池的另一个优点是,可用线程的数量可以根据可用的计算资源进行调整,即处理器内核的数量或内存量。

这些任务的约束之一是它们不是相互依赖的,也就是说,一个任务的结果不依赖于前一个任务的结果,或者下一个任务不应依赖于当前任务的结果。这使任务保持隔离,并且易于存储在池中。

典型的用例包括:
Web服务和api服务:请求通常非常小且生命周期很短,因此非常适合于线程池,实际上许多web服务器都实现了线程池。
批量处理图像、视频文件或音频文件:例如,调整图像大小也是非常适合线程池的小型且定义良好的任务。
数据处理管道:数据处理管道中的每个阶段都可以由线程池处理。如前所述,任务不应该相互依赖,以提高线程池的效率。

使用Rust实现线程池
在这个例子中,我们将构建一个简单的线程池,但这可以很容易地扩展到一个真正的线程池。

在开始之前,需要添加一个库到Cargo.toml文件中:
[dependencies]
fstrings = "0.2.3"
我们将使用这个crate以类似python的方式格式化字符串。
接下来在src/main.rs文件中添加以下几行:
use std::sync::{Arc, Mutex};
use std::thread;
#[macro_use]
extern crate fstrings;
.使用Arc和Mutex来保证线程池线程安全
.使用std::thread可以生成新的线程

定义任务
在main.rs中定义一个WebRequest结构体:
struct WebRequest {
    work: Box<dyn FnOnce(&str) + Send + Sync>,
}

impl WebRequest {
    fn new<F>(f: F) -> Self
    where
        F: FnOnce(&str) + Send + Sync + 'static,
    {
        WebRequest { work: Box::new(f) }
    }
}
在这段代码中,WebRequest包含一个字段work,它是一个Box封装的闭包。为什么要使用Box?因为闭包的大小是动态的,换句话说,它的大小在编译时是未知的,所以我们需要将它存储在像Box这样的堆分配容器中。Send和Sync特性向编译器表明,这个特定的闭包可以安全地跨多个线程发送和访问。

构造函数接受闭包作为它的唯一参数。当然,它必须满足与结构体中字段相同的约束。静态生命周期是必需的,因为闭包可能比定义它的作用域活得更长。

实现线程池
在main.rs中定义一个ThreadPool结构体:
struct ThreadPool {
    workers: Vec<thread::JoinHandle<()>>,
    queue: Arc<Mutex<Vec<WebRequest>>>,
}
workers向量,它表示工作线程集合。每个元素都是一个线程的句柄。我们需要持有这个句柄,以便等待线程的完成。queue,这是一个任务队列,每个任务由一个工作线程执行。Arc允许在多个线程之间共享,并且我们可以通过使用Mutex结构体确保线程可以安全的访问队列。

现在我们看一下实现。首先是构造函数:
impl ThreadPool {
    fn new(num_threads: usize) -> ThreadPool {
        let mut workers = Vec::with_capacity(num_threads);
        let queue = Arc::new(Mutex::new(Vec::<WebRequest>::new()));

        for i in 0..num_threads {
            let number = f!("Request {i}");
            let queue_clone = Arc::clone(&queue);
            workers.push(thread::spawn(move || loop {
                let task = queue_clone.lock().unwrap().pop();
                if let Some(task) = task {
                    (task.work)(&number);
                } else {
                    break;
                }
            }));
        }
        ThreadPool { workers, queue }
    }
}
此方法使用指定的线程数初始化池,创建队列。之后,构造函数生成工作线程。这些线程进入一个循环,弹出队列的任务,并执行它们。如果队列恰好为空,则工作线程中断循环。

然后,我们看一下execute()方法:
impl ThreadPool {
    ......

    fn execute<F>(&self, f: F)
    where
        F: FnOnce(&str) + Send + Sync + 'static,
    {
        let task = WebRequest::new(f);
        self.queue.lock().unwrap().push(task);
    }
}
这个方法只是用指定的闭包创建一个新的WebRequest,并将其push到任务队列中。

接下来,我们看一下join()方法:
impl ThreadPool {
    ......

    fn join(self) {
        for worker in self.workers {
            worker.join().unwrap();
        }
    }
}
这是一个阻塞操作,等待线程完成。

测试
使用如下代码测试线程池:
fn main() {
    let pool = ThreadPool::new(6);
    for i in 0..6 {
        pool.execute(move |message| {
            println!("Task: {} prints  {}",i, message);
        });
    }
    pool.join();
}
执行结果如下:
Task: 3 prints  Request 3
Task: 1 prints  Request 3
Task: 5 prints  Request 1
Task: 2 prints  Request 5
Task: 0 prints  Request 3
Task: 4 prints  Request 4
总结
正如你所看到的,这种模式非常灵活,但是使用时,请考虑以下影响性能和资源因素:
1.如果程序销毁线程的速度太慢,或者销毁被阻塞,这可能会使其他线程缺乏资源。
2.如果创建太多线程,创建未使用的线程会浪费资源和时间。
3.如果线程创建时间过长,则会影响性能。
4.销毁太多的线程可能会在以后需要重新创建它们时耗费时间。
总而言之,找到正确的线程数有时可能非常清楚,有时需要使用一些尝试和错误来找到最佳数量。更高级的做法是可以根据需求动态地增加和减少可用线程的数量。
用户评论