Przeglądaj źródła

ConsumeWorker report max queue len and num msg processes (#7587)

Andrew Fitzgerald 3 miesięcy temu
rodzic
commit
3e9d89a58f
1 zmienionych plików z 18 dodań i 0 usunięć
  1. 18 0
      core/src/banking_stage/consume_worker.rs

+ 18 - 0
core/src/banking_stage/consume_worker.rs

@@ -85,6 +85,10 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
             .fetch_add(get_bank_us, Ordering::Relaxed);
 
         for work in try_drain_iter(work, &self.consume_receiver) {
+            self.metrics
+                .count_metrics
+                .max_queue_len
+                .fetch_max(self.consume_receiver.len() as u64, Ordering::Relaxed);
             if self.exit.load(Ordering::Relaxed) {
                 return Ok(());
             }
@@ -109,6 +113,10 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
                     return self.retry_drain(work);
                 }
             }
+            self.metrics
+                .count_metrics
+                .num_messages_processed
+                .fetch_add(1, Ordering::Relaxed);
             self.consume(&bank, work)?;
         }
 
@@ -446,6 +454,8 @@ impl ConsumeWorkerMetrics {
 }
 
 struct ConsumeWorkerCountMetrics {
+    max_queue_len: AtomicU64,
+    num_messages_processed: AtomicU64,
     transactions_attempted_processing_count: AtomicU64,
     processed_transactions_count: AtomicU64,
     processed_with_successful_result_count: AtomicU64,
@@ -459,6 +469,8 @@ struct ConsumeWorkerCountMetrics {
 impl Default for ConsumeWorkerCountMetrics {
     fn default() -> Self {
         Self {
+            max_queue_len: AtomicU64::default(),
+            num_messages_processed: AtomicU64::default(),
             transactions_attempted_processing_count: AtomicU64::default(),
             processed_transactions_count: AtomicU64::default(),
             processed_with_successful_result_count: AtomicU64::default(),
@@ -476,6 +488,12 @@ impl ConsumeWorkerCountMetrics {
         datapoint_info!(
             "banking_stage_worker_counts",
             "id" => id,
+            ("max_queue_len", self.max_queue_len.swap(0, Ordering::Relaxed), i64),
+            (
+                "num_messages_processed",
+                self.num_messages_processed.swap(0, Ordering::Relaxed),
+                i64
+            ),
             (
                 "transactions_attempted_processing_count",
                 self.transactions_attempted_processing_count