Преглед изворни кода

PohRecorder: SharedWorkingBank (#7280)

Andrew Fitzgerald пре 3 месеци
родитељ
комит
dc0f51a3c7

+ 1 - 0
Cargo.lock

@@ -9379,6 +9379,7 @@ dependencies = [
 name = "solana-poh"
 name = "solana-poh"
 version = "3.0.0"
 version = "3.0.0"
 dependencies = [
 dependencies = [
+ "arc-swap",
  "assert_matches",
  "assert_matches",
  "bincode",
  "bincode",
  "core_affinity",
  "core_affinity",

+ 1 - 1
core/src/banking_stage.rs

@@ -549,7 +549,7 @@ impl BankingStage {
                     log_messages_bytes_limit,
                     log_messages_bytes_limit,
                 ),
                 ),
                 finished_work_sender.clone(),
                 finished_work_sender.clone(),
-                poh_recorder.read().unwrap().new_leader_bank_notifier(),
+                poh_recorder.read().unwrap().shared_working_bank(),
             );
             );
 
 
             worker_metrics.push(consume_worker.metrics_handle());
             worker_metrics.push(consume_worker.metrics_handle());

+ 27 - 18
core/src/banking_stage/consume_worker.rs

@@ -6,7 +6,7 @@ use {
     },
     },
     crossbeam_channel::{Receiver, RecvError, SendError, Sender},
     crossbeam_channel::{Receiver, RecvError, SendError, Sender},
     solana_measure::measure_us,
     solana_measure::measure_us,
-    solana_poh::leader_bank_notifier::LeaderBankNotifier,
+    solana_poh::poh_recorder::SharedWorkingBank,
     solana_runtime::bank::Bank,
     solana_runtime::bank::Bank,
     solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
     solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
     solana_svm::transaction_error_metrics::TransactionErrorMetrics,
     solana_svm::transaction_error_metrics::TransactionErrorMetrics,
@@ -16,7 +16,7 @@ use {
             atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
             atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
             Arc,
             Arc,
         },
         },
-        time::Duration,
+        time::{Duration, Instant},
     },
     },
     thiserror::Error,
     thiserror::Error,
 };
 };
@@ -34,7 +34,7 @@ pub(crate) struct ConsumeWorker<Tx> {
     consumer: Consumer,
     consumer: Consumer,
     consumed_sender: Sender<FinishedConsumeWork<Tx>>,
     consumed_sender: Sender<FinishedConsumeWork<Tx>>,
 
 
-    leader_bank_notifier: Arc<LeaderBankNotifier>,
+    shared_working_bank: SharedWorkingBank,
     metrics: Arc<ConsumeWorkerMetrics>,
     metrics: Arc<ConsumeWorkerMetrics>,
 }
 }
 
 
@@ -44,13 +44,13 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
         consume_receiver: Receiver<ConsumeWork<Tx>>,
         consume_receiver: Receiver<ConsumeWork<Tx>>,
         consumer: Consumer,
         consumer: Consumer,
         consumed_sender: Sender<FinishedConsumeWork<Tx>>,
         consumed_sender: Sender<FinishedConsumeWork<Tx>>,
-        leader_bank_notifier: Arc<LeaderBankNotifier>,
+        shared_working_bank: SharedWorkingBank,
     ) -> Self {
     ) -> Self {
         Self {
         Self {
             consume_receiver,
             consume_receiver,
             consumer,
             consumer,
             consumed_sender,
             consumed_sender,
-            leader_bank_notifier,
+            shared_working_bank,
             metrics: Arc::new(ConsumeWorkerMetrics::new(id)),
             metrics: Arc::new(ConsumeWorkerMetrics::new(id)),
         }
         }
     }
     }
