Browse Source

add ProgressMessage::leader_state (#8788)

* add ProgressMessage::leader_state

* 0 goes before 1

* add LeaderState::next_leader_slot_range

* leader range on progress message
Andrew Fitzgerald 3 weeks ago
parent
commit
864843df49

+ 5 - 1
core/src/banking_stage/consume_worker.rs

@@ -1687,7 +1687,7 @@ mod tests {
             Arc::new(PrioritizationFeeCache::new(0u64)),
         );
         let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
-        let shared_leader_state = SharedLeaderState::new(0, None);
+        let shared_leader_state = SharedLeaderState::new(0, None, None);
 
         let (consume_sender, consume_receiver) = unbounded();
         let (consumed_sender, consumed_receiver) = unbounded();
@@ -1781,6 +1781,7 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             None,
+            None,
         )));
         record_receiver.restart(bank.bank_id());
 
@@ -1834,6 +1835,7 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             None,
+            None,
         )));
         record_receiver.restart(bank.bank_id());
 
@@ -1898,6 +1900,7 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             None,
+            None,
         )));
         record_receiver.restart(bank.bank_id());
 
@@ -1976,6 +1979,7 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             None,
+            None,
         )));
         record_receiver.restart(bank.bank_id());
         assert!(bank.slot() > 0);

+ 11 - 3
core/src/banking_stage/decision_maker.rs

@@ -128,7 +128,7 @@ mod tests {
         let genesis_config = create_genesis_config(2).genesis_config;
         let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
 
-        let mut shared_leader_state = SharedLeaderState::new(0, None);
+        let mut shared_leader_state = SharedLeaderState::new(0, None, None);
 
         let decision_maker = DecisionMaker::new(shared_leader_state.clone());
 
@@ -139,12 +139,17 @@ mod tests {
         );
 
         // Active bank.
-        shared_leader_state.store(Arc::new(LeaderState::new(Some(bank.clone()), 0, None)));
+        shared_leader_state.store(Arc::new(LeaderState::new(
+            Some(bank.clone()),
+            0,
+            None,
+            None,
+        )));
         assert_matches!(
             decision_maker.make_consume_or_forward_decision(),
             BufferedPacketsDecision::Consume(_)
         );
-        shared_leader_state.store(Arc::new(LeaderState::new(None, 0, None)));
+        shared_leader_state.store(Arc::new(LeaderState::new(None, 0, None, None)));
 
         // Will be leader shortly - Hold
         for next_leader_slot_offset in [0, 1].into_iter() {
@@ -153,6 +158,7 @@ mod tests {
                 None,
                 0,
                 Some(next_leader_slot * DEFAULT_TICKS_PER_SLOT),
+                Some((next_leader_slot, next_leader_slot + 4)),
             )));
 
             let decision = decision_maker.make_consume_or_forward_decision();
@@ -169,6 +175,7 @@ mod tests {
                 None,
                 0,
                 Some(next_leader_slot * DEFAULT_TICKS_PER_SLOT),
+                Some((next_leader_slot, next_leader_slot + 4)),
             )));
 
             let decision = decision_maker.make_consume_or_forward_decision();
