فهرست منبع

metrics for request reveal processed

0xfirefist 1 سال پیش
والد
کامیت
5111998716
3فایلهای تغییر یافته به همراه81 افزوده شده و 8 حذف شده
  1. 3 0
      apps/fortuna/src/command/run.rs
  2. 45 5
      apps/fortuna/src/keeper.rs
  3. 33 3
      apps/fortuna/src/metrics.rs

+ 3 - 0
apps/fortuna/src/command/run.rs

@@ -108,6 +108,7 @@ pub async fn run_keeper(
     chains: HashMap<String, api::BlockchainState>,
     config: Config,
     private_key: String,
+    metrics_registry: Arc<metrics::Metrics>,
 ) -> Result<()> {
     let mut handles = Vec::new();
     for (chain_id, chain_config) in chains {
@@ -121,6 +122,7 @@ pub async fn run_keeper(
             private_key,
             chain_eth_config,
             chain_config.clone(),
+            metrics_registry.clone(),
         )));
     }
 
@@ -232,6 +234,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
             chains.clone(),
             config.clone(),
             keeper_private_key,
+            metrics_registry.clone(),
         ));
     }
 

+ 45 - 5
apps/fortuna/src/keeper.rs

@@ -12,6 +12,10 @@ use {
             },
         },
         config::EthereumConfig,
+        metrics::{
+            Metrics,
+            ProviderLabel,
+        },
     },
     anyhow::{
         anyhow,
@@ -88,6 +92,7 @@ pub async fn run_keeper_threads(
     private_key: String,
     chain_eth_config: EthereumConfig,
     chain_state: BlockchainState,
+    metrics: Arc<Metrics>,
 ) {
     tracing::info!("starting keeper");
     let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
@@ -109,6 +114,7 @@ pub async fn run_keeper_threads(
             contract.clone(),
             chain_eth_config.gas_limit,
             chain_state.clone(),
+            metrics.clone(),
         )
         .in_current_span(),
     );
@@ -131,6 +137,7 @@ pub async fn run_keeper_threads(
             rx,
             Arc::clone(&contract),
             chain_eth_config.gas_limit,
+            metrics.clone(),
         )
         .in_current_span(),
     );