@@ -67,7 +67,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
     }
     }
 
 
     fn consume_loop(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
     fn consume_loop(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
-        let (maybe_consume_bank, get_bank_us) = measure_us!(self.get_consume_bank());
+        let (maybe_consume_bank, get_bank_us) = measure_us!(self.working_bank_with_timeout());
         let Some(mut bank) = maybe_consume_bank else {
         let Some(mut bank) = maybe_consume_bank else {
             self.metrics
             self.metrics
                 .timing_metrics
                 .timing_metrics
@@ -82,10 +82,12 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
 
 
         for work in try_drain_iter(work, &self.consume_receiver) {
         for work in try_drain_iter(work, &self.consume_receiver) {
             if bank.is_complete() || {
             if bank.is_complete() || {
-                // check if the bank got interrupted before completion
-                self.get_consume_bank_id() != Some(bank.bank_id())
+                // if working bank has changed, then try to get a new bank.
+                self.working_bank()
+                    .map(|working_bank| Arc::ptr_eq(&working_bank, &bank))
+                    .unwrap_or(true)
             } {
             } {
-                let (maybe_new_bank, get_bank_us) = measure_us!(self.get_consume_bank());
+                let (maybe_new_bank, get_bank_us) = measure_us!(self.working_bank_with_timeout());
                 if let Some(new_bank) = maybe_new_bank {
                 if let Some(new_bank) = maybe_new_bank {
                     self.metrics
                     self.metrics
                         .timing_metrics
                         .timing_metrics
@@ -130,16 +132,23 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
         Ok(())
         Ok(())
     }
     }
 
 
-    /// Try to get a bank for consuming.
-    fn get_consume_bank(&self) -> Option<Arc<Bank>> {
-        self.leader_bank_notifier
-            .get_or_wait_for_in_progress(Duration::from_millis(50))
-            .upgrade()
+    /// Get the current poh working bank with a timeout - if the Bank is
+    /// not available within the timeout, return None.
+    fn working_bank_with_timeout(&self) -> Option<Arc<Bank>> {
+        const TIMEOUT: Duration = Duration::from_millis(50);
+        let now = Instant::now();
+        while now.elapsed() < TIMEOUT {
+            if let Some(bank) = self.working_bank() {
+                return Some(bank);
+            }
+        }
+
+        None
     }
     }
 
 
-    /// Try to get the id for the bank that should be used for consuming
-    fn get_consume_bank_id(&self) -> Option<u64> {
-        self.leader_bank_notifier.get_current_bank_id()
+    /// Get the current poh working bank without a timeout.
+    fn working_bank(&self) -> Option<Arc<Bank>> {
+        self.shared_working_bank.load()
     }
     }
 
 
     /// Retry current batch and all outstanding batches.
     /// Retry current batch and all outstanding batches.
@@ -845,7 +854,7 @@ mod tests {
             consume_receiver,
             consume_receiver,
             consumer,
             consumer,
             consumed_sender,
             consumed_sender,
-            poh_recorder.read().unwrap().new_leader_bank_notifier(),
+            poh_recorder.read().unwrap().shared_working_bank(),
         );
         );
 
 
         (
         (

+ 1 - 0
poh/Cargo.toml

@@ -20,6 +20,7 @@ name = "solana_poh"
 dev-context-only-utils = []
 dev-context-only-utils = []
 
 
 [dependencies]
 [dependencies]
+arc-swap = { workspace = true }
 core_affinity = { workspace = true }
 core_affinity = { workspace = true }
 crossbeam-channel = { workspace = true }
 crossbeam-channel = { workspace = true }
 log = { workspace = true }
 log = { workspace = true }

+ 0 - 315
poh/src/leader_bank_notifier.rs

@@ -1,315 +0,0 @@
-use {
-    solana_clock::Slot,
-    solana_runtime::bank::Bank,
-    std::{
-        sync::{
-            atomic::{AtomicU64, Ordering},
-            Arc, Condvar, Mutex, MutexGuard, Weak,
-        },
-        time::{Duration, Instant},
-    },
-};
-
-const STAND_BY_SENTINEL_ID: u64 = u64::MAX;
-
-/// Tracks leader status of the validator node and notifies when:
-///     1. A leader bank initiates (=PoH-initiated)
-///     2. A leader slot completes (=PoH-completed)
-#[derive(Debug)]
-pub struct LeaderBankNotifier {
-    /// Current state (slot, bank, and status) of the system
-    state: Mutex<SlotAndBankWithStatus>,
-    /// CondVar to notify status changes and waiting
-    condvar: Condvar,
-    /// Lightweight atomic variable that can be used to check the id of the
-    /// latest leader bank
-    current_bank_id: AtomicU64,
-}
-
-impl Default for LeaderBankNotifier {
-    fn default() -> Self {
-        Self {
-            state: Mutex::default(),
-            condvar: Condvar::default(),
-            current_bank_id: AtomicU64::new(STAND_BY_SENTINEL_ID),
-        }
-    }
-}
-
-/// Leader status state machine for the validator.
-#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
-enum Status {
-    /// The leader bank is not currently available. Either not initialized, or PoH-completed bank.
-    #[default]
-    StandBy,
-    /// PoH-initiated bank is available.
-    InProgress,
-}
-
-#[derive(Debug, Default)]
-struct SlotAndBankWithStatus {
-    status: Status,
-    slot: Option<Slot>,
-    bank: Weak<Bank>,
-}
-
-impl LeaderBankNotifier {
-    /// Set the status to `InProgress` and notify any waiting threads.
-    /// Panics if the status is not `StandBy` - cannot have multiple
-    /// leader banks in progress.
-    pub(crate) fn set_in_progress(&self, bank: &Arc<Bank>) {
-        let mut state = self.state.lock().unwrap();
-        assert_eq!(state.status, Status::StandBy);
-
-        self.current_bank_id
-            .store(bank.bank_id(), Ordering::Relaxed);
-        *state = SlotAndBankWithStatus {
-            status: Status::InProgress,
-            slot: Some(bank.slot()),
-            bank: Arc::downgrade(bank),
-        };
-        drop(state);
-
-        self.condvar.notify_all();
-    }
-
-    /// Set the status to `StandBy` and notify any waiting threads.
-    /// Panics if the current status is not `InProgress` or the stored slot does not match
-    /// the given slot.
-    pub(crate) fn set_completed(&self, slot: Slot) {
-        let mut state = self.state.lock().unwrap();
-        assert_eq!(state.status, Status::InProgress);
-        assert_eq!(state.slot, Some(slot));
-
-        self.current_bank_id
-            .store(STAND_BY_SENTINEL_ID, Ordering::Relaxed);
-        state.status = Status::StandBy;
-        drop(state);
-
-        self.condvar.notify_all();
-    }
-
-    /// Fetch the bank id of the bank inside the mutex wrapped state field. Due
-    /// to the usage of relaxed ordering, this is not a guarantee that the
-    /// caller thread will see the updated bank in the mutex wrapped state yet.
-    pub fn get_current_bank_id(&self) -> Option<u64> {
-        let current_bank_id = self.current_bank_id.load(Ordering::Relaxed);
-        if current_bank_id == STAND_BY_SENTINEL_ID {
-            None
-        } else {
-            Some(current_bank_id)
-        }
-    }
-
-    /// If the status is `InProgress`, immediately return a weak reference to the bank.
-    /// Otherwise, wait up to the `timeout` for the status to become `InProgress`.
-    /// If the timeout is reached, the weak reference is unupgradable.
-    pub fn get_or_wait_for_in_progress(&self, timeout: Duration) -> Weak<Bank> {
-        let state = self.state.lock().unwrap();
-        Self::get_or_wait_for_in_progress_state(&self.condvar, state, timeout)
-            .map(|state| state.bank.clone())
-            .unwrap_or_default()
-    }
-
-    /// Wait for next notification for a completed leader slot.
-    /// Returns `None` if the timeout is reached
-    pub fn wait_for_completed(&self, mut remaining_timeout: Duration) -> Option<Slot> {
-        let state = self.state.lock().unwrap();
-
-        // If currently `StandBy`, need to wait for `InProgress` to begin.
-        let now = Instant::now();
-        let state =
-            Self::get_or_wait_for_in_progress_state(&self.condvar, state, remaining_timeout)?;
-        remaining_timeout = remaining_timeout.checked_sub(now.elapsed())?;
-
-        // Wait for `StandBy` to be set.
-        let (state, wait_timeout_result) = self
-            .condvar
-            .wait_timeout_while(state, remaining_timeout, |state| {
-                matches!(state.status, Status::InProgress)
-            })
-            .unwrap();
-
-        (!wait_timeout_result.timed_out()).then(|| state.slot.expect("some slot when completed"))
-    }
-
-    /// Helper function to get or wait for the `InProgress` status with a given `MutexGuard`.
-    /// If `InProgress` status is reached, the state `MutexGuard` is returned, otherwise None.
-    fn get_or_wait_for_in_progress_state<'a>(
-        condvar: &'a Condvar,
-        state: MutexGuard<'a, SlotAndBankWithStatus>,
-        timeout: Duration,
-    ) -> Option<MutexGuard<'a, SlotAndBankWithStatus>> {
-        let (state, wait_timeout_result) = condvar
-            .wait_timeout_while(state, timeout, |state| {
-                matches!(state.status, Status::StandBy)
-            })
-            .unwrap();
-
-        (!wait_timeout_result.timed_out()).then_some(state)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_leader_bank_notifier_default() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-        let state = leader_bank_notifier.state.lock().unwrap();
-        assert_eq!(leader_bank_notifier.get_current_bank_id(), None);
-        assert_eq!(state.status, Status::StandBy);
-        assert_eq!(state.slot, None);
-        assert!(state.bank.upgrade().is_none());
-    }
-
-    #[test]
-    #[should_panic]
-    fn test_leader_bank_notifier_set_in_progress_already_in_progress() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-        let bank = Arc::new(Bank::default_for_tests());
-        leader_bank_notifier.set_in_progress(&bank);
-        leader_bank_notifier.set_in_progress(&bank);
-    }
-
-    #[test]
-    fn test_leader_bank_notifier_set_in_progress() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-        let bank = Arc::new(Bank::default_for_tests());
-        leader_bank_notifier.set_in_progress(&bank);
-
-        let state = leader_bank_notifier.state.lock().unwrap();
-        assert_eq!(
-            leader_bank_notifier.get_current_bank_id(),
-            Some(bank.bank_id())
-        );
-        assert_eq!(state.status, Status::InProgress);
-        assert_eq!(state.slot, Some(bank.slot()));
-        assert_eq!(state.bank.upgrade(), Some(bank));
-    }
-
-    #[test]
-    #[should_panic]
-    fn test_leader_bank_notifier_set_completed_uninitialized() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-        leader_bank_notifier.set_completed(0);
-    }
-
-    #[test]
-    #[should_panic]
-    fn test_leader_bank_notifier_set_completed_mismatched_in_progress_slot() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-        let bank = Arc::new(Bank::default_for_tests());
-        leader_bank_notifier.set_in_progress(&bank);
-        leader_bank_notifier.set_completed(bank.slot() + 1);
-    }
-
-    #[test]
-    #[should_panic]
-    fn test_leader_bank_notifier_set_completed_mismatched_completed_slot() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-        let bank = Arc::new(Bank::default_for_tests());
-        leader_bank_notifier.set_in_progress(&bank);
-        leader_bank_notifier.set_completed(bank.slot());
-        leader_bank_notifier.set_completed(bank.slot() + 1);
-    }
-
-    #[test]
-    fn test_leader_bank_notifier_set_completed() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-        let bank = Arc::new(Bank::default_for_tests());
-        leader_bank_notifier.set_in_progress(&bank);
-        leader_bank_notifier.set_completed(bank.slot());
-
-        let state = leader_bank_notifier.state.lock().unwrap();
-        assert_eq!(leader_bank_notifier.get_current_bank_id(), None);
-        assert_eq!(state.status, Status::StandBy);
-        assert_eq!(state.slot, Some(bank.slot()));
-        assert_eq!(state.bank.upgrade(), Some(bank));
-    }
-
-    #[test]
-    fn test_leader_bank_notifier_get_or_wait_for_in_progress_timeout() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-
-        // Uninitialized
-        assert!(leader_bank_notifier
-            .get_or_wait_for_in_progress(Duration::from_millis(1))
-            .upgrade()
-            .is_none());
-
-        let bank = Arc::new(Bank::default_for_tests());
-        leader_bank_notifier.set_in_progress(&bank);
-        leader_bank_notifier.set_completed(bank.slot());
-
-        // Completed
-        assert!(leader_bank_notifier
-            .get_or_wait_for_in_progress(Duration::from_millis(1))
-            .upgrade()
-            .is_none());
-    }
-
-    #[test]
-    fn test_leader_bank_notifier_get_in_progress() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-
-        let bank = Arc::new(Bank::default_for_tests());
-        leader_bank_notifier.set_in_progress(&bank);
-        let weak_bank = leader_bank_notifier.get_or_wait_for_in_progress(Duration::ZERO);
-        assert!(weak_bank.upgrade().is_some());
-    }
-
-    #[test]
-    fn test_leader_bank_notifier_wait_for_in_progress() {
-        let leader_bank_notifier = Arc::new(LeaderBankNotifier::default());
-        let bank = Arc::new(Bank::default_for_tests());
-
-        // Need to spawn a separate thread so we wait for the condvar in `get_or_wait_for_in_progress`
-        let jh = std::thread::spawn({
-            let leader_bank_notifier = leader_bank_notifier.clone();
-            let bank = bank.clone();
-            move || {
-                std::thread::sleep(Duration::from_millis(10));
-                leader_bank_notifier.set_in_progress(&bank);
-            }
-        });
-
-        let weak_bank = leader_bank_notifier.get_or_wait_for_in_progress(Duration::from_secs(1));
-        let upgraded_bank = weak_bank.upgrade().unwrap();
-        assert_eq!(upgraded_bank.slot(), bank.slot());
-
-        jh.join().unwrap();
-    }
-
-    #[test]
-    fn test_leader_bank_notifier_wait_for_completed() {
-        let leader_bank_notifier = Arc::new(LeaderBankNotifier::default());
-        let bank = Arc::new(Bank::default_for_tests());
-
-        let jh = std::thread::spawn({
-            let leader_bank_notifier = leader_bank_notifier.clone();
-            let bank = bank.clone();
-            move || {
-                leader_bank_notifier.set_in_progress(&bank);
-                std::thread::sleep(Duration::from_millis(10));
-                leader_bank_notifier.set_completed(bank.slot());
-            }
-        });
-
-        let slot = leader_bank_notifier.wait_for_completed(Duration::from_secs(1));
-        assert_eq!(slot, Some(bank.slot()));
-
-        jh.join().unwrap();
-    }
-
-    #[test]
-    fn test_leader_bank_notifier_wait_for_completed_timeout() {
-        let leader_bank_notifier = LeaderBankNotifier::default();
-        let bank = Arc::new(Bank::default_for_tests());
-        leader_bank_notifier.set_in_progress(&bank);
-        assert!(leader_bank_notifier
-            .wait_for_completed(Duration::from_millis(1))
-            .is_none());
-    }
-}

