• 如何在Tokio程序中配置CPU内核
  • 发布于 2个月前
  • 209 热度
    0 评论
在一些情况下,应用程序的开发是为了获得极高的性能,因此需要为应用程序选择内核。例如,我们可能希望为某些特定的进程保留某些核心,或者我们可能希望根据服务对服务器的内核进行分组。为了在Tokio中做到这一点,我们将使用core_affinity crate。

使用以下命令创建一个Rust新项目:
cargo new tokio-cores-example
在Cargo.toml文件中配置项目依赖项:
[dependencies]
anyhow = "1.0.86"
core_affinity = "0.8"
tokio = { version = "1", features = [ "full" ] }
获取内核
现在我们实现一个函数来获得我们想要使用的内核。在src/main.rs文件中,写入以下代码:
use core_affinity::CoreId;

/// 获取应用程序使用的CPU内核
/// 如果没有指定范围,它将使用所有可用的内核
pub fn get_cpu_cores(range: Option<&str>) -> anyhow::Result<Vec<CoreId>> {
    let available_cores =
        core_affinity::get_core_ids().ok_or(anyhow::anyhow!("Failed to get available cores"))?;

    // 记录可用的核数
    for core in &available_cores {
        println!("可用内核: {}", core.id);
    }

    match range.map(parse_range_usize) {
        None => Ok(available_cores),
        Some(Err(err)) => Err(err),
        Some(Ok(range)) => {
            let cores = available_cores
                .into_iter()
                .filter(|core| range.contains(&core.id))
                .collect::<Vec<CoreId>>();
            Ok(cores)
        }
    }
}

/// 将一个范围字符串解析为一个usize向量
/// # 堆代码 duidaima.com
/// # 参数
/// - range_str: &str - 要解析的范围字符串
///
/// # 返回
/// - Result<Vec<usize>, anyhow::Error> - 解析范围
///
/// # 例子
/// ```
/// use notpu::utils::parse_range_usize;
///
/// let range = parse_range_usize("0-3").unwrap();
/// assert_eq!(range, vec![0, 1, 2]);
///
/// let range = parse_range_usize("0,1,2,3").unwrap();
/// assert_eq!(range, vec![0, 1, 2, 3]);
/// ```
pub fn parse_range_usize(range_str: &str) -> anyhow::Result<Vec<usize>> {
    // 解析两种格式:0-3或0,1,2,3
    if range_str.contains('-') {
        let mut range = range_str.split('-');
        let start = range
            .next()
            .ok_or_else(|| anyhow::anyhow!("Invalid range"))?;
        let end = range
            .next()
            .ok_or_else(|| anyhow::anyhow!("Invalid range"))?;
        let start = start
            .parse::<usize>()
            .map_err(|_| anyhow::anyhow!("Invalid range"))?;
        let end = end
            .parse::<usize>()
            .map_err(|_| anyhow::anyhow!("Invalid range"))?;

        Ok((start..end).collect::<Vec<usize>>())
    } else {
        let range = range_str
            .split(',')
            .map(|s| {
                s.parse::<usize>()
                    .map_err(|_| anyhow::anyhow!("Invalid range"))
            })
            .collect::<Result<Vec<usize>, _>>()?;
        Ok(range)
    }
}
配置Tokio运行时
让我们使用core_affinity来设置Tokio运行时:
fn main() -> anyhow::Result<()> {
    // 使用CPU内核
    let args = Some("0-7");
    let cpu_cores: Vec<CoreId> = get_cpu_cores(args)?;

    // 让我们构建tokio运行时
    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(cpu_cores.len().max(32))
        .on_thread_start(move || {
            // 这里我们使用core_affinity为worker随机选择一个核心
            use rand::seq::SliceRandom;
            // 选择一个CPU内核来运行工作线程
            let mut rng = rand::thread_rng();
            let core = cpu_cores.choose(&mut rng).unwrap();
            if core_affinity::set_for_current(*core) {
                println!("将工作线程放在 {} 内核上运行", core.id);
            } else {
                println!("将工作线程放在 {} 在内核上失败", core.id);
            }
        })
        .enable_all()
        .build()?;

    // 进入运行时
    let _guard = tokio_runtime.enter();

    // run
    tokio_runtime.block_on(async_main())
}
async fn async_main() -> anyhow::Result<()> {
    Ok(())
}
让我们一步一步来看看我们是如何配置Tokio使用的内核的。实际上,所有神奇的事情都发生在on_thread_start中,它将在每次tokio::task::spawn之后执行。在这里,我们为应用程序配置的内核中选择一个随机内核。此时,一旦我们为这个worker选择了核心,我们使用core_affinity::set_for_current为worker分配一个特定的CPU核心。

注意:on_thread_start中的回调在每次调用tokio::task::spawn时运行。
运行结果:
可用内核: 0
可用内核: 1
可用内核: 2
可用内核: 3
可用内核: 4
可用内核: 5
可用内核: 6
可用内核: 7
将工作线程放在 7 内核上运行
将工作线程放在 2 在内核上失败
将工作线程放在 1 在内核上失败
将工作线程放在 3 在内核上失败
......
总结
这就是如何使用core_affinity在Rust中配置内核以供tokio任务使用。显然,这段代码可以扩展为使用一些上下文根据几个标准(如任务类型等)来选择内核。还可以选择在同步环境应用程序中使用core_affinity,只需在生成线程后调用core_affinity::set_for_current,最后在main()函数中调用。
用户评论