Преглед на файлове

Add thread manager crate to agave (#3890)

* add thread manager base functionality
* add park/unpark metrics collection for tokio runtimes
* support basic core affinity and priority 
* examples of use and demo of benefits

---------

Co-authored-by: Alex Pyattaev <alex.pyattaev@anza.xyz>
Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>
Co-authored-by: kirill lykov <lykov.kirill@gmail.com>
Alex Pyattaev преди 10 месеца
родител
ревизия
73b8cb7b6c

+ 249 - 44
Cargo.lock

@@ -63,6 +63,18 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "affinity"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b"
+dependencies = [
+ "cfg-if 1.0.0",
+ "errno",
+ "libc",
+ "num_cpus",
+]
+
 [[package]]
 name = "agave-accounts-hash-cache-tool"
 version = "2.2.0"
@@ -94,7 +106,7 @@ dependencies = [
  "clap 2.33.3",
  "flate2",
  "hex",
- "hyper",
+ "hyper 0.14.32",
  "log",
  "serde",
  "serde_derive",
@@ -255,6 +267,29 @@ dependencies = [
  "solana-version",
 ]
 
+[[package]]
+name = "agave-thread-manager"
+version = "2.2.0"
+dependencies = [
+ "affinity",
+ "agave-thread-manager",
+ "anyhow",
+ "axum 0.7.9",
+ "cfg-if 1.0.0",
+ "env_logger",
+ "hyper 0.14.32",
+ "log",
+ "num_cpus",
+ "rayon",
+ "serde",
+ "serde_json",
+ "solana-metrics",
+ "thread-priority",
+ "tokio",
+ "toml 0.8.12",
+ "tower 0.5.2",
+]
+
 [[package]]
 name = "agave-transaction-view"
 version = "2.2.0"
@@ -464,9 +499,9 @@ dependencies = [
 
 [[package]]
 name = "anstyle"
-version = "1.0.1"
+version = "1.0.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd"
+checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
 
 [[package]]
 name = "anyhow"
@@ -810,13 +845,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
 dependencies = [
  "async-trait",
- "axum-core",
+ "axum-core 0.3.4",
  "bitflags 1.3.2",
  "bytes",
  "futures-util",
- "http",
- "http-body",
- "hyper",
+ "http 0.2.12",
+ "http-body 0.4.5",
+ "hyper 0.14.32",
  "itoa",
  "matchit",
  "memchr",
@@ -825,12 +860,46 @@ dependencies = [
  "pin-project-lite",
  "rustversion",
  "serde",
- "sync_wrapper",
- "tower",
+ "sync_wrapper 0.1.2",
+ "tower 0.4.13",
  "tower-layer",
  "tower-service",
 ]
 
+[[package]]
+name = "axum"
+version = "0.7.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
+dependencies = [
+ "async-trait",
+ "axum-core 0.4.5",
+ "bytes",
+ "futures-util",
+ "http 1.1.0",
+ "http-body 1.0.1",
+ "http-body-util",
+ "hyper 1.5.1",
+ "hyper-util",
+ "itoa",
+ "matchit",
+ "memchr",
+ "mime",
+ "percent-encoding 2.3.1",
+ "pin-project-lite",
+ "rustversion",
+ "serde",
+ "serde_json",
+ "serde_path_to_error",
+ "serde_urlencoded",
+ "sync_wrapper 1.0.2",
+ "tokio",
+ "tower 0.5.2",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
 [[package]]
 name = "axum-core"
 version = "0.3.4"
@@ -840,14 +909,35 @@ dependencies = [
  "async-trait",
  "bytes",
  "futures-util",
- "http",
- "http-body",
+ "http 0.2.12",
+ "http-body 0.4.5",
  "mime",
  "rustversion",
  "tower-layer",
  "tower-service",
 ]
 
+[[package]]
+name = "axum-core"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "futures-util",
+ "http 1.1.0",
+ "http-body 1.0.1",
+ "http-body-util",
+ "mime",
+ "pin-project-lite",
+ "rustversion",
+ "sync_wrapper 1.0.2",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
 [[package]]
 name = "backoff"
 version = "0.4.0"
@@ -1041,7 +1131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b"
 dependencies = [
  "borsh-derive 0.10.3",
- "hashbrown 0.12.3",
+ "hashbrown 0.13.2",
 ]
 
 [[package]]
@@ -2241,13 +2331,13 @@ version = "0.11.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89"
 dependencies = [
- "http",
+ "http 0.2.12",
  "prost",
  "tokio",
  "tokio-stream",
  "tonic",
  "tonic-build",
- "tower",
+ "tower 0.4.13",
  "tower-service",
 ]
 
@@ -2725,7 +2815,7 @@ dependencies = [
  "futures-core",
  "futures-sink",
  "futures-util",
- "http",
+ "http 0.2.12",
  "indexmap 2.7.1",
  "slab",
  "tokio",
@@ -2792,7 +2882,7 @@ dependencies = [
  "bitflags 1.3.2",
  "bytes",
  "headers-core",
- "http",
+ "http 0.2.12",
  "httpdate",
  "mime",
  "sha-1 0.10.0",
@@ -2804,7 +2894,7 @@ version = "0.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
 dependencies = [
- "http",
+ "http 0.2.12",
 ]
 
 [[package]]
@@ -2894,6 +2984,17 @@ dependencies = [
  "itoa",
 ]
 
+[[package]]
+name = "http"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
 [[package]]
 name = "http-body"
 version = "0.4.5"
@@ -2901,7 +3002,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
 dependencies = [
  "bytes",
- "http",
+ "http 0.2.12",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "http-body"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
+dependencies = [
+ "bytes",
+ "http 1.1.0",
+]
+
+[[package]]
+name = "http-body-util"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "http 1.1.0",
+ "http-body 1.0.1",
  "pin-project-lite",
 ]
 
@@ -2934,8 +3058,8 @@ dependencies = [
  "futures-core",
  "futures-util",
  "h2",
- "http",
- "http-body",
+ "http 0.2.12",
+ "http-body 0.4.5",
  "httparse",
  "httpdate",
  "itoa",
@@ -2947,6 +3071,25 @@ dependencies = [
  "want",
 ]
 
+[[package]]
+name = "hyper"
+version = "1.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "http 1.1.0",
+ "http-body 1.0.1",
+ "httparse",
+ "httpdate",
+ "itoa",
+ "pin-project-lite",
+ "smallvec",
+ "tokio",
+]
+
 [[package]]
 name = "hyper-proxy"
 version = "0.9.1"
@@ -2956,8 +3099,8 @@ dependencies = [
  "bytes",
  "futures 0.3.31",
  "headers",
- "http",
- "hyper",
+ "http 0.2.12",
+ "hyper 0.14.32",
  "hyper-tls",
  "native-tls",
  "tokio",
@@ -2972,8 +3115,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97"
 dependencies = [
  "futures-util",
- "http",
- "hyper",
+ "http 0.2.12",
+ "hyper 0.14.32",
  "rustls 0.21.12",
  "tokio",
  "tokio-rustls",
@@ -2985,7 +3128,7 @@ version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
 dependencies = [
- "hyper",
+ "hyper 0.14.32",
  "pin-project-lite",
  "tokio",
  "tokio-io-timeout",
@@ -2998,12 +3141,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
 dependencies = [
  "bytes",
- "hyper",
+ "hyper 0.14.32",
  "native-tls",
  "tokio",
  "tokio-native-tls",
 ]
 
+[[package]]
+name = "hyper-util"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "http 1.1.0",
+ "http-body 1.0.1",
+ "hyper 1.5.1",
+ "pin-project-lite",
+ "tokio",
+ "tower-service",
+]
+
 [[package]]
 name = "iana-time-zone"
 version = "0.1.46"
@@ -3426,7 +3585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff"
 dependencies = [
  "futures 0.3.31",
- "hyper",
+ "hyper 0.14.32",
  "jsonrpc-core",
  "jsonrpc-server-utils",
  "log",
@@ -5044,9 +5203,9 @@ dependencies = [
  "futures-core",
  "futures-util",
  "h2",
- "http",
- "http-body",
- "hyper",
+ "http 0.2.12",
+ "http-body 0.4.5",
+ "hyper 0.14.32",
  "hyper-rustls",
  "hyper-tls",
  "ipnet",
@@ -5063,7 +5222,7 @@ dependencies = [
  "serde",
  "serde_json",
  "serde_urlencoded",
- "sync_wrapper",
+ "sync_wrapper 0.1.2",
  "system-configuration",
  "tokio",
  "tokio-native-tls",
@@ -5086,7 +5245,7 @@ checksum = "5a735987236a8e238bf0296c7e351b999c188ccc11477f311b82b55c93984216"
 dependencies = [
  "anyhow",
  "async-trait",
- "http",
+ "http 0.2.12",
  "reqwest",
  "serde",
  "task-local-extensions",
@@ -5510,6 +5669,16 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "serde_path_to_error"
+version = "0.1.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6"
+dependencies = [
+ "itoa",
+ "serde",
+]
+
 [[package]]
 name = "serde_spanned"
 version = "0.6.5"
@@ -9485,8 +9654,8 @@ dependencies = [
  "flate2",
  "futures 0.3.31",
  "goauth",
- "http",
- "hyper",
+ "http 0.2.12",
+ "hyper 0.14.32",
  "hyper-proxy",
  "log",
  "openssl",
@@ -11109,6 +11278,12 @@ version = "0.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
 
+[[package]]
+name = "sync_wrapper"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
+
 [[package]]
 name = "synstructure"
 version = "0.12.6"
@@ -11369,6 +11544,20 @@ dependencies = [
  "syn 2.0.96",
 ]
 
+[[package]]
+name = "thread-priority"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cfe075d7053dae61ac5413a34ea7d4913b6e6207844fd726bdd858b37ff72bf5"
+dependencies = [
+ "bitflags 2.8.0",
+ "cfg-if 1.0.0",
+ "libc",
+ "log",
+ "rustversion",
+ "winapi 0.3.9",
+]
+
 [[package]]
 name = "thread-scoped"
 version = "1.0.2"
@@ -11668,15 +11857,15 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
 dependencies = [
  "async-stream",
  "async-trait",
- "axum",
+ "axum 0.6.20",
  "base64 0.21.7",
  "bytes",
  "futures-core",
  "futures-util",
  "h2",
- "http",
- "http-body",
- "hyper",
+ "http 0.2.12",
+ "http-body 0.4.5",
+ "hyper 0.14.32",
  "hyper-timeout",
  "percent-encoding 2.3.1",
  "pin-project",
@@ -11685,7 +11874,7 @@ dependencies = [
  "tokio",
  "tokio-rustls",
  "tokio-stream",
- "tower",
+ "tower 0.4.13",
  "tower-layer",
  "tower-service",
  "tracing",
@@ -11724,17 +11913,33 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "tower"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
+dependencies = [
+ "futures-core",
+ "futures-util",
+ "pin-project-lite",
+ "sync_wrapper 1.0.2",
+ "tokio",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
 [[package]]
 name = "tower-layer"
-version = "0.3.2"
+version = "0.3.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
+checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
 
 [[package]]
 name = "tower-service"
-version = "0.3.2"
+version = "0.3.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
+checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
 
 [[package]]
 name = "tracing"
@@ -11813,7 +12018,7 @@ dependencies = [
  "byteorder",
  "bytes",
  "data-encoding",
- "http",
+ "http 0.2.12",
  "httparse",
  "log",
  "rand 0.8.5",

+ 6 - 0
Cargo.toml

@@ -212,6 +212,7 @@ members = [
     "svm-transaction",
     "test-validator",
     "thin-client",
+    "thread-manager",
     "timings",
     "tls-utils",
     "tokens",
@@ -261,6 +262,7 @@ check-cfg = [
 
 [workspace.dependencies]
 Inflector = "0.11.4"
+axum = "0.7.9"
 agave-banking-stage-ingress-types = { path = "banking-stage-ingress-types", version = "=2.2.0" }
 agave-transaction-view = { path = "transaction-view", version = "=2.2.0" }
 aquamarine = "0.3.3"
@@ -299,6 +301,7 @@ bzip2 = "0.4.4"
 caps = "0.5.5"
 cargo_metadata = "0.15.4"
 cfg_eval = "0.1.2"
+cfg-if = "1.0.0"
 chrono = { version = "0.4.39", default-features = false }
 chrono-humanize = "0.2.3"
 clap = "2.33.1"
@@ -463,6 +466,7 @@ solana-bucket-map = { path = "bucket_map", version = "=2.2.0" }
 solana-builtins = { path = "builtins", version = "=2.2.0" }
 solana-builtins-default-costs = { path = "builtins-default-costs", version = "=2.2.0" }
 agave-cargo-registry = { path = "cargo-registry", version = "=2.2.0" }
+agave-thread-manager = { path = "thread-manager", version = "=2.2.0" }
 solana-clap-utils = { path = "clap-utils", version = "=2.2.0" }
 solana-clap-v3-utils = { path = "clap-v3-utils", version = "=2.2.0" }
 solana-cli = { path = "cli", version = "=2.2.0" }
@@ -660,6 +664,7 @@ tarpc = "0.29.0"
 tempfile = "3.14.0"
 test-case = "3.3.1"
 thiserror = "2.0.11"
+thread-priority = "1.2.0"
 tiny-bip39 = "0.8.2"
 # Update solana-tokio patch below when updating this version
 tokio = "1.43.0"
@@ -670,6 +675,7 @@ tokio-util = "0.7"
 toml = "0.8.12"
 tonic = "0.9.2"
 tonic-build = "0.9.2"
+tower = "0.5.2"
 trees = "0.4.2"
 tungstenite = "0.20.1"
 uriparse = "0.6.4"

+ 38 - 0
thread-manager/Cargo.toml

@@ -0,0 +1,38 @@
+[package]
+name = "agave-thread-manager"
+description = "Thread pool manager for agave"
+
+version = { workspace = true }
+authors = { workspace = true }
+repository = { workspace = true }
+homepage = { workspace = true }
+license = { workspace = true }
+edition = { workspace = true }
+
+publish = false
+
+[dependencies]
+anyhow = { workspace = true }
+cfg-if = { workspace = true }
+log = { workspace = true }
+num_cpus = { workspace = true }
+rayon = { workspace = true }
+serde = { workspace = true, features = ["derive"] }
+solana-metrics = { workspace = true }
+thread-priority = { workspace = true }
+tokio = { workspace = true, features = ["time", "rt-multi-thread"] }
+
+[target.'cfg(target_os = "linux")'.dependencies]
+affinity = "0.1.2"
+
+[dev-dependencies]
+agave-thread-manager = { path = ".", features = ["dev-context-only-utils"] }
+axum = { workspace = true }
+env_logger = { workspace = true }
+hyper = { workspace = true, features = ["http1", "client", "stream", "tcp"] }
+serde_json = { workspace = true }
+toml = { workspace = true }
+tower = { workspace = true }
+
+[features]
+dev-context-only-utils = []

+ 69 - 0
thread-manager/README.md

@@ -0,0 +1,69 @@
+# thread-manager
+Balances machine resources across multiple threaded runtimes to optimize performance.
+The goal is to manage thread contention effectively between different parts of the code, ensuring each can benefit from tailored management strategies.
+For example, we may want to have cores 1-4 handling networking via
+Tokio, core 5 handling file IO via Tokio, cores 9-16 allocated for
+Rayon thread pool, and cores 6-8 available for general use by std::thread.
+This will minimize contention for CPU caches and context switches that
+would occur if Rayon was entirely unaware it was running side-by-side with
+tokio, and each was to spawn as many threads as there are cores.
+
+## Thread pool mapping
+Thread manager will, by default, look for a particular named pool, e.g. "solGossip".
+Matching is done independently for each type of runtime.
+However, if no named pool is found, it will fall back to the "default" thread pool
+of the same type (if specified in the config). If the default pool is not specified,
+thread pool lookup will fail.
+
+Multiple names can point to the same pool. For example, "solGossipConsume" and
+"solSigverify" can both be executed on the same rayon pool named "rayonSigverify".
+This, in principle, allows some degree of runtime sharing between different crates
+in the codebase without having to manually patch the pointers through.
+
+# Supported threading models
+## Affinity
+All threading models allow setting core affinity, but only on Linux.
+
+For core affinity you can set e.g.
+```toml
+core_allocation.DedicatedCoreSet = { min = 16, max = 64 }
+```
+to pin the pool to cores 16-64.
+
+## Scheduling policy and priority
+You can configure the thread scheduling policy and priority if desired. Keep in mind that this will likely require
+```bash
+ sudo setcap cap_sys_nice+ep
+ ```
+or root privileges to run the resulting process.
+To see which policies are supported check (the sources)[./src/policy.rs]
+If you use realtime policies, priority to values from 1 (lowest) to 99 (highest) are possible.
+
+## Tokio
+You can create multiple Tokio runtimes, each with its own dedicated pool of CPU cores. The number of worker and blocking threads, along with thread priorities for the pool, can be fully customized.
+
+## Native
+Native threads (`std::thread`) can be spawned from managed pools, allowing them to inherit specific
+affinity from the pool, along with providing control over the total number of threads in each pool.
+
+## Rayon
+Rayon already manages thread pools well enough, all thread_manager does on top is enforce affinity and
+priority for rayon threads. Normally one would only ever have one rayon pool, but for priority allocations
+one may want to spawn many rayon pools.
+
+# Limitations
+
+ * Thread pools can only be created at process startup
+ * Once thread pool is created, its policy can not be modified at runtime
+ * Thread affinity & priority are not supported outside of linux
+ * Thread priority generally requires kernel level support and extra capabilities
+
+# TODO:
+
+ * even more tests
+ * better thread priority support
+
+
+# Examples
+ * core_contention_basics will demonstrate why core contention is bad, and how thread configs can help
+ * core_contention_sweep will sweep across a range of core counts to show how benefits scale with core counts

+ 135 - 0
thread-manager/examples/common/mod.rs

@@ -0,0 +1,135 @@
+use {
+    axum::{routing::get, Router},
+    hyper::{Body, Request},
+    log::info,
+    std::{
+        future::IntoFuture,
+        net::{IpAddr, Ipv4Addr, SocketAddr},
+        sync::{
+            atomic::{AtomicUsize, Ordering},
+            Arc,
+        },
+        time::{Duration, Instant},
+    },
+    tokio::{net::TcpStream, sync::oneshot::Sender, task::JoinSet, time::timeout},
+    tower::ServiceExt,
+};
+// 10 seconds is just enough to get a reasonably accurate measurement under our crude methodology
+const TEST_SECONDS: u64 = 10;
+
+// A simple web server that puts load on tokio to measure thread contention impacts
+pub async fn axum_main(port: u16, ready: Sender<()>) {
+    // basic handler that responds with a static string
+    async fn root() -> &'static str {
+        tokio::time::sleep(Duration::from_millis(1)).await;
+        "Hello, World!"
+    }
+
+    // build our application with a route
+    let app = Router::new().route("/", get(root));
+
+    // run our app with hyper, listening globally on port 3000
+    let listener =
+        tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))
+            .await
+            .unwrap();
+    info!("Server on port {port} ready");
+    ready.send(()).unwrap();
+    let timeout = tokio::time::timeout(
+        Duration::from_secs(TEST_SECONDS + 1),
+        axum::serve(listener, app).into_future(),
+    )
+    .await;
+    match timeout {
+        Ok(v) => {
+            v.unwrap();
+        }
+        Err(_) => {
+            info!("Terminating server on port {port}");
+        }
+    }
+}
+
+#[allow(dead_code)]
+#[derive(Debug)]
+pub struct Stats {
+    pub latency_s: f32,
+    pub requests_per_second: f32,
+}
+
+// Generates a bunch of HTTP load on the ports provided. Will spawn `tasks` worth
+// of connections for each of the ports given.
+pub async fn workload_main(ports: &[u16], tasks: usize) -> anyhow::Result<Stats> {
+    struct ControlBlock {
+        start_time: std::time::Instant,
+        requests: AtomicUsize,
+        cumulative_latency_us: AtomicUsize,
+    }
+
+    let control_block = Arc::new(ControlBlock {
+        start_time: std::time::Instant::now(),
+        requests: AtomicUsize::new(0),
+        cumulative_latency_us: AtomicUsize::new(0),
+    });
+
+    async fn connection(port: u16, control_block: Arc<ControlBlock>) -> anyhow::Result<()> {
+        let sa = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
+        let stream = TcpStream::connect(sa).await?;
+
+        let (mut request_sender, connection) = hyper::client::conn::handshake(stream).await?;
+        // spawn a task to poll the connection and drive the HTTP state
+        tokio::spawn(async move {
+            // Technically, this can error but this only happens when server is killed
+            let _ = connection.await;
+        });
+
+        let path = "/";
+        while control_block.start_time.elapsed() < Duration::from_secs(TEST_SECONDS) {
+            let req = Request::builder()
+                .uri(path)
+                .method("GET")
+                .body(Body::from(""))?;
+            let start = Instant::now();
+            let res = timeout(Duration::from_millis(100), request_sender.send_request(req)).await;
+            let res = match res {
+                Ok(res) => res?,
+                Err(_) => {
+                    anyhow::bail!("Timeout on request!")
+                }
+            };
+            let _ = res.body();
+            if res.status() != 200 {
+                anyhow::bail!("Got error from server");
+            }
+
+            control_block
+                .cumulative_latency_us
+                .fetch_add(start.elapsed().as_micros() as usize, Ordering::Relaxed);
+            control_block.requests.fetch_add(1, Ordering::Relaxed);
+            // To send via the same connection again, it may not work as it may not be ready,
+            // so we have to wait until the request_sender becomes ready.
+            request_sender.ready().await?;
+        }
+        Ok(())
+    }
+
+    let mut join_set = JoinSet::new();
+    for port in ports {
+        info!("Starting load generation on port {port}");
+        for _t in 0..tasks {
+            join_set.spawn(connection(*port, control_block.clone()));
+        }
+    }
+
+    while let Some(jr) = join_set.join_next().await {
+        jr??;
+    }
+
+    let requests = control_block.requests.load(Ordering::Relaxed);
+    let latency_accumulator_us = control_block.cumulative_latency_us.load(Ordering::Relaxed);
+    Ok(Stats {
+        requests_per_second: requests as f32 / TEST_SECONDS as f32,
+        #[allow(clippy::arithmetic_side_effects)]
+        latency_s: (latency_accumulator_us / requests) as f32 / 1e6,
+    })
+}

+ 64 - 0
thread-manager/examples/core_contention_basics.rs

@@ -0,0 +1,64 @@
+use {
+    agave_thread_manager::*,
+    log::info,
+    std::{io::Read, path::PathBuf, time::Duration},
+    tokio::sync::oneshot,
+};
+
+mod common;
+use common::*;
+
+fn main() -> anyhow::Result<()> {
+    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
+    let experiments = [
+        "examples/core_contention_dedicated_set.toml",
+        "examples/core_contention_contending_set.toml",
+    ];
+
+    for exp in experiments {
+        info!("===================");
+        info!("Running {exp}");
+        let mut conf_file = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
+        conf_file.push(exp);
+        let mut buf = String::new();
+        std::fs::File::open(conf_file)?.read_to_string(&mut buf)?;
+        let cfg: ThreadManagerConfig = toml::from_str(&buf)?;
+
+        let manager = ThreadManager::new(cfg).unwrap();
+        let tokio1 = manager.get_tokio("axum1");
+        tokio1.start_metrics_sampling(Duration::from_secs(1));
+        let tokio2 = manager.get_tokio("axum2");
+        tokio2.start_metrics_sampling(Duration::from_secs(1));
+
+        let workload_runtime = TokioRuntime::new(
+            "LoadGenerator".to_owned(),
+            TokioConfig {
+                core_allocation: CoreAllocation::DedicatedCoreSet { min: 32, max: 64 },
+                ..Default::default()
+            },
+        )?;
+
+        let results = std::thread::scope(|scope| {
+            let (tx1, rx1) = oneshot::channel();
+            let (tx2, rx2) = oneshot::channel();
+
+            scope.spawn(|| {
+                tokio1.tokio.block_on(axum_main(8888, tx1));
+            });
+            scope.spawn(|| {
+                tokio2.tokio.block_on(axum_main(8889, tx2));
+            });
+
+            // Wait for axum servers to start
+            rx1.blocking_recv().unwrap();
+            rx2.blocking_recv().unwrap();
+
+            let join_handle =
+                scope.spawn(|| workload_runtime.block_on(workload_main(&[8888, 8889], 1000)));
+            join_handle.join().expect("Load generator crashed!")
+        });
+        //print out the results of the bench run
+        info!("Results are: {:?}", results);
+    }
+    Ok(())
+}

+ 13 - 0
thread-manager/examples/core_contention_contending_set.toml

@@ -0,0 +1,13 @@
+[native_configs]
+
+[rayon_configs]
+
+[tokio_configs.axum1]
+worker_threads = 8
+max_blocking_threads = 1
+core_allocation.DedicatedCoreSet = { min = 0, max = 8 }
+
+[tokio_configs.axum2]
+worker_threads = 8
+max_blocking_threads = 1
+core_allocation.DedicatedCoreSet = { min = 0, max = 8 }

+ 13 - 0
thread-manager/examples/core_contention_dedicated_set.toml

@@ -0,0 +1,13 @@
+[native_configs]
+
+[rayon_configs]
+
+[tokio_configs.axum1]
+worker_threads = 4
+max_blocking_threads = 1
+core_allocation.DedicatedCoreSet = { min = 0, max = 4 }
+
+[tokio_configs.axum2]
+worker_threads = 4
+max_blocking_threads = 1
+core_allocation.DedicatedCoreSet = { min = 4, max = 8 }

+ 139 - 0
thread-manager/examples/core_contention_sweep.rs

@@ -0,0 +1,139 @@
+use {
+    agave_thread_manager::*,
+    log::info,
+    std::{collections::HashMap, time::Duration},
+    tokio::sync::oneshot,
+};
+
+mod common;
+use common::*;
+
+fn make_config_shared(cc: usize) -> ThreadManagerConfig {
+    let tokio_cfg_1 = TokioConfig {
+        core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc },
+        worker_threads: cc,
+        ..Default::default()
+    };
+    let tokio_cfg_2 = tokio_cfg_1.clone();
+    ThreadManagerConfig {
+        tokio_configs: HashMap::from([
+            ("axum1".into(), tokio_cfg_1),
+            ("axum2".into(), tokio_cfg_2),
+        ]),
+        ..Default::default()
+    }
+}
+fn make_config_dedicated(core_count: usize) -> ThreadManagerConfig {
+    let tokio_cfg_1 = TokioConfig {
+        core_allocation: CoreAllocation::DedicatedCoreSet {
+            min: 0,
+            max: core_count / 2,
+        },
+        worker_threads: core_count / 2,
+        ..Default::default()
+    };
+    let tokio_cfg_2 = TokioConfig {
+        core_allocation: CoreAllocation::DedicatedCoreSet {
+            min: core_count / 2,
+            max: core_count,
+        },
+        worker_threads: core_count / 2,
+        ..Default::default()
+    };
+    ThreadManagerConfig {
+        tokio_configs: HashMap::from([
+            ("axum1".into(), tokio_cfg_1),
+            ("axum2".into(), tokio_cfg_2),
+        ]),
+        ..Default::default()
+    }
+}
+
+#[derive(Debug, PartialEq, Eq, Clone, Copy)]
+enum Regime {
+    Shared,
+    Dedicated,
+    Single,
+}
+impl Regime {
+    const VALUES: [Self; 3] = [Self::Dedicated, Self::Shared, Self::Single];
+}
+
+#[derive(Debug, Default, serde::Serialize)]
+struct Results {
+    latencies_s: Vec<f32>,
+    requests_per_second: Vec<f32>,
+}
+
+fn main() -> anyhow::Result<()> {
+    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
+    let mut all_results: HashMap<String, Results> = HashMap::new();
+    for regime in Regime::VALUES {
+        let mut results = Results::default();
+        for core_count in [2, 4, 8, 16] {
+            let manager;
+            info!("===================");
+            info!("Running {core_count} cores under {regime:?}");
+            let (tokio1, tokio2) = match regime {
+                Regime::Shared => {
+                    manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
+                    (manager.get_tokio("axum1"), manager.get_tokio("axum2"))
+                }
+                Regime::Dedicated => {
+                    manager = ThreadManager::new(make_config_dedicated(core_count)).unwrap();
+                    (manager.get_tokio("axum1"), manager.get_tokio("axum2"))
+                }
+                Regime::Single => {
+                    manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
+                    (manager.get_tokio("axum1"), manager.get_tokio("axum2"))
+                }
+            };
+
+            let workload_runtime = TokioRuntime::new(
+                "LoadGenerator".to_owned(),
+                TokioConfig {
+                    core_allocation: CoreAllocation::DedicatedCoreSet { min: 32, max: 64 },
+                    ..Default::default()
+                },
+            )?;
+            let measurement = std::thread::scope(|s| {
+                let (tx1, rx1) = oneshot::channel();
+                let (tx2, rx2) = oneshot::channel();
+                s.spawn(|| {
+                    tokio1.start_metrics_sampling(Duration::from_secs(1));
+                    tokio1.tokio.block_on(axum_main(8888, tx1));
+                });
+                let jh = match regime {
+                    Regime::Single => s.spawn(|| {
+                        rx1.blocking_recv().unwrap();
+                        workload_runtime.block_on(workload_main(&[8888, 8888], 3000))
+                    }),
+                    _ => {
+                        s.spawn(|| {
+                            tokio2.start_metrics_sampling(Duration::from_secs(1));
+                            tokio2.tokio.block_on(axum_main(8889, tx2));
+                        });
+                        s.spawn(|| {
+                            rx1.blocking_recv().unwrap();
+                            rx2.blocking_recv().unwrap();
+                            workload_runtime.block_on(workload_main(&[8888, 8889], 3000))
+                        })
+                    }
+                };
+                jh.join().expect("Some of the threads crashed!")
+            })?;
+            info!("Results are: {:?}", measurement);
+            results.latencies_s.push(measurement.latency_s);
+            results
+                .requests_per_second
+                .push(measurement.requests_per_second);
+        }
+        all_results.insert(format!("{regime:?}"), results);
+        std::thread::sleep(Duration::from_secs(3));
+    }
+
+    //print the resulting measurements so they can be e.g. plotted with matplotlib
+    println!("{}", serde_json::to_string_pretty(&all_results)?);
+
+    Ok(())
+}

+ 375 - 0
thread-manager/src/lib.rs

@@ -0,0 +1,375 @@
+use {
+    anyhow::Ok,
+    serde::{Deserialize, Serialize},
+    std::{collections::HashMap, ops::Deref, sync::Arc},
+};
+
+pub mod native_thread_runtime;
+pub mod policy;
+pub mod rayon_runtime;
+pub mod tokio_runtime;
+
+pub use {
+    native_thread_runtime::{JoinHandle, NativeConfig, NativeThreadRuntime},
+    policy::CoreAllocation,
+    rayon_runtime::{RayonConfig, RayonRuntime},
+    tokio_runtime::{TokioConfig, TokioRuntime},
+};
+
+pub const MAX_THREAD_NAME_CHARS: usize = 12;
+
+#[derive(Default, Debug)]
+pub struct ThreadManagerInner {
+    pub tokio_runtimes: HashMap<String, TokioRuntime>,
+    pub tokio_runtime_mapping: HashMap<String, String>,
+
+    pub native_thread_runtimes: HashMap<String, NativeThreadRuntime>,
+    pub native_runtime_mapping: HashMap<String, String>,
+
+    pub rayon_runtimes: HashMap<String, RayonRuntime>,
+    pub rayon_runtime_mapping: HashMap<String, String>,
+}
+
+impl ThreadManagerInner {
+    /// Populates mappings with copies of config names, overrides as appropriate
+    fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
+        //TODO: this should probably be cleaned up with a macro at some point...
+
+        for name in config.native_configs.keys() {
+            self.native_runtime_mapping
+                .insert(name.clone(), name.clone());
+        }
+        for (k, v) in config.native_runtime_mapping.iter() {
+            self.native_runtime_mapping.insert(k.clone(), v.clone());
+        }
+
+        for name in config.tokio_configs.keys() {
+            self.tokio_runtime_mapping
+                .insert(name.clone(), name.clone());
+        }
+        for (k, v) in config.tokio_runtime_mapping.iter() {
+            self.tokio_runtime_mapping.insert(k.clone(), v.clone());
+        }
+
+        for name in config.rayon_configs.keys() {
+            self.rayon_runtime_mapping
+                .insert(name.clone(), name.clone());
+        }
+        for (k, v) in config.rayon_runtime_mapping.iter() {
+            self.rayon_runtime_mapping.insert(k.clone(), v.clone());
+        }
+    }
+}
+
+#[derive(Default, Debug, Clone)]
+pub struct ThreadManager {
+    inner: Arc<ThreadManagerInner>,
+}
+
+impl Deref for ThreadManager {
+    type Target = ThreadManagerInner;
+
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(default)]
+pub struct ThreadManagerConfig {
+    pub native_configs: HashMap<String, NativeConfig>,
+    pub native_runtime_mapping: HashMap<String, String>,
+
+    pub rayon_configs: HashMap<String, RayonConfig>,
+    pub rayon_runtime_mapping: HashMap<String, String>,
+
+    pub tokio_configs: HashMap<String, TokioConfig>,
+    pub tokio_runtime_mapping: HashMap<String, String>,
+
+    pub default_core_allocation: CoreAllocation,
+}
+
+impl Default for ThreadManagerConfig {
+    fn default() -> Self {
+        Self {
+            native_configs: HashMap::from([("default".to_owned(), NativeConfig::default())]),
+            native_runtime_mapping: HashMap::new(),
+            rayon_configs: HashMap::from([("default".to_owned(), RayonConfig::default())]),
+            rayon_runtime_mapping: HashMap::new(),
+            tokio_configs: HashMap::from([("default".to_owned(), TokioConfig::default())]),
+            tokio_runtime_mapping: HashMap::new(),
+            default_core_allocation: CoreAllocation::OsDefault,
+        }
+    }
+}
+
+impl ThreadManager {
+    /// Will lookup a runtime by given name. If not found, will try to lookup by name "default". If all fails, returns None.
+    fn lookup<'a, T>(
+        &'a self,
+        name: &str,
+        mapping: &HashMap<String, String>,
+        runtimes: &'a HashMap<String, T>,
+    ) -> Option<&'a T> {
+        match mapping.get(name) {
+            Some(n) => runtimes.get(n),
+            None => match mapping.get("default") {
+                Some(n) => {
+                    log::warn!("Falling back to default runtime for {name}");
+                    runtimes.get(n)
+                }
+                None => None,
+            },
+        }
+    }
+
+    pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
+        self.lookup(
+            name,
+            &self.native_runtime_mapping,
+            &self.native_thread_runtimes,
+        )
+    }
+    pub fn get_native(&self, name: &str) -> &NativeThreadRuntime {
+        if let Some(runtime) = self.try_get_native(name) {
+            runtime
+        } else {
+            panic!("Native thread pool for {name} can not be found!");
+        }
+    }
+
+    pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
+        self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
+    }
+
+    pub fn get_rayon(&self, name: &str) -> &RayonRuntime {
+        if let Some(runtime) = self.try_get_rayon(name) {
+            runtime
+        } else {
+            panic!("Rayon thread pool for {name} can not be found!");
+        }
+    }
+
+    pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
+        self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
+    }
+
+    pub fn get_tokio(&self, name: &str) -> &TokioRuntime {
+        if let Some(runtime) = self.try_get_tokio(name) {
+            runtime
+        } else {
+            panic!("Tokio thread pool for {name} can not be found!");
+        }
+    }
+
+    pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
+        let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();
+        crate::policy::set_thread_affinity(&chosen_cores_mask);
+        Ok(chosen_cores_mask)
+    }
+
+    pub fn new(config: ThreadManagerConfig) -> anyhow::Result<Self> {
+        let mut core_allocations = HashMap::<String, Vec<usize>>::new();
+        Self::set_process_affinity(&config)?;
+        let mut manager = ThreadManagerInner::default();
+        manager.populate_mappings(&config);
+        for (name, cfg) in config.native_configs.iter() {
+            let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
+            manager.native_thread_runtimes.insert(name.clone(), nrt);
+        }
+        for (name, cfg) in config.rayon_configs.iter() {
+            let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
+            manager.rayon_runtimes.insert(name.clone(), rrt);
+        }
+
+        for (name, cfg) in config.tokio_configs.iter() {
+            let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;
+
+            core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector());
+            manager.tokio_runtimes.insert(name.clone(), tokiort);
+        }
+        Ok(Self {
+            inner: Arc::new(manager),
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {crate::ThreadManagerConfig, std::io::Read};
+    #[cfg(target_os = "linux")]
+    use {
+        crate::{CoreAllocation, NativeConfig, RayonConfig, ThreadManager},
+        std::collections::HashMap,
+    };
+
+    #[test]
+    fn test_config_files() {
+        let experiments = [
+            "examples/core_contention_dedicated_set.toml",
+            "examples/core_contention_contending_set.toml",
+        ];
+
+        for exp in experiments {
+            println!("Loading config {exp}");
+            let mut conffile = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
+            conffile.push(exp);
+            let mut buf = String::new();
+            std::fs::File::open(conffile)
+                .unwrap()
+                .read_to_string(&mut buf)
+                .unwrap();
+            let cfg: ThreadManagerConfig = toml::from_str(&buf).unwrap();
+            println!("{:?}", cfg);
+        }
+    }
+    // Nobody runs Agave on windows, and on Mac we can not set mask affinity without patching external crate
+    #[cfg(target_os = "linux")]
+    fn validate_affinity(expect_cores: &[usize], error_msg: &str) {
+        let affinity = affinity::get_thread_affinity().unwrap();
+        assert_eq!(affinity, expect_cores, "{}", error_msg);
+    }
+    #[test]
+    #[cfg(target_os = "linux")]
+    #[ignore] //test ignored for now as thread priority requires kernel support and extra permissions
+    fn test_thread_priority() {
+        let priority_high = 10;
+        let priority_default = crate::policy::DEFAULT_PRIORITY;
+        let priority_low = 1;
+        let conf = ThreadManagerConfig {
+            native_configs: HashMap::from([
+                (
+                    "high".to_owned(),
+                    NativeConfig {
+                        priority: priority_high,
+                        ..Default::default()
+                    },
+                ),
+                (
+                    "default".to_owned(),
+                    NativeConfig {
+                        ..Default::default()
+                    },
+                ),
+                (
+                    "low".to_owned(),
+                    NativeConfig {
+                        priority: priority_low,
+                        ..Default::default()
+                    },
+                ),
+            ]),
+            ..Default::default()
+        };
+
+        let manager = ThreadManager::new(conf).unwrap();
+        let high = manager.get_native("high");
+        let low = manager.get_native("low");
+        let default = manager.get_native("default");
+
+        high.spawn(move || {
+            let prio =
+                thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
+            assert_eq!(
+                prio,
+                thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap())
+            );
+        })
+        .unwrap()
+        .join()
+        .unwrap();
+        low.spawn(move || {
+            let prio =
+                thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
+            assert_eq!(
+                prio,
+                thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap())
+            );
+        })
+        .unwrap()
+        .join()
+        .unwrap();
+        default
+            .spawn(move || {
+                let prio =
+                    thread_priority::get_thread_priority(thread_priority::thread_native_id())
+                        .unwrap();
+                assert_eq!(
+                    prio,
+                    thread_priority::ThreadPriority::Crossplatform(
+                        (priority_default).try_into().unwrap()
+                    )
+                );
+            })
+            .unwrap()
+            .join()
+            .unwrap();
+    }
+
+    #[cfg(target_os = "linux")]
+    #[test]
+    fn test_process_affinity() {
+        let conf = ThreadManagerConfig {
+            native_configs: HashMap::from([(
+                "pool1".to_owned(),
+                NativeConfig {
+                    core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
+                    max_threads: 5,
+                    ..Default::default()
+                },
+            )]),
+            default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
+            native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]),
+            ..Default::default()
+        };
+
+        let manager = ThreadManager::new(conf).unwrap();
+        let runtime = manager.get_native("test");
+
+        let thread1 = runtime
+            .spawn(|| {
+                validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3");
+            })
+            .unwrap();
+
+        let thread2 = std::thread::spawn(|| {
+            validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7");
+
+            let inner_thread = std::thread::spawn(|| {
+                validate_affinity(
+                    &[4, 5, 6, 7],
+                    "Nested thread allocation should still be 4-7",
+                );
+            });
+            inner_thread.join().unwrap();
+        });
+        thread1.join().unwrap();
+        thread2.join().unwrap();
+    }
+
+    #[cfg(target_os = "linux")]
+    #[test]
+    fn test_rayon_affinity() {
+        let conf = ThreadManagerConfig {
+            rayon_configs: HashMap::from([(
+                "test".to_owned(),
+                RayonConfig {
+                    core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
+                    worker_threads: 3,
+                    ..Default::default()
+                },
+            )]),
+            default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
+
+            ..Default::default()
+        };
+
+        let manager = ThreadManager::new(conf).unwrap();
+        let rayon_runtime = manager.get_rayon("test");
+
+        let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
+            println!("Rayon thread {} reporting", ctx.index());
+            validate_affinity(&[1, 2, 3], "Rayon thread allocation should still be 1-3");
+        });
+    }
+}

+ 163 - 0
thread-manager/src/native_thread_runtime.rs

@@ -0,0 +1,163 @@
+use {
+    crate::{
+        policy::{apply_policy, parse_policy, CoreAllocation},
+        MAX_THREAD_NAME_CHARS,
+    },
+    anyhow::bail,
+    log::warn,
+    serde::{Deserialize, Serialize},
+    solana_metrics::datapoint_info,
+    std::{
+        ops::Deref,
+        sync::{
+            atomic::{AtomicUsize, Ordering},
+            Arc, Mutex,
+        },
+    },
+};
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(default)]
+pub struct NativeConfig {
+    pub core_allocation: CoreAllocation,
+    pub max_threads: usize,
+    /// Priority in range 0..99
+    pub priority: u8,
+    pub policy: String,
+    pub stack_size_bytes: usize,
+}
+
+impl Default for NativeConfig {
+    fn default() -> Self {
+        Self {
+            core_allocation: CoreAllocation::OsDefault,
+            max_threads: 16,
+            priority: crate::policy::DEFAULT_PRIORITY,
+            policy: "OTHER".to_owned(),
+            stack_size_bytes: 2 * 1024 * 1024,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct NativeThreadRuntimeInner {
+    pub id_count: AtomicUsize,
+    pub running_count: Arc<AtomicUsize>,
+    pub config: NativeConfig,
+    pub name: String,
+}
+
+#[derive(Debug, Clone)]
+pub struct NativeThreadRuntime {
+    inner: Arc<NativeThreadRuntimeInner>,
+}
+
+impl Deref for NativeThreadRuntime {
+    type Target = NativeThreadRuntimeInner;
+
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+pub struct JoinHandle<T> {
+    std_handle: Option<std::thread::JoinHandle<T>>,
+    running_count: Arc<AtomicUsize>,
+}
+
+impl<T> JoinHandle<T> {
+    fn join_inner(&mut self) -> std::thread::Result<T> {
+        match self.std_handle.take() {
+            Some(jh) => {
+                let result = jh.join();
+                let rc = self.running_count.fetch_sub(1, Ordering::Relaxed);
+                datapoint_info!("thread-manager-native", ("threads-running", rc, i64),);
+                result
+            }
+            None => {
+                panic!("Thread already joined");
+            }
+        }
+    }
+
+    pub fn join(mut self) -> std::thread::Result<T> {
+        self.join_inner()
+    }
+
+    pub fn is_finished(&self) -> bool {
+        match self.std_handle {
+            Some(ref jh) => jh.is_finished(),
+            None => true,
+        }
+    }
+}
+
+impl<T> Drop for JoinHandle<T> {
+    fn drop(&mut self) {
+        if self.std_handle.is_some() {
+            warn!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your  threads!");
+            self.join_inner().expect("Child thread panicked");
+        }
+    }
+}
+
+impl NativeThreadRuntime {
+    pub fn new(name: String, cfg: NativeConfig) -> Self {
+        debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
+        Self {
+            inner: Arc::new(NativeThreadRuntimeInner {
+                id_count: AtomicUsize::new(0),
+                running_count: Arc::new(AtomicUsize::new(0)),
+                config: cfg,
+                name,
+            }),
+        }
+    }
+
+    pub fn spawn<F, T>(&self, f: F) -> anyhow::Result<JoinHandle<T>>
+    where
+        F: FnOnce() -> T,
+        F: Send + 'static,
+        T: Send + 'static,
+    {
+        let n = self.id_count.fetch_add(1, Ordering::Relaxed);
+        let name = format!("{}-{}", &self.name, n);
+        self.spawn_named(name, f)
+    }
+
+    pub fn spawn_named<F, T>(&self, name: String, f: F) -> anyhow::Result<JoinHandle<T>>
+    where
+        F: FnOnce() -> T,
+        F: Send + 'static,
+        T: Send + 'static,
+    {
+        debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
+        let spawned = self.running_count.load(Ordering::Relaxed);
+        if spawned >= self.config.max_threads {
+            bail!("All allowed threads in this pool are already spawned");
+        }
+
+        let core_alloc = self.config.core_allocation.clone();
+        let priority = self.config.priority;
+        let policy = parse_policy(&self.config.policy);
+        let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector());
+        let jh = std::thread::Builder::new()
+            .name(name)
+            .stack_size(self.config.stack_size_bytes)
+            .spawn(move || {
+                apply_policy(&core_alloc, policy, priority, &chosen_cores_mask);
+                f()
+            })?;
+        let rc = self.running_count.fetch_add(1, Ordering::Relaxed);
+        datapoint_info!("thread-manager-native", ("threads-running", rc as i64, i64),);
+        Ok(JoinHandle {
+            std_handle: Some(jh),
+            running_count: self.running_count.clone(),
+        })
+    }
+
+    #[cfg(feature = "dev-context-only-utils")]
+    pub fn new_for_tests(name: &str) -> Self {
+        Self::new(name.to_owned(), NativeConfig::default())
+    }
+}

+ 113 - 0
thread-manager/src/policy.rs

@@ -0,0 +1,113 @@
+cfg_if::cfg_if! {
+    if #[cfg(target_os = "linux")]{
+        use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy};
+    }
+    else{
+        #[derive(Clone, Copy)]
+        pub(crate) struct ThreadSchedulePolicy {}
+    }
+}
+use {
+    serde::{Deserialize, Serialize},
+    std::sync::{Mutex, OnceLock},
+};
+
+static CORE_COUNT: OnceLock<usize> = OnceLock::new();
+
+pub const DEFAULT_PRIORITY: u8 = 0;
+
+#[derive(Default, Debug, Clone, Serialize, Deserialize)]
+pub enum CoreAllocation {
+    ///Use OS default allocation (i.e. do not alter core affinity)
+    #[default]
+    OsDefault,
+    ///Pin each thread to a core in given range. Number of cores should be >= number of threads
+    PinnedCores { min: usize, max: usize },
+    ///Pin the threads to a set of cores
+    DedicatedCoreSet { min: usize, max: usize },
+}
+
+impl CoreAllocation {
+    /// Converts into a vector of core IDs. OsDefault is converted to vector with all core IDs.
+    pub fn as_core_mask_vector(&self) -> Vec<usize> {
+        match *self {
+            CoreAllocation::PinnedCores { min, max } => (min..max).collect(),
+            CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(),
+            CoreAllocation::OsDefault => Vec::from_iter(0..*CORE_COUNT.get_or_init(num_cpus::get)),
+        }
+    }
+}
+cfg_if::cfg_if! {
+    if #[cfg(target_os = "linux")]{
+
+        pub fn set_thread_affinity(cores: &[usize]) {
+            assert!(
+                !cores.is_empty(),
+                "Can not call setaffinity with empty cores mask"
+            );
+            if let Err(e) = affinity::set_thread_affinity(cores) {
+                let thread = std::thread::current();
+                panic!(
+                    "Can not set core affinity {:?} for thread {:?} named {:?}, error {}",
+                    cores,
+                    thread.id(),
+                    thread.name(),
+                    e
+                );
+            }
+        }
+        fn apply_thread_scheduler_policy(policy: ThreadSchedulePolicy, priority: u8) {
+            if let Err(e) = std::thread::current().set_priority_and_policy(
+                policy,
+                thread_priority::ThreadPriority::Crossplatform((priority).try_into().expect("Priority value outside of OS-supported range")),
+            ) {
+                panic!("Can not set thread priority, OS error {:?}", e);
+            }
+        }
+        pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy {
+            match policy.to_uppercase().as_ref() {
+                "BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
+                "OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other),
+                "IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle),
+                _ => panic!("Could not parse the policy"),
+            }
+        }
+    }
+    else{
+
+        pub fn set_thread_affinity(_cores: &[usize]) {}
+
+        pub(crate) fn parse_policy(_policy: &str) -> ThreadSchedulePolicy {
+            ThreadSchedulePolicy {}
+        }
+        fn apply_thread_scheduler_policy(_policy: ThreadSchedulePolicy, _priority: u8) {}
+    }
+}
+
+///Applies policy to the calling thread
+pub(crate) fn apply_policy(
+    alloc: &CoreAllocation,
+    policy: ThreadSchedulePolicy,
+    priority: u8,
+    chosen_cores_mask: &Mutex<Vec<usize>>,
+) {
+    apply_thread_scheduler_policy(policy, priority);
+    match alloc {
+        CoreAllocation::PinnedCores { min: _, max: _ } => {
+            let mut lg = chosen_cores_mask
+                .lock()
+                .expect("Can not lock core mask mutex");
+            let core = lg
+                .pop()
+                .expect("Not enough cores provided for pinned allocation");
+            set_thread_affinity(&[core]);
+        }
+        CoreAllocation::DedicatedCoreSet { min: _, max: _ } => {
+            let lg = chosen_cores_mask
+                .lock()
+                .expect("Can not lock core mask mutex");
+            set_thread_affinity(&lg);
+        }
+        CoreAllocation::OsDefault => {}
+    }
+}

+ 88 - 0
thread-manager/src/rayon_runtime.rs

@@ -0,0 +1,88 @@
+use {
+    crate::{
+        policy::{apply_policy, parse_policy, CoreAllocation},
+        MAX_THREAD_NAME_CHARS,
+    },
+    anyhow::Ok,
+    serde::{Deserialize, Serialize},
+    std::{
+        ops::Deref,
+        sync::{Arc, Mutex},
+    },
+};
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(default)]
+pub struct RayonConfig {
+    pub worker_threads: usize,
+    /// Priority in range 0..99
+    pub priority: u8,
+    pub policy: String,
+    pub stack_size_bytes: usize,
+    pub core_allocation: CoreAllocation,
+}
+
+impl Default for RayonConfig {
+    fn default() -> Self {
+        Self {
+            core_allocation: CoreAllocation::OsDefault,
+            worker_threads: 16,
+            priority: crate::policy::DEFAULT_PRIORITY,
+            policy: "BATCH".to_owned(),
+            stack_size_bytes: 2 * 1024 * 1024,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct RayonRuntimeInner {
+    pub rayon_pool: rayon::ThreadPool,
+    pub config: RayonConfig,
+}
+impl Deref for RayonRuntimeInner {
+    type Target = rayon::ThreadPool;
+
+    fn deref(&self) -> &Self::Target {
+        &self.rayon_pool
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct RayonRuntime {
+    inner: Arc<RayonRuntimeInner>,
+}
+
+impl Deref for RayonRuntime {
+    type Target = RayonRuntimeInner;
+
+    fn deref(&self) -> &Self::Target {
+        self.inner.deref()
+    }
+}
+
+impl RayonRuntime {
+    pub fn new(name: String, config: RayonConfig) -> anyhow::Result<Self> {
+        debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
+        let core_allocation = config.core_allocation.clone();
+        let chosen_cores_mask = Mutex::new(core_allocation.as_core_mask_vector());
+        let priority = config.priority;
+        let policy = parse_policy(&config.policy);
+        let rayon_pool = rayon::ThreadPoolBuilder::new()
+            .num_threads(config.worker_threads)
+            .thread_name(move |i| format!("{}_{}", &name, i))
+            .stack_size(config.stack_size_bytes)
+            .start_handler(move |_idx| {
+                apply_policy(&core_allocation, policy, priority, &chosen_cores_mask);
+            })
+            .build()?;
+        Ok(Self {
+            inner: Arc::new(RayonRuntimeInner { rayon_pool, config }),
+        })
+    }
+
+    #[cfg(feature = "dev-context-only-utils")]
+    pub fn new_for_tests(name: &str) -> Self {
+        Self::new(name.to_owned(), RayonConfig::default())
+            .expect("Failed to create rayon runtime for tests")
+    }
+}

+ 186 - 0
thread-manager/src/tokio_runtime.rs

@@ -0,0 +1,186 @@
+use {
+    crate::{
+        policy::{apply_policy, parse_policy, CoreAllocation},
+        MAX_THREAD_NAME_CHARS,
+    },
+    serde::{Deserialize, Serialize},
+    solana_metrics::datapoint_info,
+    std::{
+        ops::Deref,
+        sync::{
+            atomic::{AtomicU64, AtomicUsize, Ordering},
+            Arc, Mutex,
+        },
+        time::Duration,
+    },
+    thread_priority::ThreadExt,
+};
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(default)]
+pub struct TokioConfig {
+    ///number of worker threads tokio is allowed to spawn
+    pub worker_threads: usize,
+    ///max number of blocking threads tokio is allowed to spawn
+    pub max_blocking_threads: usize,
+    /// Priority in range 0..99
+    pub priority: u8,
+    pub policy: String,
+    pub stack_size_bytes: usize,
+    pub event_interval: u32,
+    pub core_allocation: CoreAllocation,
+}
+
+impl Default for TokioConfig {
+    fn default() -> Self {
+        Self {
+            core_allocation: CoreAllocation::OsDefault,
+            worker_threads: 8,
+            max_blocking_threads: 1,
+            priority: crate::policy::DEFAULT_PRIORITY,
+            policy: "OTHER".to_owned(),
+            stack_size_bytes: 2 * 1024 * 1024,
+            event_interval: 61,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct TokioRuntime {
+    pub tokio: tokio::runtime::Runtime,
+    pub config: TokioConfig,
+    pub counters: Arc<ThreadCounters>,
+}
+
+impl Deref for TokioRuntime {
+    type Target = tokio::runtime::Runtime;
+
+    fn deref(&self) -> &Self::Target {
+        &self.tokio
+    }
+}
+
+impl TokioRuntime {
+    /// Starts the metrics sampling task on the runtime to monitor
+    /// how many workers are busy doing useful things.
+    pub fn start_metrics_sampling(&self, period: Duration) {
+        let counters = self.counters.clone();
+        self.tokio.spawn(metrics_sampler(counters, period));
+    }
+
+    pub fn new(name: String, cfg: TokioConfig) -> anyhow::Result<Self> {
+        debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
+        let num_workers = if cfg.worker_threads == 0 {
+            num_cpus::get()
+        } else {
+            cfg.worker_threads
+        };
+        let chosen_cores_mask = cfg.core_allocation.as_core_mask_vector();
+
+        let base_name = name.clone();
+        let mut builder = match num_workers {
+            1 => tokio::runtime::Builder::new_current_thread(),
+            _ => {
+                let mut builder = tokio::runtime::Builder::new_multi_thread();
+                builder.worker_threads(num_workers);
+                builder
+            }
+        };
+        let atomic_id: AtomicUsize = AtomicUsize::new(0);
+
+        let counters = Arc::new(ThreadCounters {
+            // no workaround, metrics crate will only consume 'static str
+            namespace: format!("thread-manager-tokio-{}", &base_name).leak(),
+            total_threads_cnt: cfg.worker_threads as u64,
+            active_threads_cnt: AtomicU64::new(
+                (num_workers.wrapping_add(cfg.max_blocking_threads)) as u64,
+            ),
+        });
+        builder
+            .event_interval(cfg.event_interval)
+            .thread_name_fn(move || {
+                let id = atomic_id.fetch_add(1, Ordering::Relaxed);
+                format!("{}-{}", base_name, id)
+            })
+            .on_thread_park({
+                let counters = counters.clone();
+                move || {
+                    counters.on_park();
+                }
+            })
+            .on_thread_unpark({
+                let counters = counters.clone();
+                move || {
+                    counters.on_unpark();
+                }
+            })
+            .thread_stack_size(cfg.stack_size_bytes)
+            .enable_all()
+            .max_blocking_threads(cfg.max_blocking_threads);
+
+        //keep borrow checker happy and move these things into the closure
+        let c = cfg.clone();
+        let chosen_cores_mask = Mutex::new(chosen_cores_mask);
+        builder.on_thread_start(move || {
+            let cur_thread = std::thread::current();
+            let _tid = cur_thread
+                .get_native_id()
+                .expect("Can not get thread id for newly created thread");
+
+            apply_policy(
+                &c.core_allocation,
+                parse_policy(&c.policy),
+                c.priority,
+                &chosen_cores_mask,
+            );
+        });
+        Ok(TokioRuntime {
+            tokio: builder.build()?,
+            config: cfg.clone(),
+            counters,
+        })
+    }
+
+    /// Makes test runtime with 2 threads, only for unittests
+    #[cfg(feature = "dev-context-only-utils")]
+    pub fn new_for_tests() -> Self {
+        let cfg = TokioConfig {
+            worker_threads: 2,
+            ..Default::default()
+        };
+        TokioRuntime::new("solNetTest".to_owned(), cfg.clone())
+            .expect("Failed to create Tokio runtime for tests")
+    }
+}
+
+/// Internal counters to keep track of worker pool utilization
+#[derive(Debug)]
+pub struct ThreadCounters {
+    pub namespace: &'static str,
+    pub total_threads_cnt: u64,
+    pub active_threads_cnt: AtomicU64,
+}
+
+impl ThreadCounters {
+    pub fn on_park(&self) {
+        self.active_threads_cnt.fetch_sub(1, Ordering::Relaxed);
+    }
+
+    pub fn on_unpark(&self) {
+        self.active_threads_cnt.fetch_add(1, Ordering::Relaxed);
+    }
+}
+
+async fn metrics_sampler(counters: Arc<ThreadCounters>, period: Duration) {
+    let mut interval = tokio::time::interval(period);
+    loop {
+        interval.tick().await;
+        let active = counters.active_threads_cnt.load(Ordering::Relaxed) as i64;
+        let parked = (counters.total_threads_cnt as i64).saturating_sub(active);
+        datapoint_info!(
+            counters.namespace,
+            ("threads_parked", parked, i64),
+            ("threads_active", active, i64),
+        );
+    }
+}