From d0480e908960c2beb6d2754fecccef3c3a784215 Mon Sep 17 00:00:00 2001 From: Evan Lloyd New-Schmidt Date: Tue, 15 Aug 2023 18:06:56 -0400 Subject: [PATCH] Make thread pool control similar to osmium - Allow setting number relative to number of cores - Default to Cores - 2 threads - Add env variable OM_POOL_THREADS (lower priority than CLI) - Rename CLI option to `-t/--threads` Signed-off-by: Evan Lloyd New-Schmidt --- src/main.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index ee1fa5b..6a26672 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,15 @@ use std::{ + env, fs::File, io::{stdin, stdout, BufReader, Read, Write}, num::NonZeroUsize, path::PathBuf, + str::FromStr, + thread::available_parallelism, time::Instant, }; +use anyhow::Context; use clap::{CommandFactory, Parser, Subcommand}; #[macro_use] extern crate log; @@ -32,11 +36,14 @@ enum Cmd { /// The `.osm.pbf` file to use. pbf_file: PathBuf, - /// The number of threads to spawn to parse and decompress the pbf file. + /// The number of worker threads to spawn to parse and decompress the pbf file. /// - /// Defaults to the number of cores. - #[arg(short, long)] - procs: Option, + /// If `THREADS` is <= 0, then the number of cores plus `THREADS` threads will be created. + /// The computed number of threads will never be less than one. + /// + /// Defaults to the env variable `OM_POOL_THREADS` if it exists or -2. + #[arg(short, long, allow_hyphen_values = true)] + threads: Option, }, /// Apply the same html article simplification used when extracting articles to stdin, and write it to stdout. @@ -77,12 +84,16 @@ fn main() -> anyhow::Result<()> { get_articles::run(args) } - Cmd::GetTags { pbf_file, procs } => { + Cmd::GetTags { pbf_file, threads } => { + let threads = get_thread_count(threads) + .context("determining thread count")? + .get(); + debug!("Using {threads} worker threads"); rayon::ThreadPoolBuilder::new() .thread_name(|num| format!("worker{num}")) - .num_threads(procs.map(usize::from).unwrap_or_default()) - .build_global()?; - + .num_threads(threads) + .build_global() + .context("initializing thread pool")?; let pbf_file = File::open(pbf_file).map(BufReader::new)?; get_tags::run(pbf_file) } @@ -110,6 +121,40 @@ fn main() -> anyhow::Result<()> { } } +/// Determine the number of threads to use. +/// +/// If `requested` is <= 0, then the number of cores plus `requested` will be created. +/// If `requested` is `None`, the environment variable `OM_POOL_THREADS` is used, otherwise a default of -2. +/// The computed number of threads will never be less than one. +/// +/// # Errors +/// +/// Returns an error if: +/// - `OM_POOL_THREADS` is set and cannot be parsed into an isize. +/// - [available_parallelism] returns an error. +fn get_thread_count(requested: Option) -> anyhow::Result { + let env_value = env::var("OM_POOL_THREADS") + .ok() + .map(|s| isize::from_str(&s)) + .transpose() + .context("invalid OM_POOL_THREADS value")?; + + let procs = requested.or(env_value).unwrap_or(-2); + let procs: usize = if procs > 0 { + // Explicit thread count. + procs.try_into().unwrap() + } else { + // Relative to cpu count. + available_parallelism()? + .get() + .saturating_sub(procs.abs().try_into().expect("procs.abs() is >= 0")) + }; + + let procs = NonZeroUsize::new(procs).unwrap_or(NonZeroUsize::new(1).unwrap()); + + Ok(procs) +} + /// Get the version returned by `git describe`, e.g.: /// - `v2.0` if a git tag /// - the commit hash `034ac04` if not a tag