Make thread pool control similar to osmium

- Allow setting number relative to number of cores
- Default to Cores - 2 threads
- Add env variable OM_POOL_THREADS (lower priority than CLI)
- Rename CLI option to `-t/--threads`

Signed-off-by: Evan Lloyd New-Schmidt <evan@new-schmidt.com>
This commit is contained in:
Evan Lloyd New-Schmidt 2023-08-15 18:06:56 -04:00 committed by Evan Lloyd New-Schmidt
parent c7fe34f3ad
commit d0480e9089

View file

@ -1,11 +1,15 @@
use std::{
env,
fs::File,
io::{stdin, stdout, BufReader, Read, Write},
num::NonZeroUsize,
path::PathBuf,
str::FromStr,
thread::available_parallelism,
time::Instant,
};
use anyhow::Context;
use clap::{CommandFactory, Parser, Subcommand};
#[macro_use]
extern crate log;
@ -32,11 +36,14 @@ enum Cmd {
/// The `.osm.pbf` file to use.
pbf_file: PathBuf,
/// The number of threads to spawn to parse and decompress the pbf file.
/// The number of worker threads to spawn to parse and decompress the pbf file.
///
/// Defaults to the number of cores.
#[arg(short, long)]
procs: Option<NonZeroUsize>,
/// If `THREADS` is <= 0, then the number of cores plus `THREADS` threads will be created.
/// The computed number of threads will never be less than one.
///
/// Defaults to the env variable `OM_POOL_THREADS` if it exists or -2.
#[arg(short, long, allow_hyphen_values = true)]
threads: Option<isize>,
},
/// Apply the same html article simplification used when extracting articles to stdin, and write it to stdout.
@ -77,12 +84,16 @@ fn main() -> anyhow::Result<()> {
get_articles::run(args)
}
Cmd::GetTags { pbf_file, procs } => {
Cmd::GetTags { pbf_file, threads } => {
let threads = get_thread_count(threads)
.context("determining thread count")?
.get();
debug!("Using {threads} worker threads");
rayon::ThreadPoolBuilder::new()
.thread_name(|num| format!("worker{num}"))
.num_threads(procs.map(usize::from).unwrap_or_default())
.build_global()?;
.num_threads(threads)
.build_global()
.context("initializing thread pool")?;
let pbf_file = File::open(pbf_file).map(BufReader::new)?;
get_tags::run(pbf_file)
}
@ -110,6 +121,40 @@ fn main() -> anyhow::Result<()> {
}
}
/// Determine the number of threads to use.
///
/// If `requested` is <= 0, then the number of cores plus `requested` will be created.
/// If `requested` is `None`, the environment variable `OM_POOL_THREADS` is used, otherwise a default of -2.
/// The computed number of threads will never be less than one.
///
/// # Errors
///
/// Returns an error if:
/// - `OM_POOL_THREADS` is set and cannot be parsed into an isize.
/// - [available_parallelism] returns an error.
fn get_thread_count(requested: Option<isize>) -> anyhow::Result<NonZeroUsize> {
let env_value = env::var("OM_POOL_THREADS")
.ok()
.map(|s| isize::from_str(&s))
.transpose()
.context("invalid OM_POOL_THREADS value")?;
let procs = requested.or(env_value).unwrap_or(-2);
let procs: usize = if procs > 0 {
// Explicit thread count.
procs.try_into().unwrap()
} else {
// Relative to cpu count.
available_parallelism()?
.get()
.saturating_sub(procs.abs().try_into().expect("procs.abs() is >= 0"))
};
let procs = NonZeroUsize::new(procs).unwrap_or(NonZeroUsize::new(1).unwrap());
Ok(procs)
}
/// Get the version returned by `git describe`, e.g.:
/// - `v2.0` if a git tag
/// - the commit hash `034ac04` if not a tag