Răsfoiți Sursa

DecisionMaker cache decision (#4724)

Andrew Fitzgerald 9 luni în urmă
părinte
comite
0d94844f5c
2 a modificat fișierele cu 34 adăugiri și 10 ștergeri
  1. 4 4
      core/src/banking_stage.rs
  2. 30 6
      core/src/banking_stage/decision_maker.rs

+ 4 - 4
core/src/banking_stage.rs

@@ -634,7 +634,7 @@ impl BankingStage {
     fn spawn_thread_local_multi_iterator_thread<T: LikeClusterInfo>(
         id: u32,
         packet_receiver: BankingPacketReceiver,
-        decision_maker: DecisionMaker,
+        mut decision_maker: DecisionMaker,
         committer: Committer,
         transaction_recorder: TransactionRecorder,
         log_messages_bytes_limit: Option<usize>,
@@ -654,7 +654,7 @@ impl BankingStage {
             .spawn(move || {
                 Self::process_loop(
                     &mut packet_receiver,
-                    &decision_maker,
+                    &mut decision_maker,
                     &mut forwarder,
                     &consumer,
                     id,
@@ -666,7 +666,7 @@ impl BankingStage {
 
     #[allow(clippy::too_many_arguments)]
     fn process_buffered_packets<T: LikeClusterInfo>(
-        decision_maker: &DecisionMaker,
+        decision_maker: &mut DecisionMaker,
         forwarder: &mut Forwarder<T>,
         consumer: &Consumer,
         unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
@@ -730,7 +730,7 @@ impl BankingStage {
 
     fn process_loop<T: LikeClusterInfo>(
         packet_receiver: &mut PacketReceiver,
-        decision_maker: &DecisionMaker,
+        decision_maker: &mut DecisionMaker,
         forwarder: &mut Forwarder<T>,
         consumer: &Consumer,
         id: u32,

+ 30 - 6
core/src/banking_stage/decision_maker.rs

@@ -7,7 +7,10 @@ use {
         },
         pubkey::Pubkey,
     },
-    std::sync::{Arc, RwLock},
+    std::{
+        sync::{Arc, RwLock},
+        time::{Duration, Instant},
+    },
 };
 
 #[derive(Debug, Clone)]
@@ -32,6 +35,9 @@ impl BufferedPacketsDecision {
 pub struct DecisionMaker {
     my_pubkey: Pubkey,
     poh_recorder: Arc<RwLock<PohRecorder>>,
+
+    cached_decision: Option<BufferedPacketsDecision>,
+    last_decision_time: Instant,
 }
 
 impl DecisionMaker {
@@ -39,10 +45,28 @@ impl DecisionMaker {
         Self {
             my_pubkey,
             poh_recorder,
+            cached_decision: None,
+            last_decision_time: Instant::now(),
+        }
+    }
+
+    pub(crate) fn make_consume_or_forward_decision(&mut self) -> BufferedPacketsDecision {
+        const CACHE_DURATION: Duration = Duration::from_millis(5);
+        let now = Instant::now();
+
+        // If there is a cached decision that has not expired, return it now.
+        if let Some(decision) = &self.cached_decision {
+            if now.duration_since(self.last_decision_time) < CACHE_DURATION {
+                return decision.clone();
+            }
         }
+
+        self.last_decision_time = now;
+        self.cached_decision = Some(self.make_consume_or_forward_decision_no_cache());
+        self.cached_decision.as_ref().unwrap().clone()
     }
 
-    pub(crate) fn make_consume_or_forward_decision(&self) -> BufferedPacketsDecision {
+    fn make_consume_or_forward_decision_no_cache(&self) -> BufferedPacketsDecision {
         let decision;
         {
             let poh_recorder = self.poh_recorder.read().unwrap();
@@ -169,7 +193,7 @@ mod tests {
                 .write()
                 .unwrap()
                 .set_bank_for_test(bank.clone());
-            let decision = decision_maker.make_consume_or_forward_decision();
+            let decision = decision_maker.make_consume_or_forward_decision_no_cache();
             assert_matches!(decision, BufferedPacketsDecision::Consume(_));
         }
 
@@ -183,7 +207,7 @@ mod tests {
                     next_leader_slot + NUM_CONSECUTIVE_LEADER_SLOTS,
                 )),
             );
-            let decision = decision_maker.make_consume_or_forward_decision();
+            let decision = decision_maker.make_consume_or_forward_decision_no_cache();
             assert!(
                 matches!(decision, BufferedPacketsDecision::Hold),
                 "next_leader_slot_offset: {next_leader_slot_offset}",
@@ -200,7 +224,7 @@ mod tests {
                     next_leader_slot + NUM_CONSECUTIVE_LEADER_SLOTS + 1,
                 )),
             );
-            let decision = decision_maker.make_consume_or_forward_decision();
+            let decision = decision_maker.make_consume_or_forward_decision_no_cache();
             assert!(
                 matches!(decision, BufferedPacketsDecision::ForwardAndHold),
                 "next_leader_slot_offset: {next_leader_slot_offset}",
@@ -210,7 +234,7 @@ mod tests {
         // Known leader, not me - Forward
         {
             poh_recorder.write().unwrap().reset(bank, None);
-            let decision = decision_maker.make_consume_or_forward_decision();
+            let decision = decision_maker.make_consume_or_forward_decision_no_cache();
             assert_matches!(decision, BufferedPacketsDecision::Forward);
         }
     }