+ 0 - 1
poh/src/lib.rs

@@ -1,5 +1,4 @@
 #![allow(clippy::arithmetic_side_effects)]
 #![allow(clippy::arithmetic_side_effects)]
-pub mod leader_bank_notifier;
 pub mod poh_recorder;
 pub mod poh_recorder;
 pub mod poh_service;
 pub mod poh_service;
 pub mod transaction_recorder;
 pub mod transaction_recorder;

+ 44 - 10
poh/src/poh_recorder.rs

@@ -13,10 +13,8 @@
 #[cfg(feature = "dev-context-only-utils")]
 #[cfg(feature = "dev-context-only-utils")]
 use qualifier_attr::qualifiers;
 use qualifier_attr::qualifiers;
 use {
 use {
-    crate::{
-        leader_bank_notifier::LeaderBankNotifier, poh_service::PohService,
-        transaction_recorder::TransactionRecorder,
-    },
+    crate::{poh_service::PohService, transaction_recorder::TransactionRecorder},
+    arc_swap::ArcSwapOption,
     crossbeam_channel::{unbounded, Receiver, SendError, Sender, TrySendError},
     crossbeam_channel::{unbounded, Receiver, SendError, Sender, TrySendError},
     log::*,
     log::*,
     solana_clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
     solana_clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
@@ -177,7 +175,14 @@ pub struct PohRecorder {
     start_bank_active_descendants: Vec<Slot>,
     start_bank_active_descendants: Vec<Slot>,
     start_tick_height: u64, // first tick_height this recorder will observe
     start_tick_height: u64, // first tick_height this recorder will observe
     tick_cache: Vec<(Entry, u64)>, // cache of entry and its tick_height
     tick_cache: Vec<(Entry, u64)>, // cache of entry and its tick_height
+    /// This stores the current working bank + scheduler and other metadata,
+    /// if they exist.
+    /// This field MUST be kept consistent with the `shared_working_bank` field.
     working_bank: Option<WorkingBank>,
     working_bank: Option<WorkingBank>,
+    /// This is used to store the current working bank - just the Arc<Bank> without
+    /// schduler or any other metadata. It MUST be kept consistent with
+    /// the `working_bank` field of this struct.
+    shared_working_bank: SharedWorkingBank,
     working_bank_sender: Sender<WorkingBankEntry>,
     working_bank_sender: Sender<WorkingBankEntry>,
     leader_first_tick_height: Option<u64>,
     leader_first_tick_height: Option<u64>,
     leader_last_tick_height: u64, // zero if none
     leader_last_tick_height: u64, // zero if none
@@ -187,7 +192,6 @@ pub struct PohRecorder {
     ticks_per_slot: u64,
     ticks_per_slot: u64,
     target_ns_per_tick: u64,
     target_ns_per_tick: u64,
     metrics: PohRecorderMetrics,
     metrics: PohRecorderMetrics,
-    leader_bank_notifier: Arc<LeaderBankNotifier>,
     delay_leader_block_for_pending_fork: bool,
     delay_leader_block_for_pending_fork: bool,
     last_reported_slot_for_pending_fork: Arc<Mutex<Slot>>,
     last_reported_slot_for_pending_fork: Arc<Mutex<Slot>>,
     pub is_exited: Arc<AtomicBool>,
     pub is_exited: Arc<AtomicBool>,
@@ -262,6 +266,7 @@ impl PohRecorder {
                 tick_height,
                 tick_height,
                 tick_cache: vec![],
                 tick_cache: vec![],
                 working_bank: None,
                 working_bank: None,
+                shared_working_bank: SharedWorkingBank::empty(),
                 working_bank_sender,
                 working_bank_sender,
                 clear_bank_signal,
                 clear_bank_signal,
                 start_bank,
                 start_bank,
@@ -275,7 +280,6 @@ impl PohRecorder {
                 ticks_per_slot,
                 ticks_per_slot,
                 target_ns_per_tick,
                 target_ns_per_tick,
                 metrics: PohRecorderMetrics::default(),
                 metrics: PohRecorderMetrics::default(),
-                leader_bank_notifier: Arc::default(),
                 delay_leader_block_for_pending_fork,
                 delay_leader_block_for_pending_fork,
                 last_reported_slot_for_pending_fork: Arc::default(),
                 last_reported_slot_for_pending_fork: Arc::default(),
                 is_exited,
                 is_exited,
@@ -434,7 +438,6 @@ impl PohRecorder {
 
 
     pub fn set_bank(&mut self, bank: BankWithScheduler) {
     pub fn set_bank(&mut self, bank: BankWithScheduler) {
         assert!(self.working_bank.is_none());
         assert!(self.working_bank.is_none());
-        self.leader_bank_notifier.set_in_progress(&bank);
         let working_bank = WorkingBank {
         let working_bank = WorkingBank {
             min_tick_height: bank.tick_height(),
             min_tick_height: bank.tick_height(),
             max_tick_height: bank.max_tick_height(),
             max_tick_height: bank.max_tick_height(),
@@ -457,6 +460,9 @@ impl PohRecorder {
                 self.reset_poh(working_bank.bank.clone(), false);
                 self.reset_poh(working_bank.bank.clone(), false);
             }
             }
         }
         }
+
+        // `shared_working_bank` and `working_bank` must be kept consistent.
+        self.shared_working_bank.store(working_bank.bank.clone());
         self.working_bank = Some(working_bank);
         self.working_bank = Some(working_bank);
 
 
         // TODO: adjust the working_bank.start time based on number of ticks
         // TODO: adjust the working_bank.start time based on number of ticks
@@ -466,7 +472,9 @@ impl PohRecorder {
 
 
     fn clear_bank(&mut self) {
     fn clear_bank(&mut self) {
         if let Some(WorkingBank { bank, start, .. }) = self.working_bank.take() {
         if let Some(WorkingBank { bank, start, .. }) = self.working_bank.take() {
-            self.leader_bank_notifier.set_completed(bank.slot());
+            // clear `shared_working_bank` to keep it consistent with `working_bank`
+            self.shared_working_bank.clear();
+
             let next_leader_slot = self.leader_schedule_cache.next_leader_slot(
             let next_leader_slot = self.leader_schedule_cache.next_leader_slot(
                 bank.collector_id(),
                 bank.collector_id(),
                 bank.slot(),
                 bank.slot(),
@@ -660,8 +668,11 @@ impl PohRecorder {
         self.ticks_per_slot
         self.ticks_per_slot
     }
     }
 
 
-    pub fn new_leader_bank_notifier(&self) -> Arc<LeaderBankNotifier> {
-        self.leader_bank_notifier.clone()
+    /// Returns a shared reference to the working bank, if it exists.
+    /// This allows for other threads to access the working bank
+    /// without needing to lock poh recorder.
+    pub fn shared_working_bank(&self) -> SharedWorkingBank {
+        self.shared_working_bank.clone()
     }
     }
 
 
     pub fn start_slot(&self) -> Slot {
     pub fn start_slot(&self) -> Slot {
@@ -972,6 +983,29 @@ pub fn create_test_recorder_with_index_tracking(
     do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, true)
     do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, true)
 }
 }
 
 
+/// Wrapper around an arc-swapped bank that prevents modifying outside
+/// of `PohRecorder`.
+#[derive(Clone)]
+pub struct SharedWorkingBank(Arc<ArcSwapOption<Bank>>);
+
+impl SharedWorkingBank {
+    pub fn load(&self) -> Option<Arc<Bank>> {
+        self.0.load_full()
+    }
+
+    fn store(&self, bank: Arc<Bank>) {
+        self.0.store(Some(bank));
+    }
+
+    fn clear(&self) {
+        self.0.store(None);
+    }
+
+    fn empty() -> Self {
+        Self(Arc::new(ArcSwapOption::empty()))
+    }
+}
+
 #[cfg(test)]
 #[cfg(test)]
 mod tests {
 mod tests {
     use {
     use {

+ 1 - 0
programs/sbf/Cargo.lock

@@ -7184,6 +7184,7 @@ dependencies = [
 name = "solana-poh"
 name = "solana-poh"
 version = "3.0.0"
 version = "3.0.0"
 dependencies = [
 dependencies = [
+ "arc-swap",
  "core_affinity",
  "core_affinity",
  "crossbeam-channel",
  "crossbeam-channel",
  "log",
  "log",

+ 1 - 0
svm/examples/Cargo.lock

@@ -6984,6 +6984,7 @@ dependencies = [
 name = "solana-poh"
 name = "solana-poh"
 version = "3.0.0"
 version = "3.0.0"
 dependencies = [
 dependencies = [
+ "arc-swap",
  "core_affinity",
  "core_affinity",
  "crossbeam-channel",
  "crossbeam-channel",
  "log",
  "log",