Explorar el Código

Limit Rayon threadpool threads (#5871)

Sagar Dhawan hace 6 años
padre
commit
c1d788880d

+ 11 - 0
Cargo.lock

@@ -3084,6 +3084,7 @@ dependencies = [
  "solana-merkle-tree 0.19.0-pre0",
  "solana-metrics 0.19.0-pre0",
  "solana-netutil 0.19.0-pre0",
+ "solana-rayon-threadlimit 0.19.0-pre0",
  "solana-runtime 0.19.0-pre0",
  "solana-sdk 0.19.0-pre0",
  "solana-stake-api 0.19.0-pre0",
@@ -3332,6 +3333,7 @@ dependencies = [
  "solana-client 0.19.0-pre0",
  "solana-core 0.19.0-pre0",
  "solana-logger 0.19.0-pre0",
+ "solana-rayon-threadlimit 0.19.0-pre0",
  "solana-runtime 0.19.0-pre0",
  "solana-sdk 0.19.0-pre0",
  "solana-stake-api 0.19.0-pre0",
@@ -3444,6 +3446,14 @@ dependencies = [
  "solana-sdk 0.19.0-pre0",
 ]
 
+[[package]]
+name = "solana-rayon-threadlimit"
+version = "0.19.0-pre0"
+dependencies = [
+ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "sys-info 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [[package]]
 name = "solana-replicator"
 version = "0.19.0-pre0"
@@ -3482,6 +3492,7 @@ dependencies = [
  "solana-measure 0.19.0-pre0",
  "solana-metrics 0.19.0-pre0",
  "solana-noop-program 0.19.0-pre0",
+ "solana-rayon-threadlimit 0.19.0-pre0",
  "solana-sdk 0.19.0-pre0",
  "solana-stake-api 0.19.0-pre0",
  "solana-stake-program 0.19.0-pre0",

+ 1 - 0
Cargo.toml

@@ -56,6 +56,7 @@ members = [
     "fixed-buf",
     "vote-signer",
     "cli",
+    "rayon-threadlimit",
 ]
 exclude = [
     "programs/bpf",

+ 1 - 1
ci/buildkite.yml

@@ -11,7 +11,7 @@ steps:
   - wait
   - command: "ci/test-stable-perf.sh"
     name: "stable-perf"
-    timeout_in_minutes: 30
+    timeout_in_minutes: 40
     artifact_paths: "log-*.txt"
     agents:
       - "queue=cuda"

+ 1 - 0
core/Cargo.toml

@@ -75,6 +75,7 @@ tokio-codec = "0.1"
 tokio-fs = "0.1"
 tokio-io = "0.1"
 untrusted = "0.7.0"
+solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.19.0-pre0" }
 
 # reed-solomon-erasure's simd_c feature fails to build for x86_64-pc-windows-msvc, use pure-rust
 [target.'cfg(windows)'.dependencies]

+ 6 - 5
core/src/blocktree_processor.rs

@@ -2,6 +2,8 @@ use crate::bank_forks::BankForks;
 use crate::blocktree::{Blocktree, SlotMeta};
 use crate::entry::{Entry, EntrySlice};
 use crate::leader_schedule_cache::LeaderScheduleCache;
+use rand::seq::SliceRandom;
+use rand::thread_rng;
 use rayon::prelude::*;
 use rayon::ThreadPool;
 use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug};
@@ -16,16 +18,15 @@ use std::result;
 use std::sync::Arc;
 use std::time::{Duration, Instant};
 
-use rand::seq::SliceRandom;
-use rand::thread_rng;
-
 pub const NUM_THREADS: u32 = 10;
+use solana_rayon_threadlimit::get_thread_count;
 use std::cell::RefCell;
 
 thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
-                    .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
+                    .num_threads(get_thread_count())
                     .build()
-                    .unwrap()));
+                    .unwrap())
+);
 
 fn first_err(results: &[Result<()>]) -> Result<()> {
     for r in results {

+ 3 - 2
core/src/entry.rs

@@ -14,6 +14,7 @@ use solana_merkle_tree::MerkleTree;
 use solana_metrics::inc_new_counter_warn;
 use solana_sdk::hash::Hash;
 use solana_sdk::signature::{Keypair, KeypairUtil};
+use solana_sdk::timing;
 use solana_sdk::transaction::Transaction;
 use std::borrow::Borrow;
 use std::cell::RefCell;
@@ -22,7 +23,7 @@ use std::sync::{Arc, RwLock};
 
 #[cfg(feature = "cuda")]
 use crate::sigverify::poh_verify_many;
-use solana_sdk::timing;
+use solana_rayon_threadlimit::get_thread_count;
 #[cfg(feature = "cuda")]
 use std::sync::Mutex;
 #[cfg(feature = "cuda")]
@@ -32,7 +33,7 @@ use std::time::Instant;
 pub const NUM_THREADS: u32 = 10;
 
 thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
-                    .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
+                    .num_threads(get_thread_count())
                     .build()
                     .unwrap()));
 

+ 4 - 5
core/src/sigverify.rs

@@ -19,17 +19,16 @@ use solana_sdk::signature::Signature;
 use solana_sdk::transaction::Transaction;
 use std::mem::size_of;
 
-#[cfg(feature = "cuda")]
-use std::os::raw::{c_int, c_uint};
-
 #[cfg(feature = "cuda")]
 use core::ffi::c_void;
-
+use solana_rayon_threadlimit::get_thread_count;
+#[cfg(feature = "cuda")]
+use std::os::raw::{c_int, c_uint};
 pub const NUM_THREADS: u32 = 10;
 use std::cell::RefCell;
 
 thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
-                    .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
+                    .num_threads(get_thread_count())
                     .build()
                     .unwrap()));
 

