Преглед изворни кода

feat(scheduler-bindings): add `--enable-scheduler-bindings` (#8850)

OliverNChalk пре 2 недеља
родитељ
комит
dcd98dcb63

+ 1 - 0
CHANGELOG.md

@@ -17,6 +17,7 @@ Release channels have their own copy of this changelog:
 ### RPC
 #### Breaking
 #### Changes
+* Added `--enable-scheduler-bindings` which binds an IPC server at `<ledger-path>/scheduler_bindings.ipc` for external schedulers to connect to.
 ### Validator
 #### Breaking
 #### Deprecations

+ 0 - 1
core/src/lib.rs

@@ -39,7 +39,6 @@ pub mod replay_stage;
 pub mod resource_limits;
 mod result;
 pub mod sample_performance_service;
-#[allow(dead_code)]
 #[cfg(unix)]
 mod scheduler_bindings_server;
 mod shred_fetch_stage;

+ 9 - 0
core/src/tpu.rs

@@ -64,6 +64,7 @@ use {
         collections::HashMap,
         net::{SocketAddr, UdpSocket},
         num::NonZeroUsize,
+        path::PathBuf,
         sync::{atomic::AtomicBool, Arc, RwLock},
         thread::{self, JoinHandle},
         time::Duration,
@@ -164,6 +165,7 @@ impl Tpu {
         _generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
         key_notifiers: Arc<RwLock<KeyUpdaters>>,
         banking_control_receiver: mpsc::Receiver<BankingControlMsg>,
+        scheduler_bindings: Option<(PathBuf, mpsc::Sender<BankingControlMsg>)>,
         cancel: CancellationToken,
     ) -> Self {
         let TpuSockets {
@@ -347,6 +349,13 @@ impl Tpu {
             prioritization_fee_cache.clone(),
         );
 
+        #[cfg(unix)]
+        if let Some((path, banking_control_sender)) = scheduler_bindings {
+            super::scheduler_bindings_server::spawn(&path, banking_control_sender);
+        }
+        #[cfg(not(unix))]
+        assert!(scheduler_bindings.is_none());
+
         let SpawnForwardingStageResult {
             join_handle: forwarding_stage,
             client_updater,

+ 8 - 0
core/src/validator.rs

@@ -368,6 +368,7 @@ pub struct ValidatorConfig {
     pub block_production_num_workers: NonZeroUsize,
     pub block_production_scheduler_config: SchedulerConfig,
     pub enable_block_production_forwarding: bool,
+    pub enable_scheduler_bindings: bool,
     pub generator_config: Option<GeneratorConfig>,
     pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
     pub wen_restart_proto_path: Option<PathBuf>,
@@ -449,6 +450,7 @@ impl ValidatorConfig {
             block_production_scheduler_config: SchedulerConfig::default(),
             // enable forwarding by default for tests
             enable_block_production_forwarding: true,
+            enable_scheduler_bindings: false,
             generator_config: None,
             use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
             wen_restart_proto_path: None,
@@ -1751,6 +1753,12 @@ impl Validator {
             config.generator_config.clone(),
             key_notifiers.clone(),
             banking_control_reciever,
+            config.enable_scheduler_bindings.then(|| {
+                (
+                    ledger_path.join("scheduler_bindings.ipc"),
+                    banking_control_sender.clone(),
+                )
+            }),
             cancel,
         );
 

+ 1 - 0
local-cluster/src/validator_configs.rs

@@ -67,6 +67,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
         block_production_num_workers: config.block_production_num_workers,
         block_production_scheduler_config: config.block_production_scheduler_config.clone(),
         enable_block_production_forwarding: config.enable_block_production_forwarding,
+        enable_scheduler_bindings: config.enable_scheduler_bindings,
         generator_config: config.generator_config.clone(),
         use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
         wen_restart_proto_path: config.wen_restart_proto_path.clone(),

+ 3 - 0
multinode-demo/bootstrap-validator.sh

@@ -109,6 +109,9 @@ while [[ -n $1 ]]; do
     elif [[ $1 == --block-production-method ]]; then
       args+=("$1" "$2")
       shift 2
+    elif [[ $1 == --enable-scheduler-bindings ]]; then
+      args+=("$1")
+      shift
     elif [[ $1 == --transaction-structure ]]; then
       args+=("$1" "$2")
       shift 2

+ 3 - 0
multinode-demo/validator.sh

@@ -179,6 +179,9 @@ while [[ -n $1 ]]; do
     elif [[ $1 == --block-production-method ]]; then
       args+=("$1" "$2")
       shift 2
+    elif [[ $1 == --enable-scheduler-bindings ]]; then
+      args+=("$1")
+      shift
     elif [[ $1 == --transaction-structure ]]; then
       args+=("$1" "$2")
       shift 2

+ 1 - 1
scheduling-utils/src/handshake/shared.rs

@@ -1,6 +1,6 @@
 pub const MAX_WORKERS: usize = 64;
 
-pub(crate) const VERSION: u64 = 1;
+pub(crate) const VERSION: u64 = 2;
 pub(crate) const LOGON_SUCCESS: u8 = 0x01;
 pub(crate) const LOGON_FAILURE: u8 = 0x02;
 pub(crate) const MAX_ALLOCATOR_HANDLES: usize = 128;

+ 3 - 0
test-validator/src/lib.rs

@@ -136,6 +136,7 @@ pub struct TestValidatorGenesis {
     pub max_ledger_shreds: Option<u64>,
     pub max_genesis_archive_unpacked_size: Option<u64>,
     pub geyser_plugin_config_files: Option<Vec<PathBuf>>,
+    pub enable_scheduler_bindings: bool,
     deactivate_feature_set: HashSet<Pubkey>,
     compute_unit_limit: Option<u64>,
     pub log_messages_bytes_limit: Option<usize>,
@@ -171,6 +172,7 @@ impl Default for TestValidatorGenesis {
             max_ledger_shreds: Option::<u64>::default(),
             max_genesis_archive_unpacked_size: Option::<u64>::default(),
             geyser_plugin_config_files: Option::<Vec<PathBuf>>::default(),
+            enable_scheduler_bindings: false,
             deactivate_feature_set,
             compute_unit_limit: Option::<u64>::default(),
             log_messages_bytes_limit: Option::<usize>::default(),
@@ -1135,6 +1137,7 @@ impl TestValidator {
             staked_nodes_overrides: config.staked_nodes_overrides.clone(),
             accounts_db_config,
             runtime_config,
+            enable_scheduler_bindings: config.enable_scheduler_bindings,
             ..ValidatorConfig::default_for_test()
         };
         if let Some(ref tower_storage) = config.tower_storage {

+ 1 - 0
validator/src/bin/solana-test-validator.rs

@@ -406,6 +406,7 @@ fn main() {
     genesis.log_messages_bytes_limit = value_t!(matches, "log_messages_bytes_limit", usize).ok();
     genesis.transaction_account_lock_limit =
         value_t!(matches, "transaction_account_lock_limit", usize).ok();
+    genesis.enable_scheduler_bindings = matches.is_present("enable_scheduler_bindings");
 
     let tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone()));
 

+ 6 - 0
validator/src/cli.rs

@@ -866,6 +866,12 @@ pub fn test_app<'a>(version: &'a str, default_args: &'a DefaultTestArgs) -> App<
                 .multiple(true)
                 .help("Specify the configuration file for the Geyser plugin."),
         )
+        .arg(
+            Arg::with_name("enable_scheduler_bindings")
+                .long("enable-scheduler-bindings")
+                .takes_value(false)
+                .help("Enables external processes to connect and manage block production"),
+        )
         .arg(
             Arg::with_name("deactivate_feature")
                 .long("deactivate-feature")

+ 6 - 0
validator/src/commands/run/args.rs

@@ -1287,6 +1287,12 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a,
                  method",
             ),
     )
+    .arg(
+        Arg::with_name("enable_scheduler_bindings")
+            .long("enable-scheduler-bindings")
+            .takes_value(false)
+            .help("Enables external processes to connect and manage block production"),
+    )
     .arg(
         Arg::with_name("unified_scheduler_handler_threads")
             .long("unified-scheduler-handler-threads")

+ 1 - 0
validator/src/commands/run/execute.rs

@@ -637,6 +637,7 @@ pub fn execute(
             ),
         },
         enable_block_production_forwarding: staked_nodes_overrides_path.is_some(),
+        enable_scheduler_bindings: matches.is_present("enable_scheduler_bindings"),
         banking_trace_dir_byte_limit: parse_banking_trace_dir_byte_limit(matches),
         validator_exit: Arc::new(RwLock::new(Exit::default())),
         validator_exit_backpressure: [(