|
|
@@ -1,27 +1,22 @@
|
|
|
use {
|
|
|
- log::{debug, error, warn},
|
|
|
- std::{
|
|
|
- collections::HashMap,
|
|
|
- ops::Deref,
|
|
|
- sync::{atomic::Ordering, Arc},
|
|
|
- },
|
|
|
+ anyhow::Ok,
|
|
|
+ serde::{Deserialize, Serialize},
|
|
|
+ std::{collections::HashMap, ops::Deref, sync::Arc},
|
|
|
};
|
|
|
|
|
|
-pub mod config;
|
|
|
pub mod native_thread_runtime;
|
|
|
pub mod policy;
|
|
|
pub mod rayon_runtime;
|
|
|
pub mod tokio_runtime;
|
|
|
|
|
|
pub use {
|
|
|
- config::ThreadManagerConfig,
|
|
|
native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime},
|
|
|
policy::CoreAllocation,
|
|
|
rayon_runtime::{RayonConfig, RayonRuntime},
|
|
|
tokio_runtime::{TokioConfig, TokioRuntime},
|
|
|
};
|
|
|
|
|
|
-pub const MAX_THREAD_NAME_CHARS: usize = 16;
|
|
|
+pub const MAX_THREAD_NAME_CHARS: usize = 12;
|
|
|
|
|
|
#[derive(Default, Debug)]
|
|
|
pub struct ThreadManagerInner {
|
|
|
@@ -79,6 +74,35 @@ impl Deref for ThreadManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
|
+#[serde(default)]
|
|
|
+pub struct ThreadManagerConfig {
|
|
|
+ pub native_configs: HashMap<String, NativeConfig>,
|
|
|
+ pub native_runtime_mapping: HashMap<String, String>,
|
|
|
+
|
|
|
+ pub rayon_configs: HashMap<String, RayonConfig>,
|
|
|
+ pub rayon_runtime_mapping: HashMap<String, String>,
|
|
|
+
|
|
|
+ pub tokio_configs: HashMap<String, TokioConfig>,
|
|
|
+ pub tokio_runtime_mapping: HashMap<String, String>,
|
|
|
+
|
|
|
+ pub default_core_allocation: CoreAllocation,
|
|
|
+}
|
|
|
+
|
|
|
+impl Default for ThreadManagerConfig {
|
|
|
+ fn default() -> Self {
|
|
|
+ Self {
|
|
|
+ native_configs: HashMap::from([("default".to_owned(), NativeConfig::default())]),
|
|
|
+ native_runtime_mapping: HashMap::new(),
|
|
|
+ rayon_configs: HashMap::from([("default".to_owned(), RayonConfig::default())]),
|
|
|
+ rayon_runtime_mapping: HashMap::new(),
|
|
|
+ tokio_configs: HashMap::from([("default".to_owned(), TokioConfig::default())]),
|
|
|
+ tokio_runtime_mapping: HashMap::new(),
|
|
|
+ default_core_allocation: CoreAllocation::OsDefault,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
impl ThreadManager {
|
|
|
/// Will lookup a runtime by given name. If not found, will try to lookup by name "default". If all fails, returns None.
|
|
|
fn lookup<'a, T>(
|
|
|
@@ -91,7 +115,7 @@ impl ThreadManager {
|
|
|
Some(n) => runtimes.get(n),
|
|
|
None => match mapping.get("default") {
|
|
|
Some(n) => {
|
|
|
- warn!("Falling back to default runtime for {name}");
|
|
|
+ log::warn!("Falling back to default runtime for {name}");
|
|
|
runtimes.get(n)
|
|
|
}
|
|
|
None => None,
|
|
|
@@ -144,11 +168,11 @@ impl ThreadManager {
|
|
|
Ok(chosen_cores_mask)
|
|
|
}
|
|
|
|
|
|
- pub fn new(config: &ThreadManagerConfig) -> anyhow::Result<Self> {
|
|
|
+ pub fn new(config: ThreadManagerConfig) -> anyhow::Result<Self> {
|
|
|
let mut core_allocations = HashMap::<String, Vec<usize>>::new();
|
|
|
- Self::set_process_affinity(config)?;
|
|
|
+ Self::set_process_affinity(&config)?;
|
|
|
let mut manager = ThreadManagerInner::default();
|
|
|
- manager.populate_mappings(config);
|
|
|
+ manager.populate_mappings(&config);
|
|
|
for (name, cfg) in config.native_configs.iter() {
|
|
|
let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
|
|
|
manager.native_thread_runtimes.insert(name.clone(), nrt);
|
|
|
@@ -168,31 +192,6 @@ impl ThreadManager {
|
|
|
inner: Arc::new(manager),
|
|
|
})
|
|
|
}
|
|
|
-
|
|
|
- pub fn destroy(self) {
|
|
|
- let Ok(mut inner) = Arc::try_unwrap(self.inner) else {
|
|
|
- error!(
|
|
|
- "References to Thread Manager are still active, clean shutdown may not be possible!"
|
|
|
- );
|
|
|
- return;
|
|
|
- };
|
|
|
-
|
|
|
- for (name, runtime) in inner.tokio_runtimes.drain() {
|
|
|
- let active_cnt = runtime.counters.active_threads_cnt.load(Ordering::SeqCst);
|
|
|
- match active_cnt {
|
|
|
- 0 => debug!("Shutting down Tokio runtime {name}"),
|
|
|
- _ => warn!("Tokio runtime {name} has active workers during shutdown!"),
|
|
|
- }
|
|
|
- runtime.tokio.shutdown_background();
|
|
|
- }
|
|
|
- for (name, runtime) in inner.native_thread_runtimes.drain() {
|
|
|
- let active_cnt = runtime.running_count.load(Ordering::SeqCst);
|
|
|
- match active_cnt {
|
|
|
- 0 => debug!("Shutting down Native thread pool {name}"),
|
|
|
- _ => warn!("Native pool {name} has active threads during shutdown!"),
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
#[cfg(test)]
|
|
|
@@ -263,7 +262,7 @@ mod tests {
|
|
|
..Default::default()
|
|
|
};
|
|
|
|
|
|
- let manager = ThreadManager::new(&conf).unwrap();
|
|
|
+ let manager = ThreadManager::new(conf).unwrap();
|
|
|
let high = manager.get_native("high");
|
|
|
let low = manager.get_native("low");
|
|
|
let default = manager.get_native("default");
|
|
|
@@ -324,7 +323,7 @@ mod tests {
|
|
|
..Default::default()
|
|
|
};
|
|
|
|
|
|
- let manager = ThreadManager::new(&conf).unwrap();
|
|
|
+ let manager = ThreadManager::new(conf).unwrap();
|
|
|
let runtime = manager.get_native("test");
|
|
|
|
|
|
let thread1 = runtime
|
|
|
@@ -365,7 +364,7 @@ mod tests {
|
|
|
..Default::default()
|
|
|
};
|
|
|
|
|
|
- let manager = ThreadManager::new(&conf).unwrap();
|
|
|
+ let manager = ThreadManager::new(conf).unwrap();
|
|
|
let rayon_runtime = manager.get_rayon("test");
|
|
|
|
|
|
let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
|