소스 검색

v3.1: fix(poh): don't process service messages if we have a record (backport of #9047) (#9063)

fix(poh): don't process service messages if we have a record (#9047)

(cherry picked from commit 34943b03f30b1b119551b99385b406cafda89a09)

Co-authored-by: OliverNChalk <11343499+OliverNChalk@users.noreply.github.com>
mergify[bot] 2 일 전
부모
커밋
cae3526cb4
2개의 변경된 파일90개의 추가작업 그리고 2개의 파일을 삭제
  1. 84 2
      poh/src/poh_service.rs
  2. 6 0
      poh/src/record_channels.rs

+ 84 - 2
poh/src/poh_service.rs

@@ -543,7 +543,9 @@ impl PohService {
                 }
 
                 // Check if we can break the inner loop to handle a service message.
-                if Self::can_process_service_message(&service_message, &record_receiver) {
+                if next_record.is_none()
+                    && Self::can_process_service_message(&service_message, &record_receiver)
+                {
                     break;
                 }
             }
@@ -633,7 +635,8 @@ mod tests {
             record_channels::record_channels,
         },
         rand::{thread_rng, Rng},
-        solana_clock::{DEFAULT_HASHES_PER_TICK, DEFAULT_MS_PER_SLOT},
+        solana_clock::{DEFAULT_HASHES_PER_TICK, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT},
+        solana_hash::Hash,
         solana_ledger::{
             blockstore::Blockstore,
             genesis_utils::{create_genesis_config, GenesisConfigInfo},
@@ -845,4 +848,83 @@ mod tests {
         poh_service.join().unwrap();
         entry_producer.join().unwrap();
     }
+
+    #[test]
+    fn test_poh_service_record_race() {
+        agave_logger::setup();
+        let GenesisConfigInfo {
+            mut genesis_config, ..
+        } = create_genesis_config(2);
+        let hashes_per_tick = Some(DEFAULT_HASHES_PER_TICK);
+        genesis_config.poh_config.hashes_per_tick = hashes_per_tick;
+        let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
+        let prev_hash = bank.last_blockhash();
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Blockstore::open(ledger_path.path())
+            .expect("Expected to be able to open database ledger");
+
+        let default_target_tick_duration =
+            PohConfig::default().target_tick_duration.as_micros() as u64;
+        let target_tick_duration = Duration::from_micros(default_target_tick_duration);
+        let poh_config = PohConfig {
+            hashes_per_tick,
+            target_tick_duration,
+            target_tick_count: None,
+        };
+        let exit = Arc::new(AtomicBool::new(false));
+
+        let ticks_per_slot = bank.ticks_per_slot();
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
+        let blockstore = Arc::new(blockstore);
+        let next_leader_slot = Some((1_000_000, 1_000_000));
+        let (poh_recorder, _entry_receiver) = PohRecorder::new(
+            bank.tick_height(),
+            prev_hash,
+            bank.clone(),
+            next_leader_slot,
+            ticks_per_slot,
+            blockstore,
+            &leader_schedule_cache,
+            &poh_config,
+            exit.clone(),
+        );
+        let poh_recorder = Arc::new(RwLock::new(poh_recorder));
+
+        // Queue a new record & service message at the same time.
+        let (record_sender, mut record_receiver) = record_channels(false);
+        record_receiver.restart(bank.bank_id());
+        let (mut poh_controller, poh_service_message_receiver) = PohController::new();
+        poh_controller.reset(bank.clone(), None).unwrap();
+        record_sender
+            .try_send(Record {
+                mixins: vec![Hash::new_unique()],
+                transaction_batches: vec![vec![VersionedTransaction::from(test_tx())]],
+                bank_id: bank.bank_id(),
+            })
+            .unwrap();
+
+        // Spawn the poh service.
+        poh_recorder.write().unwrap().set_bank_for_test(bank);
+        let poh_service = PohService::new(
+            poh_recorder.clone(),
+            &poh_config,
+            exit.clone(),
+            DEFAULT_TICKS_PER_SLOT,
+            DEFAULT_PINNED_CPU_CORE,
+            DEFAULT_HASHES_PER_BATCH,
+            record_receiver,
+            poh_service_message_receiver,
+        );
+
+        // Let poh service process the messages.
+        let start = Instant::now();
+        while !record_sender.is_empty() {
+            assert!(start.elapsed() < Duration::from_secs(1));
+            std::thread::sleep(Duration::from_millis(1));
+        }
+
+        // Shutdown.
+        exit.store(true, Ordering::Relaxed);
+        poh_service.join().unwrap();
+    }
 }

+ 6 - 0
poh/src/record_channels.rs

@@ -58,6 +58,7 @@ pub fn record_channels(track_transaction_indexes: bool) -> (RecordSender, Record
     )
 }
 
+#[derive(Debug)]
 pub enum RecordSenderError {
     /// The channel is full, the record was not sent.
     Full,
@@ -84,6 +85,11 @@ pub struct RecordSender {
 }
 
 impl RecordSender {
+    #[cfg(test)]
+    pub(crate) fn is_empty(&self) -> bool {
+        self.sender.is_empty()
+    }
+
     pub fn try_send(&self, record: Record) -> Result<Option<usize>, RecordSenderError> {
         let num_transactions: usize = record
             .transaction_batches