Ver código fonte

Revert "poh recording performance improvement (anza-xyz#7898)" (#8031)

Revert "poh recording performance improvement (#7898)"

This reverts commit 7616b2be4d6de5ded21cf02a19d8a8e0805e3d6f.
Andrew Fitzgerald 2 meses atrás
pai
commit
302ff5ecf0

+ 2 - 9
core/src/banking_simulation.rs

@@ -29,7 +29,6 @@ use {
         poh_controller::PohController,
         poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
         poh_service::{PohService, DEFAULT_HASHES_PER_BATCH, DEFAULT_PINNED_CPU_CORE},
-        record_channels::record_channels,
         transaction_recorder::TransactionRecorder,
     },
     solana_pubkey::Pubkey,
@@ -502,12 +501,6 @@ impl SimulatorLoop {
                     &mut self.poh_controller,
                     new_bank,
                 );
-                // Wait for the controller message to be processed.
-                // This loop assumes that
-                // `update_bank_forks_and_poh_recorder_for_new_tpu_bank`
-                // takes immediate effect in PohRecorder's working bank.
-                while self.poh_controller.has_pending_message() {}
-
                 (bank, bank_created) = (
                     self.bank_forks
                         .read()
@@ -748,8 +741,8 @@ impl BankingSimulator {
             exit.clone(),
         );
         let poh_recorder = Arc::new(RwLock::new(poh_recorder));
-        let (record_sender, record_receiver) = record_channels(false);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
+        let (record_sender, record_receiver) = unbounded();
+        let transaction_recorder = TransactionRecorder::new(record_sender, exit.clone());
         let (poh_controller, poh_service_message_receiver) = PohController::new();
         let poh_service = PohService::new(
             poh_recorder.clone(),

+ 64 - 15
core/src/banking_stage.rs

@@ -672,7 +672,7 @@ mod tests {
         super::*,
         crate::banking_trace::{BankingTracer, Channels},
         agave_banking_stage_ingress_types::BankingPacketBatch,
-        crossbeam_channel::unbounded,
+        crossbeam_channel::{unbounded, Receiver},
         itertools::Itertools,
         solana_entry::entry::{self, EntrySlice},
         solana_hash::Hash,
@@ -683,11 +683,12 @@ mod tests {
                 create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo,
             },
             get_tmp_ledger_path_auto_delete,
+            leader_schedule_cache::LeaderScheduleCache,
         },
         solana_perf::packet::to_packet_batches,
         solana_poh::{
-            poh_recorder::{create_test_recorder, PohRecorderError},
-            record_channels::record_channels,
+            poh_recorder::{create_test_recorder, PohRecorderError, Record},
+            poh_service::PohService,
             transaction_recorder::RecordTransactionsSummary,
         },
         solana_poh_config::PohConfig,
@@ -699,7 +700,11 @@ mod tests {
         solana_transaction::{sanitized::SanitizedTransaction, Transaction},
         solana_vote::vote_transaction::new_tower_sync_transaction,
         solana_vote_program::vote_state::TowerSync,
-        std::{sync::atomic::Ordering, thread::sleep, time::Instant},
+        std::{
+            sync::atomic::{AtomicBool, Ordering},
+            thread::sleep,
+            time::Instant,
+        },
         test_case::test_case,
     };
 
@@ -1093,11 +1098,31 @@ mod tests {
             ..
         } = create_genesis_config(10_000);
         let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let (poh_recorder, entry_receiver) = PohRecorder::new(
+            // TODO use record_receiver
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            None,
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
 
-        let (record_sender, mut record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
 
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
         let pubkey = solana_pubkey::new_rand();
         let keypair2 = Keypair::new();
         let pubkey2 = solana_pubkey::new_rand();
@@ -1107,13 +1132,9 @@ mod tests {
             system_transaction::transfer(&keypair2, &pubkey2, 1, genesis_config.hash()).into(),
         ];
 
-        let summary = recorder.record_transactions(bank.slot(), txs.clone());
-        assert!(summary.result.is_ok());
-        assert_eq!(
-            record_receiver.try_recv().unwrap().transaction_batches,
-            vec![txs.clone()]
-        );
-        assert!(record_receiver.try_recv().is_err());
+        let _ = recorder.record_transactions(bank.slot(), txs.clone());
+        let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
+        assert_eq!(entry.transactions, txs);
 
         // Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions),
         // record_transactions should throw MaxHeightReached
@@ -1121,7 +1142,14 @@ mod tests {
         let RecordTransactionsSummary { result, .. } = recorder.record_transactions(next_slot, txs);
         assert_matches!(result, Err(PohRecorderError::MaxHeightReached));
         // Should receive nothing from PohRecorder b/c record failed
-        assert!(record_receiver.try_recv().is_err());
+        assert!(entry_receiver.try_recv().is_err());
+
+        poh_recorder
+            .read()
+            .unwrap()
+            .is_exited
+            .store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
     }
 
     pub(crate) fn create_slow_genesis_config(lamports: u64) -> GenesisConfigInfo {
@@ -1144,6 +1172,27 @@ mod tests {
         config_info
     }
 
+    pub(crate) fn simulate_poh(
+        record_receiver: Receiver<Record>,
+        poh_recorder: &Arc<RwLock<PohRecorder>>,
+    ) -> JoinHandle<()> {
+        let poh_recorder = poh_recorder.clone();
+        let is_exited = poh_recorder.read().unwrap().is_exited.clone();
+        let tick_producer = Builder::new()
+            .name("solana-simulate_poh".to_string())
+            .spawn(move || loop {
+                PohService::read_record_receiver_and_process(
+                    &poh_recorder,
+                    &record_receiver,
+                    Duration::from_millis(10),
+                );
+                if is_exited.load(Ordering::Relaxed) {
+                    break;
+                }
+            });
+        tick_producer.unwrap()
+    }
+
     #[test_case(TransactionStructure::Sdk)]
     #[test_case(TransactionStructure::View)]
     fn test_vote_storage_full_send(transaction_struct: TransactionStructure) {

+ 64 - 35
core/src/banking_stage/consume_worker.rs

@@ -765,21 +765,25 @@ mod tests {
             committer::Committer,
             qos_service::QosService,
             scheduler_messages::{MaxAge, TransactionBatchId},
-            tests::{create_slow_genesis_config, sanitize_transactions},
+            tests::{create_slow_genesis_config, sanitize_transactions, simulate_poh},
         },
         crossbeam_channel::unbounded,
         solana_clock::{Slot, MAX_PROCESSING_AGE},
         solana_genesis_config::GenesisConfig,
         solana_keypair::Keypair,
-        solana_ledger::genesis_utils::GenesisConfigInfo,
+        solana_ledger::{
+            blockstore::Blockstore, genesis_utils::GenesisConfigInfo,
+            get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache,
+        },
         solana_message::{
             v0::{self, LoadedAddresses},
             AddressLookupTableAccount, SimpleAddressLoader, VersionedMessage,
         },
         solana_poh::{
-            record_channels::{record_channels, RecordReceiver},
+            poh_recorder::{PohRecorder, WorkingBankEntry},
             transaction_recorder::TransactionRecorder,
         },
+        solana_poh_config::PohConfig,
         solana_pubkey::Pubkey,
         solana_runtime::{
             bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
@@ -798,7 +802,9 @@ mod tests {
         std::{
             collections::HashSet,
             sync::{atomic::AtomicBool, RwLock},
+            thread::JoinHandle,
         },
+        tempfile::TempDir,
         test_case::test_case,
     };
 
@@ -809,9 +815,11 @@ mod tests {
         genesis_config: GenesisConfig,
         bank: Arc<Bank>,
         _bank_forks: Arc<RwLock<BankForks>>,
+        _ledger_path: TempDir,
+        _entry_receiver: Receiver<WorkingBankEntry>,
+        poh_recorder: Arc<RwLock<PohRecorder>>,
+        _poh_simulator: JoinHandle<()>,
         _replay_vote_receiver: ReplayVoteReceiver,
-        record_receiver: RecordReceiver,
-        shared_working_bank: SharedWorkingBank,
 
         consume_sender: Sender<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
         consumed_receiver: Receiver<FinishedConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
@@ -840,8 +848,24 @@ mod tests {
         }
         let bank = Arc::new(bank);
 
-        let (record_sender, record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let (poh_recorder, entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+        let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
 
         let (replay_vote_sender, replay_vote_receiver) = unbounded();
         let committer = Committer::new(
@@ -850,7 +874,6 @@ mod tests {
             Arc::new(PrioritizationFeeCache::new(0u64)),
         );
         let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
-        let shared_working_bank = SharedWorkingBank::empty();
 
         let (consume_sender, consume_receiver) = unbounded();
         let (consumed_sender, consumed_receiver) = unbounded();
@@ -860,7 +883,7 @@ mod tests {
             consume_receiver,
             consumer,
             consumed_sender,
-            shared_working_bank.clone(),
+            poh_recorder.read().unwrap().shared_working_bank(),
         );
 
         (
@@ -869,9 +892,11 @@ mod tests {
                 genesis_config,
                 bank,
                 _bank_forks: bank_forks,
+                _ledger_path: ledger_path,
+                _entry_receiver: entry_receiver,
+                poh_recorder,
+                _poh_simulator: poh_simulator,
                 _replay_vote_receiver: replay_vote_receiver,
-                record_receiver,
-                shared_working_bank,
                 consume_sender,
                 consumed_receiver,
             },
@@ -925,20 +950,21 @@ mod tests {
 
     #[test]
     fn test_worker_consume_simple() {
-        let (mut test_frame, worker) = setup_test_frame(true);
+        let (test_frame, worker) = setup_test_frame(true);
         let TestFrame {
             mint_keypair,
             genesis_config,
             bank,
-            ref mut record_receiver,
-            ref mut shared_working_bank,
+            poh_recorder,
             consume_sender,
             consumed_receiver,
             ..
-        } = &mut test_frame;
+        } = &test_frame;
         let worker_thread = std::thread::spawn(move || worker.run());
-        shared_working_bank.store(bank.clone());
-        record_receiver.restart(bank.slot());
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
 
         let pubkey1 = Pubkey::new_unique();
 
@@ -974,20 +1000,21 @@ mod tests {
     #[test_case(false; "old")]
     #[test_case(true; "simd83")]
     fn test_worker_consume_self_conflicting(relax_intrabatch_account_locks: bool) {
-        let (mut test_frame, worker) = setup_test_frame(relax_intrabatch_account_locks);
+        let (test_frame, worker) = setup_test_frame(relax_intrabatch_account_locks);
         let TestFrame {
             mint_keypair,
             genesis_config,
             bank,
-            ref mut record_receiver,
-            ref mut shared_working_bank,
+            poh_recorder,
             consume_sender,
             consumed_receiver,
             ..
-        } = &mut test_frame;
+        } = &test_frame;
         let worker_thread = std::thread::spawn(move || worker.run());
-        shared_working_bank.store(bank.clone());
-        record_receiver.restart(bank.slot());
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
 
         let pubkey1 = Pubkey::new_unique();
         let pubkey2 = Pubkey::new_unique();
@@ -1034,20 +1061,21 @@ mod tests {
 
     #[test]
     fn test_worker_consume_multiple_messages() {
-        let (mut test_frame, worker) = setup_test_frame(true);
+        let (test_frame, worker) = setup_test_frame(true);
         let TestFrame {
             mint_keypair,
             genesis_config,
             bank,
-            ref mut record_receiver,
-            ref mut shared_working_bank,
+            poh_recorder,
             consume_sender,
             consumed_receiver,
             ..
-        } = &mut test_frame;
+        } = &test_frame;
         let worker_thread = std::thread::spawn(move || worker.run());
-        shared_working_bank.store(bank.clone());
-        record_receiver.restart(bank.slot());
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
 
         let pubkey1 = Pubkey::new_unique();
         let pubkey2 = Pubkey::new_unique();
@@ -1108,20 +1136,21 @@ mod tests {
 
     #[test]
     fn test_worker_ttl() {
-        let (mut test_frame, worker) = setup_test_frame(true);
+        let (test_frame, worker) = setup_test_frame(true);
         let TestFrame {
             mint_keypair,
             genesis_config,
             bank,
-            ref mut record_receiver,
-            ref mut shared_working_bank,
+            poh_recorder,
             consume_sender,
             consumed_receiver,
             ..
-        } = &mut test_frame;
+        } = &test_frame;
         let worker_thread = std::thread::spawn(move || worker.run());
-        shared_working_bank.store(bank.clone());
-        record_receiver.restart(bank.slot());
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
         assert!(bank.slot() > 0);
         assert!(bank.epoch() > 0);
 

+ 353 - 45
core/src/banking_stage/consumer.rs

@@ -483,9 +483,11 @@ impl Consumer {
 mod tests {
     use {
         super::*,
-        crate::banking_stage::tests::{create_slow_genesis_config, sanitize_transactions},
+        crate::banking_stage::tests::{
+            create_slow_genesis_config, sanitize_transactions, simulate_poh,
+        },
         agave_reserved_account_keys::ReservedAccountKeys,
-        crossbeam_channel::unbounded,
+        crossbeam_channel::{unbounded, Receiver},
         solana_account::{state_traits::StateMut, AccountSharedData},
         solana_address_lookup_table_interface::{
             self as address_lookup_table,
@@ -505,6 +507,7 @@ mod tests {
                 GenesisConfigInfo,
             },
             get_tmp_ledger_path_auto_delete,
+            leader_schedule_cache::LeaderScheduleCache,
         },
         solana_message::{
             v0::{self, MessageAddressTableLookup},
@@ -512,7 +515,8 @@ mod tests {
         },
         solana_nonce::{self as nonce, state::DurableNonce},
         solana_nonce_account::verify_nonce_account,
-        solana_poh::record_channels::record_channels,
+        solana_poh::poh_recorder::{PohRecorder, Record},
+        solana_poh_config::PohConfig,
         solana_pubkey::Pubkey,
         solana_rpc::transaction_status_service::TransactionStatusService,
         solana_runtime::prioritization_fee_cache::PrioritizationFeeCache,
@@ -527,22 +531,44 @@ mod tests {
         std::{
             borrow::Cow,
             sync::{
-                atomic::{AtomicBool, AtomicU64},
-                Arc,
+                atomic::{AtomicBool, AtomicU64, Ordering},
+                Arc, RwLock,
             },
+            thread::{Builder, JoinHandle},
+            time::Duration,
         },
         test_case::test_case,
     };
 
-    fn execute_transactions_for_test(
+    fn execute_transactions_with_dummy_poh_service(
         bank: Arc<Bank>,
         transactions: Vec<Transaction>,
     ) -> ProcessTransactionBatchOutput {
         let transactions = sanitize_transactions(transactions);
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let (poh_recorder, _entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
 
-        let (record_sender, mut record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
 
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
         let committer = Committer::new(
@@ -551,7 +577,17 @@ mod tests {
             Arc::new(PrioritizationFeeCache::new(0u64)),
         );
         let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
-        consumer.process_and_record_transactions(&bank, &transactions)
+        let process_transactions_summary =
+            consumer.process_and_record_transactions(&bank, &transactions);
+
+        poh_recorder
+            .read()
+            .unwrap()
+            .is_exited
+            .store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
+
+        process_transactions_summary
     }
 
     fn generate_new_address_lookup_table(
@@ -620,10 +656,30 @@ mod tests {
             genesis_config.hash(),
         )]);
 
-        let (record_sender, mut record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let (poh_recorder, entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+
+        let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
 
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
         let committer = Committer::new(
             None,
@@ -651,14 +707,27 @@ mod tests {
         );
         assert!(commit_transactions_result.is_ok());
 
-        // When poh is near end of slot, it will be shutdown.
-        record_receiver.shutdown();
+        // Tick up to max tick height
+        while poh_recorder.read().unwrap().tick_height() != bank.max_tick_height() {
+            poh_recorder.write().unwrap().tick();
+        }
 
-        let record = record_receiver.drain().next().unwrap();
-        assert_eq!(record.slot, bank.slot());
-        assert_eq!(record.transaction_batches.len(), 1);
-        let transaction_batch = record.transaction_batches[0].clone();
-        assert_eq!(transaction_batch.len(), 1);
+        let mut done = false;
+        // read entries until I find mine, might be ticks...
+        while let Ok((_bank, (entry, _tick_height))) = entry_receiver.recv() {
+            if !entry.is_tick() {
+                trace!("got entry");
+                assert_eq!(entry.transactions.len(), transactions.len());
+                assert_eq!(bank.get_balance(&pubkey), 1);
+                done = true;
+            }
+            if done {
+                break;
+            }
+        }
+        trace!("done ticking");
+
+        assert!(done);
 
         let transactions = sanitize_transactions(vec![system_transaction::transfer(
             &mint_keypair,
@@ -691,6 +760,13 @@ mod tests {
             Err(PohRecorderError::MaxHeightReached)
         );
 
+        poh_recorder
+            .read()
+            .unwrap()
+            .is_exited
+            .store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
+
         assert_eq!(bank.get_balance(&pubkey), 1);
     }
 
@@ -732,13 +808,72 @@ mod tests {
             nonce_hash,
         )]);
 
-        let (record_sender, mut record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let (poh_recorder, entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::new(false)),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+
+        fn poh_tick_before_returning_record_response(
+            record_receiver: Receiver<Record>,
+            poh_recorder: Arc<RwLock<PohRecorder>>,
+        ) -> JoinHandle<()> {
+            let is_exited = poh_recorder.read().unwrap().is_exited.clone();
+            let tick_producer = Builder::new()
+                .name("solana-simulate_poh".to_string())
+                .spawn(move || loop {
+                    let timeout = Duration::from_millis(10);
+                    let record = record_receiver.recv_timeout(timeout);
+                    if let Ok(record) = record {
+                        let record_response = poh_recorder.write().unwrap().record(
+                            record.slot,
+                            record.mixins,
+                            record.transaction_batches,
+                        );
+                        poh_recorder.write().unwrap().tick();
+                        if record
+                            .sender
+                            .send(record_response.map(|r| r.starting_transaction_index))
+                            .is_err()
+                        {
+                            panic!("Error returning mixin hash");
+                        }
+                    }
+                    if is_exited.load(Ordering::Relaxed) {
+                        break;
+                    }
+                });
+            tick_producer.unwrap()
+        }
 
-        record_receiver.restart(bank.slot());
+        // Simulate a race condition by setting up poh to do the last tick
+        // right before returning the transaction record response so that
+        // bank blockhash queue is updated before transactions are
+        // committed.
+        let poh_simulator =
+            poh_tick_before_returning_record_response(record_receiver, poh_recorder.clone());
 
-        while bank.tick_height() != bank.max_tick_height() - 1 {
-            bank.register_default_tick_for_test();
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
+
+        // Tick up to max tick height - 1 so that only one tick remains
+        // before recording transactions to poh
+        while poh_recorder.read().unwrap().tick_height() != bank.max_tick_height() - 1 {
+            poh_recorder.write().unwrap().tick();
         }
 
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
@@ -766,7 +901,30 @@ mod tests {
             }
         );
         assert!(commit_transactions_result.is_ok());
-        bank.register_default_tick_for_test();
+
+        // Ensure that poh did the last tick after recording transactions
+        assert_eq!(
+            poh_recorder.read().unwrap().tick_height(),
+            bank.max_tick_height()
+        );
+
+        let mut done = false;
+        // read entries until I find mine, might be ticks...
+        while let Ok((_bank, (entry, _tick_height))) = entry_receiver.recv() {
+            if !entry.is_tick() {
+                assert_eq!(entry.transactions.len(), transactions.len());
+                done = true;
+                break;
+            }
+        }
+        assert!(done);
+
+        poh_recorder
+            .read()
+            .unwrap()
+            .is_exited
+            .store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
 
         // check that the nonce was advanced to the current bank's last blockhash
         // rather than the current bank's blockhash as would occur had the update
@@ -796,10 +954,30 @@ mod tests {
             sanitize_transactions(vec![tx])
         };
 
-        let (record_sender, mut record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let (poh_recorder, _entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+
+        let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
 
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
         let committer = Committer::new(
             None,
@@ -836,6 +1014,13 @@ mod tests {
                 1
             ])
         );
+
+        poh_recorder
+            .read()
+            .unwrap()
+            .is_exited
+            .store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
     }
 
     #[test_case(false; "old")]
@@ -857,10 +1042,30 @@ mod tests {
         let (bank, _bank_forks) = bank.wrap_with_bank_forks_for_tests();
         let pubkey = solana_pubkey::new_rand();
 
-        let (record_sender, mut record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let (poh_recorder, _entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+
+        let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
 
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
         let committer = Committer::new(
             None,
@@ -980,6 +1185,13 @@ mod tests {
 
         assert_eq!(get_block_cost(), expected_block_cost);
         assert_eq!(get_tx_count(), 2);
+
+        poh_recorder
+            .read()
+            .unwrap()
+            .is_exited
+            .store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
     }
 
     #[test_case(false, false; "old::locked")]
@@ -1023,9 +1235,30 @@ mod tests {
             use_duplicate_transaction
         );
 
-        let (record_sender, mut record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let (poh_recorder, _entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
+
+        let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
 
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
         let committer = Committer::new(
@@ -1052,6 +1285,13 @@ mod tests {
         let process_transactions_batch_output =
             consumer.process_and_record_transactions(&bank, &transactions);
 
+        poh_recorder
+            .read()
+            .unwrap()
+            .is_exited
+            .store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
+
         let ExecuteAndCommitTransactionsOutput {
             transaction_counts,
             retryable_transaction_indexes,
@@ -1105,7 +1345,7 @@ mod tests {
         let ProcessTransactionBatchOutput {
             execute_and_commit_transactions_output,
             ..
-        } = execute_transactions_for_test(bank, transactions);
+        } = execute_transactions_with_dummy_poh_service(bank, transactions);
 
         // All the transactions should have been replayed
         assert_eq!(
@@ -1180,7 +1420,7 @@ mod tests {
         let ProcessTransactionBatchOutput {
             execute_and_commit_transactions_output,
             ..
-        } = execute_transactions_for_test(bank, transactions);
+        } = execute_transactions_with_dummy_poh_service(bank, transactions);
 
         // If SIMD-83 is enabled *and* the transactions are distinct, all are executed.
         // In the three other cases, only one is executed. In all four cases, all are attempted.
@@ -1244,10 +1484,27 @@ mod tests {
             genesis_config.hash(),
         )]);
 
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let (poh_recorder, _entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+
         // Poh Recorder has no working bank, so should throw MaxHeightReached error on
         // record
-        let (record_sender, _record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+
+        let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder)));
 
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
         let committer = Committer::new(
@@ -1296,6 +1553,9 @@ mod tests {
             execute_and_commit_transactions_output.retryable_transaction_indexes,
             expected
         );
+
+        recorder.is_exited.store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
     }
 
     #[test]
@@ -1341,9 +1601,27 @@ mod tests {
         let blockstore = Blockstore::open(ledger_path.path())
             .expect("Expected to be able to open database ledger");
         let blockstore = Arc::new(blockstore);
-        let (record_sender, mut record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let (poh_recorder, _entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            blockstore.clone(),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+
+        let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
+
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
 
         let shreds = entries_to_test_shreds(
             &entries,
@@ -1404,6 +1682,13 @@ mod tests {
             ),
         ];
         assert_eq!(actual_tx_results, expected_tx_results);
+
+        poh_recorder
+            .read()
+            .unwrap()
+            .is_exited
+            .store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
     }
 
     #[test]
@@ -1462,10 +1747,27 @@ mod tests {
         let blockstore = Blockstore::open(ledger_path.path())
             .expect("Expected to be able to open database ledger");
         let blockstore = Arc::new(blockstore);
+        let (poh_recorder, _entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            bank.last_blockhash(),
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            blockstore.clone(),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        let (record_sender, record_receiver) = unbounded();
+        let recorder = TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
 
-        let (record_sender, mut record_receiver) = record_channels(false);
-        let recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
+
+        poh_recorder
+            .write()
+            .unwrap()
+            .set_bank_for_test(bank.clone());
 
         let shreds = entries_to_test_shreds(
             &entries,
@@ -1547,5 +1849,11 @@ mod tests {
                 ..TransactionStatusMeta::default()
             }
         );
+        poh_recorder
+            .read()
+            .unwrap()
+            .is_exited
+            .store(true, Ordering::Relaxed);
+        let _ = poh_simulator.join();
     }
 }

+ 7 - 4
core/src/validator.rs

@@ -86,7 +86,6 @@ use {
         poh_controller::PohController,
         poh_recorder::PohRecorder,
         poh_service::{self, PohService},
-        record_channels::record_channels,
         transaction_recorder::TransactionRecorder,
     },
     solana_pubkey::Pubkey,
@@ -941,7 +940,7 @@ impl Validator {
         let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
 
         let leader_schedule_cache = Arc::new(leader_schedule_cache);
-        let (poh_recorder, entry_receiver) = {
+        let (mut poh_recorder, entry_receiver) = {
             let bank = &bank_forks.read().unwrap().working_bank();
             PohRecorder::new_with_clear_signal(
                 bank.tick_height(),
@@ -957,8 +956,12 @@ impl Validator {
                 exit.clone(),
             )
         };
-        let (record_sender, record_receiver) = record_channels(transaction_status_sender.is_some());
-        let transaction_recorder = TransactionRecorder::new(record_sender);
+        if transaction_status_sender.is_some() {
+            poh_recorder.track_transaction_indexes();
+        }
+        let (record_sender, record_receiver) = unbounded();
+        let transaction_recorder =
+            TransactionRecorder::new(record_sender, poh_recorder.is_exited.clone());
         let poh_recorder = Arc::new(RwLock::new(poh_recorder));
         let (poh_controller, poh_service_message_receiver) = PohController::new();
 

+ 5 - 1
poh/benches/poh.rs

@@ -82,7 +82,7 @@ fn bench_poh_lock_time_per_batch(bencher: &mut Bencher) {
 }
 
 #[bench]
-fn bench_poh_recorder_record(bencher: &mut Bencher) {
+fn bench_poh_recorder_record_transaction_index(bencher: &mut Bencher) {
     let ledger_path = get_tmp_ledger_path_auto_delete!();
     let blockstore =
         Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger");
@@ -101,6 +101,7 @@ fn bench_poh_recorder_record(bencher: &mut Bencher) {
         &PohConfig::default(),
         Arc::new(AtomicBool::default()),
     );
+    poh_recorder.track_transaction_indexes();
     let h1 = hash(b"hello Agave, hello Anza!");
 
     poh_recorder.set_bank_for_test(bank.clone());
@@ -123,6 +124,8 @@ fn bench_poh_recorder_record(bencher: &mut Bencher) {
                 vec![test::black_box(h1)],
                 vec![test::black_box(txs.clone())],
             )
+            .unwrap()
+            .starting_transaction_index
             .unwrap();
     });
     poh_recorder.tick();
@@ -148,6 +151,7 @@ fn bench_poh_recorder_set_bank(bencher: &mut Bencher) {
         &PohConfig::default(),
         Arc::new(AtomicBool::default()),
     );
+    poh_recorder.track_transaction_indexes();
     bencher.iter(|| {
         poh_recorder.set_bank_for_test(bank.clone());
         poh_recorder.tick();

+ 13 - 18
poh/benches/transaction_recorder.rs

@@ -10,12 +10,11 @@ use {
         poh_controller::PohController,
         poh_recorder::PohRecorder,
         poh_service::{PohService, DEFAULT_HASHES_PER_BATCH, DEFAULT_PINNED_CPU_CORE},
-        record_channels::record_channels,
         transaction_recorder::TransactionRecorder,
     },
     solana_poh_config::PohConfig,
     solana_pubkey::Pubkey,
-    solana_runtime::{bank::Bank, installed_scheduler_pool::BankWithScheduler},
+    solana_runtime::bank::Bank,
     solana_transaction::versioned::VersionedTransaction,
     std::{
         sync::{atomic::AtomicBool, Arc, RwLock},
@@ -48,7 +47,7 @@ fn bench_record_transactions(c: &mut Criterion) {
     let blockstore = Arc::new(
         Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"),
     );
-    let (poh_recorder, _entry_receiver) = PohRecorder::new(
+    let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
         bank.tick_height(),
         bank.last_blockhash(),
         bank.clone(),
@@ -59,9 +58,10 @@ fn bench_record_transactions(c: &mut Criterion) {
         &genesis_config_info.genesis_config.poh_config,
         exit.clone(),
     );
+    poh_recorder.set_bank_for_test(bank.clone());
 
-    let (record_sender, record_receiver) = record_channels(false);
-    let transaction_recorder = TransactionRecorder::new(record_sender);
+    let (record_sender, record_receiver) = crossbeam_channel::unbounded();
+    let transaction_recorder = TransactionRecorder::new(record_sender, exit.clone());
 
     let txs: Vec<_> = (0..NUM_TRANSACTIONS)
         .map(|_| {
@@ -74,8 +74,8 @@ fn bench_record_transactions(c: &mut Criterion) {
         })
         .collect();
 
-    let (mut poh_controller, poh_service_receiver) = PohController::new();
     let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+    let (_poh_controller, poh_service_message_receiver) = PohController::new();
     let poh_service = PohService::new(
         poh_recorder.clone(),
         &genesis_config_info.genesis_config.poh_config,
@@ -84,11 +84,8 @@ fn bench_record_transactions(c: &mut Criterion) {
         DEFAULT_PINNED_CPU_CORE,
         DEFAULT_HASHES_PER_BATCH,
         record_receiver,
-        poh_service_receiver,
+        poh_service_message_receiver,
     );
-    poh_controller
-        .set_bank_sync(BankWithScheduler::new_without_scheduler(bank.clone()))
-        .unwrap();
 
     let mut group = c.benchmark_group("record_transactions");
     group.throughput(criterion::Throughput::Elements(
@@ -100,18 +97,16 @@ fn bench_record_transactions(c: &mut Criterion) {
 
             for _ in 0..iters {
                 let tx_batches: Vec<_> = (0..NUM_BATCHES).map(|_| txs.clone()).collect();
-                let next_slot = bank.slot().wrapping_add(1);
-                poh_controller
-                    .reset_sync(bank.clone(), Some((next_slot, next_slot)))
-                    .unwrap();
+                poh_recorder.write().unwrap().clear_bank_for_test();
                 bank = Arc::new(Bank::new_from_parent(
                     bank.clone(),
                     &Pubkey::default(),
-                    next_slot,
+                    bank.slot().wrapping_add(1),
                 ));
-                poh_controller
-                    .set_bank_sync(BankWithScheduler::new_without_scheduler(bank.clone()))
-                    .unwrap();
+                poh_recorder
+                    .write()
+                    .unwrap()
+                    .set_bank_for_test(bank.clone());
 
                 let start = Instant::now();
                 for txs in tx_batches {

+ 0 - 1
poh/src/lib.rs

@@ -2,7 +2,6 @@
 pub mod poh_controller;
 pub mod poh_recorder;
 pub mod poh_service;
-pub mod record_channels;
 pub mod transaction_recorder;
 
 #[macro_use]

+ 108 - 16
poh/src/poh_recorder.rs

@@ -14,7 +14,7 @@
 use qualifier_attr::qualifiers;
 use {
     crate::{
-        poh_controller::PohController, poh_service::PohService, record_channels::record_channels,
+        poh_controller::PohController, poh_service::PohService,
         transaction_recorder::TransactionRecorder,
     },
     arc_swap::ArcSwapOption,
@@ -56,39 +56,40 @@ pub enum PohRecorderError {
 
     #[error("send WorkingBankEntry error")]
     SendError(#[from] SendError<WorkingBankEntry>),
-
-    #[error("channel full")]
-    ChannelFull,
-
-    #[error("channel disconnected")]
-    ChannelDisconnected,
 }
 
 pub(crate) type Result<T> = std::result::Result<T, PohRecorderError>;
 
 pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
 
+// Sends the Result of the record operation, including the index in the slot of the first
+// transaction, if being tracked by WorkingBank
+type RecordResultSender = Sender<Result<Option<usize>>>;
+
 #[derive(Debug)]
 pub struct RecordSummary {
     pub remaining_hashes_in_slot: u64,
+    pub starting_transaction_index: Option<usize>,
 }
 
 pub struct Record {
     pub mixins: Vec<Hash>,
     pub transaction_batches: Vec<Vec<VersionedTransaction>>,
     pub slot: Slot,
+    pub sender: RecordResultSender,
 }
-
 impl Record {
     pub fn new(
         mixins: Vec<Hash>,
         transaction_batches: Vec<Vec<VersionedTransaction>>,
         slot: Slot,
+        sender: RecordResultSender,
     ) -> Self {
         Self {
             mixins,
             transaction_batches,
             slot,
+            sender,
         }
     }
 }
@@ -98,6 +99,7 @@ pub struct WorkingBank {
     pub start: Arc<Instant>,
     pub min_tick_height: u64,
     pub max_tick_height: u64,
+    pub transaction_index: Option<usize>,
 }
 
 #[derive(Debug, PartialEq, Eq)]
@@ -193,6 +195,7 @@ pub struct PohRecorder {
 
     // Allocation to hold PohEntrys recorded into PoHStream.
     entries: Vec<PohEntry>,
+    track_transaction_indexes: bool,
 
     // Alpenglow related migration things
     pub is_alpenglow_enabled: bool,
@@ -283,12 +286,17 @@ impl PohRecorder {
                 last_reported_slot_for_pending_fork: Arc::default(),
                 is_exited,
                 entries: Vec::with_capacity(64),
+                track_transaction_indexes: false,
                 is_alpenglow_enabled: false,
             },
             working_bank_receiver,
         )
     }
 
+    pub fn track_transaction_indexes(&mut self) {
+        self.track_transaction_indexes = true;
+    }
+
     // synchronize PoH with a bank
     pub(crate) fn reset(&mut self, reset_bank: Arc<Bank>, next_leader_slot: Option<(Slot, Slot)>) {
         self.clear_bank();
@@ -350,6 +358,10 @@ impl PohRecorder {
             drop(poh_lock);
 
             if mixed_in {
+                let num_transactions = transaction_batches
+                    .iter()
+                    .map(|batch| batch.len())
+                    .sum::<usize>();
                 debug_assert_eq!(self.entries.len(), mixins.len());
                 for (entry, transactions) in self.entries.drain(..).zip(transaction_batches) {
                     let (send_entry_res, send_batches_us) =
@@ -368,8 +380,15 @@ impl PohRecorder {
                     send_entry_res?;
                 }
 
+                let starting_transaction_index =
+                    working_bank.transaction_index.inspect(|transaction_index| {
+                        let next_starting_transaction_index =
+                            transaction_index.saturating_add(num_transactions);
+                        working_bank.transaction_index = Some(next_starting_transaction_index);
+                    });
                 return Ok(RecordSummary {
                     remaining_hashes_in_slot,
+                    starting_transaction_index,
                 });
             }
 
@@ -434,6 +453,7 @@ impl PohRecorder {
             max_tick_height: bank.max_tick_height(),
             bank,
             start: Arc::new(Instant::now()),
+            transaction_index: self.track_transaction_indexes.then_some(0),
         };
         trace!("new working bank");
         assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot());
@@ -910,7 +930,7 @@ fn do_create_test_recorder(
     };
     let exit = Arc::new(AtomicBool::new(false));
     let poh_config = poh_config.unwrap_or_default();
-    let (poh_recorder, entry_receiver) = PohRecorder::new(
+    let (mut poh_recorder, entry_receiver) = PohRecorder::new(
         bank.tick_height(),
         bank.last_blockhash(),
         bank.clone(),
@@ -921,12 +941,17 @@ fn do_create_test_recorder(
         &poh_config,
         exit.clone(),
     );
+    if track_transaction_indexes {
+        poh_recorder.track_transaction_indexes();
+    }
     let ticks_per_slot = bank.ticks_per_slot();
 
-    let (record_sender, record_receiver) = record_channels(track_transaction_indexes);
-    let transaction_recorder = TransactionRecorder::new(record_sender);
+    poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank));
+
+    let (record_sender, record_receiver) = unbounded();
+    let transaction_recorder = TransactionRecorder::new(record_sender, exit.clone());
     let poh_recorder = Arc::new(RwLock::new(poh_recorder));
-    let (mut poh_controller, poh_service_message_receiver) = PohController::new();
+    let (poh_controller, poh_service_message_receiver) = PohController::new();
     let poh_service = PohService::new(
         poh_recorder.clone(),
         &poh_config,
@@ -938,10 +963,6 @@ fn do_create_test_recorder(
         poh_service_message_receiver,
     );
 
-    poh_controller
-        .set_bank_sync(BankWithScheduler::new_without_scheduler(bank))
-        .unwrap();
-
     (
         exit,
         poh_recorder,
@@ -1471,6 +1492,77 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_poh_recorder_record_transaction_index() {
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
+        let bank = Arc::new(Bank::new_for_tests(&genesis_config));
+        let prev_hash = bank.last_blockhash();
+        let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
+            0,
+            prev_hash,
+            bank.clone(),
+            Some((4, 4)),
+            bank.ticks_per_slot(),
+            Arc::new(blockstore),
+            &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
+            &PohConfig::default(),
+            Arc::new(AtomicBool::default()),
+        );
+        poh_recorder.track_transaction_indexes();
+
+        poh_recorder.set_bank_for_test(bank.clone());
+        poh_recorder.tick();
+        assert_eq!(
+            poh_recorder
+                .working_bank
+                .as_ref()
+                .unwrap()
+                .transaction_index
+                .unwrap(),
+            0
+        );
+
+        let tx0 = test_tx();
+        let tx1 = test_tx();
+        let h1 = hash(b"hello world!");
+        let record_result = poh_recorder
+            .record(bank.slot(), vec![h1], vec![vec![tx0.into(), tx1.into()]])
+            .unwrap()
+            .starting_transaction_index
+            .unwrap();
+        assert_eq!(record_result, 0);
+        assert_eq!(
+            poh_recorder
+                .working_bank
+                .as_ref()
+                .unwrap()
+                .transaction_index
+                .unwrap(),
+            2
+        );
+
+        let tx = test_tx();
+        let h2 = hash(b"foobar");
+        let starting_transaction_index = poh_recorder
+            .record(bank.slot(), vec![h2], vec![vec![tx.into()]])
+            .unwrap()
+            .starting_transaction_index
+            .unwrap();
+        assert_eq!(starting_transaction_index, 2);
+        assert_eq!(
+            poh_recorder
+                .working_bank
+                .as_ref()
+                .unwrap()
+                .transaction_index
+                .unwrap(),
+            3
+        );
+    }
+
     #[test]
     fn test_poh_cache_on_disconnect() {
         let ledger_path = get_tmp_ledger_path_auto_delete!();

+ 94 - 249
poh/src/poh_service.rs

@@ -4,12 +4,12 @@ use {
     crate::{
         poh_controller::{PohServiceMessage, PohServiceMessageGuard, PohServiceMessageReceiver},
         poh_recorder::{PohRecorder, Record},
-        record_channels::RecordReceiver,
     },
+    crossbeam_channel::Receiver,
     log::*,
     solana_clock::DEFAULT_HASHES_PER_SECOND,
     solana_entry::poh::Poh,
-    solana_measure::measure::Measure,
+    solana_measure::{measure::Measure, measure_us},
     solana_poh_config::PohConfig,
     std::{
         sync::{
@@ -49,6 +49,7 @@ struct PohTiming {
     total_tick_time_ns: u64,
     last_metric: Instant,
     total_record_time_us: u64,
+    total_send_record_result_us: u64,
 }
 
 impl PohTiming {
@@ -62,6 +63,7 @@ impl PohTiming {
             total_tick_time_ns: 0,
             last_metric: Instant::now(),
             total_record_time_us: 0,
+            total_send_record_result_us: 0,
         }
     }
     fn report(&mut self, ticks_per_slot: u64) {
@@ -78,6 +80,11 @@ impl PohTiming {
                 ("total_lock_time_us", self.total_lock_time_ns / 1000, i64),
                 ("total_hash_time_us", self.total_hash_time_ns / 1000, i64),
                 ("total_record_time_us", self.total_record_time_us, i64),
+                (
+                    "total_send_record_result_us",
+                    self.total_send_record_result_us,
+                    i64
+                ),
             );
             self.total_sleep_us = 0;
             self.num_ticks = 0;
@@ -87,6 +94,7 @@ impl PohTiming {
             self.total_hash_time_ns = 0;
             self.last_metric = Instant::now();
             self.total_record_time_us = 0;
+            self.total_send_record_result_us = 0;
         }
     }
 }
@@ -99,7 +107,7 @@ impl PohService {
         ticks_per_slot: u64,
         pinned_cpu_core: usize,
         hashes_per_batch: u64,
-        record_receiver: RecordReceiver,
+        record_receiver: Receiver<Record>,
         poh_service_receiver: PohServiceMessageReceiver,
     ) -> Self {
         let poh_config = poh_config.clone();
@@ -114,7 +122,6 @@ impl PohService {
                             &poh_exit,
                             record_receiver,
                             poh_service_receiver,
-                            ticks_per_slot,
                         );
                     } else {
                         Self::short_lived_low_power_tick_producer(
@@ -123,7 +130,6 @@ impl PohService {
                             &poh_exit,
                             record_receiver,
                             poh_service_receiver,
-                            ticks_per_slot,
                         );
                     }
                 } else {
@@ -168,94 +174,51 @@ impl PohService {
         poh_recorder: Arc<RwLock<PohRecorder>>,
         poh_config: &PohConfig,
         poh_exit: &AtomicBool,
-        mut record_receiver: RecordReceiver,
+        record_receiver: Receiver<Record>,
         poh_service_receiver: PohServiceMessageReceiver,
-        ticks_per_slot: u64,
     ) {
-        let poh = poh_recorder.read().unwrap().poh.clone();
         let mut last_tick = Instant::now();
         while !poh_exit.load(Ordering::Relaxed) {
-            let service_message =
-                Self::check_for_service_message(&poh_service_receiver, &mut record_receiver);
-            loop {
-                let remaining_tick_time = poh_config
-                    .target_tick_duration
-                    .saturating_sub(last_tick.elapsed());
-                Self::read_record_receiver_and_process(
-                    &poh_recorder,
-                    &mut record_receiver,
-                    remaining_tick_time,
-                    ticks_per_slot,
-                );
-                if remaining_tick_time.is_zero() {
-                    last_tick = Instant::now();
-                    poh_recorder.write().unwrap().tick();
-
-                    let last_tick = {
-                        let poh_recorder = poh_recorder.read().unwrap();
-                        poh_recorder
-                            .bank()
-                            .map(|bank| {
-                                bank.max_tick_height().wrapping_sub(1) == poh_recorder.tick_height()
-                            })
-                            .unwrap_or(false)
-                    };
-
-                    if last_tick
-                        || record_receiver.should_shutdown(
-                            poh.lock().unwrap().remaining_hashes_in_slot(ticks_per_slot),
-                            ticks_per_slot,
-                        )
-                    {
-                        record_receiver.shutdown();
-                    }
+            let service_message = poh_service_receiver.try_recv();
 
-                    // Check if we can break the inner loop to handle a service message.
-                    if Self::can_process_service_message(&service_message, &record_receiver) {
-                        break;
-                    }
-                }
-            }
-
-            if let Some(service_message) = service_message {
-                Self::handle_service_message(&poh_recorder, service_message, &mut record_receiver);
-            }
-        }
-
-        record_receiver.shutdown();
-        while !record_receiver.is_empty() {
+            let remaining_tick_time = poh_config
+                .target_tick_duration
+                .saturating_sub(last_tick.elapsed());
             Self::read_record_receiver_and_process(
                 &poh_recorder,
-                &mut record_receiver,
-                Duration::ZERO,
-                ticks_per_slot,
+                &record_receiver,
+                remaining_tick_time,
             );
+            if remaining_tick_time.is_zero() {
+                last_tick = Instant::now();
+                poh_recorder.write().unwrap().tick();
+            }
+
+            if let Ok(service_message) = service_message {
+                Self::handle_service_message(&poh_recorder, service_message);
+            }
         }
     }
 
     pub fn read_record_receiver_and_process(
         poh_recorder: &Arc<RwLock<PohRecorder>>,
-        record_receiver: &mut RecordReceiver,
+        record_receiver: &Receiver<Record>,
         timeout: Duration,
-        ticks_per_slot: u64,
     ) {
         let record = record_receiver.recv_timeout(timeout);
         if let Ok(record) = record {
-            match poh_recorder.write().unwrap().record(
-                record.slot,
-                record.mixins,
-                record.transaction_batches,
-            ) {
-                Ok(record_summary) => {
-                    if record_receiver
-                        .should_shutdown(record_summary.remaining_hashes_in_slot, ticks_per_slot)
-                    {
-                        record_receiver.shutdown();
-                    }
-                }
-                Err(err) => {
-                    panic!("PohRecorder::record failed: {err:?}");
-                }
+            if record
+                .sender
+                .send(
+                    poh_recorder
+                        .write()
+                        .unwrap()
+                        .record(record.slot, record.mixins, record.transaction_batches)
+                        .map(|summary| summary.starting_transaction_index),
+                )
+                .is_err()
+            {
+                panic!("Error returning mixin hash");
             }
         }
     }
@@ -264,78 +227,37 @@ impl PohService {
         poh_recorder: Arc<RwLock<PohRecorder>>,
         poh_config: &PohConfig,
         poh_exit: &AtomicBool,
-        mut record_receiver: RecordReceiver,
+        record_receiver: Receiver<Record>,
         poh_service_receiver: PohServiceMessageReceiver,
-        ticks_per_slot: u64,
     ) {
         let mut warned = false;
         let mut elapsed_ticks = 0;
         let mut last_tick = Instant::now();
         let num_ticks = poh_config.target_tick_count.unwrap();
-        let poh = poh_recorder.read().unwrap().poh.clone();
-
         while elapsed_ticks < num_ticks {
-            let service_message =
-                Self::check_for_service_message(&poh_service_receiver, &mut record_receiver);
-
-            loop {
-                let remaining_tick_time = poh_config
-                    .target_tick_duration
-                    .saturating_sub(last_tick.elapsed());
-                Self::read_record_receiver_and_process(
-                    &poh_recorder,
-                    &mut record_receiver,
-                    Duration::from_millis(0),
-                    ticks_per_slot,
-                );
-                if remaining_tick_time.is_zero() {
-                    last_tick = Instant::now();
-                    poh_recorder.write().unwrap().tick();
-                    elapsed_ticks += 1;
-
-                    let last_tick = {
-                        let poh_recorder = poh_recorder.read().unwrap();
-                        poh_recorder
-                            .bank()
-                            .map(|bank| {
-                                bank.max_tick_height().wrapping_sub(1) == poh_recorder.tick_height()
-                            })
-                            .unwrap_or(false)
-                    };
-
-                    if last_tick
-                        || record_receiver.should_shutdown(
-                            poh.lock().unwrap().remaining_hashes_in_slot(ticks_per_slot),
-                            ticks_per_slot,
-                        )
-                    {
-                        record_receiver.shutdown();
-                    }
-                }
+            let service_message = poh_service_receiver.try_recv();
 
-                // Check if we can break the inner loop to handle a service message.
-                if Self::can_process_service_message(&service_message, &record_receiver) {
-                    break;
-                }
+            let remaining_tick_time = poh_config
+                .target_tick_duration
+                .saturating_sub(last_tick.elapsed());
+            Self::read_record_receiver_and_process(
+                &poh_recorder,
+                &record_receiver,
+                Duration::from_millis(0),
+            );
+            if remaining_tick_time.is_zero() {
+                last_tick = Instant::now();
+                poh_recorder.write().unwrap().tick();
+                elapsed_ticks += 1;
             }
-
             if poh_exit.load(Ordering::Relaxed) && !warned {
                 warned = true;
                 warn!("exit signal is ignored because PohService is scheduled to exit soon");
             }
-            if let Some(service_message) = service_message {
-                Self::handle_service_message(&poh_recorder, service_message, &mut record_receiver);
-            }
-        }
 
-        record_receiver.shutdown();
-        while !record_receiver.is_empty() {
-            Self::read_record_receiver_and_process(
-                &poh_recorder,
-                &mut record_receiver,
-                Duration::ZERO,
-                ticks_per_slot,
-            );
+            if let Ok(service_message) = service_message {
+                Self::handle_service_message(&poh_recorder, service_message);
+            }
         }
     }
 
@@ -344,11 +266,10 @@ impl PohService {
         next_record: &mut Option<Record>,
         poh_recorder: &Arc<RwLock<PohRecorder>>,
         timing: &mut PohTiming,
-        record_receiver: &mut RecordReceiver,
+        record_receiver: &Receiver<Record>,
         hashes_per_batch: u64,
         poh: &Arc<Mutex<Poh>>,
         target_ns_per_tick: u64,
-        ticks_per_slot: u64,
     ) -> bool {
         match next_record.take() {
             Some(mut record) => {
@@ -360,24 +281,17 @@ impl PohService {
                 timing.total_lock_time_ns += lock_time.as_ns();
                 let mut record_time = Measure::start("record");
                 loop {
-                    match poh_recorder_l.record(
+                    let res = poh_recorder_l.record(
                         record.slot,
                         record.mixins,
                         std::mem::take(&mut record.transaction_batches),
-                    ) {
-                        Ok(record_summary) => {
-                            if record_receiver.should_shutdown(
-                                record_summary.remaining_hashes_in_slot,
-                                ticks_per_slot,
-                            ) {
-                                record_receiver.shutdown();
-                            }
-                        }
-                        Err(err) => {
-                            panic!("PohRecorder::record failed: {err:?}");
-                        }
-                    }
+                    );
+                    let (send_res, send_record_result_us) = measure_us!(record
+                        .sender
+                        .send(res.map(|summary| summary.starting_transaction_index)));
+                    debug_assert!(send_res.is_ok(), "Record wasn't sent.");
 
+                    timing.total_send_record_result_us += send_record_result_us;
                     timing.num_hashes += 1; // note: may have also ticked inside record
                     if let Ok(new_record) = record_receiver.try_recv() {
                         // we already have second request to record, so record again while we still have the mutex
@@ -402,17 +316,6 @@ impl PohService {
                     let should_tick = poh_l.hash(hashes_per_batch);
                     let ideal_time = poh_l.target_poh_time(target_ns_per_tick);
                     hash_time.stop();
-
-                    // shutdown if another batch would push us over the shutdown threshold.
-                    let remaining_hashes_in_slot = poh_l.remaining_hashes_in_slot(ticks_per_slot);
-                    let remaining_hashes_after_next_batch =
-                        remaining_hashes_in_slot.saturating_sub(hashes_per_batch);
-                    if record_receiver
-                        .should_shutdown(remaining_hashes_after_next_batch, ticks_per_slot)
-                    {
-                        record_receiver.shutdown();
-                    }
-
                     timing.total_hash_time_ns += hash_time.as_ns();
                     if should_tick {
                         // nothing else can be done. tick required.
@@ -454,96 +357,54 @@ impl PohService {
         poh_exit: &AtomicBool,
         ticks_per_slot: u64,
         hashes_per_batch: u64,
-        mut record_receiver: RecordReceiver,
+        record_receiver: Receiver<Record>,
         poh_service_receiver: PohServiceMessageReceiver,
         target_ns_per_tick: u64,
     ) {
         let poh = poh_recorder.read().unwrap().poh.clone();
         let mut timing = PohTiming::new();
         let mut next_record = None;
-        let mut should_exit = poh_exit.load(Ordering::Relaxed);
 
         loop {
-            // If we should exit, close the channel so no more records are accepted,
-            // but we still want to process any pending records.
-            // We should **not** however process any service messages once we have detected
-            // the exit signal.
-            should_exit |= poh_exit.load(Ordering::Relaxed); // once set, stay set.
-            if should_exit {
-                record_receiver.shutdown();
-            }
-
-            let service_message =
-                Self::check_for_service_message(&poh_service_receiver, &mut record_receiver);
-            loop {
-                let should_tick = Self::record_or_hash(
-                    &mut next_record,
-                    &poh_recorder,
-                    &mut timing,
-                    &mut record_receiver,
-                    hashes_per_batch,
-                    &poh,
-                    target_ns_per_tick,
-                    ticks_per_slot,
-                );
-                if should_tick {
-                    // Lock PohRecorder only for the final hash. record_or_hash will lock PohRecorder for record calls but not for hashing.
-                    {
-                        let mut lock_time = Measure::start("lock");
-                        let mut poh_recorder_l = poh_recorder.write().unwrap();
-                        lock_time.stop();
-                        timing.total_lock_time_ns += lock_time.as_ns();
-                        let mut tick_time = Measure::start("tick");
-                        poh_recorder_l.tick();
-                        tick_time.stop();
-                        timing.total_tick_time_ns += tick_time.as_ns();
-                    }
-                    timing.num_ticks += 1;
-
-                    timing.report(ticks_per_slot);
+            let service_message = poh_service_receiver.try_recv();
+            let should_tick = Self::record_or_hash(
+                &mut next_record,
+                &poh_recorder,
+                &mut timing,
+                &record_receiver,
+                hashes_per_batch,
+                &poh,
+                target_ns_per_tick,
+            );
+            if should_tick {
+                // Lock PohRecorder only for the final hash. record_or_hash will lock PohRecorder for record calls but not for hashing.
+                {
+                    let mut lock_time = Measure::start("lock");
+                    let mut poh_recorder_l = poh_recorder.write().unwrap();
+                    lock_time.stop();
+                    timing.total_lock_time_ns += lock_time.as_ns();
+                    let mut tick_time = Measure::start("tick");
+                    poh_recorder_l.tick();
+                    tick_time.stop();
+                    timing.total_tick_time_ns += tick_time.as_ns();
                 }
+                timing.num_ticks += 1;
 
-                // Check if we can break the inner loop to handle a service message.
-                if Self::can_process_service_message(&service_message, &record_receiver) {
+                timing.report(ticks_per_slot);
+                if poh_exit.load(Ordering::Relaxed) {
                     break;
                 }
             }
 
-            if let Some(service_message) = service_message {
-                if !should_exit {
-                    Self::handle_service_message(
-                        &poh_recorder,
-                        service_message,
-                        &mut record_receiver,
-                    );
-                }
-            }
-
-            // If exit signal is set and there are no more records to process, exit.
-            if should_exit && record_receiver.is_empty() {
-                break;
+            if let Ok(service_message) = service_message {
+                Self::handle_service_message(&poh_recorder, service_message);
             }
         }
     }
 
-    /// Check for a service message and shutdown the channel if there is one.
-    fn check_for_service_message<'a>(
-        service_message_receiver: &'a PohServiceMessageReceiver,
-        record_receiver: &mut RecordReceiver,
-    ) -> Option<PohServiceMessageGuard<'a>> {
-        match service_message_receiver.try_recv() {
-            Ok(bank_message) => {
-                record_receiver.shutdown();
-                Some(bank_message)
-            }
-            Err(_) => None,
-        }
-    }
-
     fn handle_service_message(
         poh_recorder: &RwLock<PohRecorder>,
         mut service_message: PohServiceMessageGuard,
-        record_receiver: &mut RecordReceiver,
     ) {
         {
             let mut recorder = poh_recorder.write().unwrap();
@@ -555,26 +416,12 @@ impl PohService {
                     recorder.reset(reset_bank, next_leader_slot);
                 }
                 PohServiceMessage::SetBank { bank } => {
-                    let slot = bank.slot();
                     recorder.set_bank(bank);
-                    record_receiver.restart(slot);
                 }
             }
         }
     }
 
-    /// If we have a service message and there are no more records to process,
-    /// we can break inner recording loops and handle the service message.
-    /// However, if there are still records to process, we must continue processing
-    /// records before handling the service message, to ensure we do not lose
-    /// any records.
-    fn can_process_service_message(
-        service_message: &Option<PohServiceMessageGuard>,
-        record_receiver: &RecordReceiver,
-    ) -> bool {
-        service_message.is_none() || record_receiver.is_empty()
-    }
-
     pub fn join(self) -> thread::Result<()> {
         self.tick_producer.join()
     }
@@ -584,10 +431,8 @@ impl PohService {
 mod tests {
     use {
         super::*,
-        crate::{
-            poh_controller::PohController, poh_recorder::PohRecorderError::MaxHeightReached,
-            record_channels::record_channels,
-        },
+        crate::{poh_controller::PohController, poh_recorder::PohRecorderError::MaxHeightReached},
+        crossbeam_channel::unbounded,
         rand::{thread_rng, Rng},
         solana_clock::{DEFAULT_HASHES_PER_TICK, DEFAULT_MS_PER_SLOT},
         solana_ledger::{
@@ -716,7 +561,7 @@ mod tests {
         let hashes_per_batch = std::env::var("HASHES_PER_BATCH")
             .map(|x| x.parse().unwrap())
             .unwrap_or(DEFAULT_HASHES_PER_BATCH);
-        let (_record_sender, record_receiver) = record_channels(false);
+        let (_record_sender, record_receiver) = unbounded();
         let (_poh_controller, poh_service_message_receiver) = PohController::new();
         let poh_service = PohService::new(
             poh_recorder.clone(),

+ 0 - 417
poh/src/record_channels.rs

@@ -1,417 +0,0 @@
-use {
-    crate::poh_recorder::Record,
-    crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender, TryRecvError},
-    solana_clock::Slot,
-    std::{
-        sync::{
-            atomic::{AtomicU64, Ordering},
-            Arc, Mutex,
-        },
-        time::Duration,
-    },
-};
-
-/// Create a channel pair for communicating [`Record`]s.
-/// Transaction processing threads (workers/vote thread) send records, and
-/// PohService receives them.
-///
-/// The receiver can shutdown the channel, preventing any further sends,
-/// and can restart the channel for a new slot, re-enabling sends.
-/// The sender does not wait for the receiver to pick up records, and will return
-/// immediately if the channel is full, shutdown, or if the slot has changed.
-///
-/// The channel has a bounded capacity based on the maximum number of allowed
-/// insertions at a given time. This is for guaranteeing that once shutdown the
-/// service can always process all sent records correctly without dropping any
-/// i.e. once sent records can be guaranteed to be recorded.
-pub fn record_channels(track_transaction_indexes: bool) -> (RecordSender, RecordReceiver) {
-    const CAPACITY: usize = SlotAllowedInsertions::MAX_ALLOWED_INSERTIONS as usize;
-    let (sender, receiver) = bounded(CAPACITY);
-
-    // Begin in a shutdown state.
-    let slot_allowed_insertions = SlotAllowedInsertions::new_shutdown();
-    let transaction_indexes = if track_transaction_indexes {
-        Some(Arc::new(Mutex::new(0)))
-    } else {
-        None
-    };
-
-    (
-        RecordSender {
-            slot_allowed_insertions: slot_allowed_insertions.clone(),
-            sender,
-            transaction_indexes: transaction_indexes.clone(),
-        },
-        RecordReceiver {
-            slot_allowed_insertions,
-            receiver,
-            capacity: CAPACITY as u64,
-            transaction_indexes,
-        },
-    )
-}
-
-pub enum RecordSenderError {
-    /// The channel is full, the record was not sent.
-    Full(Record),
-    /// The channel is in a shutdown state, it is not valid to
-    /// send records for this slot anymore.
-    Shutdown,
-    /// The record's slot does not match the current slot of the channel.
-    InactiveSlot,
-    /// The receiver has been dropped, the channel is disconnected.
-    Disconnected,
-}
-
-/// A sender for sending [`Record`]s to PohService.
-/// The sender does not wait for service to pick up the records, and will return
-/// immediately if the channel is full, shutdown, or if the slot has changed.
-#[derive(Clone, Debug)]
-pub struct RecordSender {
-    slot_allowed_insertions: SlotAllowedInsertions,
-    sender: Sender<Record>,
-    transaction_indexes: Option<Arc<Mutex<usize>>>,
-}
-
-impl RecordSender {
-    pub fn try_send(&self, record: Record) -> Result<Option<usize>, RecordSenderError> {
-        let num_transactions: usize = record
-            .transaction_batches
-            .iter()
-            .map(|batch| batch.len())
-            .sum();
-        loop {
-            // Grab lock on `transaction_indexes` here to ensure we are sending
-            // sequentially, ONLY if this exists.
-            let transaction_indexes = self
-                .transaction_indexes
-                .as_ref()
-                .map(|transaction_indexes| transaction_indexes.lock().unwrap());
-
-            // Get the current slot and allowed insertions.
-            // If the number of allowed insertions is less than the number of
-            // batches, the channel is full - just return immediately.
-            // If the `record`'s slot is different from the current slot,
-            // return immediately.
-            let current_slot_allowed_insertions =
-                self.slot_allowed_insertions.0.load(Ordering::Acquire);
-            let (slot, allowed_insertions) = (
-                SlotAllowedInsertions::slot(current_slot_allowed_insertions),
-                SlotAllowedInsertions::allowed_insertions(current_slot_allowed_insertions),
-            );
-
-            if slot == SlotAllowedInsertions::DISABLED_SLOT {
-                return Err(RecordSenderError::Shutdown);
-            }
-            if slot != record.slot {
-                return Err(RecordSenderError::InactiveSlot);
-            }
-            if allowed_insertions < record.transaction_batches.len() as u64 {
-                return Err(RecordSenderError::Full(record));
-            }
-
-            let new_slot_allowed_insertions = SlotAllowedInsertions::encoded_value(
-                slot,
-                allowed_insertions.wrapping_sub(record.transaction_batches.len() as u64),
-            );
-
-            if self
-                .slot_allowed_insertions
-                .0
-                .compare_exchange(
-                    current_slot_allowed_insertions,
-                    new_slot_allowed_insertions,
-                    Ordering::AcqRel,
-                    Ordering::Acquire,
-                )
-                .is_ok()
-            {
-                // Send the record over the channel, space has been reserved successfully.
-                if let Err(err) = self.sender.try_send(record) {
-                    assert!(err.is_disconnected());
-                    return Err(RecordSenderError::Disconnected);
-                }
-                return Ok(transaction_indexes.map(|mut transaction_indexes| {
-                    let transaction_starting_index = *transaction_indexes;
-                    *transaction_indexes += num_transactions;
-                    transaction_starting_index
-                }));
-            }
-        }
-    }
-}
-
-/// A receiver for receiving [`Record`]s in PohService.
-/// The receiver can shutdown the channel, preventing any further sends,
-/// and can restart the channel for a new slot, re-enabling sends.
-pub struct RecordReceiver {
-    capacity: u64,
-    slot_allowed_insertions: SlotAllowedInsertions,
-    receiver: Receiver<Record>,
-    transaction_indexes: Option<Arc<Mutex<usize>>>,
-}
-
-impl RecordReceiver {
-    /// Returns true if the channel should be shutdown.
-    pub fn should_shutdown(&self, remaining_hashes_in_slot: u64, ticks_per_slot: u64) -> bool {
-        // This channel must guarantee that all sent records are recorded.
-        // Each batch in a record consumes one hash in the PoH stream,
-        // each tick also consumes at least one hash in the PoH stream.
-        // As a conservative estimate, we assume no ticks have been recorded.
-        remaining_hashes_in_slot.saturating_sub(ticks_per_slot) <= self.capacity
-    }
-
-    /// Shutdown the channel immediately.
-    pub fn shutdown(&mut self) {
-        self.slot_allowed_insertions.shutdown();
-    }
-
-    /// Re-enable the channel after a shutdown.
-    pub fn restart(&mut self, slot: Slot) {
-        assert!(slot <= SlotAllowedInsertions::MAX_SLOT);
-        assert!(self.receiver.is_empty()); // Should be empty before restarting.
-
-        // Reset transaction indexes if tracking them - BEFORE allowing new insertions.
-        let transaction_indexes_lock =
-            self.transaction_indexes
-                .as_ref()
-                .map(|transaction_indexes| {
-                    let mut lock = transaction_indexes.lock().unwrap();
-                    *lock = 0;
-                    lock
-                });
-
-        self.slot_allowed_insertions.0.store(
-            SlotAllowedInsertions::encoded_value(slot, self.capacity),
-            Ordering::Release,
-        );
-
-        // Drop lock AFTER allowing new insertions. This makes any sends grabbing locks
-        // wait until after the slot has been changed. Meaning the CAS in try_send
-        // will always succeed, if passing previous checks.
-        drop(transaction_indexes_lock);
-    }
-
-    /// Drain all available records from the channel with `try_recv` loop.
-    pub fn drain(&self) -> impl Iterator<Item = Record> + '_ {
-        core::iter::from_fn(|| self.try_recv().ok())
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.receiver.is_empty()
-    }
-
-    /// Try to receive a record from the channel.
-    pub fn try_recv(&self) -> Result<Record, TryRecvError> {
-        let record = self.receiver.try_recv()?;
-        self.on_received_record(record.transaction_batches.len() as u64);
-        Ok(record)
-    }
-
-    /// Receive a record from the channel, waiting up to `duration`.
-    pub fn recv_timeout(&self, duration: Duration) -> Result<Record, RecvTimeoutError> {
-        let record = self.receiver.recv_timeout(duration)?;
-        self.on_received_record(record.transaction_batches.len() as u64);
-        Ok(record)
-    }
-
-    fn on_received_record(&self, num_batches: u64) {
-        // The record has been received and processed, so increment the number
-        // of allowed insertions, so that new records can be sent.
-        self.slot_allowed_insertions
-            .0
-            .fetch_add(num_batches, Ordering::AcqRel);
-    }
-}
-
-/// Encoded u64 where the upper 54 bits are the slot and the lower 10 bits are
-/// the number of allowed insertions at the current time.
-/// The number of allowed insertions is based on the number of **batches** sent,
-/// not the number of [`Record`]. This is because each batch is a separate hash
-/// in the PoH stream, and we must guarantee enough space for each hash, if we
-/// allow a [`Record`] to be sent.
-/// The allowed insertions uses 10 bits allowing up to 1023 insertions at a
-/// given time. This is for messages that have been sent but not yet processed
-/// by the receiver.
-/// The `allowed_insertions` is a budget and is decremented when something is
-/// sent/inserted into the channel, and incremented when something is received
-/// from the channel.
-#[derive(Clone, Debug)]
-struct SlotAllowedInsertions(Arc<AtomicU64>);
-
-impl SlotAllowedInsertions {
-    const NUM_BITS: u64 = 64;
-    /// Number of bits used to track allowed insertions.
-    const ALLOWED_INSERTIONS_BITS: u64 = 10;
-    const SLOT_BITS: u64 = Self::NUM_BITS - Self::ALLOWED_INSERTIONS_BITS;
-
-    const DISABLED_SLOT: Slot = (1 << Self::SLOT_BITS) - 1;
-    const MAX_SLOT: Slot = Self::DISABLED_SLOT - 1;
-    const MAX_ALLOWED_INSERTIONS: u64 = (1 << Self::ALLOWED_INSERTIONS_BITS) - 1;
-
-    const SHUTDOWN: u64 = Self::encoded_value(Self::DISABLED_SLOT, 0);
-
-    /// Create a new `SlotAllowedInsertions` with state consistent with a
-    /// shutdown state:
-    /// - slot = `DISABLED_SLOT`
-    /// - allowed_insertions = 0
-    fn new_shutdown() -> Self {
-        Self(Arc::new(AtomicU64::new(Self::SHUTDOWN)))
-    }
-
-    /// Shutdown the channel immediately.
-    fn shutdown(&self) {
-        self.0.store(Self::SHUTDOWN, Ordering::Release);
-    }
-
-    const fn encoded_value(slot: Slot, allowed_insertions: u64) -> u64 {
-        assert!(slot <= Self::DISABLED_SLOT);
-        assert!(allowed_insertions <= Self::MAX_ALLOWED_INSERTIONS);
-        (slot << Self::ALLOWED_INSERTIONS_BITS) | allowed_insertions
-    }
-
-    /// The current slot, or `DISABLED_SLOT` if shutdown.
-    fn slot(value: u64) -> Slot {
-        (value >> Self::ALLOWED_INSERTIONS_BITS) & Self::DISABLED_SLOT
-    }
-
-    /// How many insertions/sends are allowed at this time.
-    fn allowed_insertions(value: u64) -> u64 {
-        value & Self::MAX_ALLOWED_INSERTIONS
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use {super::*, solana_transaction::versioned::VersionedTransaction};
-
-    #[test]
-    fn test_record_channels() {
-        let (sender, mut receiver) = record_channels(false);
-
-        // Initially shutdown.
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 0,
-                transaction_batches: vec![],
-                mixins: vec![],
-            }),
-            Err(RecordSenderError::Shutdown)
-        ));
-
-        // Restart for slot 1.
-        receiver.restart(1);
-
-        // Record for slot 0 fails.
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 0,
-                transaction_batches: vec![],
-                mixins: vec![],
-            }),
-            Err(RecordSenderError::InactiveSlot)
-        ));
-
-        // Record for slot 1 with 1 batch succeeds.
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 1,
-                transaction_batches: vec![vec![]],
-                mixins: vec![],
-            }),
-            Ok(None)
-        ));
-
-        // Record for slot 1 with 1023 batches fails (channel full).
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 1,
-                transaction_batches: vec![vec![]; 1_023],
-                mixins: vec![],
-            }),
-            Err(RecordSenderError::Full(_))
-        ));
-
-        // Record for slot 1 with 1023 batches succeeds (channel full).
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 1,
-                transaction_batches: vec![vec![]; 1_022],
-                mixins: vec![],
-            }),
-            Ok(None)
-        ));
-
-        // Record for slot 1 with 1 batch fails (channel full).
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 1,
-                transaction_batches: vec![vec![]],
-                mixins: vec![],
-            }),
-            Err(RecordSenderError::Full(_))
-        ));
-
-        // Receive 1 record.
-        assert!(receiver.try_recv().is_ok());
-        assert!(!receiver.is_empty());
-        assert!(receiver.try_recv().is_ok());
-        assert!(receiver.is_empty());
-    }
-
-    #[test]
-    fn test_record_channels_track_indexes() {
-        let (sender, mut receiver) = record_channels(true);
-
-        // Initially shutdown.
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 0,
-                transaction_batches: vec![],
-                mixins: vec![],
-            }),
-            Err(RecordSenderError::Shutdown)
-        ));
-
-        // Restart for slot 1.
-        receiver.restart(1);
-
-        // Record for slot 0 fails.
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 0,
-                transaction_batches: vec![],
-                mixins: vec![],
-            }),
-            Err(RecordSenderError::InactiveSlot)
-        ));
-
-        // Record for slot 1 with 1 batch succeeds.
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 1,
-                transaction_batches: vec![vec![VersionedTransaction::default()]],
-                mixins: vec![],
-            }),
-            Ok(Some(0))
-        ));
-
-        // Record for slot 1 with 2 batches (3 transactions) succeeds.
-        assert!(matches!(
-            sender.try_send(Record {
-                slot: 1,
-                transaction_batches: vec![
-                    vec![VersionedTransaction::default()],
-                    vec![
-                        VersionedTransaction::default(),
-                        VersionedTransaction::default()
-                    ],
-                ],
-                mixins: vec![],
-            }),
-            Ok(Some(1))
-        ));
-
-        assert!(*sender.transaction_indexes.as_ref().unwrap().lock().unwrap() == 4);
-    }
-}

+ 58 - 22
poh/src/transaction_recorder.rs

@@ -1,14 +1,19 @@
 use {
-    crate::{
-        poh_recorder::{PohRecorderError, Record},
-        record_channels::{RecordSender, RecordSenderError},
-    },
+    crate::poh_recorder::{PohRecorderError, Record, Result},
+    crossbeam_channel::{bounded, RecvTimeoutError, Sender},
     solana_clock::Slot,
     solana_entry::entry::hash_transactions,
     solana_hash::Hash,
     solana_measure::measure_us,
     solana_transaction::versioned::VersionedTransaction,
-    std::num::Saturating,
+    std::{
+        num::Saturating,
+        sync::{
+            atomic::{AtomicBool, Ordering},
+            Arc,
+        },
+        time::Duration,
+    },
 };
 
 #[derive(Default, Debug)]
@@ -30,7 +35,7 @@ pub struct RecordTransactionsSummary {
     // Metrics describing how time was spent recording transactions
     pub record_transactions_timings: RecordTransactionsTimings,
     // Result of trying to record the transactions into the PoH stream
-    pub result: Result<(), PohRecorderError>,
+    pub result: Result<()>,
     // Index in the slot of the first transaction recorded
     pub starting_transaction_index: Option<usize>,
 }
@@ -39,12 +44,16 @@ pub struct RecordTransactionsSummary {
 #[derive(Clone, Debug)]
 pub struct TransactionRecorder {
     // shared by all users of PohRecorder
-    pub record_sender: RecordSender,
+    pub record_sender: Sender<Record>,
+    pub is_exited: Arc<AtomicBool>,
 }
 
 impl TransactionRecorder {
-    pub fn new(record_sender: RecordSender) -> Self {
-        Self { record_sender }
+    pub fn new(record_sender: Sender<Record>, is_exited: Arc<AtomicBool>) -> Self {
+        Self {
+            record_sender,
+            is_exited,
+        }
     }
 
     /// Hashes `transactions` and sends to PoH service for recording. Waits for response up to 1s.
@@ -69,27 +78,21 @@ impl TransactionRecorder {
                 Ok(starting_index) => {
                     starting_transaction_index = starting_index;
                 }
-                Err(RecordSenderError::InactiveSlot | RecordSenderError::Shutdown) => {
+                Err(PohRecorderError::MaxHeightReached) => {
                     return RecordTransactionsSummary {
                         record_transactions_timings,
                         result: Err(PohRecorderError::MaxHeightReached),
                         starting_transaction_index: None,
-                    }
-                }
-                Err(RecordSenderError::Full(_)) => {
-                    return RecordTransactionsSummary {
-                        record_transactions_timings,
-                        result: Err(PohRecorderError::ChannelFull),
-                        starting_transaction_index: None,
                     };
                 }
-                Err(RecordSenderError::Disconnected) => {
+                Err(PohRecorderError::SendError(e)) => {
                     return RecordTransactionsSummary {
                         record_transactions_timings,
-                        result: Err(PohRecorderError::ChannelDisconnected),
+                        result: Err(PohRecorderError::SendError(e)),
                         starting_transaction_index: None,
                     };
                 }
+                Err(e) => panic!("Poh recorder returned unexpected error: {e:?}"),
             }
         }
 
@@ -106,8 +109,41 @@ impl TransactionRecorder {
         bank_slot: Slot,
         mixins: Vec<Hash>,
         transaction_batches: Vec<Vec<VersionedTransaction>>,
-    ) -> Result<Option<usize>, RecordSenderError> {
-        self.record_sender
-            .try_send(Record::new(mixins, transaction_batches, bank_slot))
+    ) -> Result<Option<usize>> {
+        // create a new channel so that there is only 1 sender and when it goes out of scope, the receiver fails
+        let (result_sender, result_receiver) = bounded(1);
+        let res = self.record_sender.send(Record::new(
+            mixins,
+            transaction_batches,
+            bank_slot,
+            result_sender,
+        ));
+        if res.is_err() {
+            // If the channel is dropped, then the validator is shutting down so return that we are hitting
+            //  the max tick height to stop transaction processing and flush any transactions in the pipeline.
+            return Err(PohRecorderError::MaxHeightReached);
+        }
+        // Besides validator exit, this timeout should primarily be seen to affect test execution environments where the various pieces can be shutdown abruptly
+        let mut is_exited = false;
+        loop {
+            let res = result_receiver.recv_timeout(Duration::from_millis(1000));
+            match res {
+                Err(RecvTimeoutError::Timeout) => {
+                    if is_exited {
+                        return Err(PohRecorderError::MaxHeightReached);
+                    } else {
+                        // A result may have come in between when we timed out checking this
+                        // bool, so check the channel again, even if is_exited == true
+                        is_exited = self.is_exited.load(Ordering::SeqCst);
+                    }
+                }
+                Err(RecvTimeoutError::Disconnected) => {
+                    return Err(PohRecorderError::MaxHeightReached);
+                }
+                Ok(result) => {
+                    return result;
+                }
+            }
+        }
     }
 }

+ 254 - 55
unified-scheduler-pool/src/lib.rs

@@ -2725,8 +2725,13 @@ mod tests {
         solana_clock::{Slot, MAX_PROCESSING_AGE},
         solana_hash::Hash,
         solana_keypair::Keypair,
-        solana_ledger::blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage},
-        solana_poh::record_channels::record_channels,
+        solana_ledger::{
+            blockstore::Blockstore,
+            blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage},
+            create_new_tmp_ledger_auto_delete,
+            leader_schedule_cache::LeaderScheduleCache,
+        },
+        solana_poh::poh_recorder::create_test_recorder_with_index_tracking,
         solana_pubkey::Pubkey,
         solana_runtime::{
             bank::Bank,
@@ -2742,7 +2747,7 @@ mod tests {
         solana_unified_scheduler_logic::NO_CONSUMED_BLOCK_SIZE,
         std::{
             num::Saturating,
-            sync::{Arc, RwLock},
+            sync::{atomic::Ordering, Arc, RwLock},
             thread::JoinHandle,
         },
         test_case::test_matrix,
@@ -3873,11 +3878,31 @@ mod tests {
             None,
             ignored_prioritization_fee_cache,
         );
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
         let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
-
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let (
+            exit,
+            _poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = {
+            // Create a dummy bank to prevent it from being frozen; otherwise, the following panic
+            // will happen:
+            //    thread 'solPohTickProd' panicked at runtime/src/bank.rs:LL:CC:
+            //    register_tick() working on a bank that is already frozen or is undergoing freezing!
+            let dummy_bank = Bank::new_for_tests(&genesis_config);
+            let (dummy_bank, _bank_forks) = setup_dummy_fork_graph(dummy_bank);
+            create_test_recorder_with_index_tracking(
+                dummy_bank,
+                blockstore.clone(),
+                None,
+                Some(leader_schedule_cache),
+            )
+        };
 
         if matches!(scheduling_mode, BlockProduction) {
             pool.register_banking_stage(
@@ -3947,6 +3972,9 @@ mod tests {
             expected_transaction_count += 1;
         }
         assert_eq!(bank.transaction_count(), expected_transaction_count.0);
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 
     #[test]
@@ -3988,10 +4016,28 @@ mod tests {
         let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
         let pool =
             DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
-
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
         let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
+        let (
+            exit,
+            _poh_recorder,
+            mut poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = {
+            create_test_recorder_with_index_tracking(
+                bank.clone(),
+                blockstore.clone(),
+                None,
+                Some(leader_schedule_cache),
+            )
+        };
+        poh_controller
+            .reset_sync(bank.clone(), Some((bank.slot(), bank.slot() + 1)))
+            .unwrap();
 
         pool.register_banking_stage(
             None,
@@ -4013,7 +4059,9 @@ mod tests {
         let scheduler = pool.take_scheduler(context);
         let old_scheduler_id = scheduler.id();
         let bank = BankWithScheduler::new(bank, Some(scheduler));
-        record_receiver.restart(bank.slot());
+        poh_controller
+            .set_bank_sync(bank.clone_with_scheduler())
+            .unwrap();
         bank.schedule_transaction_executions([(tx, ORIGINAL_TRANSACTION_INDEX)].into_iter())
             .unwrap();
         bank.unpause_new_block_production_scheduler();
@@ -4026,8 +4074,10 @@ mod tests {
         // There should be no executed transaction yet.
         assert_eq!(bank.transaction_count(), 0);
 
-        // Shutdown channel to observe behavior difference around session ending
-        record_receiver.shutdown();
+        // Create new bank to observe behavior difference around session ending
+        poh_controller
+            .reset_sync(bank.clone(), Some((bank.slot(), bank.slot() + 1)))
+            .unwrap();
         let bank = Arc::new(Bank::new_from_parent(
             bank.clone_without_scheduler(),
             &Pubkey::default(),
@@ -4043,7 +4093,9 @@ mod tests {
         // Make sure the same scheduler is used to test its internal cross-session behavior
         assert_eq!(scheduler.id(), old_scheduler_id);
         let bank = BankWithScheduler::new(bank, Some(scheduler));
-        record_receiver.restart(bank.slot());
+        poh_controller
+            .set_bank_sync(bank.clone_with_scheduler())
+            .unwrap();
         bank.unpause_new_block_production_scheduler();
 
         // Calling wait_for_completed_scheduler() for block production scheduler causes it to be
@@ -4053,6 +4105,9 @@ mod tests {
         // Block production scheduler should carry over the temporarily-failed transaction itself
         // and the transaction should now have been executed.
         assert_eq!(bank.transaction_count(), 1);
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 
     #[test]
@@ -4480,10 +4535,22 @@ mod tests {
         let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
         let scheduling_context = &SchedulingContext::for_production(bank.clone());
         let (sender, receiver) = crossbeam_channel::unbounded();
-
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (
+            exit,
+            poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            signal_receiver,
+        ) = create_test_recorder_with_index_tracking(
+            bank.clone(),
+            blockstore.clone(),
+            None,
+            Some(leader_schedule_cache),
+        );
         let handler_context = &HandlerContext {
             thread_count: 0,
             log_messages_bytes_limit: None,
@@ -4502,10 +4569,13 @@ mod tests {
         let task =
             SchedulingStateMachine::create_task(tx.clone(), 0, &mut |_| UsageQueue::default());
 
-        // Recording will succeed based upon if the channel is shutdown or not.
-        if should_succeed_to_record_to_poh {
-            // If we should succeed, we reset the channel to accept records.
-            record_receiver.restart(bank.slot());
+        // wait until the poh's working bank is cleared.
+        // also flush signal_receiver after that.
+        if !should_succeed_to_record_to_poh {
+            while poh_recorder.read().unwrap().bank().is_some() {
+                sleep(Duration::from_millis(100));
+            }
+            while signal_receiver.try_recv().is_ok() {}
         }
 
         assert_eq!(bank.transaction_count(), 0);
@@ -4528,15 +4598,17 @@ mod tests {
                         None, // no work id
                     )))
                 );
-                // check that the `Record` is correctly sent through the channel;
-                // in reality this would then be picked up by PoH service.
-                assert!(record_receiver.try_recv().is_ok());
+                assert_matches!(
+                    signal_receiver.try_recv(),
+                    Ok((_, (solana_entry::entry::Entry {transactions, ..} , _)))
+                        if transactions == vec![tx.to_versioned_transaction()]
+                );
             } else {
                 assert_eq!(result, &expected_tx_result);
                 assert_eq!(bank.transaction_count(), 0);
                 assert_eq!(bank.transaction_error_count(), 0);
                 assert_matches!(receiver.try_recv(), Err(_));
-                assert!(record_receiver.try_recv().is_err());
+                assert_matches!(signal_receiver.try_recv(), Err(_));
             }
         } else {
             if expected_tx_result.is_ok() {
@@ -4547,8 +4619,11 @@ mod tests {
 
             assert_eq!(bank.transaction_count(), 0);
             assert_matches!(receiver.try_recv(), Err(_));
-            assert!(record_receiver.try_recv().is_err());
+            assert_matches!(signal_receiver.try_recv(), Err(_));
         }
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 
     #[derive(Debug)]
@@ -4578,10 +4653,22 @@ mod tests {
             DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
 
         let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
-
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (
+            exit,
+            _poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = create_test_recorder_with_index_tracking(
+            bank.clone(),
+            blockstore.clone(),
+            None,
+            Some(leader_schedule_cache),
+        );
         pool.register_banking_stage(
             None,
             banking_packet_receiver,
@@ -4605,6 +4692,9 @@ mod tests {
         let bank = BankWithScheduler::new(bank, Some(scheduler));
         assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
         assert_eq!(bank.transaction_count(), 1);
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 
     impl BankingStageHelper {
@@ -4638,9 +4728,22 @@ mod tests {
         let pool =
             DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
 
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (
+            exit,
+            _poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = create_test_recorder_with_index_tracking(
+            bank.clone(),
+            blockstore.clone(),
+            None,
+            Some(leader_schedule_cache),
+        );
 
         // send fake packet batch to trigger banking_packet_handler
         let (banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
@@ -4679,6 +4782,9 @@ mod tests {
         let bank = BankWithScheduler::new(bank, Some(scheduler));
         assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
         assert_eq!(bank.transaction_count(), 1);
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 
     #[test]
@@ -4702,9 +4808,22 @@ mod tests {
         let pool =
             DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
 
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (
+            exit,
+            _poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = create_test_recorder_with_index_tracking(
+            bank.clone(),
+            blockstore.clone(),
+            None,
+            Some(leader_schedule_cache),
+        );
 
         // Create a dummy handler which unconditionally sends tx0 back to the scheduler thread
         let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
@@ -4751,6 +4870,9 @@ mod tests {
         let bank = BankWithScheduler::new(bank, Some(scheduler));
         assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
         assert_eq!(bank.transaction_count(), 1);
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 
     #[test]
@@ -4781,10 +4903,22 @@ mod tests {
         let pool =
             DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
 
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
-
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (
+            exit,
+            _poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = create_test_recorder_with_index_tracking(
+            bank.clone(),
+            blockstore.clone(),
+            None,
+            Some(leader_schedule_cache),
+        );
         let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
         pool.register_banking_stage(
             None,
@@ -4800,6 +4934,9 @@ mod tests {
 
         Box::new(scheduler1.into_inner().1).return_to_pool();
         Box::new(scheduler2.into_inner().1).return_to_pool();
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 
     #[test]
@@ -4832,9 +4969,22 @@ mod tests {
             DEFAULT_TIMEOUT_DURATION,
         );
 
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (
+            exit,
+            _poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = create_test_recorder_with_index_tracking(
+            bank.clone(),
+            blockstore.clone(),
+            None,
+            Some(leader_schedule_cache),
+        );
 
         let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
         pool.register_banking_stage(
@@ -4866,6 +5016,9 @@ mod tests {
         // id should be different
         assert_ne!(trashed_old_scheduler_id, respawned_new_scheduler_id);
 
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
+
         // Ensure the actual async trashing by solScCleaner
         sleepless_testing::at(&TestCheckPoint::AfterTrashedSchedulerCleaned);
     }
@@ -4908,9 +5061,22 @@ mod tests {
             DEFAULT_TIMEOUT_DURATION,
         );
 
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (
+            exit,
+            _poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = create_test_recorder_with_index_tracking(
+            bank.clone(),
+            blockstore.clone(),
+            None,
+            Some(leader_schedule_cache),
+        );
 
         let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
         pool.register_banking_stage(
@@ -4939,6 +5105,9 @@ mod tests {
 
         // id should be different
         assert_ne!(trashed_old_scheduler_id, respawned_new_scheduler_id);
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 
     #[test]
@@ -4955,10 +5124,22 @@ mod tests {
             DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
 
         let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
-
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (
+            exit,
+            _poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = create_test_recorder_with_index_tracking(
+            bank.clone(),
+            blockstore.clone(),
+            None,
+            Some(leader_schedule_cache),
+        );
         pool.register_banking_stage(
             None,
             banking_packet_receiver,
@@ -4973,6 +5154,9 @@ mod tests {
         let scheduler = pool.take_scheduler(context);
         let bank_tmp = BankWithScheduler::new(bank, Some(scheduler));
         assert_matches!(bank_tmp.wait_for_completed_scheduler(), Some((Ok(()), _)));
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 
     #[test]
@@ -5040,10 +5224,22 @@ mod tests {
         banking_packet_sender
             .send(BankingPacketBatch::default())
             .unwrap();
-        let (record_sender, mut record_receiver) = record_channels(true);
-        let transaction_recorder = TransactionRecorder::new(record_sender);
-        record_receiver.restart(bank.slot());
-
+        let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let (
+            exit,
+            _poh_recorder,
+            _poh_controller,
+            transaction_recorder,
+            poh_service,
+            _signal_receiver,
+        ) = create_test_recorder_with_index_tracking(
+            bank.clone(),
+            blockstore.clone(),
+            None,
+            Some(leader_schedule_cache),
+        );
         pool.register_banking_stage(
             None,
             banking_packet_receiver,
@@ -5056,5 +5252,8 @@ mod tests {
         *START_DISCARD.lock().unwrap() = true;
 
         sleepless_testing::at(TestCheckPoint::AfterDiscarded);
+
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
     }
 }