Browse Source

Discard buffer of inactive BP unified scheduler (#6246)

* Discard buffer of inactive BP unified scheduler

* Document rationale of apparent naive clearing algo
Ryo Onodera 5 months ago
parent
commit
528908ab38

+ 1 - 0
Cargo.lock

@@ -7738,6 +7738,7 @@ dependencies = [
  "criterion",
  "criterion",
  "crossbeam-channel",
  "crossbeam-channel",
  "dashmap",
  "dashmap",
+ "derive_more 1.0.0",
  "etcd-client",
  "etcd-client",
  "fs_extra",
  "fs_extra",
  "futures 0.3.31",
  "futures 0.3.31",

+ 1 - 0
core/Cargo.toml

@@ -30,6 +30,7 @@ chrono = { workspace = true, features = ["default", "serde"] }
 conditional-mod = { workspace = true }
 conditional-mod = { workspace = true }
 crossbeam-channel = { workspace = true }
 crossbeam-channel = { workspace = true }
 dashmap = { workspace = true, features = ["rayon", "raw-api"] }
 dashmap = { workspace = true, features = ["rayon", "raw-api"] }
+derive_more = { workspace = true }
 etcd-client = { workspace = true, features = ["tls"] }
 etcd-client = { workspace = true, features = ["tls"] }
 futures = { workspace = true }
 futures = { workspace = true }
 histogram = { workspace = true }
 histogram = { workspace = true }

+ 16 - 1
core/src/banking_stage/decision_maker.rs

@@ -5,6 +5,7 @@ use {
         HOLD_TRANSACTIONS_SLOT_OFFSET,
         HOLD_TRANSACTIONS_SLOT_OFFSET,
     },
     },
     solana_pubkey::Pubkey,
     solana_pubkey::Pubkey,
+    solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus},
     std::{
     std::{
         sync::{Arc, RwLock},
         sync::{Arc, RwLock},
         time::{Duration, Instant},
         time::{Duration, Instant},
@@ -29,9 +30,10 @@ impl BufferedPacketsDecision {
     }
     }
 }
 }
 
 
-#[derive(Clone)]
+#[derive(Clone, derive_more::Debug)]
 pub struct DecisionMaker {
 pub struct DecisionMaker {
     my_pubkey: Pubkey,
     my_pubkey: Pubkey,
+    #[debug("{poh_recorder:p}")]
     poh_recorder: Arc<RwLock<PohRecorder>>,
     poh_recorder: Arc<RwLock<PohRecorder>>,
 
 
     cached_decision: Option<BufferedPacketsDecision>,
     cached_decision: Option<BufferedPacketsDecision>,
@@ -134,6 +136,19 @@ impl DecisionMaker {
     }
     }
 }
 }
 
 
