Ver Fonte

validator: Organize blockstore related args (#8803)

* init

* --wal-recovery-mode

* --rocksdb-ledger-compression

* --rocksdb-perf-sample-interval

* --rocksdb-compaction-threads

* --rocksdb-flush-threads

* --rocksdb-shred-compaction

* --limit-ledger-size

* impl ThreadArg for RocksdbCompactionThreadsArg

* impl ThreadArg for RocksdbFlushThreadsArg
Yihau Chen há 1 semana atrás
pai
commit
9ebd1e5dc5

+ 0 - 7
validator/src/cli.rs

@@ -246,10 +246,6 @@ pub struct DefaultArgs {
     pub snapshot_archive_format: String,
     pub snapshot_zstd_compression_level: String,
 
-    pub rocksdb_shred_compaction: String,
-    pub rocksdb_ledger_compression: String,
-    pub rocksdb_perf_sample_interval: String,
-
     pub accounts_shrink_optimize_total_space: String,
     pub accounts_shrink_ratio: String,
     pub tpu_connection_pool_size: String,
@@ -301,9 +297,6 @@ impl DefaultArgs {
             snapshot_zstd_compression_level: "1".to_string(), // level 1 is optimized for speed
             contact_debug_interval: "120000".to_string(),
             snapshot_version: SnapshotVersion::default(),
-            rocksdb_shred_compaction: "level".to_string(),
-            rocksdb_ledger_compression: "none".to_string(),
-            rocksdb_perf_sample_interval: "0".to_string(),
             accounts_shrink_optimize_total_space: DEFAULT_ACCOUNTS_SHRINK_OPTIMIZE_TOTAL_SPACE
                 .to_string(),
             accounts_shrink_ratio: DEFAULT_ACCOUNTS_SHRINK_RATIO.to_string(),

+ 1 - 29
validator/src/cli/thread_args.rs

@@ -19,8 +19,6 @@ pub struct DefaultThreadArgs {
     pub rayon_global_threads: String,
     pub replay_forks_threads: String,
     pub replay_transactions_threads: String,
-    pub rocksdb_compaction_threads: String,
-    pub rocksdb_flush_threads: String,
     pub tpu_transaction_forward_receive_threads: String,
     pub tpu_transaction_receive_threads: String,
     pub tpu_vote_transaction_receive_threads: String,
@@ -44,8 +42,6 @@ impl Default for DefaultThreadArgs {
             replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
             replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
                 .to_string(),
-            rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
-            rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
             tpu_transaction_forward_receive_threads:
                 TpuTransactionForwardReceiveThreadArgs::bounded_default().to_string(),
             tpu_transaction_receive_threads: TpuTransactionReceiveThreads::bounded_default()
@@ -69,8 +65,6 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
         new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
         new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
         new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
-        new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
-        new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
         new_thread_arg::<TpuTransactionForwardReceiveThreadArgs>(
             &defaults.tpu_transaction_forward_receive_threads,
         ),
@@ -84,7 +78,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
     ]
 }
 
-fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
+pub(crate) fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
     Arg::with_name(T::NAME)
         .long(T::LONG_NAME)
         .takes_value(true)
@@ -313,28 +307,6 @@ impl ThreadArg for ReplayTransactionsThreadsArg {
     }
 }
 
-pub struct RocksdbCompactionThreadsArg;
-impl ThreadArg for RocksdbCompactionThreadsArg {
-    const NAME: &'static str = "rocksdb_compaction_threads";
-    const LONG_NAME: &'static str = "rocksdb-compaction-threads";
-    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";
-
-    fn default() -> usize {
-        solana_ledger::blockstore::default_num_compaction_threads().get()
-    }
-}
-
-pub struct RocksdbFlushThreadsArg;
-impl ThreadArg for RocksdbFlushThreadsArg {
-    const NAME: &'static str = "rocksdb_flush_threads";
-    const LONG_NAME: &'static str = "rocksdb-flush-threads";
-    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";
-
-    fn default() -> usize {
-        solana_ledger::blockstore::default_num_flush_threads().get()
-    }
-}
-
 struct TpuTransactionForwardReceiveThreadArgs;
 impl ThreadArg for TpuTransactionForwardReceiveThreadArgs {
     const NAME: &'static str = "tpu_transaction_forward_receive_threads";

+ 1 - 61
validator/src/commands/run/args.rs

@@ -573,54 +573,6 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a,
             .default_value(default_args.snapshot_version.into())
             .help("Output snapshot version"),
     )
-    .arg(
-        Arg::with_name("limit_ledger_size")
-            .long("limit-ledger-size")
-            .value_name("SHRED_COUNT")
-            .takes_value(true)
-            .min_values(0)
-            .max_values(1)
-            /* .default_value() intentionally not used here! */
-            .help("Keep this amount of shreds in root slots."),
-    )
-    .arg(
-        Arg::with_name("rocksdb_shred_compaction")
-            .long("rocksdb-shred-compaction")
-            .value_name("ROCKSDB_COMPACTION_STYLE")
-            .takes_value(true)
-            .possible_values(&["level"])
-            .default_value(&default_args.rocksdb_shred_compaction)
-            .help(
-                "Controls how RocksDB compacts shreds. *WARNING*: You will lose your Blockstore \
-                 data when you switch between options.",
-            ),
-    )
-    .arg(
-        Arg::with_name("rocksdb_ledger_compression")
-            .hidden(hidden_unless_forced())
-            .long("rocksdb-ledger-compression")
-            .value_name("COMPRESSION_TYPE")
-            .takes_value(true)
-            .possible_values(&["none", "lz4", "snappy", "zlib"])
-            .default_value(&default_args.rocksdb_ledger_compression)
-            .help(
-                "The compression algorithm that is used to compress transaction status data. \
-                 Turning on compression can save ~10% of the ledger size.",
-            ),
-    )
-    .arg(
-        Arg::with_name("rocksdb_perf_sample_interval")
-            .hidden(hidden_unless_forced())
-            .long("rocksdb-perf-sample-interval")
-            .value_name("ROCKS_PERF_SAMPLE_INTERVAL")
-            .takes_value(true)
-            .validator(is_parsable::<usize>)
-            .default_value(&default_args.rocksdb_perf_sample_interval)
-            .help(
-                "Controls how often RocksDB read/write performance samples are collected. Perf \
-                 samples are collected in 1 / ROCKS_PERF_SAMPLE_INTERVAL sampling rate.",
-            ),
-    )
     .arg(
         Arg::with_name("skip_startup_ledger_verification")
             .long("skip-startup-ledger-verification")
@@ -948,19 +900,6 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a,
                  See the zstd manpage for more information.",
             ),
     )
-    .arg(
-        Arg::with_name("wal_recovery_mode")
-            .long("wal-recovery-mode")
-            .value_name("MODE")
-            .takes_value(true)
-            .possible_values(&[
-                "tolerate_corrupted_tail_records",
-                "absolute_consistency",
-                "point_in_time",
-                "skip_any_corrupted_record",
-            ])
-            .help("Mode to recovery the ledger db write ahead log."),
-    )
     .arg(
         Arg::with_name("poh_pinned_cpu_core")
             .hidden(hidden_unless_forced())
@@ -1374,6 +1313,7 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a,
     .args(&rpc_bigtable_config::args())
     .args(&send_transaction_config::args())
     .args(&rpc_bootstrap_config::args())
+    .args(&blockstore_options::args())
 }
 
 fn validators_set(

+ 140 - 3
validator/src/commands/run/args/blockstore_options.rs

@@ -1,16 +1,47 @@
 use {
     crate::{
-        cli::thread_args::{RocksdbCompactionThreadsArg, RocksdbFlushThreadsArg, ThreadArg},
+        cli::thread_args::{new_thread_arg, ThreadArg},
         commands::{FromClapArgMatches, Result},
     },
-    clap::{value_t, ArgMatches},
+    clap::{value_t, Arg, ArgMatches},
+    solana_clap_utils::{hidden_unless_forced, input_validators::is_parsable},
     solana_ledger::blockstore_options::{
         AccessType, BlockstoreCompressionType, BlockstoreOptions, BlockstoreRecoveryMode,
         LedgerColumnOptions,
     },
-    std::num::NonZeroUsize,
+    std::{num::NonZeroUsize, sync::LazyLock},
 };
 
+struct RocksdbCompactionThreadsArg;
+impl ThreadArg for RocksdbCompactionThreadsArg {
+    const NAME: &'static str = "rocksdb_compaction_threads";
+    const LONG_NAME: &'static str = "rocksdb-compaction-threads";
+    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";
+
+    fn default() -> usize {
+        solana_ledger::blockstore::default_num_compaction_threads().get()
+    }
+}
+
+struct RocksdbFlushThreadsArg;
+impl ThreadArg for RocksdbFlushThreadsArg {
+    const NAME: &'static str = "rocksdb_flush_threads";
+    const LONG_NAME: &'static str = "rocksdb-flush-threads";
+    const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";
+
+    fn default() -> usize {
+        solana_ledger::blockstore::default_num_flush_threads().get()
+    }
+}
+
+const DEFAULT_ROCKSDB_LEDGER_COMPRESSION: &str = "none";
+const DEFAULT_ROCKSDB_PERF_SAMPLE_INTERVAL: &str = "0";
+static DEFAULT_ROCKSDB_COMPACTION_THREADS: LazyLock<String> =
+    LazyLock::new(|| RocksdbCompactionThreadsArg::default().to_string());
+static DEFAULT_ROCKSDB_FLUSH_THREADS: LazyLock<String> =
+    LazyLock::new(|| RocksdbFlushThreadsArg::default().to_string());
+const DEFAULT_ROCKSDB_SHRED_COMPACTION: &str = "level";
+
 impl FromClapArgMatches for BlockstoreOptions {
     fn from_clap_arg_match(matches: &ArgMatches) -> Result<Self> {
         let recovery_mode = matches
@@ -53,6 +84,64 @@ impl FromClapArgMatches for BlockstoreOptions {
     }
 }
 
+pub(crate) fn args<'a, 'b>() -> Vec<Arg<'a, 'b>> {
+    vec![
+        Arg::with_name("wal_recovery_mode")
+            .long("wal-recovery-mode")
+            .value_name("MODE")
+            .takes_value(true)
+            .possible_values(&[
+                "tolerate_corrupted_tail_records",
+                "absolute_consistency",
+                "point_in_time",
+                "skip_any_corrupted_record",
+            ])
+            .help("Mode to recovery the ledger db write ahead log."),
+        Arg::with_name("rocksdb_ledger_compression")
+            .hidden(hidden_unless_forced())
+            .long("rocksdb-ledger-compression")
+            .value_name("COMPRESSION_TYPE")
+            .takes_value(true)
+            .possible_values(&["none", "lz4", "snappy", "zlib"])
+            .default_value(DEFAULT_ROCKSDB_LEDGER_COMPRESSION)
+            .help(
+                "The compression algorithm that is used to compress transaction status data. \
+                 Turning on compression can save ~10% of the ledger size.",
+            ),
+        Arg::with_name("rocksdb_perf_sample_interval")
+            .hidden(hidden_unless_forced())
+            .long("rocksdb-perf-sample-interval")
+            .value_name("ROCKS_PERF_SAMPLE_INTERVAL")
+            .takes_value(true)
+            .validator(is_parsable::<usize>)
+            .default_value(DEFAULT_ROCKSDB_PERF_SAMPLE_INTERVAL)
+            .help(
+                "Controls how often RocksDB read/write performance samples are collected. Perf \
+                 samples are collected in 1 / ROCKS_PERF_SAMPLE_INTERVAL sampling rate.",
+            ),
+        new_thread_arg::<RocksdbCompactionThreadsArg>(&DEFAULT_ROCKSDB_COMPACTION_THREADS),
+        new_thread_arg::<RocksdbFlushThreadsArg>(&DEFAULT_ROCKSDB_FLUSH_THREADS),
+        Arg::with_name("rocksdb_shred_compaction")
+            .long("rocksdb-shred-compaction")
+            .value_name("ROCKSDB_COMPACTION_STYLE")
+            .takes_value(true)
+            .possible_values(&["level"])
+            .default_value(DEFAULT_ROCKSDB_SHRED_COMPACTION)
+            .help(
+                "Controls how RocksDB compacts shreds. *WARNING*: You will lose your Blockstore \
+                 data when you switch between options.",
+            ),
+        Arg::with_name("limit_ledger_size")
+            .long("limit-ledger-size")
+            .value_name("SHRED_COUNT")
+            .takes_value(true)
+            .min_values(0)
+            .max_values(1)
+            /* .default_value() intentionally not used here! */
+            .help("Keep this amount of shreds in root slots."),
+    ]
+}
+
 #[cfg(test)]
 mod tests {
     use {
@@ -64,6 +153,7 @@ mod tests {
             },
             RunArgs,
         },
+        std::ops::RangeInclusive,
         test_case::test_case,
     };
 
@@ -199,4 +289,51 @@ mod tests {
             );
         }
     }
+
+    #[test]
+    fn test_default_rocksdb_ledger_compression_unchanged() {
+        assert_eq!(DEFAULT_ROCKSDB_LEDGER_COMPRESSION, "none");
+    }
+
+    #[test]
+    fn test_default_rocksdb_perf_sample_interval_unchanged() {
+        assert_eq!(DEFAULT_ROCKSDB_PERF_SAMPLE_INTERVAL, "0");
+    }
+
+    #[test]
+    fn test_default_rocksdb_compaction_threads_unchanged() {
+        assert_eq!(
+            *DEFAULT_ROCKSDB_COMPACTION_THREADS,
+            num_cpus::get().to_string(),
+        );
+    }
+
+    #[test]
+    fn test_valid_range_rocksdb_compaction_threads_unchanged() {
+        assert_eq!(
+            RocksdbCompactionThreadsArg::range(),
+            RangeInclusive::new(1, num_cpus::get()),
+        );
+    }
+
+    #[test]
+    fn test_default_rocksdb_flush_threads_unchanged() {
+        assert_eq!(
+            *DEFAULT_ROCKSDB_FLUSH_THREADS,
+            (num_cpus::get() / 4).max(1).to_string()
+        );
+    }
+
+    #[test]
+    fn test_valid_range_rocksdb_flush_threads_unchanged() {
+        assert_eq!(
+            RocksdbFlushThreadsArg::range(),
+            RangeInclusive::new(1, num_cpus::get()),
+        );
+    }
+
+    #[test]
+    fn test_default_rocksdb_shred_compaction_unchanged() {
+        assert_eq!(DEFAULT_ROCKSDB_SHRED_COMPACTION, "level");
+    }
 }