Browse Source

fix: use atomic to check if leader bank changed (#4596)

* fix: use atomic to check if leader bank changed

* fix default value and tests

* ordering update

* eedback

* add comment
Justin Starry 10 months ago
parent
commit
4cc49ac80a
2 changed files with 51 additions and 3 deletions
  1. 9 1
      core/src/banking_stage/consume_worker.rs
  2. 42 2
      poh/src/leader_bank_notifier.rs

+ 9 - 1
core/src/banking_stage/consume_worker.rs

@@ -81,7 +81,10 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
             .fetch_add(get_bank_us, Ordering::Relaxed);
 
         for work in try_drain_iter(work, &self.consume_receiver) {
-            if bank.is_complete() {
+            if bank.is_complete() || {
+                // check if the bank got interrupted before completion
+                self.get_consume_bank_id() != Some(bank.bank_id())
+            } {
                 let (maybe_new_bank, get_bank_us) = measure_us!(self.get_consume_bank());
                 if let Some(new_bank) = maybe_new_bank {
                     self.metrics
@@ -134,6 +137,11 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
             .upgrade()
     }
 
+    /// Try to get the id for the bank that should be used for consuming
+    fn get_consume_bank_id(&self) -> Option<u64> {
+        self.leader_bank_notifier.get_current_bank_id()
+    }
+
     /// Retry current batch and all outstanding batches.
     fn retry_drain(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
         for work in try_drain_iter(work, &self.consume_receiver) {

+ 42 - 2
poh/src/leader_bank_notifier.rs

@@ -2,20 +2,38 @@ use {
     solana_clock::Slot,
     solana_runtime::bank::Bank,
     std::{
-        sync::{Arc, Condvar, Mutex, MutexGuard, Weak},
+        sync::{
+            atomic::{AtomicU64, Ordering},
+            Arc, Condvar, Mutex, MutexGuard, Weak,
+        },
         time::{Duration, Instant},
     },
 };
 
+const STAND_BY_SENTINEL_ID: u64 = u64::MAX;
+
 /// Tracks leader status of the validator node and notifies when:
 ///     1. A leader bank initiates (=PoH-initiated)
 ///     2. A leader slot completes (=PoH-completed)
-#[derive(Debug, Default)]
+#[derive(Debug)]
 pub struct LeaderBankNotifier {
     /// Current state (slot, bank, and status) of the system
     state: Mutex<SlotAndBankWithStatus>,
     /// CondVar to notify status changes and waiting
     condvar: Condvar,
+    /// Lightweight atomic variable that can be used to check the id of the
+    /// latest leader bank
+    current_bank_id: AtomicU64,
+}
+
+impl Default for LeaderBankNotifier {
+    fn default() -> Self {
+        Self {
+            state: Mutex::default(),
+            condvar: Condvar::default(),
+            current_bank_id: AtomicU64::new(STAND_BY_SENTINEL_ID),
+        }
+    }
 }
 
 /// Leader status state machine for the validator.
@@ -43,6 +61,8 @@ impl LeaderBankNotifier {
         let mut state = self.state.lock().unwrap();
         assert_eq!(state.status, Status::StandBy);
 
+        self.current_bank_id
+            .store(bank.bank_id(), Ordering::Relaxed);
         *state = SlotAndBankWithStatus {
             status: Status::InProgress,
             slot: Some(bank.slot()),
@@ -61,12 +81,26 @@ impl LeaderBankNotifier {
         assert_eq!(state.status, Status::InProgress);
         assert_eq!(state.slot, Some(slot));
 
+        self.current_bank_id
+            .store(STAND_BY_SENTINEL_ID, Ordering::Relaxed);
         state.status = Status::StandBy;
         drop(state);
 
         self.condvar.notify_all();
     }
 
+    /// Fetch the bank id of the bank inside the mutex wrapped state field. Due
+    /// to the usage of relaxed ordering, this is not a guarantee that the
+    /// caller thread will see the updated bank in the mutex wrapped state yet.
+    pub fn get_current_bank_id(&self) -> Option<u64> {
+        let current_bank_id = self.current_bank_id.load(Ordering::Relaxed);
+        if current_bank_id == STAND_BY_SENTINEL_ID {
+            None
+        } else {
+            Some(current_bank_id)
+        }
+    }
+
     /// If the status is `InProgress`, immediately return a weak reference to the bank.
     /// Otherwise, wait up to the `timeout` for the status to become `InProgress`.
     /// If the timeout is reached, the weak reference is unupgradable.
@@ -124,6 +158,7 @@ mod tests {
     fn test_leader_bank_notifier_default() {
         let leader_bank_notifier = LeaderBankNotifier::default();
         let state = leader_bank_notifier.state.lock().unwrap();
+        assert_eq!(leader_bank_notifier.get_current_bank_id(), None);
         assert_eq!(state.status, Status::StandBy);
         assert_eq!(state.slot, None);
         assert!(state.bank.upgrade().is_none());
@@ -145,6 +180,10 @@ mod tests {
         leader_bank_notifier.set_in_progress(&bank);
 
         let state = leader_bank_notifier.state.lock().unwrap();
+        assert_eq!(
+            leader_bank_notifier.get_current_bank_id(),
+            Some(bank.bank_id())
+        );
         assert_eq!(state.status, Status::InProgress);
         assert_eq!(state.slot, Some(bank.slot()));
         assert_eq!(state.bank.upgrade(), Some(bank));
@@ -184,6 +223,7 @@ mod tests {
         leader_bank_notifier.set_completed(bank.slot());
 
         let state = leader_bank_notifier.state.lock().unwrap();
+        assert_eq!(leader_bank_notifier.get_current_bank_id(), None);
         assert_eq!(state.status, Status::StandBy);
         assert_eq!(state.slot, Some(bank.slot()));
         assert_eq!(state.bank.upgrade(), Some(bank));