Browse Source

Hold certain retryable transactions until next slot (#6864)

Andrew Fitzgerald 2 months ago
parent
commit
8b734e5a6a

+ 16 - 7
core/src/banking_stage/consume_worker.rs

@@ -4,6 +4,7 @@ use {
         leader_slot_timing_metrics::LeaderExecuteAndCommitTimings,
         leader_slot_timing_metrics::LeaderExecuteAndCommitTimings,
         scheduler_messages::{ConsumeWork, FinishedConsumeWork},
         scheduler_messages::{ConsumeWork, FinishedConsumeWork},
     },
     },
+    crate::banking_stage::consumer::RetryableIndex,
     crossbeam_channel::{Receiver, RecvError, SendError, Sender},
     crossbeam_channel::{Receiver, RecvError, SendError, Sender},
     solana_measure::measure_us,
     solana_measure::measure_us,
     solana_poh::poh_recorder::SharedWorkingBank,
     solana_poh::poh_recorder::SharedWorkingBank,
@@ -179,7 +180,12 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
 
 
     /// Send transactions back to scheduler as retryable.
     /// Send transactions back to scheduler as retryable.
     fn retry(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
     fn retry(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
-        let retryable_indexes: Vec<_> = (0..work.transactions.len()).collect();
+        let retryable_indexes: Vec<_> = (0..work.transactions.len())
+            .map(|index| RetryableIndex {
+                index,
+                immediately_retryable: true,
+            })
+            .collect();
         let num_retryable = retryable_indexes.len();
         let num_retryable = retryable_indexes.len();
         self.metrics
         self.metrics
             .count_metrics
             .count_metrics
@@ -942,7 +948,10 @@ mod tests {
         assert_eq!(consumed.work.batch_id, bid);
         assert_eq!(consumed.work.batch_id, bid);
         assert_eq!(consumed.work.ids, vec![id]);
         assert_eq!(consumed.work.ids, vec![id]);
         assert_eq!(consumed.work.max_ages, vec![max_age]);
         assert_eq!(consumed.work.max_ages, vec![max_age]);
-        assert_eq!(consumed.retryable_indexes, vec![0]);
+        assert_eq!(
+            consumed.retryable_indexes,
+            vec![RetryableIndex::new(0, true)]
+        );
 
 
         drop(test_frame);
         drop(test_frame);
         let _ = worker_thread.join().unwrap();
         let _ = worker_thread.join().unwrap();
@@ -991,7 +1000,7 @@ mod tests {
         assert_eq!(consumed.work.batch_id, bid);
         assert_eq!(consumed.work.batch_id, bid);
         assert_eq!(consumed.work.ids, vec![id]);
         assert_eq!(consumed.work.ids, vec![id]);
         assert_eq!(consumed.work.max_ages, vec![max_age]);
         assert_eq!(consumed.work.max_ages, vec![max_age]);
-        assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());
+        assert_eq!(consumed.retryable_indexes, Vec::new());
 
 
         drop(test_frame);
         drop(test_frame);
         let _ = worker_thread.join().unwrap();
         let _ = worker_thread.join().unwrap();
@@ -1051,7 +1060,7 @@ mod tests {
             if relax_intrabatch_account_locks {
             if relax_intrabatch_account_locks {
                 vec![]
                 vec![]
             } else {
             } else {
-                vec![1]
+                vec![RetryableIndex::new(1, true)]
             }
             }
         );
         );
 
 
@@ -1122,13 +1131,13 @@ mod tests {
         assert_eq!(consumed.work.batch_id, bid1);
         assert_eq!(consumed.work.batch_id, bid1);
         assert_eq!(consumed.work.ids, vec![id1]);
         assert_eq!(consumed.work.ids, vec![id1]);
         assert_eq!(consumed.work.max_ages, vec![max_age]);
         assert_eq!(consumed.work.max_ages, vec![max_age]);
-        assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());
+        assert_eq!(consumed.retryable_indexes, Vec::new());
 
 
         let consumed = consumed_receiver.recv().unwrap();
         let consumed = consumed_receiver.recv().unwrap();
         assert_eq!(consumed.work.batch_id, bid2);
         assert_eq!(consumed.work.batch_id, bid2);
         assert_eq!(consumed.work.ids, vec![id2]);
         assert_eq!(consumed.work.ids, vec![id2]);
         assert_eq!(consumed.work.max_ages, vec![max_age]);
         assert_eq!(consumed.work.max_ages, vec![max_age]);
-        assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());
+        assert_eq!(consumed.retryable_indexes, Vec::new());
 
 
         drop(test_frame);
         drop(test_frame);
         let _ = worker_thread.join().unwrap();
         let _ = worker_thread.join().unwrap();
@@ -1258,7 +1267,7 @@ mod tests {
             .unwrap();
             .unwrap();
 
 
         let consumed = consumed_receiver.recv().unwrap();
         let consumed = consumed_receiver.recv().unwrap();
-        assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());
+        assert_eq!(consumed.retryable_indexes, Vec::new());
         // all but one succeed. 6 for initial funding
         // all but one succeed. 6 for initial funding
         assert_eq!(bank.transaction_count(), 6 + 5);
         assert_eq!(bank.transaction_count(), 6 + 5);
 
 

+ 69 - 15
core/src/banking_stage/consumer.rs

@@ -34,6 +34,21 @@ use {
 /// Consumer will create chunks of transactions from buffer with up to this size.
 /// Consumer will create chunks of transactions from buffer with up to this size.
 pub const TARGET_NUM_TRANSACTIONS_PER_BATCH: usize = 64;
 pub const TARGET_NUM_TRANSACTIONS_PER_BATCH: usize = 64;
 
 
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
+pub struct RetryableIndex {
+    pub index: usize,
+    pub immediately_retryable: bool,
+}
+
+impl RetryableIndex {
+    pub fn new(index: usize, immediately_retryable: bool) -> Self {
+        Self {
+            index,
+            immediately_retryable,
+        }
+    }
+}
+
 pub struct ProcessTransactionBatchOutput {
 pub struct ProcessTransactionBatchOutput {
     // The number of transactions filtered out by the cost model
     // The number of transactions filtered out by the cost model
     pub(crate) cost_model_throttled_transactions_count: u64,
     pub(crate) cost_model_throttled_transactions_count: u64,
@@ -48,7 +63,7 @@ pub struct ExecuteAndCommitTransactionsOutput {
     pub(crate) transaction_counts: LeaderProcessedTransactionCounts,
     pub(crate) transaction_counts: LeaderProcessedTransactionCounts,
     // Transactions that either were not executed, or were executed and failed to be committed due
     // Transactions that either were not executed, or were executed and failed to be committed due
     // to the block ending.
     // to the block ending.
-    pub(crate) retryable_transaction_indexes: Vec<usize>,
+    pub(crate) retryable_transaction_indexes: Vec<RetryableIndex>,
     // A result that indicates whether transactions were successfully
     // A result that indicates whether transactions were successfully
     // committed into the Poh stream.
     // committed into the Poh stream.
     pub commit_transactions_result: Result<Vec<CommitTransactionDetails>, PohRecorderError>,
     pub commit_transactions_result: Result<Vec<CommitTransactionDetails>, PohRecorderError>,
@@ -256,23 +271,39 @@ impl Consumer {
                 // following are retryable errors
                 // following are retryable errors
                 Err(TransactionError::AccountInUse) => {
                 Err(TransactionError::AccountInUse) => {
                     error_counters.account_in_use += 1;
                     error_counters.account_in_use += 1;
-                    Some(index)
+                    // locking failure due to vote conflict or jito - immediately retry.
+                    Some(RetryableIndex {
+                        index,
+                        immediately_retryable: true,
+                    })
                 }
                 }
                 Err(TransactionError::WouldExceedMaxBlockCostLimit) => {
                 Err(TransactionError::WouldExceedMaxBlockCostLimit) => {
                     error_counters.would_exceed_max_block_cost_limit += 1;
                     error_counters.would_exceed_max_block_cost_limit += 1;
-                    Some(index)
+                    Some(RetryableIndex {
+                        index,
+                        immediately_retryable: false,
+                    })
                 }
                 }
                 Err(TransactionError::WouldExceedMaxVoteCostLimit) => {
                 Err(TransactionError::WouldExceedMaxVoteCostLimit) => {
                     error_counters.would_exceed_max_vote_cost_limit += 1;
                     error_counters.would_exceed_max_vote_cost_limit += 1;
-                    Some(index)
+                    Some(RetryableIndex {
+                        index,
+                        immediately_retryable: false,
+                    })
                 }
                 }
                 Err(TransactionError::WouldExceedMaxAccountCostLimit) => {
                 Err(TransactionError::WouldExceedMaxAccountCostLimit) => {
                     error_counters.would_exceed_max_account_cost_limit += 1;
                     error_counters.would_exceed_max_account_cost_limit += 1;
-                    Some(index)
+                    Some(RetryableIndex {
+                        index,
+                        immediately_retryable: false,
+                    })
                 }
                 }
                 Err(TransactionError::WouldExceedAccountDataBlockLimit) => {
                 Err(TransactionError::WouldExceedAccountDataBlockLimit) => {
                     error_counters.would_exceed_account_data_block_limit += 1;
                     error_counters.would_exceed_account_data_block_limit += 1;
-                    Some(index)
+                    Some(RetryableIndex {
+                        index,
+                        immediately_retryable: false,
+                    })
                 }
                 }
                 // following are non-retryable errors
                 // following are non-retryable errors
                 Err(TransactionError::TooManyAccountLocks) => {
                 Err(TransactionError::TooManyAccountLocks) => {
@@ -369,7 +400,12 @@ impl Consumer {
 
 
         if let Err(recorder_err) = record_transactions_result {
         if let Err(recorder_err) = record_transactions_result {
             retryable_transaction_indexes.extend(processing_results.iter().enumerate().filter_map(
             retryable_transaction_indexes.extend(processing_results.iter().enumerate().filter_map(
-                |(index, processing_result)| processing_result.was_processed().then_some(index),
+                |(index, processing_result)| {
+                    processing_result.was_processed().then_some(RetryableIndex {
+                        index,
+                        immediately_retryable: true, // recording errors are always immediately retryable
+                    })
+                },
             ));
             ));
 
 
             // retryable indexes are expected to be sorted - in this case the
             // retryable indexes are expected to be sorted - in this case the
@@ -754,7 +790,13 @@ mod tests {
                 processed_with_successful_result_count: 1,
                 processed_with_successful_result_count: 1,
             }
             }
         );
         );
-        assert_eq!(retryable_transaction_indexes, vec![0]);
+        assert_eq!(
+            retryable_transaction_indexes,
+            vec![RetryableIndex {
+                index: 0,
+                immediately_retryable: true
+            }]
+        );
         assert_matches!(
         assert_matches!(
             commit_transactions_result,
             commit_transactions_result,
             Err(PohRecorderError::MaxHeightReached)
             Err(PohRecorderError::MaxHeightReached)
@@ -1152,7 +1194,10 @@ mod tests {
             commit_transactions_result.get(1),
             commit_transactions_result.get(1),
             Some(CommitTransactionDetails::NotCommitted(_))
             Some(CommitTransactionDetails::NotCommitted(_))
         );
         );
-        assert_eq!(retryable_transaction_indexes, vec![1]);
+        assert_eq!(
+            retryable_transaction_indexes,
+            vec![RetryableIndex::new(1, true)]
+        );
 
 
         let expected_block_cost = {
         let expected_block_cost = {
             let (actual_programs_execution_cost, actual_loaded_accounts_data_size_cost) =
             let (actual_programs_execution_cost, actual_loaded_accounts_data_size_cost) =
@@ -1311,9 +1356,12 @@ mod tests {
 
 
         // with simd3, duplicate transactions are not retryable
         // with simd3, duplicate transactions are not retryable
         if relax_intrabatch_account_locks && use_duplicate_transaction {
         if relax_intrabatch_account_locks && use_duplicate_transaction {
-            assert_eq!(retryable_transaction_indexes, Vec::<usize>::new());
+            assert_eq!(retryable_transaction_indexes, Vec::<_>::new());
         } else {
         } else {
-            assert_eq!(retryable_transaction_indexes, vec![1]);
+            assert_eq!(
+                retryable_transaction_indexes,
+                vec![RetryableIndex::new(1, true)]
+            );
         }
         }
     }
     }
 
 
@@ -1369,7 +1417,9 @@ mod tests {
 
 
         assert_eq!(
         assert_eq!(
             execute_and_commit_transactions_output.retryable_transaction_indexes,
             execute_and_commit_transactions_output.retryable_transaction_indexes,
-            (1..transactions_len - 1).collect::<Vec<usize>>()
+            (1..transactions_len - 1)
+                .map(|index| RetryableIndex::new(index, true))
+                .collect::<Vec<_>>()
         );
         );
     }
     }
 
 
@@ -1455,12 +1505,14 @@ mod tests {
         if relax_intrabatch_account_locks {
         if relax_intrabatch_account_locks {
             assert_eq!(
             assert_eq!(
                 execute_and_commit_transactions_output.retryable_transaction_indexes,
                 execute_and_commit_transactions_output.retryable_transaction_indexes,
-                Vec::<usize>::new()
+                Vec::<_>::new()
             );
             );
         } else {
         } else {
             assert_eq!(
             assert_eq!(
                 execute_and_commit_transactions_output.retryable_transaction_indexes,
                 execute_and_commit_transactions_output.retryable_transaction_indexes,
-                (1..transactions_len).collect::<Vec<usize>>()
+                (1..transactions_len)
+                    .map(|index| RetryableIndex::new(index, true))
+                    .collect::<Vec<_>>()
             );
             );
         }
         }
     }
     }
@@ -1548,7 +1600,9 @@ mod tests {
         execute_and_commit_transactions_output
         execute_and_commit_transactions_output
             .retryable_transaction_indexes
             .retryable_transaction_indexes
             .sort_unstable();
             .sort_unstable();
-        let expected: Vec<usize> = (0..transactions.len()).collect();
+        let expected: Vec<_> = (0..transactions.len())
+            .map(|index| RetryableIndex::new(index, true))
+            .collect();
         assert_eq!(
         assert_eq!(
             execute_and_commit_transactions_output.retryable_transaction_indexes,
             execute_and_commit_transactions_output.retryable_transaction_indexes,
             expected
             expected

+ 2 - 1
core/src/banking_stage/scheduler_messages.rs

@@ -1,4 +1,5 @@
 use {
 use {
+    crate::banking_stage::consumer::RetryableIndex,
     solana_clock::{Epoch, Slot},
     solana_clock::{Epoch, Slot},
     std::fmt::Display,
     std::fmt::Display,
 };
 };
@@ -47,5 +48,5 @@ pub struct ConsumeWork<Tx> {
 /// Processed transactions.
 /// Processed transactions.
 pub struct FinishedConsumeWork<Tx> {
 pub struct FinishedConsumeWork<Tx> {
     pub work: ConsumeWork<Tx>,
     pub work: ConsumeWork<Tx>,
-    pub retryable_indexes: Vec<usize>,
+    pub retryable_indexes: Vec<RetryableIndex>,
 }
 }

+ 34 - 14
core/src/banking_stage/transaction_scheduler/scheduler_common.rs

@@ -242,9 +242,13 @@ impl<Tx: TransactionWithMeta> SchedulingCommon<Tx> {
                 // Assumption - retryable indexes are in order (sorted by workers).
                 // Assumption - retryable indexes are in order (sorted by workers).
                 let mut retryable_iter = retryable_indexes.iter().peekable();
                 let mut retryable_iter = retryable_indexes.iter().peekable();
                 for (index, (id, transaction)) in izip!(ids, transactions).enumerate() {
                 for (index, (id, transaction)) in izip!(ids, transactions).enumerate() {
-                    if let Some(&&retryable_index) = retryable_iter.peek() {
-                        if retryable_index == index {
-                            container.retry_transaction(id, transaction);
+                    if let Some(&retryable_index) = retryable_iter.peek() {
+                        if retryable_index.index == index {
+                            container.retry_transaction(
+                                id,
+                                transaction,
+                                retryable_index.immediately_retryable,
+                            );
                             retryable_iter.next();
                             retryable_iter.next();
                             continue;
                             continue;
                         }
                         }
@@ -254,7 +258,11 @@ impl<Tx: TransactionWithMeta> SchedulingCommon<Tx> {
 
 
                 debug_assert!(
                 debug_assert!(
                     retryable_iter.peek().is_none(),
                     retryable_iter.peek().is_none(),
-                    "retryable indexes were not in order: {retryable_indexes:?}"
+                    "retryable indexes were not in order: {:?}",
+                    retryable_indexes
+                        .iter()
+                        .map(|index| index.index)
+                        .collect::<Vec<_>>(),
                 );
                 );
 
 
                 Ok((num_transactions, num_retryable))
                 Ok((num_transactions, num_retryable))
@@ -290,11 +298,18 @@ impl<Tx: TransactionWithMeta> SchedulingCommon<Tx> {
 mod tests {
 mod tests {
     use {
     use {
         super::*,
         super::*,
-        crate::banking_stage::transaction_scheduler::transaction_state_container::TransactionStateContainer,
-        crossbeam_channel::unbounded, solana_hash::Hash, solana_keypair::Keypair,
-        solana_pubkey::Pubkey, solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
+        crate::banking_stage::{
+            consumer::RetryableIndex,
+            transaction_scheduler::transaction_state_container::TransactionStateContainer,
+        },
+        crossbeam_channel::unbounded,
+        solana_hash::Hash,
+        solana_keypair::Keypair,
+        solana_pubkey::Pubkey,
+        solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
         solana_system_transaction as system_transaction,
         solana_system_transaction as system_transaction,
-        solana_transaction::sanitized::SanitizedTransaction, test_case::test_case,
+        solana_transaction::sanitized::SanitizedTransaction,
+        test_case::test_case,
     };
     };
 
 
     const NUM_WORKERS: usize = 4;
     const NUM_WORKERS: usize = 4;
@@ -521,17 +536,22 @@ mod tests {
         let num_scheduled = common.send_batch(0).unwrap();
         let num_scheduled = common.send_batch(0).unwrap();
         let work = work_receivers[0].try_recv().unwrap();
         let work = work_receivers[0].try_recv().unwrap();
         assert_eq!(work.ids.len(), num_scheduled);
         assert_eq!(work.ids.len(), num_scheduled);
-        let retryable_indexes = vec![0, 1];
+        let retryable_indexes = vec![
+            RetryableIndex::new(0, true),
+            RetryableIndex::new(1, false), // should be held by container.
+        ];
+        let expected_num_retryable = retryable_indexes.len();
         let finished_work = FinishedConsumeWork {
         let finished_work = FinishedConsumeWork {
             work,
             work,
-            retryable_indexes: retryable_indexes.clone(),
+            retryable_indexes,
         };
         };
         finished_work_sender.send(finished_work).unwrap();
         finished_work_sender.send(finished_work).unwrap();
         let (num_transactions, num_retryable) =
         let (num_transactions, num_retryable) =
             common.try_receive_completed(&mut container).unwrap();
             common.try_receive_completed(&mut container).unwrap();
         assert_eq!(num_transactions, num_scheduled);
         assert_eq!(num_transactions, num_scheduled);
-        assert_eq!(num_retryable, retryable_indexes.len());
-        assert_eq!(container.buffer_size(), retryable_indexes.len());
+        assert_eq!(num_retryable, expected_num_retryable);
+        assert_eq!(container.buffer_size(), expected_num_retryable);
+        assert_eq!(container.queue_size(), expected_num_retryable - 1); // held transaction not in queue.
     }
     }
 
 
     #[test]
     #[test]
@@ -550,10 +570,10 @@ mod tests {
         let num_scheduled = common.send_batch(0).unwrap();
         let num_scheduled = common.send_batch(0).unwrap();
         let work = work_receivers[0].try_recv().unwrap();
         let work = work_receivers[0].try_recv().unwrap();
         assert_eq!(work.ids.len(), num_scheduled);
         assert_eq!(work.ids.len(), num_scheduled);
-        let retryable_indexes = vec![1, 0];
+        let retryable_indexes = vec![RetryableIndex::new(1, true), RetryableIndex::new(0, true)];
         let finished_work = FinishedConsumeWork {
         let finished_work = FinishedConsumeWork {
             work,
             work,
-            retryable_indexes: retryable_indexes.clone(),
+            retryable_indexes,
         };
         };
         finished_work_sender.send(finished_work).unwrap();
         finished_work_sender.send(finished_work).unwrap();
 
 

+ 7 - 2
core/src/banking_stage/transaction_scheduler/scheduler_controller.rs

@@ -87,6 +87,7 @@ where
     }
     }
 
 
     pub fn run(mut self) -> Result<(), SchedulerError> {
     pub fn run(mut self) -> Result<(), SchedulerError> {
+        let mut last_slot = None;
         while !self.exit.load(Ordering::Relaxed) {
         while !self.exit.load(Ordering::Relaxed) {
             // BufferedPacketsDecision is shared with legacy BankingStage, which will forward
             // BufferedPacketsDecision is shared with legacy BankingStage, which will forward
             // packets. Initially, not renaming these decision variants but the actions taken
             // packets. Initially, not renaming these decision variants but the actions taken
@@ -110,6 +111,10 @@ where
                 .maybe_report_and_reset_slot(new_leader_slot);
                 .maybe_report_and_reset_slot(new_leader_slot);
 
 
             self.receive_completed()?;
             self.receive_completed()?;
+            if last_slot != new_leader_slot {
+                self.container.flush_held_transactions();
+                last_slot = new_leader_slot;
+            }
             self.process_transactions(&decision)?;
             self.process_transactions(&decision)?;
             if self.receive_and_buffer_packets(&decision).is_err() {
             if self.receive_and_buffer_packets(&decision).is_err() {
                 break;
                 break;
@@ -353,7 +358,7 @@ mod tests {
     use {
     use {
         super::*,
         super::*,
         crate::banking_stage::{
         crate::banking_stage::{
-            consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
+            consumer::{RetryableIndex, TARGET_NUM_TRANSACTIONS_PER_BATCH},
             packet_deserializer::PacketDeserializer,
             packet_deserializer::PacketDeserializer,
             scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
             scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
             tests::create_slow_genesis_config,
             tests::create_slow_genesis_config,
@@ -874,7 +879,7 @@ mod tests {
         finished_consume_work_sender
         finished_consume_work_sender
             .send(FinishedConsumeWork {
             .send(FinishedConsumeWork {
                 work: consume_work,
                 work: consume_work,
-                retryable_indexes: vec![1],
+                retryable_indexes: vec![RetryableIndex::new(1, true)],
             })
             })
             .unwrap();
             .unwrap();
 
 

+ 39 - 2
core/src/banking_stage/transaction_scheduler/transaction_state_container.rs

@@ -44,6 +44,7 @@ pub(crate) struct TransactionStateContainer<Tx: TransactionWithMeta> {
     capacity: usize,
     capacity: usize,
     priority_queue: MinMaxHeap<TransactionPriorityId>,
     priority_queue: MinMaxHeap<TransactionPriorityId>,
     id_to_transaction_state: Slab<TransactionState<Tx>>,
     id_to_transaction_state: Slab<TransactionState<Tx>>,
+    held_transactions: Vec<TransactionPriorityId>,
 }
 }
 
 
 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
@@ -71,13 +72,23 @@ pub(crate) trait StateContainer<Tx: TransactionWithMeta> {
 
 
     /// Retries a transaction - inserts transaction back into map.
     /// Retries a transaction - inserts transaction back into map.
     /// This transitions the transaction to `Unprocessed` state.
     /// This transitions the transaction to `Unprocessed` state.
-    fn retry_transaction(&mut self, transaction_id: TransactionId, transaction: Tx) {
+    fn retry_transaction(
+        &mut self,
+        transaction_id: TransactionId,
+        transaction: Tx,
+        immediately_retryable: bool,
+    ) {
         let transaction_state = self
         let transaction_state = self
             .get_mut_transaction_state(transaction_id)
             .get_mut_transaction_state(transaction_id)
             .expect("transaction must exist");
             .expect("transaction must exist");
         let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id);
         let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id);
         transaction_state.retry_transaction(transaction);
         transaction_state.retry_transaction(transaction);
-        self.push_ids_into_queue(std::iter::once(priority_id));
+
+        if immediately_retryable {
+            self.push_ids_into_queue(std::iter::once(priority_id));
+        } else {
+            self.hold_transaction(priority_id);
+        }
     }
     }
 
 
     /// Pushes transaction ids into the priority queue. If the queue if full,
     /// Pushes transaction ids into the priority queue. If the queue if full,
@@ -91,9 +102,14 @@ pub(crate) trait StateContainer<Tx: TransactionWithMeta> {
         priority_ids: impl Iterator<Item = TransactionPriorityId>,
         priority_ids: impl Iterator<Item = TransactionPriorityId>,
     ) -> usize;
     ) -> usize;
 
 
+    /// Hold the tarnsaction until the next flush (next slot).
+    fn hold_transaction(&mut self, priority_id: TransactionPriorityId);
+
     /// Remove transaction by id.
     /// Remove transaction by id.
     fn remove_by_id(&mut self, id: TransactionId);
     fn remove_by_id(&mut self, id: TransactionId);
 
 
+    fn flush_held_transactions(&mut self);
+
     fn get_min_max_priority(&self) -> MinMaxResult<u64>;
     fn get_min_max_priority(&self) -> MinMaxResult<u64>;
 
 
     #[cfg(feature = "dev-context-only-utils")]
     #[cfg(feature = "dev-context-only-utils")]
@@ -110,6 +126,7 @@ impl<Tx: TransactionWithMeta> StateContainer<Tx> for TransactionStateContainer<T
             capacity,
             capacity,
             priority_queue: MinMaxHeap::with_capacity(capacity + EXTRA_CAPACITY),
             priority_queue: MinMaxHeap::with_capacity(capacity + EXTRA_CAPACITY),
             id_to_transaction_state: Slab::with_capacity(capacity + EXTRA_CAPACITY),
             id_to_transaction_state: Slab::with_capacity(capacity + EXTRA_CAPACITY),
+            held_transactions: Vec::with_capacity(capacity),
         }
         }
     }
     }
 
 
@@ -167,10 +184,20 @@ impl<Tx: TransactionWithMeta> StateContainer<Tx> for TransactionStateContainer<T
         num_dropped
         num_dropped
     }
     }
 
 
+    fn hold_transaction(&mut self, priority_id: TransactionPriorityId) {
+        self.held_transactions.push(priority_id);
+    }
+
     fn remove_by_id(&mut self, id: TransactionId) {
     fn remove_by_id(&mut self, id: TransactionId) {
         self.id_to_transaction_state.remove(id);
         self.id_to_transaction_state.remove(id);
     }
     }
 
 
+    fn flush_held_transactions(&mut self) {
+        let mut held_transactions = core::mem::take(&mut self.held_transactions);
+        self.push_ids_into_queue(held_transactions.drain(..));
+        core::mem::swap(&mut self.held_transactions, &mut held_transactions);
+    }
+
     fn get_min_max_priority(&self) -> MinMaxResult<u64> {
     fn get_min_max_priority(&self) -> MinMaxResult<u64> {
         match self.priority_queue.peek_min() {
         match self.priority_queue.peek_min() {
             Some(min) => match self.priority_queue.peek_max() {
             Some(min) => match self.priority_queue.peek_max() {
@@ -323,11 +350,21 @@ impl StateContainer<RuntimeTransactionView> for TransactionViewStateContainer {
         self.inner.push_ids_into_queue(priority_ids)
         self.inner.push_ids_into_queue(priority_ids)
     }
     }
 
 
+    #[inline]
+    fn hold_transaction(&mut self, priority_id: TransactionPriorityId) {
+        self.inner.hold_transaction(priority_id);
+    }
+
     #[inline]
     #[inline]
     fn remove_by_id(&mut self, id: TransactionId) {
     fn remove_by_id(&mut self, id: TransactionId) {
         self.inner.remove_by_id(id);
         self.inner.remove_by_id(id);
     }
     }
 
 
+    #[inline]
+    fn flush_held_transactions(&mut self) {
+        self.inner.flush_held_transactions();
+    }
+
     #[inline]
     #[inline]
     fn get_min_max_priority(&self) -> MinMaxResult<u64> {
     fn get_min_max_priority(&self) -> MinMaxResult<u64> {
         self.inner.get_min_max_priority()
         self.inner.get_min_max_priority()

+ 4 - 1
core/src/banking_stage/vote_worker.rs

@@ -429,7 +429,10 @@ impl VoteWorker {
         ProcessTransactionsSummary {
         ProcessTransactionsSummary {
             reached_max_poh_height,
             reached_max_poh_height,
             transaction_counts: total_transaction_counts,
             transaction_counts: total_transaction_counts,
-            retryable_transaction_indexes,
+            retryable_transaction_indexes: retryable_transaction_indexes
+                .into_iter()
+                .map(|retryable_index| retryable_index.index)
+                .collect(),
             cost_model_throttled_transactions_count,
             cost_model_throttled_transactions_count,
             cost_model_us,
             cost_model_us,
             execute_and_commit_timings,
             execute_and_commit_timings,