+ 2 - 1
core/src/window_service.rs

@@ -12,6 +12,7 @@ use crate::streamer::{PacketReceiver, PacketSender};
 use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
 use rayon::ThreadPool;
 use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
+use solana_rayon_threadlimit::get_thread_count;
 use solana_runtime::bank::Bank;
 use solana_sdk::pubkey::Pubkey;
 use solana_sdk::timing::duration_as_ms;
@@ -205,7 +206,7 @@ impl WindowService {
                 trace!("{}: RECV_WINDOW started", id);
                 let mut now = Instant::now();
                 let thread_pool = rayon::ThreadPoolBuilder::new()
-                    .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
+                    .num_threads(get_thread_count())
                     .build()
                     .unwrap();
                 loop {

+ 1 - 0
local_cluster/Cargo.toml

@@ -22,6 +22,7 @@ solana-storage-program = { path = "../programs/storage_program", version = "0.19
 solana-vote-api = { path = "../programs/vote_api", version = "0.19.0-pre0" }
 symlink = "0.1.0"
 tempfile = "3.1.0"
+solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.19.0-pre0" }
 
 [dev-dependencies]
 serial_test = "0.2.0"

+ 3 - 0
local_cluster/src/local_cluster.rs

@@ -10,6 +10,7 @@ use solana_core::{
     service::Service,
     validator::{Validator, ValidatorConfig},
 };
+use solana_rayon_threadlimit::set_thread_count;
 use solana_sdk::{
     client::SyncClient,
     clock::DEFAULT_TICKS_PER_SLOT,
@@ -116,6 +117,8 @@ impl LocalCluster {
     }
 
     pub fn new(config: &ClusterConfig) -> Self {
+        set_thread_count(1);
+
         assert_eq!(config.validator_configs.len(), config.node_stakes.len());
         let leader_keypair = Arc::new(Keypair::new());
         let leader_pubkey = leader_keypair.pubkey();

+ 4 - 14
local_cluster/tests/local_cluster.rs

@@ -27,8 +27,6 @@ use tempfile::TempDir;
 
 #[test]
 #[serial]
-#[allow(unused_attributes)]
-#[ignore]
 fn test_ledger_cleanup_service() {
     solana_logger::setup();
     error!("test_ledger_cleanup_service");
@@ -111,9 +109,8 @@ fn test_spend_and_verify_all_nodes_3() {
     );
 }
 
-#[allow(unused_attributes)]
 #[test]
-#[serial]
+#[allow(unused_attributes)]
 #[ignore]
 fn test_spend_and_verify_all_nodes_env_num_nodes() {
     solana_logger::setup();
@@ -132,7 +129,6 @@ fn test_spend_and_verify_all_nodes_env_num_nodes() {
 
 #[allow(unused_attributes)]
 #[test]
-#[serial]
 #[should_panic]
 fn test_fullnode_exit_default_config_should_panic() {
     solana_logger::setup();
@@ -161,10 +157,8 @@ fn test_fullnode_exit_2() {
 }
 
 // Cluster needs a supermajority to remain, so the minimum size for this test is 4
-#[allow(unused_attributes)]
 #[test]
 #[serial]
-#[ignore]
 fn test_leader_failure_4() {
     solana_logger::setup();
     error!("test_leader_failure_4");
@@ -222,14 +216,14 @@ fn test_two_unbalanced_stakes() {
 }
 
 #[test]
-#[ignore]
+#[serial]
 fn test_forwarding() {
     // Set up a cluster where one node is never the leader, so all txs sent to this node
     // will be have to be forwarded in order to be confirmed
     let config = ClusterConfig {
         node_stakes: vec![999_990, 3],
         cluster_lamports: 2_000_000,
-        validator_configs: vec![ValidatorConfig::default(); 3],
+        validator_configs: vec![ValidatorConfig::default(); 2],
         ..ClusterConfig::default()
     };
     let cluster = LocalCluster::new(&config);
@@ -363,7 +357,6 @@ fn test_snapshot_restart_locktower() {
     );
 }
 
-#[allow(unused_attributes)]
 #[test]
 #[serial]
 fn test_snapshots_blocktree_floor() {
@@ -524,15 +517,14 @@ fn test_snapshots_restart_validity() {
     }
 }
 
-#[allow(unused_attributes)]
 #[test]
 #[serial]
-#[ignore]
 fn test_fail_entry_verification_leader() {
     test_faulty_node(BroadcastStageType::FailEntryVerification);
 }
 
 #[test]
+#[allow(unused_attributes)]
 #[ignore]
 fn test_fake_blobs_broadcast_leader() {
     test_faulty_node(BroadcastStageType::BroadcastFakeBlobs);
@@ -591,9 +583,7 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) {
     );
 }
 
-#[allow(unused_attributes)]
 #[test]
-#[serial]
 #[ignore]
 fn test_repairman_catchup() {
     solana_logger::setup();

+ 2 - 0
rayon-threadlimit/.gitignore

@@ -0,0 +1,2 @@
+/target/
+/farf/

+ 13 - 0
rayon-threadlimit/Cargo.toml

@@ -0,0 +1,13 @@
+[package]
+name = "solana-rayon-threadlimit"
+version = "0.19.0-pre0"
+homepage = "https://solana.com/"
+readme = "../README.md"
+repository = "https://github.com/solana-labs/solana"
+authors = ["Solana Maintainers <maintainers@solana.com>"]
+license = "Apache-2.0"
+edition = "2018"
+
+[dependencies]
+lazy_static = "1.4.0"
+sys-info = "0.5.7"

+ 18 - 0
rayon-threadlimit/src/lib.rs

@@ -0,0 +1,18 @@
+#[macro_use]
+extern crate lazy_static;
+
+use std::sync::RwLock;
+
+//TODO remove this hack when rayon fixes itself
+lazy_static! {
+    static ref MAX_RAYON_THREADS: RwLock<usize> =
+        RwLock::new(sys_info::cpu_num().unwrap() as usize);
+}
+
+pub fn get_thread_count() -> usize {
+    *MAX_RAYON_THREADS.read().unwrap()
+}
+
+pub fn set_thread_count(count: usize) {
+    *MAX_RAYON_THREADS.write().unwrap() = count;
+}

+ 1 - 0
runtime/Cargo.toml

@@ -37,6 +37,7 @@ solana-vote-api = { path = "../programs/vote_api", version = "0.19.0-pre0" }
 solana-vote-program = { path = "../programs/vote_program", version = "0.19.0-pre0" }
 sys-info = "0.5.8"
 tempfile = "3.1.0"
+solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.19.0-pre0" }
 
 [lib]
 crate-type = ["lib"]

+ 2 - 2
runtime/src/accounts_db.rs

@@ -30,6 +30,7 @@ use serde::de::{MapAccess, Visitor};
 use serde::ser::{SerializeMap, Serializer};
 use serde::{Deserialize, Serialize};
 use solana_measure::measure::Measure;
+use solana_rayon_threadlimit::get_thread_count;
 use solana_sdk::account::Account;
 use solana_sdk::pubkey::Pubkey;
 use std::collections::{HashMap, HashSet};
@@ -39,7 +40,6 @@ use std::path::Path;
 use std::path::PathBuf;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, RwLock};
-use sys_info;
 use tempfile::TempDir;
 
 pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
@@ -370,7 +370,7 @@ pub struct AccountsDB {
 
 impl Default for AccountsDB {
     fn default() -> Self {
-        let num_threads = sys_info::cpu_num().unwrap_or(DEFAULT_NUM_THREADS) as usize;
+        let num_threads = get_thread_count();
 
         AccountsDB {
             accounts_index: RwLock::new(AccountsIndex::default()),