Forráskód Böngészése

Poh: RecordSummary (#7859)

Andrew Fitzgerald 2 hónapja
szülő
commit
2746dd239a

+ 5 - 1
core/src/banking_stage/consumer.rs

@@ -843,7 +843,11 @@ mod tests {
                             record.transaction_batches,
                             record.transaction_batches,
                         );
                         );
                         poh_recorder.write().unwrap().tick();
                         poh_recorder.write().unwrap().tick();
-                        if record.sender.send(record_response).is_err() {
+                        if record
+                            .sender
+                            .send(record_response.map(|r| r.starting_transaction_index))
+                            .is_err()
+                        {
                             panic!("Error returning mixin hash");
                             panic!("Error returning mixin hash");
                         }
                         }
                     }
                     }

+ 41 - 25
entry/src/poh.rs

@@ -12,7 +12,7 @@ pub struct Poh {
     pub hash: Hash,
     pub hash: Hash,
     num_hashes: u64,
     num_hashes: u64,
     hashes_per_tick: u64,
     hashes_per_tick: u64,
-    remaining_hashes: u64,
+    remaining_hashes_until_tick: u64,
     tick_number: u64,
     tick_number: u64,
     slot_start_time: Instant,
     slot_start_time: Instant,
 }
 }
@@ -36,7 +36,7 @@ impl Poh {
             hash,
             hash,
             num_hashes: 0,
             num_hashes: 0,
             hashes_per_tick,
             hashes_per_tick,
-            remaining_hashes: hashes_per_tick,
+            remaining_hashes_until_tick: hashes_per_tick,
             tick_number,
             tick_number,
             slot_start_time: now,
             slot_start_time: now,
         }
         }
