Răsfoiți Sursa

BankingStage: shutdown and respawn vote thread (#7890)

Co-authored-by: Tao Zhu <82401714+tao-stones@users.noreply.github.com>
Andrew Fitzgerald 2 luni în urmă
părinte
comite
8d9fe7445c

+ 49 - 60
core/src/banking_stage.rs

@@ -339,10 +339,9 @@ pub struct BatchedTransactionErrorDetails {
 }
 
 pub struct BankingStage {
-    vote_thread_hdl: JoinHandle<()>,
     // Only None during final join of BankingStage.
-    non_vote_context: Option<BankingStageNonVoteContext>,
-    non_vote_thread_hdls: Vec<JoinHandle<()>>,
+    context: Option<BankingStageContext>,
+    thread_hdls: Vec<JoinHandle<()>>,
 }
 
 pub trait LikeClusterInfo: Send + Sync + 'static + Clone {
@@ -383,23 +382,11 @@ impl BankingStage {
             replay_vote_sender,
             prioritization_fee_cache,
         );
-        let vote_thread_hdl = Self::spawn_vote_worker(
+
+        let context = BankingStageContext {
+            exit_signal: Arc::new(AtomicBool::new(false)),
             tpu_vote_receiver,
             gossip_vote_receiver,
-            transaction_recorder.clone(),
-            poh_recorder.clone(),
-            bank_forks.clone(),
-            committer.clone(),
-            log_messages_bytes_limit,
-            VoteStorage::new(&bank_forks.read().unwrap().working_bank()),
-        );
-
-        let use_greedy_scheduler = matches!(
-            block_production_method,
-            BlockProductionMethod::CentralSchedulerGreedy
-        );
-        let non_vote_context = BankingStageNonVoteContext {
-            non_vote_exit_signal: Arc::new(AtomicBool::new(false)),
             non_vote_receiver,
             transaction_recorder,
             poh_recorder,
@@ -407,44 +394,52 @@ impl BankingStage {
             committer,
             log_messages_bytes_limit,
         };
+        // + 1 for vote worker
         // + 1 for the scheduler thread
-        let mut non_vote_thread_hdls = Vec::with_capacity(num_workers.get() + 1);
+        let mut thread_hdls = Vec::with_capacity(num_workers.get() + 2);
+        thread_hdls.push(Self::spawn_vote_worker(&context));
+
+        let use_greedy_scheduler = matches!(
+            block_production_method,
+            BlockProductionMethod::CentralSchedulerGreedy
+        );
+
         Self::new_central_scheduler(
-            &mut non_vote_thread_hdls,
+            &mut thread_hdls,
             transaction_struct,
             use_greedy_scheduler,
             num_workers,
-            &non_vote_context,
+            &context,
         );
 
         Self {
-            vote_thread_hdl,
-            non_vote_context: Some(non_vote_context),
-            non_vote_thread_hdls,
+            context: Some(context),
+            thread_hdls,
         }
     }
 
-    pub fn spawn_non_vote_threads(
+    pub fn spawn_threads(
         &mut self,
         transaction_struct: TransactionStructure,
         block_production_method: BlockProductionMethod,
         num_workers: NonZeroUsize,
     ) -> thread::Result<()> {
-        if let Some(context) = self.non_vote_context.as_ref() {
-            info!("Shutting down banking stage non-vote threads");
-            context.non_vote_exit_signal.store(true, Ordering::Relaxed);
-            for bank_thread_hdl in self.non_vote_thread_hdls.drain(..) {
+        if let Some(context) = self.context.as_ref() {
+            info!("Shutting down banking stage threads");
+            context.exit_signal.store(true, Ordering::Relaxed);
+            for bank_thread_hdl in self.thread_hdls.drain(..) {
                 bank_thread_hdl.join()?;
             }
 
             info!(
-                "Spawning new banking stage non-vote threads with block-production-method: \
+                "Spawning new banking stage threads with block-production-method: \
                  {block_production_method:?} transaction-structure: {transaction_struct:?} \
                  num-workers: {num_workers}"
             );
-            context.non_vote_exit_signal.store(false, Ordering::Relaxed);
+            context.exit_signal.store(false, Ordering::Relaxed);
+            self.thread_hdls.push(Self::spawn_vote_worker(context));
             Self::new_central_scheduler(
-                &mut self.non_vote_thread_hdls,
+                &mut self.thread_hdls,
                 transaction_struct,
                 matches!(
                     block_production_method,
@@ -463,9 +458,8 @@ impl BankingStage {
         transaction_struct: TransactionStructure,
         use_greedy_scheduler: bool,
         num_workers: NonZeroUsize,
-        context: &BankingStageNonVoteContext,
+        context: &BankingStageContext,
     ) {
-        assert!(non_vote_thread_hdls.is_empty());
         match transaction_struct {
             TransactionStructure::Sdk => {
                 let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new(
@@ -501,12 +495,12 @@ impl BankingStage {
         receive_and_buffer: R,
         use_greedy_scheduler: bool,
         num_workers: NonZeroUsize,
-        context: &BankingStageNonVoteContext,
+        context: &BankingStageContext,
     ) {
         assert!(num_workers <= BankingStage::max_num_workers());
         let num_workers = num_workers.get();
 
-        let exit = context.non_vote_exit_signal.clone();
+        let exit = context.exit_signal.clone();
 
         // Create channels for communication between scheduler and workers
         let (work_senders, work_receivers): (Vec<Sender<_>>, Vec<Receiver<_>>) =
@@ -594,30 +588,25 @@ impl BankingStage {
         }
     }
 
-    fn spawn_vote_worker(
-        tpu_receiver: BankingPacketReceiver,
-        gossip_receiver: BankingPacketReceiver,
-        transaction_recorder: TransactionRecorder,
-        poh_recorder: Arc<RwLock<PohRecorder>>,
-        bank_forks: Arc<RwLock<BankForks>>,
-        committer: Committer,
-        log_messages_bytes_limit: Option<usize>,
-        vote_storage: VoteStorage,
-    ) -> JoinHandle<()> {
-        let tpu_receiver = PacketReceiver::new(tpu_receiver);
-        let gossip_receiver = PacketReceiver::new(gossip_receiver);
+    fn spawn_vote_worker(context: &BankingStageContext) -> JoinHandle<()> {
+        let vote_storage = VoteStorage::new(&context.bank_forks.read().unwrap().working_bank());
+        let tpu_receiver = PacketReceiver::new(context.tpu_vote_receiver.clone());
+        let gossip_receiver = PacketReceiver::new(context.gossip_vote_receiver.clone());
         let consumer = Consumer::new(
-            committer,
-            transaction_recorder,
+            context.committer.clone(),
+            context.transaction_recorder.clone(),
             QosService::new(0),
-            log_messages_bytes_limit,
+            context.log_messages_bytes_limit,
         );
-        let decision_maker = DecisionMaker::from(poh_recorder.read().unwrap().deref());
+        let decision_maker = DecisionMaker::from(context.poh_recorder.read().unwrap().deref());
 
+        let exit_signal = context.exit_signal.clone();
+        let bank_forks = context.bank_forks.clone();
         Builder::new()
             .name("solBanknStgVote".to_string())
             .spawn(move || {
                 VoteWorker::new(
+                    exit_signal,
                     decision_maker,
                     tpu_receiver,
                     gossip_receiver,
@@ -639,24 +628,24 @@ impl BankingStage {
     }
 
     pub fn join(mut self) -> thread::Result<()> {
-        self.vote_thread_hdl.join()?;
-
-        self.non_vote_context
+        self.context
             .take()
             .expect("non-vote context must be Some")
-            .non_vote_exit_signal
+            .exit_signal
             .store(true, Ordering::Relaxed);
-        for bank_thread_hdl in self.non_vote_thread_hdls {
+        for bank_thread_hdl in self.thread_hdls {
             bank_thread_hdl.join()?;
         }
         Ok(())
     }
 }
 
-// Context for spawning non-vote threads in the banking stage.
+// Context for spawning threads in the banking stage.
 #[derive(Clone)]
-struct BankingStageNonVoteContext {
-    non_vote_exit_signal: Arc<AtomicBool>,
+struct BankingStageContext {
+    exit_signal: Arc<AtomicBool>,
+    tpu_vote_receiver: BankingPacketReceiver,
+    gossip_vote_receiver: BankingPacketReceiver,
     non_vote_receiver: BankingPacketReceiver,
     transaction_recorder: TransactionRecorder,
     poh_recorder: Arc<RwLock<PohRecorder>>,

+ 8 - 2
core/src/banking_stage/vote_worker.rs

@@ -31,7 +31,10 @@ use {
     solana_transaction::sanitized::SanitizedTransaction,
     solana_transaction_error::TransactionError,
     std::{
-        sync::{atomic::Ordering, Arc, RwLock},
+        sync::{
+            atomic::{AtomicBool, Ordering},
+            Arc, RwLock,
+        },
         time::Instant,
     },
 };
@@ -46,6 +49,7 @@ mod transaction {
 pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 16;
 
 pub struct VoteWorker {
+    exit: Arc<AtomicBool>,
     decision_maker: DecisionMaker,
     tpu_receiver: PacketReceiver,
     gossip_receiver: PacketReceiver,
@@ -56,6 +60,7 @@ pub struct VoteWorker {
 
 impl VoteWorker {
     pub fn new(
+        exit: Arc<AtomicBool>,
         decision_maker: DecisionMaker,
         tpu_receiver: PacketReceiver,
         gossip_receiver: PacketReceiver,
@@ -64,6 +69,7 @@ impl VoteWorker {
         consumer: Consumer,
     ) -> Self {
         Self {
+            exit,
             decision_maker,
             tpu_receiver,
             gossip_receiver,
@@ -79,7 +85,7 @@ impl VoteWorker {
 
         let mut last_metrics_update = Instant::now();
 
-        loop {
+        while !self.exit.load(Ordering::Relaxed) {
             if !self.storage.is_empty()
                 || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD
             {

+ 1 - 1
validator/src/admin_rpc_service.rs

@@ -776,7 +776,7 @@ impl AdminRpc for AdminRpcImpl {
             };
 
             banking_stage
-                .spawn_non_vote_threads(transaction_struct, block_production_method, num_workers)
+                .spawn_threads(transaction_struct, block_production_method, num_workers)
                 .map_err(|err| {
                     error!("Failed to spawn new non-vote threads: {err:?}");
                     jsonrpc_core::error::Error::internal_error()