|
|
@@ -9,7 +9,7 @@ use {
|
|
|
solana_poh::leader_bank_notifier::LeaderBankNotifier,
|
|
|
solana_runtime::bank::Bank,
|
|
|
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
|
|
|
- solana_sdk::clock::Slot,
|
|
|
+ solana_sdk::timing::AtomicInterval,
|
|
|
solana_svm::transaction_error_metrics::TransactionErrorMetrics,
|
|
|
std::{
|
|
|
sync::{
|
|
|
@@ -183,8 +183,8 @@ fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T>
|
|
|
/// done.
|
|
|
pub(crate) struct ConsumeWorkerMetrics {
|
|
|
id: String,
|
|
|
+ interval: AtomicInterval,
|
|
|
has_data: AtomicBool,
|
|
|
- slot: AtomicU64,
|
|
|
|
|
|
count_metrics: ConsumeWorkerCountMetrics,
|
|
|
error_metrics: ConsumeWorkerTransactionErrorMetrics,
|
|
|
@@ -192,34 +192,23 @@ pub(crate) struct ConsumeWorkerMetrics {
|
|
|
}
|
|
|
|
|
|
impl ConsumeWorkerMetrics {
|
|
|
- /// Report and reset metrics when the worker did some work and:
|
|
|
- /// a) (when a leader) Previous slot is not the same as current.
|
|
|
- /// b) (when not a leader) report the metrics accumulated so far.
|
|
|
- pub fn maybe_report_and_reset(&self, slot: Option<Slot>) {
|
|
|
- let prev_slot_id: u64 = self.slot.load(Ordering::Relaxed);
|
|
|
- if let Some(slot) = slot {
|
|
|
- if slot != prev_slot_id {
|
|
|
- if !self.has_data.swap(false, Ordering::Relaxed) {
|
|
|
- return;
|
|
|
- }
|
|
|
- self.count_metrics.report_and_reset(&self.id, slot);
|
|
|
- self.timing_metrics.report_and_reset(&self.id, slot);
|
|
|
- self.error_metrics.report_and_reset(&self.id, slot);
|
|
|
- self.slot.swap(slot, Ordering::Relaxed);
|
|
|
- }
|
|
|
- } else if prev_slot_id != 0 {
|
|
|
- self.count_metrics.report_and_reset(&self.id, prev_slot_id);
|
|
|
- self.timing_metrics.report_and_reset(&self.id, prev_slot_id);
|
|
|
- self.error_metrics.report_and_reset(&self.id, prev_slot_id);
|
|
|
- self.slot.swap(0, Ordering::Relaxed);
|
|
|
+ /// Report and reset metrics iff the interval has elapsed and the worker did some work.
|
|
|
+ pub fn maybe_report_and_reset(&self) {
|
|
|
+ const REPORT_INTERVAL_MS: u64 = 1000;
|
|
|
+ if self.interval.should_update(REPORT_INTERVAL_MS)
|
|
|
+ && self.has_data.swap(false, Ordering::Relaxed)
|
|
|
+ {
|
|
|
+ self.count_metrics.report_and_reset(&self.id);
|
|
|
+ self.timing_metrics.report_and_reset(&self.id);
|
|
|
+ self.error_metrics.report_and_reset(&self.id);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
fn new(id: u32) -> Self {
|
|
|
Self {
|
|
|
id: id.to_string(),
|
|
|
+ interval: AtomicInterval::default(),
|
|
|
has_data: AtomicBool::new(false),
|
|
|
- slot: AtomicU64::new(0),
|
|
|
count_metrics: ConsumeWorkerCountMetrics::default(),
|
|
|
error_metrics: ConsumeWorkerTransactionErrorMetrics::default(),
|
|
|
timing_metrics: ConsumeWorkerTimingMetrics::default(),
|
|
|
@@ -468,7 +457,7 @@ impl Default for ConsumeWorkerCountMetrics {
|
|
|
}
|
|
|
|
|
|
impl ConsumeWorkerCountMetrics {
|
|
|
- fn report_and_reset(&self, id: &str, slot: u64) {
|
|
|
+ fn report_and_reset(&self, id: &str) {
|
|
|
datapoint_info!(
|
|
|
"banking_stage_worker_counts",
|
|
|
"id" => id,
|
|
|
@@ -516,11 +505,6 @@ impl ConsumeWorkerCountMetrics {
|
|
|
self.max_prioritization_fees.swap(0, Ordering::Relaxed),
|
|
|
i64
|
|
|
),
|
|
|
- (
|
|
|
- "slot",
|
|
|
- slot,
|
|
|
- i64
|
|
|
- ),
|
|
|
);
|
|
|
}
|
|
|
}
|
|
|
@@ -542,7 +526,7 @@ struct ConsumeWorkerTimingMetrics {
|
|
|
}
|
|
|
|
|
|
impl ConsumeWorkerTimingMetrics {
|
|
|
- fn report_and_reset(&self, id: &str, slot: u64) {
|
|
|
+ fn report_and_reset(&self, id: &str) {
|
|
|
datapoint_info!(
|
|
|
"banking_stage_worker_timing",
|
|
|
"id" => id,
|
|
|
@@ -598,11 +582,6 @@ impl ConsumeWorkerTimingMetrics {
|
|
|
self.wait_for_bank_failure_us.swap(0, Ordering::Relaxed),
|
|
|
i64
|
|
|
),
|
|
|
- (
|
|
|
- "slot",
|
|
|
- slot,
|
|
|
- i64
|
|
|
- ),
|
|
|
);
|
|
|
}
|
|
|
}
|
|
|
@@ -636,7 +615,7 @@ struct ConsumeWorkerTransactionErrorMetrics {
|
|
|
}
|
|
|
|
|
|
impl ConsumeWorkerTransactionErrorMetrics {
|
|
|
- fn report_and_reset(&self, id: &str, slot: u64) {
|
|
|
+ fn report_and_reset(&self, id: &str) {
|
|
|
datapoint_info!(
|
|
|
"banking_stage_worker_error_metrics",
|
|
|
"id" => id,
|
|
|
@@ -747,11 +726,6 @@ impl ConsumeWorkerTransactionErrorMetrics {
|
|
|
.swap(0, Ordering::Relaxed),
|
|
|
i64
|
|
|
),
|
|
|
- (
|
|
|
- "slot",
|
|
|
- slot,
|
|
|
- i64
|
|
|
- ),
|
|
|
);
|
|
|
}
|
|
|
}
|