@@ -146,6 +153,7 @@ pub async fn process_event(
     chain_config: &BlockchainState,
     contract: &Arc<SignablePythContract>,
     gas_limit: U256,
+    metrics: Arc<Metrics>,
 ) -> Result<()> {
     if chain_config.provider_address != event.provider_address {
         return Ok(());
@@ -230,6 +238,13 @@ pub async fn process_event(
                                 "Revealed with res: {:?}",
                                 res
                             );
+                            metrics
+                                .reveals
+                                .get_or_create(&ProviderLabel {
+                                    chain_id: chain_config.id.clone(),
+                                    address:  chain_config.provider_address.to_string(),
+                                })
+                                .inc();
                             Ok(())
                         }
                         None => {
@@ -280,6 +295,7 @@ pub async fn process_block_range(
     contract: Arc<SignablePythContract>,
     gas_limit: U256,
     chain_state: api::BlockchainState,
+    metrics: Arc<Metrics>,
 ) {
     let BlockRange {
         from: first_block,
@@ -300,6 +316,7 @@ pub async fn process_block_range(
             contract.clone(),
             gas_limit,
             chain_state.clone(),
+            metrics.clone(),
         )
         .in_current_span()
         .await;
@@ -316,6 +333,7 @@ pub async fn process_single_block_batch(
     contract: Arc<SignablePythContract>,
     gas_limit: U256,
     chain_state: api::BlockchainState,
+    metrics: Arc<Metrics>,
 ) {
     loop {
         let events_res = chain_state
@@ -327,11 +345,23 @@ pub async fn process_single_block_batch(
             Ok(events) => {
                 tracing::info!(num_of_events = &events.len(), "Processing",);
                 for event in &events {
+                    metrics
+                        .requests
+                        .get_or_create(&ProviderLabel {
+                            chain_id: chain_state.id.clone(),
+                            address:  chain_state.provider_address.to_string(),
+                        })
+                        .inc();
                     tracing::info!(sequence_number = &event.sequence_number, "Processing event",);
-                    while let Err(e) =
-                        process_event(event.clone(), &chain_state, &contract, gas_limit)
-                            .in_current_span()
-                            .await
+                    while let Err(e) = process_event(
+                        event.clone(),
+                        &chain_state,
+                        &contract,
+                        gas_limit,
+                        metrics.clone(),
+                    )
+                    .in_current_span()
+                    .await
                     {
                         tracing::error!(
                             sequence_number = &event.sequence_number,
@@ -342,6 +372,13 @@ pub async fn process_single_block_batch(
                         time::sleep(RETRY_INTERVAL).await;
                     }
                     tracing::info!(sequence_number = &event.sequence_number, "Processed event",);
+                    metrics
+                        .requests_processed
+                        .get_or_create(&ProviderLabel {
+                            chain_id: chain_state.id.clone(),
+                            address:  chain_state.provider_address.to_string(),
+                        })
+                        .inc();
                 }
                 tracing::info!(num_of_events = &events.len(), "Processed",);
                 break;
@@ -469,6 +506,7 @@ pub async fn process_new_blocks(
     mut rx: mpsc::Receiver<BlockRange>,
     contract: Arc<SignablePythContract>,
     gas_limit: U256,
+    metrics: Arc<Metrics>,
 ) {
     tracing::info!("Waiting for new block ranges to process");
     loop {
@@ -478,6 +516,7 @@ pub async fn process_new_blocks(
                 Arc::clone(&contract),
                 gas_limit,
                 chain_state.clone(),
+                metrics.clone(),
             )
             .in_current_span()
             .await;
@@ -492,9 +531,10 @@ pub async fn process_backlog(
     contract: Arc<SignablePythContract>,
     gas_limit: U256,
     chain_state: BlockchainState,
+    metrics: Arc<Metrics>,
 ) {
     tracing::info!("Processing backlog");
-    process_block_range(backlog_range, contract, gas_limit, chain_state)
+    process_block_range(backlog_range, contract, gas_limit, chain_state, metrics)
         .in_current_span()
         .await;
     tracing::info!("Backlog processed");

+ 33 - 3
apps/fortuna/src/metrics.rs

@@ -41,9 +41,9 @@ pub struct Metrics {
     //
     // pub rpc: Family<Label, Counter>,
     //
-    // pub requests:     Family<Label, Counter>,
-    // pub reveal:       Family<Label, Counter>,
-
+    pub requests:                Family<ProviderLabel, Counter>,
+    pub requests_processed:      Family<ProviderLabel, Counter>,
+    pub reveals:                 Family<ProviderLabel, Counter>,
     // NOTE: gas_spending is not part of metrics.
     // why?
     // - it is not a value that increases or decreases over time. Not a counter or a gauge
@@ -81,11 +81,41 @@ impl Metrics {
             end_sequence_number.clone(),
         );
 
+        let requests = Family::<ProviderLabel, Counter>::default();
+        metrics_registry.register(
+            // With the metric name.
+            "requests",
+            // And the metric help text.
+            "Number of requests received",
+            requests.clone(),
+        );
+
+        let requests_processed = Family::<ProviderLabel, Counter>::default();
+        metrics_registry.register(
+            // With the metric name.
+            "requests_processed",
+            // And the metric help text.
+            "Number of requests processed",
+            requests_processed.clone(),
+        );
+
+        let reveals = Family::<ProviderLabel, Counter>::default();
+        metrics_registry.register(
+            // With the metric name.
+            "reveal",
+            // And the metric help text.
+            "Number of reveals",
+            reveals.clone(),
+        );
+
         Metrics {
             registry: RwLock::new(metrics_registry),
             request_counter: http_requests,
             current_sequence_number,
             end_sequence_number,
+            requests,
+            requests_processed,
+            reveals,
         }
     }
 }