@@ -184,6 +191,7 @@ mod tests {
             None,
             0,
             Some(next_leader_slot * DEFAULT_TICKS_PER_SLOT),
+            Some((next_leader_slot, next_leader_slot + 4)),
         )));
         let decision = decision_maker.make_consume_or_forward_decision();
         assert!(

+ 41 - 10
core/src/banking_stage/progress_tracker.rs

@@ -91,6 +91,9 @@ impl ProgressTracker {
     fn produce_progress_message(&mut self) -> (ProgressMessage, u64) {
         let leader_state = self.shared_leader_state.load();
         let tick_height = leader_state.tick_height();
+        let (next_leader_range_start, next_leader_range_end) = leader_state
+            .next_leader_slot_range()
+            .unwrap_or((u64::MAX, u64::MAX));
         let progress_message = if let Some(working_bank) = leader_state.working_bank() {
             // If new leader slot grab the cost tracker lock to get limit and shared cost.
             // This avoid needing to lock except on new leader slots.
@@ -104,8 +107,10 @@ impl ProgressTracker {
             }
 
             ProgressMessage {
+                leader_state: agave_scheduler_bindings::IS_LEADER,
                 current_slot: working_bank.slot(),
-                next_leader_slot: working_bank.slot(),
+                next_leader_slot: next_leader_range_start,
+                leader_range_end: next_leader_range_end,
                 remaining_cost_units: self.remaining_block_cost(),
                 current_slot_progress: progress(
                     working_bank.slot(),
@@ -116,11 +121,10 @@ impl ProgressTracker {
         } else {
             let current_slot = slot_from_tick_height(tick_height, self.ticks_per_slot);
             ProgressMessage {
+                leader_state: agave_scheduler_bindings::IS_NOT_LEADER,
                 current_slot,
-                next_leader_slot: leader_state
-                    .leader_first_tick_height()
-                    .map(|tick_height| slot_from_tick_height(tick_height, self.ticks_per_slot))
-                    .unwrap_or(u64::MAX),
+                next_leader_slot: next_leader_range_start,
+                leader_range_end: next_leader_range_end,
                 remaining_cost_units: 0,
                 current_slot_progress: progress(current_slot, tick_height, self.ticks_per_slot),
             }
@@ -161,7 +165,7 @@ mod tests {
 
     #[test]
     fn test_progress_tracker_produce_progress_message() {
-        let mut shared_leader_state = SharedLeaderState::new(0, None);
+        let mut shared_leader_state = SharedLeaderState::new(0, None, None);
         let ticks_per_slot = DEFAULT_TICKS_PER_SLOT;
 
         let mut progress_tracker =
@@ -169,27 +173,48 @@ mod tests {
 
         let (message, tick_height) = progress_tracker.produce_progress_message();
         assert_eq!(tick_height, 0);
+        assert_eq!(
+            message.leader_state,
+            agave_scheduler_bindings::IS_NOT_LEADER
+        );
         assert_eq!(message.current_slot, 0);
         assert_eq!(message.current_slot_progress, 0);
         assert_eq!(message.next_leader_slot, u64::MAX);
+        assert_eq!(message.leader_range_end, u64::MAX);
 
         let expected_tick_height = 2 * ticks_per_slot;
-        shared_leader_state.store(Arc::new(LeaderState::new(None, expected_tick_height, None)));
+        shared_leader_state.store(Arc::new(LeaderState::new(
+            None,
+            expected_tick_height,
+            None,
+            None,
+        )));
         let (message, tick_height) = progress_tracker.produce_progress_message();
         assert_eq!(tick_height, expected_tick_height);
+        assert_eq!(
+            message.leader_state,
+            agave_scheduler_bindings::IS_NOT_LEADER
+        );
         assert_eq!(message.current_slot, 2);
         assert_eq!(message.next_leader_slot, u64::MAX);
+        assert_eq!(message.leader_range_end, u64::MAX);
         assert_eq!(message.current_slot_progress, 0);
 
         shared_leader_state.store(Arc::new(LeaderState::new(
             None,
             expected_tick_height,
             Some(4 * ticks_per_slot),
+            Some((4, 7)),
         )));
         let (message, tick_height) = progress_tracker.produce_progress_message();
         assert_eq!(tick_height, expected_tick_height);
+        assert_eq!(
+            message.leader_state,
+            agave_scheduler_bindings::IS_NOT_LEADER
+        );
         assert_eq!(message.current_slot, 2);
         assert_eq!(message.next_leader_slot, 4);
+        assert_eq!(message.leader_range_end, 7);
         assert_eq!(message.current_slot_progress, 0);
 
         let bank = Arc::new(Bank::new_for_tests(
@@ -199,13 +224,16 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             Some(4 * ticks_per_slot),
+            Some((4, 7)),
         )));
 
         assert!(!bank.is_complete());
         let (message, tick_height) = progress_tracker.produce_progress_message();
         assert_eq!(tick_height, bank.tick_height());
+        assert_eq!(message.leader_state, agave_scheduler_bindings::IS_LEADER);
         assert_eq!(message.current_slot, bank.slot());
-        assert_eq!(message.next_leader_slot, bank.slot()); // currently leader
+        assert_eq!(message.next_leader_slot, 4);
+        assert_eq!(message.leader_range_end, 7);
         assert_eq!(message.current_slot_progress, 0);
 
         bank.fill_bank_with_ticks_for_tests();
@@ -214,11 +242,14 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             Some(4 * ticks_per_slot),
+            Some((4, 7)),
         )));
         let (message, tick_height) = progress_tracker.produce_progress_message();
         assert_eq!(tick_height, bank.tick_height());
+        assert_eq!(message.leader_state, agave_scheduler_bindings::IS_LEADER);
         assert_eq!(message.current_slot, bank.slot());
-        assert_eq!(message.next_leader_slot, bank.slot());
+        assert_eq!(message.next_leader_slot, 4);
+        assert_eq!(message.leader_range_end, 7);
         assert_eq!(message.current_slot_progress, 100);
     }
 
@@ -226,7 +257,7 @@ mod tests {
     fn test_progress_tracker_remaining_block_cost() {
         let mut progress_tracker = ProgressTracker::new(
             Arc::default(),
-            SharedLeaderState::new(0, None),
+            SharedLeaderState::new(0, None, None),
             DEFAULT_TICKS_PER_SLOT,
         );
 

+ 6 - 1
core/src/banking_stage/transaction_scheduler/scheduler_controller.rs

@@ -520,7 +520,7 @@ mod tests {
         genesis_config.fee_rate_governor = FeeRateGovernor::new(5000, 0);
         let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
 
-        let shared_leader_state = SharedLeaderState::new(0, None);
+        let shared_leader_state = SharedLeaderState::new(0, None, None);
 
         let decision_maker = DecisionMaker::new(shared_leader_state.clone());
 
@@ -671,6 +671,7 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             None,
+            None,
         )));
 
         // Send packet batch to the scheduler - should do nothing until we become the leader.
@@ -729,6 +730,7 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             None,
+            None,
         )));
 
         let pk = Pubkey::new_unique();
@@ -790,6 +792,7 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             None,
+            None,
         )));
 
         // Send multiple batches - all get scheduled
