瀏覽代碼

(Alpenglow) Make consensus metrics switch to crossbeam channels. (#8689)

* (Alpenglow) Upstream consensus metrics switching to crossbeam channels.

* Fix votor to work with new ConsensusMetrics.

* Make linter happy.

* Wrap the comments.

* Change unbounded channels in tests to bounded.

* Change metrics_sender send to try_send and do not fail on queue full.

* Address pr comments.

* Remove unneeded Eq.

* Address pr comments.

* Address pr comments.

* Undo all the renaming changes.

* Shrink thread name to 15 chars.

* Update votor/src/event_handler/stats.rs

Co-authored-by: Akhilesh Singhania <akhi3030@gmail.com>

* Rename metrics_queue_full to metrics_queue_became_full.

* Forgot to update event_handler.

* Make linter happy.

---------

Co-authored-by: Akhilesh Singhania <akhi3030@gmail.com>
Wen 2 周之前
父節點
當前提交
1253f83339

+ 88 - 17
votor/src/consensus_metrics.rs

@@ -2,16 +2,37 @@
 
 use {
     agave_votor_messages::vote::Vote,
+    crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
     histogram::Histogram,
     solana_clock::{Epoch, Slot},
     solana_metrics::datapoint_info,
     solana_pubkey::Pubkey,
     std::{
         collections::BTreeMap,
+        sync::{
+            atomic::{AtomicBool, Ordering},
+            Arc,
+        },
+        thread::{Builder, JoinHandle},
         time::{Duration, Instant},
     },
 };
 
+#[derive(Debug, Clone, PartialEq)]
+pub enum ConsensusMetricsEvent {
+    /// A vote was received from the node with `id`.
+    Vote { id: Pubkey, vote: Vote },
+    /// A block hash was seen for `slot` and the `leader` is responsible for producing it.
+    BlockHashSeen { leader: Pubkey, slot: Slot },
+    /// Check on new epoch
+    MaybeNewEpoch { epoch: Epoch },
+    /// Start of slot
+    StartOfSlot { slot: Slot },
+}
+
+pub type ConsensusMetricsEventSender = Sender<(Instant, Vec<ConsensusMetricsEvent>)>;
+pub type ConsensusMetricsEventReceiver = Receiver<(Instant, Vec<ConsensusMetricsEvent>)>;
+
 /// Returns a [`Histogram`] configured for the use cases for this module.
 ///
 /// Keeps the default precision and reduces the max value to 10s to get finer grained resolution.
@@ -97,6 +118,8 @@ pub struct ConsensusMetrics {
     node_metrics: BTreeMap<Pubkey, NodeVoteMetrics>,
     /// Used to track when this node received blocks from different leaders in the network.
     leader_metrics: BTreeMap<Pubkey, Histogram>,
+    /// Counts number of times metrics recording failed.
+    metrics_recording_failed: usize,
     /// Tracks when individual slots began.
     ///
     /// Relies on [`TimerManager`] to notify of start of slots.
@@ -104,39 +127,88 @@ pub struct ConsensusMetrics {
     start_of_slot: BTreeMap<Slot, Instant>,
     /// Tracks the current epoch, used for end of epoch reporting.
     current_epoch: Epoch,
+    /// Receiver for events
+    receiver: ConsensusMetricsEventReceiver,
 }
 
 impl ConsensusMetrics {
-    pub fn new(epoch: Epoch) -> Self {
+    fn new(epoch: Epoch, receiver: ConsensusMetricsEventReceiver) -> Self {
         Self {
             node_metrics: BTreeMap::default(),
             leader_metrics: BTreeMap::default(),
+            metrics_recording_failed: 0,
             start_of_slot: BTreeMap::default(),
             current_epoch: epoch,
+            receiver,
+        }
+    }
+
+    pub(crate) fn start_metrics_loop(
+        epoch: Epoch,
+        receiver: ConsensusMetricsEventReceiver,
+        exit: Arc<AtomicBool>,
+    ) -> JoinHandle<()> {
+        Builder::new()
+            .name("solConsMetrics".into())
+            .spawn(move || {
+                info!("ConsensusMetricsService has started");
+                let mut metrics = Self::new(epoch, receiver);
+                metrics.run(exit);
+                info!("ConsensusMetricsService has stopped");
+            })
+            .expect("Failed to start consensus metrics thread")
+    }
+
+    fn run(&mut self, exit: Arc<AtomicBool>) {
+        while !exit.load(Ordering::Relaxed) {
+            match self.receiver.recv_timeout(Duration::from_secs(1)) {
+                Ok((recorded, events)) => {
+                    for event in events {
+                        match event {
+                            ConsensusMetricsEvent::Vote { id, vote } => {
+                                self.record_vote(id, &vote, recorded);
+                            }
+                            ConsensusMetricsEvent::BlockHashSeen { leader, slot } => {
+                                self.record_block_hash_seen(leader, slot, recorded);
+                            }
+                            ConsensusMetricsEvent::MaybeNewEpoch { epoch } => {
+                                self.maybe_new_epoch(epoch);
+                            }
+                            ConsensusMetricsEvent::StartOfSlot { slot } => {
+                                self.record_start_of_slot(slot, recorded);
+                            }
+                        }
+                    }
+                }
+                Err(err) => match err {
+                    RecvTimeoutError::Timeout => trace!("ConsensusMetricsEventReceiver timeout"),
+                    RecvTimeoutError::Disconnected => {
+                        warn!("ConsensusMetricsEventReceiver disconnected, exiting loop");
+                        return;
+                    }
+                },
+            }
         }
     }
 
     /// Records a `vote` from the node with `id`.
-    pub fn record_vote(&mut self, id: Pubkey, vote: &Vote) -> Result<(), RecordVoteError> {
+    fn record_vote(&mut self, id: Pubkey, vote: &Vote, recorded: Instant) {
         let Some(start) = self.start_of_slot.get(&vote.slot()) else {
-            return Err(RecordVoteError::SlotNotFound);
+            self.metrics_recording_failed = self.metrics_recording_failed.saturating_add(1);
+            return;
         };
         let node = self.node_metrics.entry(id).or_default();
-        let elapsed = start.elapsed();
+        let elapsed = recorded.duration_since(*start);
         node.record_vote(vote, elapsed);
-        Ok(())
     }
 
     /// Records when a block for `slot` was seen and the `leader` is responsible for producing it.
-    pub fn record_block_hash_seen(
-        &mut self,
-        leader: Pubkey,
-        slot: Slot,
-    ) -> Result<(), RecordBlockHashError> {
+    fn record_block_hash_seen(&mut self, leader: Pubkey, slot: Slot, recorded: Instant) {
         let Some(start) = self.start_of_slot.get(&slot) else {
-            return Err(RecordBlockHashError::SlotNotFound);
+            self.metrics_recording_failed = self.metrics_recording_failed.saturating_add(1);
+            return;
         };
-        let elapsed = start.elapsed().as_micros();
+        let elapsed = recorded.duration_since(*start).as_micros();
         let elapsed = match elapsed.try_into() {
             Ok(e) => e,
             Err(err) => {
@@ -144,7 +216,7 @@ impl ConsensusMetrics {
                     "recording duration {elapsed} for block hash for slot {slot}: conversion to \
                      u64 failed with {err}"
                 );
-                return Ok(());
+                return;
             }
         };
         let histogram = self
@@ -160,12 +232,11 @@ impl ConsensusMetrics {
                 );
             }
         }
-        Ok(())
     }
 
     /// Records when a given slot started.
-    pub fn record_start_of_slot(&mut self, slot: Slot) {
-        self.start_of_slot.entry(slot).or_insert(Instant::now());
+    fn record_start_of_slot(&mut self, slot: Slot, recorded: Instant) {
+        self.start_of_slot.entry(slot).or_insert(recorded);
     }
 
     /// Performs end of epoch reporting and reset all the statistics for the subsequent epoch.
@@ -218,7 +289,7 @@ impl ConsensusMetrics {
     }
 
     /// This function can be called if there is a new [`Epoch`] and it will carry out end of epoch reporting.
-    pub fn maybe_new_epoch(&mut self, epoch: Epoch) {
+    fn maybe_new_epoch(&mut self, epoch: Epoch) {
         assert!(epoch >= self.current_epoch);
         if epoch != self.current_epoch {
             self.current_epoch = epoch;

+ 88 - 29
votor/src/event_handler.rs

@@ -3,6 +3,7 @@
 use {
     crate::{
         commitment::{update_commitment_cache, CommitmentType},
+        consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender},
         event::{CompletedBlock, VotorEvent, VotorEventReceiver},
         event_handler::stats::EventHandlerStats,
         root_utils::{self, RootContext, SetRootError},
@@ -13,7 +14,7 @@ use {
         votor::{SharedContext, Votor},
     },
     agave_votor_messages::{consensus_message::Block, vote::Vote},
-    crossbeam_channel::{RecvTimeoutError, SendError},
+    crossbeam_channel::{RecvTimeoutError, TrySendError},
     parking_lot::RwLock,
     solana_clock::Slot,
     solana_hash::Hash,
@@ -31,7 +32,7 @@ use {
             Arc, Condvar, Mutex,
         },
         thread::{self, Builder, JoinHandle},
-        time::Duration,
+        time::{Duration, Instant},
     },
     thiserror::Error,
 };
@@ -62,7 +63,7 @@ enum EventLoopError {
     ReceiverDisconnected(#[from] RecvTimeoutError),
 
     #[error("Sender is disconnected")]
-    SenderDisconnected(#[from] SendError<()>),
+    SenderDisconnected,
 
     #[error("Error generating and inserting vote")]
     VotingError(#[from] VoteError),
@@ -164,7 +165,9 @@ impl EventHandler {
             let mut send_votes_batch_time = Measure::start("send_votes_batch");
             for vote in votes {
                 local_context.stats.incr_vote(&vote);
-                vctx.bls_sender.send(vote).map_err(|_| SendError(()))?;
+                vctx.bls_sender
+                    .send(vote)
+                    .map_err(|_| EventLoopError::SenderDisconnected)?;
             }
             send_votes_batch_time.stop();
             local_context.stats.send_votes_batch_time_us = local_context
@@ -208,6 +211,24 @@ impl EventHandler {
         Ok(())
     }
 
+    fn send_to_metrics(
+        consensus_metrics_sender: &ConsensusMetricsEventSender,
+        consensus_metrics_events: Vec<ConsensusMetricsEvent>,
+        stats: &mut EventHandlerStats,
+    ) -> Result<(), EventLoopError> {
+        // Do not kill or block event handler threads just because metrics
+        // send failed (maybe because the queue is full).
+        match consensus_metrics_sender.try_send((Instant::now(), consensus_metrics_events)) {
+            Ok(()) => Ok(()),
+            Err(TrySendError::Disconnected(_)) => Err(EventLoopError::SenderDisconnected),
+            Err(TrySendError::Full(_)) => {
+                warn!("send_to_metrics failed: queue is full");
+                stats.metrics_queue_became_full = true;
+                Ok(())
+            }
+        }
+    }
+
     fn handle_event(
         event: VotorEvent,
         timer_manager: &RwLock<TimerManager>,
@@ -228,18 +249,25 @@ impl EventHandler {
             // Block has completed replay
             VotorEvent::Block(CompletedBlock { slot, bank }) => {
                 debug_assert!(bank.is_frozen());
-                {
-                    let mut metrics_guard = vctx.consensus_metrics.write();
-                    match metrics_guard.record_block_hash_seen(*bank.collector_id(), slot) {
-                        Ok(()) => (),
-                        Err(err) => {
-                            error!(
-                                "{my_pubkey}: recording block on slot {slot} failed with {err:?}"
-                            );
-                        }
-                    }
-                    metrics_guard.maybe_new_epoch(bank.epoch());
+                let mut consensus_metrics_events =
+                    vec![ConsensusMetricsEvent::StartOfSlot { slot }];
+                if slot == first_of_consecutive_leader_slots(slot) {
+                    // all slots except the first in the window would typically start when
+                    // the block is seen so the recording would essentially record 0.
+                    // hence we skip it.
+                    consensus_metrics_events.push(ConsensusMetricsEvent::BlockHashSeen {
+                        leader: *bank.collector_id(),
+                        slot,
+                    });
                 }
+                consensus_metrics_events.push(ConsensusMetricsEvent::MaybeNewEpoch {
+                    epoch: bank.epoch(),
+                });
+                Self::send_to_metrics(
+                    &vctx.consensus_metrics_sender,
+                    consensus_metrics_events,
+                    stats,
+                )?;
                 let (block, parent_block) = Self::get_block_parent_block(&bank);
                 info!("{my_pubkey}: Block {block:?} parent {parent_block:?}");
                 if Self::try_notar(
@@ -296,6 +324,11 @@ impl EventHandler {
 
             // Received a parent ready notification for `slot`
             VotorEvent::ParentReady { slot, parent_block } => {
+                Self::send_to_metrics(
+                    &vctx.consensus_metrics_sender,
+                    vec![ConsensusMetricsEvent::StartOfSlot { slot }],
+                    stats,
+                )?;
                 Self::handle_parent_ready_event(
                     slot,
                     parent_block,
@@ -318,6 +351,15 @@ impl EventHandler {
             // Skip timer for the slot has fired
             VotorEvent::Timeout(slot) => {
                 info!("{my_pubkey}: Timeout {slot}");
+                if slot != last_of_consecutive_leader_slots(slot) {
+                    Self::send_to_metrics(
+                        &vctx.consensus_metrics_sender,
+                        vec![ConsensusMetricsEvent::StartOfSlot {
+                            slot: slot.saturating_add(1),
+                        }],
+                        stats,
+                    )?;
+                }
                 if vctx.vote_history.voted(slot) {
                     return Ok(votes);
                 }
@@ -776,7 +818,7 @@ mod tests {
         super::*,
         crate::{
             commitment::CommitmentAggregationData,
-            consensus_metrics::ConsensusMetrics,
+            consensus_metrics::ConsensusMetricsEventReceiver,
             event::{LeaderWindowInfo, VotorEventSender},
             vote_history_storage::{
                 FileVoteHistoryStorage, SavedVoteHistory, SavedVoteHistoryVersions,
@@ -789,7 +831,7 @@ mod tests {
             consensus_message::{ConsensusMessage, VoteMessage, BLS_KEYPAIR_DERIVE_SEED},
             vote::Vote,
         },
-        crossbeam_channel::{unbounded, Receiver, RecvTimeoutError},
+        crossbeam_channel::{bounded, Receiver, RecvTimeoutError},
         parking_lot::RwLock as PlRwLock,
         solana_bls_signatures::{
             keypair::Keypair as BLSKeypair, signature::Signature as BLSSignature,
@@ -838,22 +880,23 @@ mod tests {
         leader_window_notifier: Arc<LeaderWindowNotifier>,
         drop_bank_receiver: Receiver<Vec<BankWithScheduler>>,
         cluster_info: Arc<ClusterInfo>,
+        consensus_metrics_receiver: ConsensusMetricsEventReceiver,
     }
 
     impl EventHandlerTestContext {
         fn setup() -> EventHandlerTestContext {
-            let (bls_sender, bls_receiver) = unbounded();
-            let (commitment_sender, commitment_receiver) = unbounded();
-            let (own_vote_sender, own_vote_receiver) = unbounded();
-            let (drop_bank_sender, drop_bank_receiver) = unbounded();
+            // For tests, we just make each queue bounded at 100, should be enough.
+            let (bls_sender, bls_receiver) = bounded(100);
+            let (commitment_sender, commitment_receiver) = bounded(100);
+            let (own_vote_sender, own_vote_receiver) = bounded(100);
+            let (drop_bank_sender, drop_bank_receiver) = bounded(100);
             let exit = Arc::new(AtomicBool::new(false));
             let start = Arc::new((Mutex::new(true), Condvar::new()));
-            let (event_sender, event_receiver) = unbounded();
-            let consensus_metrics = Arc::new(PlRwLock::new(ConsensusMetrics::new(0)));
+            let (event_sender, event_receiver) = bounded(100);
+            let (consensus_metrics_sender, consensus_metrics_receiver) = bounded(100);
             let timer_manager = Arc::new(PlRwLock::new(TimerManager::new(
                 event_sender.clone(),
                 exit.clone(),
-                consensus_metrics.clone(),
             )));
 
             // Create 10 node validatorvotekeypairs vec
@@ -913,7 +956,7 @@ mod tests {
                 derived_bls_keypairs: HashMap::new(),
                 has_new_vote_been_rooted: false,
                 own_vote_sender,
-                consensus_metrics,
+                consensus_metrics_sender,
             };
 
             let root_context = RootContext {
@@ -949,6 +992,7 @@ mod tests {
                 leader_window_notifier,
                 drop_bank_receiver,
                 cluster_info,
+                consensus_metrics_receiver,
             }
         }
 
@@ -1121,6 +1165,14 @@ mod tests {
             assert!(self.timer_manager.read().is_timeout_set(expected_slot));
         }
 
+        fn check_for_metrics_event(&self, expected: ConsensusMetricsEvent) {
+            let event = self
+                .consensus_metrics_receiver
+                .recv_timeout(TEST_SHORT_TIMEOUT)
+                .expect("Should receive metrics event");
+            assert!(event.1.contains(&expected));
+        }
+
         fn crate_vote_history_storage_and_switch_identity(
             &self,
             new_identity: &Keypair,
@@ -1174,6 +1226,8 @@ mod tests {
         test_context.wait_for_event_to_be_processed();
         let block_id_1 = bank1.block_id().unwrap();
 
+        test_context.check_for_metrics_event(ConsensusMetricsEvent::StartOfSlot { slot });
+
         // We should receive Notarize Vote for block 1
         test_context.check_for_vote(&Vote::new_notarization_vote(slot, block_id_1));
         test_context.check_for_commitment(CommitmentType::Notarize, slot);
@@ -1635,32 +1689,37 @@ mod tests {
     #[test_case("bls_receiver")]
     #[test_case("commitment_receiver")]
     #[test_case("own_vote_receiver")]
+    #[test_case("consensus_metrics_receiver")]
     fn test_channel_disconnection(channel_name: &str) {
         agave_logger::setup();
         let mut test_context = EventHandlerTestContext::setup();
         match channel_name {
             "bls_receiver" => {
                 let bls_receiver = test_context.bls_receiver.clone();
-                test_context.bls_receiver = unbounded().1;
+                test_context.bls_receiver = bounded(100).1;
                 drop(bls_receiver);
             }
             "commitment_receiver" => {
                 let commitment_receiver = test_context.commitment_receiver.clone();
-                test_context.commitment_receiver = unbounded().1;
+                test_context.commitment_receiver = bounded(100).1;
                 drop(commitment_receiver);
             }
             "own_vote_receiver" => {
                 let own_vote_receiver = test_context.own_vote_receiver.clone();
-                test_context.own_vote_receiver = unbounded().1;
+                test_context.own_vote_receiver = bounded(100).1;
                 drop(own_vote_receiver);
             }
+            "consensus_metrics_receiver" => {
+                let consensus_metrics_receiver = test_context.consensus_metrics_receiver.clone();
+                test_context.consensus_metrics_receiver = bounded(100).1;
+                drop(consensus_metrics_receiver);
+            }
             _ => panic!("Unknown channel name"),
         }
         // We normally need some event hitting all the senders to trigger exit
         let root_bank = test_context.bank_forks.read().unwrap().root_bank();
         let _ = test_context.create_block_and_send_block_event(1, root_bank);
         test_context.send_parent_ready_event(1, (0, Hash::default()));
-        test_context.wait_for_event_to_be_processed();
         // Verify that the event_handler exits within 5 seconds
         let start = Instant::now();
         while !test_context.exit.load(Ordering::Relaxed) && start.elapsed() < Duration::from_secs(5)

+ 9 - 0
votor/src/event_handler/stats.rs

@@ -325,6 +325,9 @@ pub(crate) struct EventHandlerStats {
     /// Timing information for major events for each slot.
     slot_tracking_map: BTreeMap<Slot, SlotTracking>,
 
+    /// Whether the send metrics queue has been full.
+    pub(super) metrics_queue_became_full: bool,
+
     root_slot: Slot,
     last_report_time: Instant,
 }
@@ -368,6 +371,7 @@ impl EventHandlerStats {
             slot_tracking_map: BTreeMap::new(),
             root_slot: 0,
             last_report_time: Instant::now(),
+            metrics_queue_became_full: false,
         }
     }
 
@@ -473,6 +477,11 @@ impl EventHandlerStats {
             ),
             ("set_root_count", self.set_root_count as i64, i64),
             ("timeout_set", self.timeout_set as i64, i64),
+            (
+                "metrics_queue_became_full",
+                self.metrics_queue_became_full,
+                bool
+            )
         );
         datapoint_info!(
             "event_handler_timing",

+ 2 - 9
votor/src/timer_manager.rs

@@ -6,7 +6,6 @@ mod timers;
 use {
     crate::{
         common::{DELTA_BLOCK, DELTA_TIMEOUT},
-        consensus_metrics::ConsensusMetrics,
         event::VotorEvent,
     },
     crossbeam_channel::Sender,
@@ -30,16 +29,11 @@ pub(crate) struct TimerManager {
 }
 
 impl TimerManager {
-    pub(crate) fn new(
-        event_sender: Sender<VotorEvent>,
-        exit: Arc<AtomicBool>,
-        consensus_metrics: Arc<PlRwLock<ConsensusMetrics>>,
-    ) -> Self {
+    pub(crate) fn new(event_sender: Sender<VotorEvent>, exit: Arc<AtomicBool>) -> Self {
         let timers = Arc::new(PlRwLock::new(Timers::new(
             DELTA_TIMEOUT,
             DELTA_BLOCK,
             event_sender,
-            consensus_metrics,
         )));
         let handle = {
             let timers = Arc::clone(&timers);
@@ -80,8 +74,7 @@ mod tests {
     fn test_timer_manager() {
         let (event_sender, event_receiver) = unbounded();
         let exit = Arc::new(AtomicBool::new(false));
-        let consensus_metrics = Arc::new(PlRwLock::new(ConsensusMetrics::new(0)));
-        let timer_manager = TimerManager::new(event_sender, exit.clone(), consensus_metrics);
+        let timer_manager = TimerManager::new(event_sender, exit.clone());
         let slot = 52;
         let start = Instant::now();
         timer_manager.set_timeouts(slot);

+ 11 - 65
votor/src/timer_manager/timers.rs

@@ -1,16 +1,11 @@
 use {
-    crate::{
-        consensus_metrics::ConsensusMetrics, event::VotorEvent,
-        timer_manager::stats::TimerManagerStats,
-    },
+    crate::{event::VotorEvent, timer_manager::stats::TimerManagerStats},
     crossbeam_channel::Sender,
-    parking_lot::RwLock as PlRwLock,
     solana_clock::Slot,
     solana_ledger::leader_schedule_utils::last_of_consecutive_leader_slots,
     std::{
         cmp::Reverse,
         collections::{BinaryHeap, HashMap, VecDeque},
-        sync::Arc,
         time::{Duration, Instant},
     },
 };
@@ -47,12 +42,7 @@ impl TimerState {
     /// Call to make progress on the state machine.
     ///
     /// Returns a potentially empty list of events that should be sent.
-    fn progress(
-        &mut self,
-        delta_block: Duration,
-        now: Instant,
-        consensus_metrics: &PlRwLock<ConsensusMetrics>,
-    ) -> Option<VotorEvent> {
+    fn progress(&mut self, delta_block: Duration, now: Instant) -> Option<VotorEvent> {
         match self {
             Self::WaitDeltaTimeout { window, timeout } => {
                 assert!(!window.is_empty());
@@ -60,7 +50,6 @@ impl TimerState {
                     return None;
                 }
                 let slot = *window.front().unwrap();
-                consensus_metrics.write().record_start_of_slot(slot);
                 let timeout = now.checked_add(delta_block).unwrap();
                 *self = Self::WaitDeltaBlock {
                     window: window.to_owned(),
@@ -77,8 +66,7 @@ impl TimerState {
                 let ret = Some(VotorEvent::Timeout(window.pop_front().unwrap()));
                 match window.front() {
                     None => *self = Self::Done,
-                    Some(next_slot) => {
-                        consensus_metrics.write().record_start_of_slot(*next_slot);
+                    Some(_next_slot) => {
                         *timeout = now.checked_add(delta_block).unwrap();
                     }
                 }
@@ -106,7 +94,6 @@ pub(super) struct Timers {
     heap: BinaryHeap<Reverse<(Instant, Slot)>>,
     /// Channel to send events on.
     event_sender: Sender<VotorEvent>,
-    consensus_metrics: Arc<PlRwLock<ConsensusMetrics>>,
     /// Stats for the timer manager.
     stats: TimerManagerStats,
 }
@@ -115,7 +102,6 @@ impl Timers {
         delta_timeout: Duration,
         delta_block: Duration,
         event_sender: Sender<VotorEvent>,
-        consensus_metrics: Arc<PlRwLock<ConsensusMetrics>>,
     ) -> Self {
         Self {
             delta_timeout,
@@ -123,7 +109,6 @@ impl Timers {
             timers: HashMap::new(),
             heap: BinaryHeap::new(),
             event_sender,
-            consensus_metrics,
             stats: TimerManagerStats::new(),
         }
     }
@@ -160,9 +145,7 @@ impl Timers {
                     }
 
                     let mut timer = self.timers.remove(&slot).unwrap();
-                    if let Some(event) =
-                        timer.progress(self.delta_block, now, &self.consensus_metrics)
-                    {
+                    if let Some(event) = timer.progress(self.delta_block, now) {
                         self.event_sender.send(event).unwrap();
                     }
                     if let Some(next_fire) = timer.next_fire() {
@@ -189,80 +172,44 @@ impl Timers {
 
 #[cfg(test)]
 mod tests {
-    use {super::*, crossbeam_channel::unbounded, solana_pubkey::Pubkey};
+    use {super::*, crossbeam_channel::unbounded};
 
     #[test]
     fn timer_state_machine() {
-        let leader = Pubkey::default();
-        let consensus_metrics = PlRwLock::new(ConsensusMetrics::new(1));
         let one_micro = Duration::from_micros(1);
         let now = Instant::now();
         let slot = 0;
         let (mut timer_state, next_fire) = TimerState::new(slot, one_micro, now);
 
         assert!(matches!(
-            timer_state
-                .progress(one_micro, next_fire, &consensus_metrics)
-                .unwrap(),
+            timer_state.progress(one_micro, next_fire,).unwrap(),
             VotorEvent::TimeoutCrashedLeader(0)
         ));
-        consensus_metrics
-            .write()
-            .record_block_hash_seen(leader, 0)
-            .unwrap();
 
         assert!(matches!(
             timer_state
-                .progress(
-                    one_micro,
-                    timer_state.next_fire().unwrap(),
-                    &consensus_metrics
-                )
+                .progress(one_micro, timer_state.next_fire().unwrap())
                 .unwrap(),
             VotorEvent::Timeout(0)
         ));
-        consensus_metrics
-            .write()
-            .record_block_hash_seen(leader, 1)
-            .unwrap();
 
         assert!(matches!(
             timer_state
-                .progress(
-                    one_micro,
-                    timer_state.next_fire().unwrap(),
-                    &consensus_metrics
-                )
+                .progress(one_micro, timer_state.next_fire().unwrap())
                 .unwrap(),
             VotorEvent::Timeout(1)
         ));
-        consensus_metrics
-            .write()
-            .record_block_hash_seen(leader, 2)
-            .unwrap();
 
         assert!(matches!(
             timer_state
-                .progress(
-                    one_micro,
-                    timer_state.next_fire().unwrap(),
-                    &consensus_metrics
-                )
+                .progress(one_micro, timer_state.next_fire().unwrap())
                 .unwrap(),
             VotorEvent::Timeout(2)
         ));
-        consensus_metrics
-            .write()
-            .record_block_hash_seen(leader, 3)
-            .unwrap();
 
         assert!(matches!(
             timer_state
-                .progress(
-                    one_micro,
-                    timer_state.next_fire().unwrap(),
-                    &consensus_metrics
-                )
+                .progress(one_micro, timer_state.next_fire().unwrap())
                 .unwrap(),
             VotorEvent::Timeout(3)
         ));
@@ -271,11 +218,10 @@ mod tests {
 
     #[test]
     fn timers_progress() {
-        let consensus_metrics = Arc::new(PlRwLock::new(ConsensusMetrics::new(1)));
         let one_micro = Duration::from_micros(1);
         let mut now = Instant::now();
         let (sender, receiver) = unbounded();
-        let mut timers = Timers::new(one_micro, one_micro, sender, consensus_metrics);
+        let mut timers = Timers::new(one_micro, one_micro, sender);
         assert!(timers.progress(now).is_none());
         assert!(receiver.try_recv().unwrap_err().is_empty());
 

+ 3 - 16
votor/src/voting_utils.rs

@@ -3,7 +3,7 @@
 use {
     crate::{
         commitment::{CommitmentAggregationData, CommitmentError},
-        consensus_metrics::ConsensusMetrics,
+        consensus_metrics::ConsensusMetricsEventSender,
         vote_history::{VoteHistory, VoteHistoryError},
         vote_history_storage::{SavedVoteHistory, SavedVoteHistoryVersions},
         voting_service::BLSOp,
@@ -13,7 +13,6 @@ use {
         vote::Vote,
     },
     crossbeam_channel::{SendError, Sender},
-    parking_lot::RwLock as PlRwLock,
     solana_bls_signatures::{
         keypair::Keypair as BLSKeypair, pubkey::PubkeyCompressed as BLSPubkeyCompressed, BlsError,
         Pubkey as BLSPubkey,
@@ -126,7 +125,7 @@ pub struct VotingContext {
     pub commitment_sender: Sender<CommitmentAggregationData>,
     pub wait_to_vote_slot: Option<u64>,
     pub sharable_banks: SharableBanks,
-    pub consensus_metrics: Arc<PlRwLock<ConsensusMetrics>>,
+    pub consensus_metrics_sender: ConsensusMetricsEventSender,
 }
 
 fn get_bls_keypair(
@@ -283,18 +282,6 @@ fn insert_vote_and_create_bls_message(
     let saved_vote_history =
         SavedVoteHistory::new(&context.vote_history, &context.identity_keypair)?;
 
-    match context
-        .consensus_metrics
-        .write()
-        .record_vote(context.vote_account_pubkey, &vote)
-    {
-        Ok(()) => (),
-        Err(err) => {
-            let slot = vote.slot();
-            error!("recording vote on slot {slot} failed with {err:?}");
-        }
-    }
-
     // Return vote for sending
     Ok(BLSOp::PushVote {
         message: Arc::new(message),
@@ -387,7 +374,7 @@ mod tests {
             commitment_sender: unbounded().0,
             wait_to_vote_slot: None,
             sharable_banks,
-            consensus_metrics: Arc::new(PlRwLock::new(ConsensusMetrics::new(0))),
+            consensus_metrics_sender: unbounded().0,
         }
     }
 

+ 20 - 8
votor/src/votor.rs

@@ -45,7 +45,9 @@ use {
     crate::{
         commitment::CommitmentAggregationData,
         common::DELTA_STANDSTILL,
-        consensus_metrics::ConsensusMetrics,
+        consensus_metrics::{
+            ConsensusMetrics, ConsensusMetricsEventReceiver, ConsensusMetricsEventSender,
+        },
         consensus_pool_service::{ConsensusPoolContext, ConsensusPoolService},
         event::{LeaderWindowInfo, VotorEventReceiver, VotorEventSender},
         event_handler::{EventHandler, EventHandlerContext},
@@ -79,7 +81,7 @@ use {
             atomic::{AtomicBool, Ordering},
             Arc, Condvar, Mutex, RwLock,
         },
-        thread,
+        thread::{self, JoinHandle},
         time::Duration,
     },
 };
@@ -119,10 +121,12 @@ pub struct VotorConfig {
     pub leader_window_notifier: Arc<LeaderWindowNotifier>,
     pub event_sender: VotorEventSender,
     pub own_vote_sender: Sender<ConsensusMessage>,
+    pub consensus_metrics_sender: ConsensusMetricsEventSender,
 
     // Receivers
     pub event_receiver: VotorEventReceiver,
     pub consensus_message_receiver: Receiver<ConsensusMessage>,
+    pub consensus_metrics_receiver: ConsensusMetricsEventReceiver,
 }
 
 /// Context shared with block creation, replay, gossip, banking stage etc
@@ -143,6 +147,7 @@ pub struct Votor {
     event_handler: EventHandler,
     consensus_pool_service: ConsensusPoolService,
     timer_manager: Arc<PlRwLock<TimerManager>>,
+    consensus_metrics_handle: JoinHandle<()>,
 }
 
 impl Votor {
@@ -170,6 +175,8 @@ impl Votor {
             event_receiver,
             own_vote_sender,
             consensus_message_receiver,
+            consensus_metrics_sender,
+            consensus_metrics_receiver,
         } = config;
 
         let start = Arc::new((Mutex::new(false), Condvar::new()));
@@ -189,9 +196,6 @@ impl Votor {
             vote_history_storage,
         };
 
-        let consensus_metrics = Arc::new(PlRwLock::new(ConsensusMetrics::new(
-            sharable_banks.root().epoch(),
-        )));
         let voting_context = VotingContext {
             vote_history,
             vote_account_pubkey: vote_account,
@@ -204,7 +208,7 @@ impl Votor {
             commitment_sender: commitment_sender.clone(),
             wait_to_vote_slot,
             sharable_banks: sharable_banks.clone(),
-            consensus_metrics: consensus_metrics.clone(),
+            consensus_metrics_sender: consensus_metrics_sender.clone(),
         };
 
         let root_context = RootContext {
@@ -217,7 +221,6 @@ impl Votor {
         let timer_manager = Arc::new(PlRwLock::new(TimerManager::new(
             event_sender.clone(),
             exit.clone(),
-            consensus_metrics,
         )));
 
         let event_handler_context = EventHandlerContext {
@@ -230,6 +233,8 @@ impl Votor {
             root_context,
         };
 
+        let root_epoch = sharable_banks.root().epoch();
+
         let consensus_pool_context = ConsensusPoolContext {
             exit: exit.clone(),
             start: start.clone(),
@@ -245,6 +250,11 @@ impl Votor {
             delta_standstill: DELTA_STANDSTILL,
         };
 
+        let consensus_metrics_handle = ConsensusMetrics::start_metrics_loop(
+            root_epoch,
+            consensus_metrics_receiver,
+            exit.clone(),
+        );
         let event_handler = EventHandler::new(event_handler_context);
         let consensus_pool_service = ConsensusPoolService::new(consensus_pool_context);
 
@@ -253,6 +263,7 @@ impl Votor {
             event_handler,
             consensus_pool_service,
             timer_manager,
+            consensus_metrics_handle,
         }
     }
 
@@ -297,6 +308,7 @@ impl Votor {
                 }
             }
         }
-        self.event_handler.join()
+        self.event_handler.join()?;
+        self.consensus_metrics_handle.join()
     }
 }