+impl BankingStageMonitor for DecisionMaker {
+    fn status(&mut self) -> BankingStageStatus {
+        if matches!(
+            self.make_consume_or_forward_decision(),
+            BufferedPacketsDecision::Forward,
+        ) {
+            BankingStageStatus::Inactive
+        } else {
+            BankingStageStatus::Active
+        }
+    }
+}
+
 #[cfg(test)]
 #[cfg(test)]
 mod tests {
 mod tests {
     use {
     use {

+ 5 - 0
core/src/banking_stage/unified_scheduler.rs

@@ -54,11 +54,15 @@ pub(crate) fn ensure_banking_stage_setup(
     let mut root_bank_cache = RootBankCache::new(bank_forks.clone());
     let mut root_bank_cache = RootBankCache::new(bank_forks.clone());
     let unified_receiver = channels.unified_receiver().clone();
     let unified_receiver = channels.unified_receiver().clone();
     let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
     let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
+    let banking_stage_monitor = Box::new(decision_maker.clone());
 
 
     let banking_packet_handler = Box::new(
     let banking_packet_handler = Box::new(
         move |helper: &BankingStageHelper, batches: BankingPacketBatch| {
         move |helper: &BankingStageHelper, batches: BankingPacketBatch| {
             let decision = decision_maker.make_consume_or_forward_decision();
             let decision = decision_maker.make_consume_or_forward_decision();
             if matches!(decision, BufferedPacketsDecision::Forward) {
             if matches!(decision, BufferedPacketsDecision::Forward) {
+                // discard newly-arriving packets. note that already handled packets (thus buffered
+                // by scheduler internally) will be discarded as well via BankingStageMonitor api
+                // by solScCleaner.
                 return;
                 return;
             }
             }
             let bank = root_bank_cache.root_bank();
             let bank = root_bank_cache.root_bank();
@@ -92,5 +96,6 @@ pub(crate) fn ensure_banking_stage_setup(
         unified_receiver,
         unified_receiver,
         banking_packet_handler,
         banking_packet_handler,
         transaction_recorder,
         transaction_recorder,
+        banking_stage_monitor,
     );
     );
 }
 }

+ 1 - 0
programs/sbf/Cargo.lock

@@ -6003,6 +6003,7 @@ dependencies = [
  "conditional-mod",
  "conditional-mod",
  "crossbeam-channel",
  "crossbeam-channel",
  "dashmap",
  "dashmap",
+ "derive_more 1.0.0",
  "etcd-client",
  "etcd-client",
  "futures 0.3.31",
  "futures 0.3.31",
  "histogram",
  "histogram",

+ 1 - 0
svm/examples/Cargo.lock

@@ -5850,6 +5850,7 @@ dependencies = [
  "conditional-mod",
  "conditional-mod",
  "crossbeam-channel",
  "crossbeam-channel",
  "dashmap",
  "dashmap",
+ "derive_more 1.0.0",
  "etcd-client",
  "etcd-client",
  "futures 0.3.31",
  "futures 0.3.31",
  "histogram",
  "histogram",

+ 36 - 1
unified-scheduler-logic/src/lib.rs

@@ -927,13 +927,15 @@ impl SchedulingStateMachine {
     /// Rewind the inactive state machine to be initialized
     /// Rewind the inactive state machine to be initialized
     ///
     ///
     /// This isn't called _reset_ to indicate this isn't safe to call this at any given moment.
     /// This isn't called _reset_ to indicate this isn't safe to call this at any given moment.
-    /// This panics if the state machine hasn't properly been finished (i.e.  there should be no
+    /// This panics if the state machine hasn't properly been finished (i.e. there should be no
     /// active task) to uphold invariants of [`UsageQueue`]s.
     /// active task) to uphold invariants of [`UsageQueue`]s.
     ///
     ///
     /// This method is intended to reuse SchedulingStateMachine instance (to avoid its `unsafe`
     /// This method is intended to reuse SchedulingStateMachine instance (to avoid its `unsafe`
     /// [constructor](SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling)
     /// [constructor](SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling)
     /// as much as possible) and its (possibly cached) associated [`UsageQueue`]s for processing
     /// as much as possible) and its (possibly cached) associated [`UsageQueue`]s for processing
     /// other slots.
     /// other slots.
+    ///
+    /// There's a related method called [`clear_and_reinitialize()`](Self::clear_and_reinitialize).
     pub fn reinitialize(&mut self) {
     pub fn reinitialize(&mut self) {
         assert!(self.has_no_active_task());
         assert!(self.has_no_active_task());
         assert_eq!(self.running_task_count.current(), 0);
         assert_eq!(self.running_task_count.current(), 0);
@@ -957,6 +959,39 @@ impl SchedulingStateMachine {
         total_task_count.reset_to_zero();
         total_task_count.reset_to_zero();
     }
     }
 
 
+    /// Clear all buffered tasks and immediately rewind the state machine to be initialized
+    ///
+    /// This method _may_ panic if there are tasks which has been scheduled but hasn't been
+    /// descheduled yet (called active tasks). This is due to the invocation of
+    /// [`reinitialize()`](Self::reinitialize) at last. On the other hand, it's guaranteed not to
+    /// panic otherwise. That's because the first clearing step effectively relaxes the runtime
+    /// invariant of `reinitialize()` by making the state machine _inactive_ beforehand. After a
+    /// successful operation, this method returns the number of cleared tasks.
+    ///
+    /// Somewhat surprisingly, the clearing logic is same as the normal (de-)scheduling operation
+    /// because it is still the fastest way to just clear all tasks, under the consideration of
+    /// potential later use of [`UsageQueue`]s. That's because `state_machine` doesn't maintain _the
+    /// global list_ of tasks. Maintaining such one would incur a needless overhead on scheduling,
+    /// which isn't strictly needed otherwise.
+    ///
+    /// Moreover, the descheduling operation is rather heavily optimized to begin with. All
+    /// collection ops are just O(1) over total N of addresses accessed by all active tasks with
+    /// no amortized mem ops.
+    ///
+    /// Whatever the algorithm is chosen, the ultimate goal of this operation is to clear all usage
+    /// queues. Toward to that end, one may create a temporary hash set over [`UsageQueue`]s on the
+    /// fly alternatively. However, that would be costlier than the above usual descheduling
+    /// approach due to extra mem ops and many lookups/insertions.
+    pub fn clear_and_reinitialize(&mut self) -> usize {
+        let mut count = ShortCounter::zero();
+        while let Some(task) = self.schedule_next_unblocked_task() {
+            self.deschedule_task(&task);
+            count.increment_self();
+        }
+        self.reinitialize();
+        count.current().try_into().unwrap()
+    }
+
     /// Creates a new instance of [`SchedulingStateMachine`] with its `unsafe` fields created as
     /// Creates a new instance of [`SchedulingStateMachine`] with its `unsafe` fields created as
     /// well, thus carrying over `unsafe`.
     /// well, thus carrying over `unsafe`.
     ///
     ///

+ 190 - 0
unified-scheduler-pool/src/lib.rs

@@ -85,6 +85,8 @@ enum CheckPoint<'a> {
     IdleSchedulerCleaned(usize),
     IdleSchedulerCleaned(usize),
     TrashedSchedulerCleaned(usize),
     TrashedSchedulerCleaned(usize),
     TimeoutListenerTriggered(usize),
     TimeoutListenerTriggered(usize),
+    DiscardRequested,
+    Discarded(usize),
 }
 }
 
 
 type CountOrDefault = Option<usize>;
 type CountOrDefault = Option<usize>;
@@ -171,6 +173,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> BlockProductionSchedulerInner<S
         assert_matches!(mem::replace(self, Self::Pooled(inner)), Self::Taken(old) if old == new);
         assert_matches!(mem::replace(self, Self::Pooled(inner)), Self::Taken(old) if old == new);
     }
     }
 
 
+    fn peek_pooled(&self) -> Option<&S::Inner> {
+        match self {
+            Self::NotSpawned | Self::Taken(_) => None,
+            Self::Pooled(inner) => Some(inner),
+        }
+    }
+
     fn take_pooled(&mut self) -> S::Inner {
     fn take_pooled(&mut self) -> S::Inner {
         let id = {
         let id = {
             let Self::Pooled(inner) = &self else {
             let Self::Pooled(inner) = &self else {
@@ -250,6 +259,7 @@ struct BankingStageHandlerContext {
     #[debug("{banking_packet_handler:p}")]
     #[debug("{banking_packet_handler:p}")]
     banking_packet_handler: Box<dyn BankingPacketHandler>,
     banking_packet_handler: Box<dyn BankingPacketHandler>,
     transaction_recorder: TransactionRecorder,
     transaction_recorder: TransactionRecorder,
+    banking_stage_monitor: Box<dyn BankingStageMonitor>,
 }
 }
 
 
 trait_set! {
 trait_set! {
@@ -419,6 +429,8 @@ where
                     idle_inner_count
                     idle_inner_count
                 };
                 };
 
 
+                let banking_stage_status = scheduler_pool.banking_stage_status();
+
                 let trashed_inner_count = {
                 let trashed_inner_count = {
                     let Ok(mut trashed_scheduler_inners) =
                     let Ok(mut trashed_scheduler_inners) =
                         scheduler_pool.trashed_scheduler_inners.lock()
                         scheduler_pool.trashed_scheduler_inners.lock()
@@ -457,6 +469,22 @@ where
                     count
                     count
                 };
                 };
 
 
+                if matches!(banking_stage_status, Some(BankingStageStatus::Inactive)) {
+                    let inner = scheduler_pool
+                        .block_production_scheduler_inner
+                        .lock()
+                        .unwrap();
+                    if let Some(pooled) = inner.peek_pooled() {
+                        pooled.discard_buffer();
+                        // Prevent replay stage's OpenSubchannel from winning the race by holding
+                        // the inner lock for the duration of discard message sending just above.
+                        // The message (internally SubchanneledPayload::Reset) must be sent only
+                        // during gaps of subchannels of the new task channel.
+                        sleepless_testing::at(CheckPoint::DiscardRequested);
+                        drop(inner);
+                    }
+                }
+
                 info!(
                 info!(
                     "Scheduler pool cleaner: dropped {} idle inners, {} trashed inners, triggered {} timeout listeners",
                     "Scheduler pool cleaner: dropped {} idle inners, {} trashed inners, triggered {} timeout listeners",
                     idle_inner_count, trashed_inner_count, triggered_timeout_listener_count,
                     idle_inner_count, trashed_inner_count, triggered_timeout_listener_count,
@@ -603,12 +631,14 @@ where
         banking_packet_receiver: BankingPacketReceiver,
         banking_packet_receiver: BankingPacketReceiver,
         banking_packet_handler: Box<dyn BankingPacketHandler>,
         banking_packet_handler: Box<dyn BankingPacketHandler>,
         transaction_recorder: TransactionRecorder,
         transaction_recorder: TransactionRecorder,
+        banking_stage_monitor: Box<dyn BankingStageMonitor>,
     ) {
     ) {
         *self.banking_stage_handler_context.lock().unwrap() = Some(BankingStageHandlerContext {
         *self.banking_stage_handler_context.lock().unwrap() = Some(BankingStageHandlerContext {
             banking_thread_count,
             banking_thread_count,
             banking_packet_receiver,
             banking_packet_receiver,
             banking_packet_handler,
             banking_packet_handler,
             transaction_recorder,
             transaction_recorder,
+            banking_stage_monitor,
         });
         });
         // Immediately start a block production scheduler, so that the scheduler can start
         // Immediately start a block production scheduler, so that the scheduler can start
         // buffering tasks, which are preprocessed as much as possible.
         // buffering tasks, which are preprocessed as much as possible.
@@ -617,6 +647,14 @@ where
         );
         );
     }
     }
 
 
+    fn banking_stage_status(&self) -> Option<BankingStageStatus> {
+        self.banking_stage_handler_context
+            .lock()
+            .unwrap()
+            .as_mut()
+            .map(|respawner| respawner.banking_stage_monitor.status())
+    }
+
     fn create_handler_context(
     fn create_handler_context(
         &self,
         &self,
         mode: SchedulingMode,
         mode: SchedulingMode,
@@ -918,6 +956,7 @@ enum SubchanneledPayload<P1, P2> {
     OpenSubchannel(P2),
     OpenSubchannel(P2),
     UnpauseOpenedSubchannel,
     UnpauseOpenedSubchannel,
     CloseSubchannel,
     CloseSubchannel,
+    Reset,
 }
 }
 
 
 type NewTaskPayload = SubchanneledPayload<Task, Box<(SchedulingContext, ResultWithTimings)>>;
 type NewTaskPayload = SubchanneledPayload<Task, Box<(SchedulingContext, ResultWithTimings)>>;
@@ -1749,6 +1788,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
                                     Ok(
                                     Ok(
                                         NewTaskPayload::OpenSubchannel(_)
                                         NewTaskPayload::OpenSubchannel(_)
                                         | NewTaskPayload::UnpauseOpenedSubchannel
                                         | NewTaskPayload::UnpauseOpenedSubchannel
+                                        | NewTaskPayload::Reset
                                     ) => unreachable!(),
                                     ) => unreachable!(),
                                     Err(RecvError) => {
                                     Err(RecvError) => {
                                         // Mostly likely is that this scheduler is dropped for pruned blocks of
                                         // Mostly likely is that this scheduler is dropped for pruned blocks of
@@ -1804,7 +1844,20 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
                     // session_result_sender just above
                     // session_result_sender just above
                     let mut new_result_with_timings = None;
                     let mut new_result_with_timings = None;
 
 
+                    let mut discard_on_reset = false;
                     loop {
                     loop {
+                        if discard_on_reset {
+                            discard_on_reset = false;
+                            // Gracefully clear all buffered tasks to discard all outstanding stale
+                            // tasks; we're not aborting scheduler here. So, `state_machine` needs
+                            // to be reusable after this.
+                            //
+                            // As for panic safety of .clear_and_reinitialize(), it's safe because
+                            // there should be _no scheduled tasks (i.e. owned by us, not by
+                            // state_machine) on the call stack by now.
+                            let count = state_machine.clear_and_reinitialize();
+                            sleepless_testing::at(CheckPoint::Discarded(count));
+                        }
                         // Prepare for the new session.
                         // Prepare for the new session.
                         match new_task_receiver.recv() {
                         match new_task_receiver.recv() {
                             Ok(NewTaskPayload::Payload(task)) => {
                             Ok(NewTaskPayload::Payload(task)) => {
@@ -1849,6 +1902,10 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
                                 // or abort is hinted from task results, before explicit
                                 // or abort is hinted from task results, before explicit
                                 // session ending is sent from the poh or the replay thread.
                                 // session ending is sent from the poh or the replay thread.
                             }
                             }
+                            Ok(NewTaskPayload::Reset) => {
+                                assert_matches!(scheduling_mode, BlockProduction);
+                                discard_on_reset = true;
+                            }
                             Err(RecvError) => {
                             Err(RecvError) => {
                                 // This unusual condition must be triggered by ThreadManager::drop().
                                 // This unusual condition must be triggered by ThreadManager::drop().
                                 // Initialize result_with_timings with a harmless value...
                                 // Initialize result_with_timings with a harmless value...
@@ -2128,6 +2185,10 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
             .expect("no new session after aborted");
             .expect("no new session after aborted");
     }
     }
 
 
+    fn discard_buffered_tasks(&self) {
+        self.new_task_sender.send(NewTaskPayload::Reset).unwrap();
+    }
+
     fn disconnect_new_task_sender(&mut self) {
     fn disconnect_new_task_sender(&mut self) {
         self.new_task_sender = crossbeam_channel::unbounded().0;
         self.new_task_sender = crossbeam_channel::unbounded().0;
     }
     }
@@ -2136,6 +2197,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
 pub trait SchedulerInner {
 pub trait SchedulerInner {
     fn id(&self) -> SchedulerId;
     fn id(&self) -> SchedulerId;
     fn is_trashed(&self) -> bool;
     fn is_trashed(&self) -> bool;
+    fn discard_buffer(&self);
 }
 }
 
 
 pub trait SpawnableScheduler<TH: TaskHandler>: InstalledScheduler {
 pub trait SpawnableScheduler<TH: TaskHandler>: InstalledScheduler {
@@ -2198,6 +2260,16 @@ impl<TH: TaskHandler> SpawnableScheduler<TH> for PooledScheduler<TH> {
     }
     }
 }
 }
 
 
+#[derive(Debug)]
+pub enum BankingStageStatus {
+    Active,
+    Inactive,
+}
+
+pub trait BankingStageMonitor: Send + Debug {
+    fn status(&mut self) -> BankingStageStatus;
+}
+
 impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
 impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
     fn id(&self) -> SchedulerId {
     fn id(&self) -> SchedulerId {
         self.inner.id()
         self.inner.id()
@@ -2265,6 +2337,10 @@ where
     fn is_trashed(&self) -> bool {
     fn is_trashed(&self) -> bool {
         self.is_aborted() || self.is_overgrown()
         self.is_aborted() || self.is_overgrown()
     }
     }
+
+    fn discard_buffer(&self) {
+        self.thread_manager.discard_buffered_tasks();
+    }
 }
 }
 
 
 impl<S, TH> UninstalledScheduler for PooledSchedulerInner<S, TH>
 impl<S, TH> UninstalledScheduler for PooledSchedulerInner<S, TH>
@@ -2332,6 +2408,7 @@ mod tests {
         BeforeThreadManagerDrop,
         BeforeThreadManagerDrop,
         BeforeEndSession,
         BeforeEndSession,
         AfterSession,
         AfterSession,
+        AfterDiscarded,
     }
     }
 
 
     #[test]
     #[test]
@@ -3462,6 +3539,7 @@ mod tests {
                 banking_packet_receiver,
                 banking_packet_receiver,
                 Box::new(|_, _| unreachable!()),
                 Box::new(|_, _| unreachable!()),
                 transaction_recorder,
                 transaction_recorder,
+                Box::new(DummyBankingMinitor),
             );
             );
         }
         }
 
 
@@ -3589,6 +3667,7 @@ mod tests {
             banking_packet_receiver,
             banking_packet_receiver,
             Box::new(|_, _| unreachable!()),
             Box::new(|_, _| unreachable!()),
             transaction_recorder,
             transaction_recorder,
+            Box::new(DummyBankingMinitor),
         );
         );
 
 
         let bank = Arc::new(Bank::new_from_parent(
         let bank = Arc::new(Bank::new_from_parent(
@@ -3837,6 +3916,10 @@ mod tests {
         fn is_trashed(&self) -> bool {
         fn is_trashed(&self) -> bool {
             false
             false
         }
         }
+
+        fn discard_buffer(&self) {
+            unimplemented!()
+        }
     }
     }
 
 
     impl<const TRIGGER_RACE_CONDITION: bool> UninstalledScheduler
     impl<const TRIGGER_RACE_CONDITION: bool> UninstalledScheduler
@@ -4151,6 +4234,15 @@ mod tests {
         poh_service.join().unwrap();
         poh_service.join().unwrap();
     }
     }
 
 
+    #[derive(Debug)]
+    struct DummyBankingMinitor;
+
+    impl BankingStageMonitor for DummyBankingMinitor {
+        fn status(&mut self) -> BankingStageStatus {
+            BankingStageStatus::Active
+        }
+    }
+
     #[test]
     #[test]
     fn test_block_production_scheduler_schedule_execution_success() {
     fn test_block_production_scheduler_schedule_execution_success() {
         solana_logger::setup();
         solana_logger::setup();
@@ -4185,6 +4277,7 @@ mod tests {
             // we don't use the banking packet channel in this test. so, pass panicking handler.
             // we don't use the banking packet channel in this test. so, pass panicking handler.
             Box::new(|_, _| unreachable!()),
             Box::new(|_, _| unreachable!()),
             transaction_recorder,
             transaction_recorder,
+            Box::new(DummyBankingMinitor),
         );
         );
 
 
         assert_eq!(bank.transaction_count(), 0);
         assert_eq!(bank.transaction_count(), 0);
@@ -4261,6 +4354,7 @@ mod tests {
             banking_packet_receiver,
             banking_packet_receiver,
             fixed_banking_packet_handler,
             fixed_banking_packet_handler,
             transaction_recorder,
             transaction_recorder,
+            Box::new(DummyBankingMinitor),
         );
         );
 
 
         // Confirm the banking packet channel is cleared, even before taking scheduler
         // Confirm the banking packet channel is cleared, even before taking scheduler
@@ -4329,6 +4423,7 @@ mod tests {
             banking_packet_receiver,
             banking_packet_receiver,
             fixed_banking_packet_handler,
             fixed_banking_packet_handler,
             transaction_recorder,
             transaction_recorder,
+            Box::new(DummyBankingMinitor),
         );
         );
 
 
         // Quickly take and return the scheduler so that this test can test the behavior while
         // Quickly take and return the scheduler so that this test can test the behavior while
@@ -4404,6 +4499,7 @@ mod tests {
             banking_packet_receiver,
             banking_packet_receiver,
             Box::new(|_, _| unreachable!()),
             Box::new(|_, _| unreachable!()),
             transaction_recorder,
             transaction_recorder,
+            Box::new(DummyBankingMinitor),
         );
         );
 
 
         let context = SchedulingContext::for_production(bank.clone());
         let context = SchedulingContext::for_production(bank.clone());
@@ -4457,6 +4553,7 @@ mod tests {
             banking_packet_receiver,
             banking_packet_receiver,
             Box::new(|_, _| unreachable!()),
             Box::new(|_, _| unreachable!()),
             transaction_recorder,
             transaction_recorder,
+            Box::new(DummyBankingMinitor),
         );
         );
 
 
         let context = SchedulingContext::for_production(bank);
         let context = SchedulingContext::for_production(bank);
@@ -4513,6 +4610,7 @@ mod tests {
             banking_packet_receiver,
             banking_packet_receiver,
             Box::new(|_, _| unreachable!()),
             Box::new(|_, _| unreachable!()),
             transaction_recorder,
             transaction_recorder,
+            Box::new(DummyBankingMinitor),
         );
         );
 
 
         // Make sure the assertion in BlockProductionSchedulerInner::can_put() doesn't cause false
         // Make sure the assertion in BlockProductionSchedulerInner::can_put() doesn't cause false
@@ -4525,4 +4623,96 @@ mod tests {
         exit.store(true, Ordering::Relaxed);
         exit.store(true, Ordering::Relaxed);
         poh_service.join().unwrap();
         poh_service.join().unwrap();
     }
     }
+
+    #[test]
+    fn test_block_production_scheduler_discard_on_reset() {
+        #[derive(Debug)]
+        struct SimpleBankingMinitor;
+        static START_DISCARD: Mutex<bool> = Mutex::new(false);
+
+        impl BankingStageMonitor for SimpleBankingMinitor {
+            fn status(&mut self) -> BankingStageStatus {
+                if *START_DISCARD.lock().unwrap() {
+                    BankingStageStatus::Inactive
+                } else {
+                    BankingStageStatus::Active
+                }
+            }
+        }
+
+        solana_logger::setup();
+
+        let GenesisConfigInfo {
+            genesis_config,
+            mint_keypair,
+            ..
+        } = create_genesis_config_for_block_production(10_000);
+
+        const DISCARDED_TASK_COUNT: usize = 3;
+        let _progress = sleepless_testing::setup(&[
+            &CheckPoint::NewBufferedTask(DISCARDED_TASK_COUNT - 1),
+            &CheckPoint::DiscardRequested,
+            &CheckPoint::Discarded(DISCARDED_TASK_COUNT),
+            &TestCheckPoint::AfterDiscarded,
+        ]);
+
+        let bank = Bank::new_for_tests(&genesis_config);
+        let (bank, _bank_forks) = setup_dummy_fork_graph(bank);
+
+        let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
+        let pool = DefaultSchedulerPool::do_new(
+            None,
+            None,
+            None,
+            None,
+            ignored_prioritization_fee_cache,
+            SHORTENED_POOL_CLEANER_INTERVAL,
+            DEFAULT_MAX_POOLING_DURATION,
+            DEFAULT_MAX_USAGE_QUEUE_COUNT,
+            DEFAULT_TIMEOUT_DURATION,
+        );
+
+        let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
+            &mint_keypair,
+            &solana_pubkey::new_rand(),
+            2,
+            genesis_config.hash(),
+        ));
+        let fixed_banking_packet_handler =
+            Box::new(move |helper: &BankingStageHelper, _banking_packet| {
+                for index in 0..DISCARDED_TASK_COUNT {
+                    helper.send_new_task(helper.create_new_task(tx0.clone(), index))
+                }
+            });
+
+        let (banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
+        banking_packet_sender
+            .send(BankingPacketBatch::default())
+            .unwrap();
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (exit, _poh_recorder, transaction_recorder, poh_service, _signal_receiver) =
+            create_test_recorder_with_index_tracking(
+                bank.clone(),
+                blockstore.clone(),
+                None,
+                Some(leader_schedule_cache),
+            );
+        pool.register_banking_stage(
+            None,
+            banking_packet_receiver,
+            fixed_banking_packet_handler,
+            transaction_recorder,
+            Box::new(SimpleBankingMinitor),
+        );
+
+        // By now, there shuold be a bufferd transaction. Let's discard it.
+        *START_DISCARD.lock().unwrap() = true;
+
+        sleepless_testing::at(TestCheckPoint::AfterDiscarded);
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
+    }
 }
 }