@@ -856,6 +859,7 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             None,
+            None,
         )));
 
         // Send 4 transactions w/o conflicts. 2 should be scheduled on each thread
@@ -926,6 +930,7 @@ mod tests {
             Some(bank.clone()),
             bank.tick_height(),
             None,
+            None,
         )));
 
         // Send packet batch to the scheduler - should do nothing until we become the leader.

+ 27 - 3
poh/src/poh_recorder.rs

@@ -252,7 +252,11 @@ impl PohRecorder {
                 poh,
                 tick_cache: vec![],
                 working_bank: None,
-                shared_leader_state: SharedLeaderState::new(tick_height, leader_first_tick_height),
+                shared_leader_state: SharedLeaderState::new(
+                    tick_height,
+                    leader_first_tick_height,
+                    next_leader_slot,
+                ),
                 working_bank_sender,
                 clear_bank_signal,
                 start_bank,
@@ -290,6 +294,7 @@ impl PohRecorder {
             None,
             tick_height,
             leader_first_tick_height,
+            next_leader_slot,
         )));
 
         self.leader_last_tick_height = leader_last_tick_height;
@@ -436,11 +441,15 @@ impl PohRecorder {
             }
         }
 
-        let leader_first_tick_height = self.shared_leader_state.load().leader_first_tick_height;
+        let leader_state = self.shared_leader_state.load();
+        let leader_first_tick_height = leader_state.leader_first_tick_height();
+        let next_leader_slot = leader_state.next_leader_slot_range();
+        drop(leader_state);
         self.shared_leader_state.store(Arc::new(LeaderState::new(
             Some(working_bank.bank.clone_without_scheduler()),
             tick_height,
             leader_first_tick_height,
+            next_leader_slot,
         )));
         self.working_bank = Some(working_bank);
 
@@ -475,6 +484,7 @@ impl PohRecorder {
                     None,
                     self.tick_height(),
                     leader_first_tick_height,
+                    next_leader_slot,
                 )));
             }
 
