在一些情况下,应用程序的开发是为了获得极高的性能,因此需要为应用程序选择内核。例如,我们可能希望为某些特定的进程保留某些核心,或者我们可能希望根据服务对服务器的内核进行分组。为了在Tokio中做到这一点,我们将使用core_affinity crate。
使用以下命令创建一个Rust新项目:
cargo new tokio-cores-example
在Cargo.toml文件中配置项目依赖项:
[dependencies]
anyhow = "1.0.86"
core_affinity = "0.8"
tokio = { version = "1", features = [ "full" ] }
获取内核
现在我们实现一个函数来获得我们想要使用的内核。在src/main.rs文件中,写入以下代码:
use core_affinity::CoreId;
/// 获取应用程序使用的CPU内核
/// 如果没有指定范围,它将使用所有可用的内核
pub fn get_cpu_cores(range: Option<&str>) -> anyhow::Result<Vec<CoreId>> {
let available_cores =
core_affinity::get_core_ids().ok_or(anyhow::anyhow!("Failed to get available cores"))?;
// 记录可用的核数
for core in &available_cores {
println!("可用内核: {}", core.id);
}
match range.map(parse_range_usize) {
None => Ok(available_cores),
Some(Err(err)) => Err(err),
Some(Ok(range)) => {
let cores = available_cores
.into_iter()
.filter(|core| range.contains(&core.id))
.collect::<Vec<CoreId>>();
Ok(cores)
}
}
}
/// 将一个范围字符串解析为一个usize向量
/// # 堆代码 duidaima.com
/// # 参数
/// - range_str: &str - 要解析的范围字符串
///
/// # 返回
/// - Result<Vec<usize>, anyhow::Error> - 解析范围
///
/// # 例子
/// ```
/// use notpu::utils::parse_range_usize;
///
/// let range = parse_range_usize("0-3").unwrap();
/// assert_eq!(range, vec![0, 1, 2]);
///
/// let range = parse_range_usize("0,1,2,3").unwrap();
/// assert_eq!(range, vec![0, 1, 2, 3]);
/// ```
pub fn parse_range_usize(range_str: &str) -> anyhow::Result<Vec<usize>> {
// 解析两种格式:0-3或0,1,2,3
if range_str.contains('-') {
let mut range = range_str.split('-');
let start = range
.next()
.ok_or_else(|| anyhow::anyhow!("Invalid range"))?;
let end = range
.next()
.ok_or_else(|| anyhow::anyhow!("Invalid range"))?;
let start = start
.parse::<usize>()
.map_err(|_| anyhow::anyhow!("Invalid range"))?;
let end = end
.parse::<usize>()
.map_err(|_| anyhow::anyhow!("Invalid range"))?;
Ok((start..end).collect::<Vec<usize>>())
} else {
let range = range_str
.split(',')
.map(|s| {
s.parse::<usize>()
.map_err(|_| anyhow::anyhow!("Invalid range"))
})
.collect::<Result<Vec<usize>, _>>()?;
Ok(range)
}
}
配置Tokio运行时
让我们使用core_affinity来设置Tokio运行时:
fn main() -> anyhow::Result<()> {
// 使用CPU内核
let args = Some("0-7");
let cpu_cores: Vec<CoreId> = get_cpu_cores(args)?;
// 让我们构建tokio运行时
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(cpu_cores.len().max(32))
.on_thread_start(move || {
// 这里我们使用core_affinity为worker随机选择一个核心
use rand::seq::SliceRandom;
// 选择一个CPU内核来运行工作线程
let mut rng = rand::thread_rng();
let core = cpu_cores.choose(&mut rng).unwrap();
if core_affinity::set_for_current(*core) {
println!("将工作线程放在 {} 内核上运行", core.id);
} else {
println!("将工作线程放在 {} 在内核上失败", core.id);
}
})
.enable_all()
.build()?;
// 进入运行时
let _guard = tokio_runtime.enter();
// run
tokio_runtime.block_on(async_main())
}
async fn async_main() -> anyhow::Result<()> {
Ok(())
}
让我们一步一步来看看我们是如何配置Tokio使用的内核的。实际上,所有神奇的事情都发生在on_thread_start中,它将在每次tokio::task::spawn之后执行。在这里,我们为应用程序配置的内核中选择一个随机内核。此时,一旦我们为这个worker选择了核心,我们使用core_affinity::set_for_current为worker分配一个特定的CPU核心。
注意:on_thread_start中的回调在每次调用tokio::task::spawn时运行。
运行结果:
可用内核: 0
可用内核: 1
可用内核: 2
可用内核: 3
可用内核: 4
可用内核: 5
可用内核: 6
可用内核: 7
将工作线程放在 7 内核上运行
将工作线程放在 2 在内核上失败
将工作线程放在 1 在内核上失败
将工作线程放在 3 在内核上失败
......
总结
这就是如何使用core_affinity在Rust中配置内核以供tokio任务使用。显然,这段代码可以扩展为使用一些上下文根据几个标准(如任务类型等)来选择内核。还可以选择在同步环境应用程序中使用core_affinity,只需在生成线程后调用core_affinity::set_for_current,最后在main()函数中调用。