@@ -62,27 +62,27 @@ impl Poh {
     /// Return `true` if the caller needs to `tick()` next, i.e. if the
     /// Return `true` if the caller needs to `tick()` next, i.e. if the
     /// remaining_hashes is 1.
     /// remaining_hashes is 1.
     pub fn hash(&mut self, max_num_hashes: u64) -> bool {
     pub fn hash(&mut self, max_num_hashes: u64) -> bool {
-        let num_hashes = std::cmp::min(self.remaining_hashes - 1, max_num_hashes);
+        let num_hashes = std::cmp::min(self.remaining_hashes_until_tick - 1, max_num_hashes);
 
 
         for _ in 0..num_hashes {
         for _ in 0..num_hashes {
             self.hash = hash(self.hash.as_ref());
             self.hash = hash(self.hash.as_ref());
         }
         }
         self.num_hashes += num_hashes;
         self.num_hashes += num_hashes;
-        self.remaining_hashes -= num_hashes;
+        self.remaining_hashes_until_tick -= num_hashes;
 
 
-        assert!(self.remaining_hashes > 0);
-        self.remaining_hashes == 1
+        assert!(self.remaining_hashes_until_tick > 0);
+        self.remaining_hashes_until_tick == 1
     }
     }
 
 
     pub fn record(&mut self, mixin: Hash) -> Option<PohEntry> {
     pub fn record(&mut self, mixin: Hash) -> Option<PohEntry> {
-        if self.remaining_hashes == 1 {
+        if self.remaining_hashes_until_tick == 1 {
             return None; // Caller needs to `tick()` first
             return None; // Caller needs to `tick()` first
         }
         }
 
 
         self.hash = hashv(&[self.hash.as_ref(), mixin.as_ref()]);
         self.hash = hashv(&[self.hash.as_ref(), mixin.as_ref()]);
         let num_hashes = self.num_hashes + 1;
         let num_hashes = self.num_hashes + 1;
         self.num_hashes = 0;
         self.num_hashes = 0;
-        self.remaining_hashes -= 1;
+        self.remaining_hashes_until_tick -= 1;
 
 
         Some(PohEntry {
         Some(PohEntry {
             num_hashes,
             num_hashes,
@@ -98,7 +98,7 @@ impl Poh {
         let num_mixins = mixins.len() as u64;
         let num_mixins = mixins.len() as u64;
         debug_assert_ne!(num_mixins, 0, "mixins.len() == 0");
         debug_assert_ne!(num_mixins, 0, "mixins.len() == 0");
 
 
-        if self.remaining_hashes < num_mixins + 1 {
+        if self.remaining_hashes_until_tick < num_mixins + 1 {
             return false; // Not enough hashes remaining to record all mixins
             return false; // Not enough hashes remaining to record all mixins
         }
         }
 
 
@@ -120,7 +120,7 @@ impl Poh {
         }));
         }));
 
 
         self.num_hashes = 0;
         self.num_hashes = 0;
-        self.remaining_hashes -= num_mixins;
+        self.remaining_hashes_until_tick -= num_mixins;
 
 
         true
         true
     }
     }
@@ -128,16 +128,16 @@ impl Poh {
     pub fn tick(&mut self) -> Option<PohEntry> {
     pub fn tick(&mut self) -> Option<PohEntry> {
         self.hash = hash(self.hash.as_ref());
         self.hash = hash(self.hash.as_ref());
         self.num_hashes += 1;
         self.num_hashes += 1;
-        self.remaining_hashes -= 1;
+        self.remaining_hashes_until_tick -= 1;
 
 
         // If we are in low power mode then always generate a tick.
         // If we are in low power mode then always generate a tick.
         // Otherwise only tick if there are no remaining hashes
         // Otherwise only tick if there are no remaining hashes
-        if self.hashes_per_tick != LOW_POWER_MODE && self.remaining_hashes != 0 {
+        if self.hashes_per_tick != LOW_POWER_MODE && self.remaining_hashes_until_tick != 0 {
             return None;
             return None;
         }
         }
 
 
         let num_hashes = self.num_hashes;
         let num_hashes = self.num_hashes;
-        self.remaining_hashes = self.hashes_per_tick;
+        self.remaining_hashes_until_tick = self.hashes_per_tick;
         self.num_hashes = 0;
         self.num_hashes = 0;
         self.tick_number += 1;
         self.tick_number += 1;
         Some(PohEntry {
         Some(PohEntry {
@@ -145,6 +145,15 @@ impl Poh {
             hash: self.hash,
             hash: self.hash,
         })
         })
     }
     }
+
+    pub fn remaining_hashes_in_slot(&self, ticks_per_slot: u64) -> u64 {
+        // ticks_per_slot must be a power of two so we can use a bitmask
+        debug_assert!(ticks_per_slot.is_power_of_two() && ticks_per_slot > 0);
+        ticks_per_slot
+            .saturating_sub((self.tick_number & (ticks_per_slot.wrapping_sub(1))).wrapping_add(1))
+            .wrapping_mul(self.hashes_per_tick)
+            .wrapping_add(self.remaining_hashes_until_tick)
+    }
 }
 }
 
 
 pub fn compute_hash_time(hashes_sample_size: u64) -> Duration {
 pub fn compute_hash_time(hashes_sample_size: u64) -> Duration {
@@ -328,29 +337,32 @@ mod tests {
     #[test]
     #[test]
     fn test_poh_tick() {
     fn test_poh_tick() {
         let mut poh = Poh::new(Hash::default(), Some(2));
         let mut poh = Poh::new(Hash::default(), Some(2));
-        assert_eq!(poh.remaining_hashes, 2);
+        assert_eq!(poh.remaining_hashes_until_tick, 2);
         assert!(poh.tick().is_none());
         assert!(poh.tick().is_none());
-        assert_eq!(poh.remaining_hashes, 1);
+        assert_eq!(poh.remaining_hashes_until_tick, 1);
         assert_matches!(poh.tick(), Some(PohEntry { num_hashes: 2, .. }));
         assert_matches!(poh.tick(), Some(PohEntry { num_hashes: 2, .. }));
-        assert_eq!(poh.remaining_hashes, 2); // Ready for the next tick
+        assert_eq!(poh.remaining_hashes_until_tick, 2); // Ready for the next tick
     }
     }
 
 
     #[test]
     #[test]
     fn test_poh_tick_large_batch() {
     fn test_poh_tick_large_batch() {
         let mut poh = Poh::new(Hash::default(), Some(2));
         let mut poh = Poh::new(Hash::default(), Some(2));
-        assert_eq!(poh.remaining_hashes, 2);
+        assert_eq!(poh.remaining_hashes_until_tick, 2);
         assert!(poh.hash(1_000_000)); // Stop hashing before the next tick
         assert!(poh.hash(1_000_000)); // Stop hashing before the next tick
-        assert_eq!(poh.remaining_hashes, 1);
+        assert_eq!(poh.remaining_hashes_until_tick, 1);
         assert!(poh.hash(1_000_000)); // Does nothing...
         assert!(poh.hash(1_000_000)); // Does nothing...
-        assert_eq!(poh.remaining_hashes, 1);
+        assert_eq!(poh.remaining_hashes_until_tick, 1);
+        assert_eq!(poh.remaining_hashes_in_slot(2), 3);
         poh.tick();
         poh.tick();
-        assert_eq!(poh.remaining_hashes, 2); // Ready for the next tick
+        assert_eq!(poh.remaining_hashes_until_tick, 2); // Ready for the next tick
+        assert_eq!(poh.remaining_hashes_in_slot(2), 2);
     }
     }
 
 
     #[test]
     #[test]
     fn test_poh_tick_too_soon() {
     fn test_poh_tick_too_soon() {
         let mut poh = Poh::new(Hash::default(), Some(2));
         let mut poh = Poh::new(Hash::default(), Some(2));
-        assert_eq!(poh.remaining_hashes, 2);
+        assert_eq!(poh.remaining_hashes_until_tick, 2);
+        assert_eq!(poh.remaining_hashes_in_slot(2), 4);
         assert!(poh.tick().is_none());
         assert!(poh.tick().is_none());
     }
     }
 
 
@@ -358,14 +370,16 @@ mod tests {
     fn test_poh_record_not_permitted_at_final_hash() {
     fn test_poh_record_not_permitted_at_final_hash() {
         let mut poh = Poh::new(Hash::default(), Some(10));
         let mut poh = Poh::new(Hash::default(), Some(10));
         assert!(poh.hash(9));
         assert!(poh.hash(9));
-        assert_eq!(poh.remaining_hashes, 1);
+        assert_eq!(poh.remaining_hashes_until_tick, 1);
+        assert_eq!(poh.remaining_hashes_in_slot(2), 11);
         assert!(poh.record(Hash::default()).is_none()); // <-- record() rejected to avoid exceeding hashes_per_tick
         assert!(poh.record(Hash::default()).is_none()); // <-- record() rejected to avoid exceeding hashes_per_tick
         assert_matches!(poh.tick(), Some(PohEntry { num_hashes: 10, .. }));
         assert_matches!(poh.tick(), Some(PohEntry { num_hashes: 10, .. }));
         assert_matches!(
         assert_matches!(
             poh.record(Hash::default()),
             poh.record(Hash::default()),
             Some(PohEntry { num_hashes: 1, .. }) // <-- record() ok
             Some(PohEntry { num_hashes: 1, .. }) // <-- record() ok
         );
         );
-        assert_eq!(poh.remaining_hashes, 9);
+        assert_eq!(poh.remaining_hashes_until_tick, 9);
+        assert_eq!(poh.remaining_hashes_in_slot(2), 9);
     }
     }
 
 
     #[test]
     #[test]
@@ -380,7 +394,8 @@ mod tests {
         assert_eq!(entries[0].num_hashes, 5);
         assert_eq!(entries[0].num_hashes, 5);
         assert_eq!(entries[1].num_hashes, 1);
         assert_eq!(entries[1].num_hashes, 1);
         assert_eq!(entries[2].num_hashes, 1);
         assert_eq!(entries[2].num_hashes, 1);
-        assert!(poh.remaining_hashes == 3);
+        assert_eq!(poh.remaining_hashes_until_tick, 3);
+        assert_eq!(poh.remaining_hashes_in_slot(2), 13);
 
 
         // Cannot record more than number of remaining hashes
         // Cannot record more than number of remaining hashes
         assert!(!poh.record_batches(&dummy_hashes[..4], &mut entries,));
         assert!(!poh.record_batches(&dummy_hashes[..4], &mut entries,));
@@ -393,6 +408,7 @@ mod tests {
         assert_eq!(entries.len(), 2);
         assert_eq!(entries.len(), 2);
         assert_eq!(entries[0].num_hashes, 1);
         assert_eq!(entries[0].num_hashes, 1);
         assert_eq!(entries[1].num_hashes, 1);
         assert_eq!(entries[1].num_hashes, 1);
-        assert!(poh.remaining_hashes == 1);
+        assert_eq!(poh.remaining_hashes_until_tick, 1);
+        assert_eq!(poh.remaining_hashes_in_slot(2), 11);
     }
     }
 }
 }

+ 1 - 1
local-cluster/tests/local_cluster.rs

@@ -295,7 +295,7 @@ fn test_two_unbalanced_stakes() {
     error!("test_two_unbalanced_stakes");
     error!("test_two_unbalanced_stakes");
     let validator_config = ValidatorConfig::default_for_test();
     let validator_config = ValidatorConfig::default_for_test();
     let num_ticks_per_second = 100;
     let num_ticks_per_second = 100;
-    let num_ticks_per_slot = 10;
+    let num_ticks_per_slot = 16;
     let num_slots_per_epoch = MINIMUM_SLOTS_PER_EPOCH;
     let num_slots_per_epoch = MINIMUM_SLOTS_PER_EPOCH;
 
 
     let mut cluster = LocalCluster::new(
     let mut cluster = LocalCluster::new(

+ 1 - 0
poh/benches/poh.rs

@@ -125,6 +125,7 @@ fn bench_poh_recorder_record_transaction_index(bencher: &mut Bencher) {
                 vec![test::black_box(txs.clone())],
                 vec![test::black_box(txs.clone())],
             )
             )
             .unwrap()
             .unwrap()
+            .starting_transaction_index
             .unwrap();
             .unwrap();
     });
     });
     poh_recorder.tick();
     poh_recorder.tick();

+ 17 - 4
poh/src/poh_recorder.rs

@@ -66,6 +66,12 @@ pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
 // transaction, if being tracked by WorkingBank
 // transaction, if being tracked by WorkingBank
 type RecordResultSender = Sender<Result<Option<usize>>>;
 type RecordResultSender = Sender<Result<Option<usize>>>;
 
 
+#[derive(Debug)]
+pub struct RecordSummary {
+    pub remaining_hashes_in_slot: u64,
+    pub starting_transaction_index: Option<usize>,
+}
+
 pub struct Record {
 pub struct Record {
     pub mixins: Vec<Hash>,
     pub mixins: Vec<Hash>,
     pub transaction_batches: Vec<Vec<VersionedTransaction>>,
     pub transaction_batches: Vec<Vec<VersionedTransaction>>,
@@ -311,7 +317,7 @@ impl PohRecorder {
         bank_slot: Slot,
         bank_slot: Slot,
         mixins: Vec<Hash>,
         mixins: Vec<Hash>,
         transaction_batches: Vec<Vec<VersionedTransaction>>,
         transaction_batches: Vec<Vec<VersionedTransaction>>,
-    ) -> Result<Option<usize>> {
+    ) -> Result<RecordSummary> {
         // Entries without transactions are used to track real-time passing in the ledger and
         // Entries without transactions are used to track real-time passing in the ledger and
         // cannot be generated by `record()`
         // cannot be generated by `record()`
         assert!(
         assert!(
@@ -346,6 +352,8 @@ impl PohRecorder {
             let (mixed_in, record_mixin_us) =
             let (mixed_in, record_mixin_us) =
                 measure_us!(poh_lock.record_batches(&mixins, &mut self.entries));
                 measure_us!(poh_lock.record_batches(&mixins, &mut self.entries));
             self.metrics.record_us += record_mixin_us;
             self.metrics.record_us += record_mixin_us;
+            let remaining_hashes_in_slot =
+                poh_lock.remaining_hashes_in_slot(working_bank.bank.ticks_per_slot());
 
 
             drop(poh_lock);
             drop(poh_lock);
 
 
@@ -378,7 +386,10 @@ impl PohRecorder {
                             transaction_index.saturating_add(num_transactions);
                             transaction_index.saturating_add(num_transactions);
                         working_bank.transaction_index = Some(next_starting_transaction_index);
                         working_bank.transaction_index = Some(next_starting_transaction_index);
                     });
                     });
-                return Ok(starting_transaction_index);
+                return Ok(RecordSummary {
+                    remaining_hashes_in_slot,
+                    starting_transaction_index,
+                });
             }
             }
 
 
             // record() might fail if the next PoH hash needs to be a tick.  But that's ok, tick()
             // record() might fail if the next PoH hash needs to be a tick.  But that's ok, tick()
@@ -1520,6 +1531,7 @@ mod tests {
         let record_result = poh_recorder
         let record_result = poh_recorder
             .record(bank.slot(), vec![h1], vec![vec![tx0.into(), tx1.into()]])
             .record(bank.slot(), vec![h1], vec![vec![tx0.into(), tx1.into()]])
             .unwrap()
             .unwrap()
+            .starting_transaction_index
             .unwrap();
             .unwrap();
         assert_eq!(record_result, 0);
         assert_eq!(record_result, 0);
         assert_eq!(
         assert_eq!(
@@ -1534,11 +1546,12 @@ mod tests {
 
 
         let tx = test_tx();
         let tx = test_tx();
         let h2 = hash(b"foobar");
         let h2 = hash(b"foobar");
-        let record_result = poh_recorder
+        let starting_transaction_index = poh_recorder
             .record(bank.slot(), vec![h2], vec![vec![tx.into()]])
             .record(bank.slot(), vec![h2], vec![vec![tx.into()]])
             .unwrap()
             .unwrap()
+            .starting_transaction_index
             .unwrap();
             .unwrap();
-        assert_eq!(record_result, 2);
+        assert_eq!(starting_transaction_index, 2);
         assert_eq!(
         assert_eq!(
             poh_recorder
             poh_recorder
                 .working_bank
                 .working_bank

+ 10 - 6
poh/src/poh_service.rs

@@ -209,11 +209,13 @@ impl PohService {
         if let Ok(record) = record {
         if let Ok(record) = record {
             if record
             if record
                 .sender
                 .sender
-                .send(poh_recorder.write().unwrap().record(
-                    record.slot,
-                    record.mixins,
-                    record.transaction_batches,
-                ))
+                .send(
+                    poh_recorder
+                        .write()
+                        .unwrap()
+                        .record(record.slot, record.mixins, record.transaction_batches)
+                        .map(|summary| summary.starting_transaction_index),
+                )
                 .is_err()
                 .is_err()
             {
             {
                 panic!("Error returning mixin hash");
                 panic!("Error returning mixin hash");
@@ -284,7 +286,9 @@ impl PohService {
                         record.mixins,
                         record.mixins,
                         std::mem::take(&mut record.transaction_batches),
                         std::mem::take(&mut record.transaction_batches),
                     );
                     );
-                    let (send_res, send_record_result_us) = measure_us!(record.sender.send(res));
+                    let (send_res, send_record_result_us) = measure_us!(record
+                        .sender
+                        .send(res.map(|summary| summary.starting_transaction_index)));
                     debug_assert!(send_res.is_ok(), "Record wasn't sent.");
                     debug_assert!(send_res.is_ok(), "Record wasn't sent.");
 
 
                     timing.total_send_record_result_us += send_record_result_us;
                     timing.total_send_record_result_us += send_record_result_us;