Selaa lähdekoodia

admin-rpc: manage-block-production (#7625)

Andrew Fitzgerald 2 kuukautta sitten
vanhempi
sitoutus
04865dba26

+ 2 - 0
core/src/admin_rpc_post_init.rs

@@ -1,5 +1,6 @@
 use {
     crate::{
+        banking_stage::BankingStage,
         cluster_slots_service::cluster_slots::ClusterSlots,
         repair::{outstanding_requests::OutstandingRequests, serve_repair::ShredRepairType},
     },
@@ -79,4 +80,5 @@ pub struct AdminRpcRequestMetadataPostInit {
     pub outstanding_repair_requests: Arc<RwLock<OutstandingRequests<ShredRepairType>>>,
     pub cluster_slots: Arc<ClusterSlots>,
     pub node: Option<Arc<NodeMultihoming>>,
+    pub banking_stage: Arc<RwLock<Option<BankingStage>>>,
 }

+ 52 - 14
core/src/banking_stage.rs

@@ -404,11 +404,14 @@ impl BankingStage {
             committer,
             log_messages_bytes_limit,
         };
-        let non_vote_thread_hdls = Self::new_central_scheduler(
+        // + 1 for the scheduler thread
+        let mut non_vote_thread_hdls = Vec::with_capacity(num_workers.get() + 1);
+        Self::new_central_scheduler(
+            &mut non_vote_thread_hdls,
             transaction_struct,
             use_greedy_scheduler,
             num_workers,
-            non_vote_context.clone(),
+            &non_vote_context,
         );
 
         Self {
@@ -418,12 +421,48 @@ impl BankingStage {
         }
     }
 
+    pub fn spawn_non_vote_threads(
+        &mut self,
+        transaction_struct: TransactionStructure,
+        block_production_method: BlockProductionMethod,
+        num_workers: NonZeroUsize,
+    ) -> thread::Result<()> {
+        if let Some(context) = self.non_vote_context.as_ref() {
+            info!("Shutting down banking stage non-vote threads");
+            context.non_vote_exit_signal.store(true, Ordering::Relaxed);
+            for bank_thread_hdl in self.non_vote_thread_hdls.drain(..) {
+                bank_thread_hdl.join()?;
+            }
+
+            info!(
+                "Spawning new banking stage non-vote threads with block-production-method: {:?} \
+                transaction-structure: {:?} num-workers: {}",
+                block_production_method, transaction_struct, num_workers
+            );
+            context.non_vote_exit_signal.store(false, Ordering::Relaxed);
+            Self::new_central_scheduler(
+                &mut self.non_vote_thread_hdls,
+                transaction_struct,
+                matches!(
+                    block_production_method,
+                    BlockProductionMethod::CentralSchedulerGreedy
+                ),
+                num_workers,
+                context,
+            )
+        }
+
+        Ok(())
+    }
+
     fn new_central_scheduler(
+        non_vote_thread_hdls: &mut Vec<JoinHandle<()>>,
         transaction_struct: TransactionStructure,
         use_greedy_scheduler: bool,
         num_workers: NonZeroUsize,
-        context: BankingStageNonVoteContext,
-    ) -> Vec<JoinHandle<()>> {
+        context: &BankingStageNonVoteContext,
+    ) {
+        assert!(non_vote_thread_hdls.is_empty());
         match transaction_struct {
             TransactionStructure::Sdk => {
                 let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new(
@@ -431,6 +470,7 @@ impl BankingStage {
                     context.bank_forks.clone(),
                 );
                 Self::spawn_scheduler_and_workers(
+                    non_vote_thread_hdls,
                     receive_and_buffer,
                     use_greedy_scheduler,
                     num_workers,
@@ -443,6 +483,7 @@ impl BankingStage {
                     bank_forks: context.bank_forks.clone(),
                 };
                 Self::spawn_scheduler_and_workers(
+                    non_vote_thread_hdls,
                     receive_and_buffer,
                     use_greedy_scheduler,
                     num_workers,
@@ -453,19 +494,17 @@ impl BankingStage {
     }
 
     fn spawn_scheduler_and_workers<R: ReceiveAndBuffer + Send + Sync + 'static>(
+        non_vote_thread_hdls: &mut Vec<JoinHandle<()>>,
         receive_and_buffer: R,
         use_greedy_scheduler: bool,
         num_workers: NonZeroUsize,
-        context: BankingStageNonVoteContext,
-    ) -> Vec<JoinHandle<()>> {
+        context: &BankingStageNonVoteContext,
+    ) {
         assert!(num_workers <= BankingStage::max_num_workers());
         let num_workers = num_workers.get();
 
         let exit = context.non_vote_exit_signal.clone();
 
-        // + 1 for scheduler thread
-        let mut thread_hdls = Vec::with_capacity(num_workers + 1);
-
         // Create channels for communication between scheduler and workers
         let (work_senders, work_receivers): (Vec<Sender<_>>, Vec<Receiver<_>>) =
             (0..num_workers).map(|_| unbounded()).unzip();
@@ -491,7 +530,7 @@ impl BankingStage {
             );
 
             worker_metrics.push(consume_worker.metrics_handle());
-            thread_hdls.push(
+            non_vote_thread_hdls.push(
                 Builder::new()
                     .name(format!("solCoWorker{id:02}"))
                     .spawn(move || {
@@ -507,7 +546,8 @@ impl BankingStage {
         macro_rules! spawn_scheduler {
             ($scheduler:ident) => {
                 let exit = exit.clone();
-                thread_hdls.push(
+                let bank_forks = context.bank_forks.clone();
+                non_vote_thread_hdls.push(
                     Builder::new()
                         .name("solBnkTxSched".to_string())
                         .spawn(move || {
@@ -515,7 +555,7 @@ impl BankingStage {
                                 exit,
                                 decision_maker,
                                 receive_and_buffer,
-                                context.bank_forks.clone(),
+                                bank_forks,
                                 $scheduler,
                                 worker_metrics,
                             );
@@ -549,8 +589,6 @@ impl BankingStage {
             );
             spawn_scheduler!(scheduler);
         }
-
-        thread_hdls
     }
 
     fn spawn_vote_worker(

+ 12 - 3
core/src/tpu.rs

@@ -99,7 +99,7 @@ pub struct Tpu {
     fetch_stage: FetchStage,
     sig_verifier: SigVerifier,
     vote_sigverify_stage: SigVerifyStage,
-    banking_stage: BankingStage,
+    banking_stage: Arc<RwLock<Option<BankingStage>>>,
     forwarding_stage: JoinHandle<()>,
     cluster_info_vote_listener: ClusterInfoVoteListener,
     broadcast_stage: BroadcastStage,
@@ -389,7 +389,7 @@ impl Tpu {
             fetch_stage,
             sig_verifier,
             vote_sigverify_stage,
-            banking_stage,
+            banking_stage: Arc::new(RwLock::new(Some(banking_stage))),
             forwarding_stage,
             cluster_info_vote_listener,
             broadcast_stage,
@@ -402,13 +402,22 @@ impl Tpu {
         }
     }
 
+    pub fn banking_stage(&self) -> Arc<RwLock<Option<BankingStage>>> {
+        self.banking_stage.clone()
+    }
+
     pub fn join(self) -> thread::Result<()> {
         let results = vec![
             self.fetch_stage.join(),
             self.sig_verifier.join(),
             self.vote_sigverify_stage.join(),
             self.cluster_info_vote_listener.join(),
-            self.banking_stage.join(),
+            self.banking_stage
+                .write()
+                .unwrap()
+                .take()
+                .expect("banking_stage must be Some")
+                .join(),
             self.forwarding_stage.join(),
             self.staked_nodes_updater_service.join(),
             self.tpu_quic_t.map_or(Ok(()), |t| t.join()),

+ 29 - 2
core/src/validator.rs

@@ -183,8 +183,21 @@ impl BlockVerificationMethod {
     }
 }
 
-#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
+#[derive(
+    Clone,
+    Debug,
+    EnumString,
+    EnumVariantNames,
+    Default,
+    IntoStaticStr,
+    Display,
+    Serialize,
+    Deserialize,
+    PartialEq,
+    Eq,
+)]
 #[strum(serialize_all = "kebab-case")]
+#[serde(rename_all = "kebab-case")]
 pub enum BlockProductionMethod {
     CentralScheduler,
     #[default]
@@ -201,8 +214,21 @@ impl BlockProductionMethod {
     }
 }
 
-#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
+#[derive(
+    Clone,
+    Debug,
+    EnumString,
+    EnumVariantNames,
+    Default,
+    IntoStaticStr,
+    Display,
+    Serialize,
+    Deserialize,
+    PartialEq,
+    Eq,
+)]
 #[strum(serialize_all = "kebab-case")]
+#[serde(rename_all = "kebab-case")]
 pub enum TransactionStructure {
     Sdk,
     #[default]
@@ -1685,6 +1711,7 @@ impl Validator {
             outstanding_repair_requests,
             cluster_slots,
             node: Some(node_multihoming),
+            banking_stage: tpu.banking_stage(),
         });
 
         Ok(Self {

+ 48 - 1
validator/src/admin_rpc_service.rs

@@ -11,9 +11,10 @@ use {
     solana_accounts_db::accounts_index::AccountIndex,
     solana_core::{
         admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
+        banking_stage::BankingStage,
         consensus::{tower_storage::TowerStorage, Tower},
         repair::repair_service,
-        validator::ValidatorStartProgress,
+        validator::{BlockProductionMethod, TransactionStructure, ValidatorStartProgress},
     },
     solana_geyser_plugin_manager::GeyserPluginManagerRequest,
     solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
@@ -28,6 +29,7 @@ use {
         env, error,
         fmt::{self, Display},
         net::{IpAddr, SocketAddr},
+        num::NonZeroUsize,
         path::{Path, PathBuf},
         sync::{
             atomic::{AtomicBool, Ordering},
@@ -259,6 +261,15 @@ pub trait AdminRpc {
         meta: Self::Metadata,
         public_tpu_forwards_addr: SocketAddr,
     ) -> Result<()>;
+
+    #[rpc(meta, name = "manageBlockProduction")]
+    fn manage_block_production(
+        &self,
+        meta: Self::Metadata,
+        block_production_method: BlockProductionMethod,
+        transaction_struct: TransactionStructure,
+        num_workers: NonZeroUsize,
+    ) -> Result<()>;
 }
 
 pub struct AdminRpcImpl;
@@ -739,6 +750,41 @@ impl AdminRpc for AdminRpcImpl {
             Ok(())
         })
     }
+
+    fn manage_block_production(
+        &self,
+        meta: Self::Metadata,
+        block_production_method: BlockProductionMethod,
+        transaction_struct: TransactionStructure,
+        num_workers: NonZeroUsize,
+    ) -> Result<()> {
+        debug!("manage_block_production rpc request received");
+
+        if num_workers > BankingStage::max_num_workers() {
+            return Err(jsonrpc_core::error::Error::invalid_params(format!(
+                "Number of workers ({}) exceeds maximum allowed ({})",
+                num_workers,
+                BankingStage::max_num_workers()
+            )));
+        }
+
+        meta.with_post_init(|post_init| {
+            let mut banking_stage = post_init.banking_stage.write().unwrap();
+            let Some(banking_stage) = banking_stage.as_mut() else {
+                error!("banking stage is not initialized");
+                return Err(jsonrpc_core::error::Error::internal_error());
+            };
+
+            banking_stage
+                .spawn_non_vote_threads(transaction_struct, block_production_method, num_workers)
+                .map_err(|err| {
+                    error!("Failed to spawn new non-vote threads: {err:?}");
+                    jsonrpc_core::error::Error::internal_error()
+                })?;
+
+            Ok(())
+        })
+    }
 }
 
 impl AdminRpcImpl {
@@ -1025,6 +1071,7 @@ mod tests {
                         solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default(),
                     ),
                     node: None,
+                    banking_stage: Arc::new(RwLock::new(None)),
                 }))),
                 staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
                 rpc_to_plugin_manager_sender: None,

+ 2 - 1
validator/src/cli.rs

@@ -73,7 +73,8 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
         .subcommand(commands::set_log_filter::command())
         .subcommand(commands::staked_nodes_overrides::command())
         .subcommand(commands::wait_for_restart_window::command())
-        .subcommand(commands::set_public_address::command());
+        .subcommand(commands::set_public_address::command())
+        .subcommand(commands::manage_block_production::command());
 
     commands::run::add_args(app, default_args)
         .args(&thread_args(&default_args.thread_args))

+ 140 - 0
validator/src/commands/manage_block_production/mod.rs

@@ -0,0 +1,140 @@
+use {
+    crate::{
+        admin_rpc_service,
+        commands::{FromClapArgMatches, Result},
+    },
+    clap::{value_t, App, Arg, ArgMatches, SubCommand},
+    solana_core::{
+        banking_stage::BankingStage,
+        validator::{BlockProductionMethod, TransactionStructure},
+    },
+    std::{num::NonZeroUsize, path::Path},
+};
+
+const COMMAND: &str = "manage-block-production";
+
+#[derive(Debug, PartialEq)]
+pub struct ManageBlockProductionArgs {
+    pub block_production_method: BlockProductionMethod,
+    pub transaction_structure: TransactionStructure,
+    pub num_workers: NonZeroUsize,
+}
+
+impl FromClapArgMatches for ManageBlockProductionArgs {
+    fn from_clap_arg_match(matches: &ArgMatches) -> Result<Self> {
+        Ok(ManageBlockProductionArgs {
+            block_production_method: value_t!(
+                matches,
+                "block_production_method",
+                BlockProductionMethod
+            )
+            .unwrap_or_default(),
+            transaction_structure: value_t!(matches, "transaction_struct", TransactionStructure)
+                .unwrap_or_default(),
+            num_workers: value_t!(matches, "block_production_num_workers", NonZeroUsize)
+                .unwrap_or(BankingStage::default_num_workers()),
+        })
+    }
+}
+
+pub fn command<'a>() -> App<'a, 'a> {
+    SubCommand::with_name(COMMAND)
+        .about("Manage block production")
+        .arg(
+            Arg::with_name("block_production_method")
+                .long("block-production-method")
+                .alias("method")
+                .value_name("METHOD")
+                .takes_value(true)
+                .possible_values(BlockProductionMethod::cli_names())
+                .default_value(BlockProductionMethod::default().into())
+                .help(BlockProductionMethod::cli_message()),
+        )
+        .arg(
+            Arg::with_name("transaction_struct")
+                .long("transaction-structure")
+                .alias("struct")
+                .value_name("STRUCT")
+                .takes_value(true)
+                .possible_values(TransactionStructure::cli_names())
+                .default_value(TransactionStructure::default().into())
+                .help(TransactionStructure::cli_message()),
+        )
+        .arg(
+            Arg::with_name("block_production_num_workers")
+                .long("block-production-num-workers")
+                .alias("num-workers")
+                .value_name("NUM")
+                .takes_value(true)
+                .help("Number of worker threads to use for block production"),
+        )
+}
+
+pub fn execute(matches: &ArgMatches, ledger_path: &Path) -> Result<()> {
+    let manage_block_production_args = ManageBlockProductionArgs::from_clap_arg_match(matches)?;
+
+    println!(
+        "Respawning block-production threads with method: {}, transaction structure: {} num_workers: {}",
+        manage_block_production_args.block_production_method,
+        manage_block_production_args.transaction_structure,
+        manage_block_production_args.num_workers,
+    );
+    let admin_client = admin_rpc_service::connect(ledger_path);
+    admin_rpc_service::runtime().block_on(async move {
+        admin_client
+            .await?
+            .manage_block_production(
+                manage_block_production_args.block_production_method,
+                manage_block_production_args.transaction_structure,
+                manage_block_production_args.num_workers,
+            )
+            .await
+    })?;
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn verify_args_struct_by_command_manage_block_production_default() {
+        let app = command();
+        let matches = app.get_matches_from(vec![COMMAND]);
+        let args = ManageBlockProductionArgs::from_clap_arg_match(&matches).unwrap();
+
+        assert_eq!(
+            args,
+            ManageBlockProductionArgs {
+                block_production_method: BlockProductionMethod::default(),
+                transaction_structure: TransactionStructure::default(),
+                num_workers: BankingStage::default_num_workers(),
+            }
+        );
+    }
+
+    #[test]
+    fn verify_args_struct_by_command_manage_block_production_with_args() {
+        let app = command();
+        let matches = app.get_matches_from(vec![
+            COMMAND,
+            "--block-production-method",
+            "central-scheduler",
+            "--transaction-structure",
+            "sdk",
+            "--block-production-num-workers",
+            "4",
+        ]);
+        let args = ManageBlockProductionArgs::from_clap_arg_match(&matches).unwrap();
+
+        assert_eq!(
+            args,
+            ManageBlockProductionArgs {
+                block_production_method: BlockProductionMethod::CentralScheduler,
+                transaction_structure: TransactionStructure::Sdk,
+                num_workers: NonZeroUsize::new(4).unwrap(),
+            }
+        );
+    }
+}

+ 1 - 0
validator/src/commands/mod.rs

@@ -1,6 +1,7 @@
 pub mod authorized_voter;
 pub mod contact_info;
 pub mod exit;
+pub mod manage_block_production;
 pub mod monitor;
 pub mod plugin;
 pub mod repair_shred_from_peer;

+ 3 - 0
validator/src/main.rs

@@ -74,6 +74,9 @@ pub fn main() {
         ("set-public-address", Some(subcommand_matches)) => {
             commands::set_public_address::execute(subcommand_matches, &ledger_path)
         }
+        ("manage-block-production", Some(subcommand_matches)) => {
+            commands::manage_block_production::execute(subcommand_matches, &ledger_path)
+        }
         _ => unreachable!(),
     }
     .unwrap_or_else(|err| {