@@ -972,11 +982,16 @@ pub struct SharedLeaderState(Arc<ArcSwap<LeaderState>>);
 
 impl SharedLeaderState {
     #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
-    fn new(tick_height: u64, leader_first_tick_height: Option<u64>) -> Self {
+    fn new(
+        tick_height: u64,
+        leader_first_tick_height: Option<u64>,
+        next_leader_slot_range: Option<(Slot, Slot)>,
+    ) -> Self {
         let inner = LeaderState {
             working_bank: None,
             tick_height: AtomicU64::new(tick_height),
             leader_first_tick_height,
+            next_leader_slot_range,
         };
         Self(Arc::new(ArcSwap::from_pointee(inner)))
     }
@@ -1001,6 +1016,7 @@ pub struct LeaderState {
     working_bank: Option<Arc<Bank>>,
     tick_height: AtomicU64,
     leader_first_tick_height: Option<u64>,
+    next_leader_slot_range: Option<(Slot, Slot)>,
 }
 
 impl LeaderState {
@@ -1009,11 +1025,13 @@ impl LeaderState {
         working_bank: Option<Arc<Bank>>,
         tick_height: u64,
         leader_first_tick_height: Option<u64>,
+        next_leader_slot_range: Option<(u64, u64)>,
     ) -> Self {
         Self {
             working_bank,
             tick_height: AtomicU64::new(tick_height),
             leader_first_tick_height,
+            next_leader_slot_range,
         }
     }
 
@@ -1028,6 +1046,12 @@ impl LeaderState {
     pub fn leader_first_tick_height(&self) -> Option<u64> {
         self.leader_first_tick_height
     }
+
+    /// Returns [first_slot, last_slot] inclusive range for the next
+    /// leader slots.
+    pub fn next_leader_slot_range(&self) -> Option<(Slot, Slot)> {
+        self.next_leader_slot_range
+    }
 }
 
 #[cfg(test)]

+ 27 - 6
scheduler-bindings/src/lib.rs

@@ -167,6 +167,11 @@ pub mod tpu_message_flags {
     pub const FROM_STAKED_NODE: u8 = 1 << 2;
 }
 
+/// Indicates the node is not leader.
+pub const IS_NOT_LEADER: u8 = 0;
+/// Indicates the node is leader.
+pub const IS_LEADER: u8 = 1;
+
 /// Message: [Agave -> Pack]
 /// Agave passes leader status to the external pack process.
 #[cfg_attr(
@@ -175,11 +180,27 @@ pub mod tpu_message_flags {
 )]
 #[repr(C)]
 pub struct ProgressMessage {
-    /// The current slot.
+    /// Indicates if node is currently leader or not.
+    /// [`IS_LEADER`] if the node is leader.
+    /// [`IS_NOT_LEADER`] if the node is not leader.
+    /// Other values should be considered invalid.
+    pub leader_state: u8,
+    /// The current slot. This along with a leader schedule is not sufficient
+    /// for determining if the node is currently leader. There is a slight
+    /// delay between when a node is supposed to begin its' leader slot, and
+    /// when a bank is ready for processing transactions as leader.
+    /// Using [`Self::leader_state`] for determining if the node is leader
+    /// and has a bank available.
     pub current_slot: u64,
     /// Next known leader slot or u64::MAX if unknown.
-    /// If currently leader, this is equal to `current_slot`.
+    /// This will **not** include the current slot if leader.
+    /// Node is leader for contiguous slots in the inclusive range
+    /// [[`Self::next_leader_slot`], [`Self::leader_range_end`]].
     pub next_leader_slot: u64,
+    /// Next known leader slot range end (inclusive) or u64::MAX if unknown.
+    /// Node is leader for contiguous slots in the inclusive range
+    /// [[`Self::next_leader_slot`], [`Self::leader_range_end`]].
+    pub leader_range_end: u64,
     /// The remaining cost units allowed to be packed in the block.
     /// i.e. block_limit - current_cost_units_used.
     /// Only valid if currently leader, otherwise the value is undefined.
@@ -243,10 +264,10 @@ pub mod pack_message_flags {
     pub const RESOLVE: u16 = 1 << 1;
 }
 
-/// The message was processed.
-pub const PROCESSED: u8 = 1;
 /// The message was not processed.
 pub const NOT_PROCESSED: u8 = 0;
+/// The message was processed.
+pub const PROCESSED: u8 = 1;
 
 /// Message: [Worker -> Pack]
 /// Message from worker threads in response to a [`PackToWorkerMessage`].
@@ -406,10 +427,10 @@ pub mod worker_message_types {
     /// Tag indicating [`Resolved`] inner message.
     pub const RESOLVED: u8 = 1;
 
-    /// Resolving was successful.
-    pub const RESOLVE_SUCCESS: u8 = 1;
     /// Resolving was unsuccessful.
     pub const RESOLVE_FAILURE: u8 = 0;
+    /// Resolving was successful.
+    pub const RESOLVE_SUCCESS: u8 = 1;
 
     #[cfg_attr(
         feature = "dev-context-only-utils",

+ 2 - 0
scheduling-utils/src/handshake/tests.rs

@@ -30,8 +30,10 @@ fn message_passing_on_all_queues() {
         src_addr: [4; 16],
     };
     let progress_tracker = ProgressMessage {
+        leader_state: agave_scheduler_bindings::IS_LEADER,
         current_slot: 3,
         next_leader_slot: 12,
+        leader_range_end: 16,
         remaining_cost_units: 12_000_000,
         current_slot_progress: 32,
     };