Bladeren bron

feat(fortuna): Better metrics tracking for alerting (#2703)

* better metrics tracking for alerting

* better tracking

* pr comments
Jayant Krishnamurthy 6 maanden geleden
bovenliggende
commit
24149dba2b

+ 1 - 1
apps/fortuna/src/command/run.rs

@@ -99,6 +99,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
             .collect(),
     ));
     for (chain_id, chain_config) in config.chains.clone() {
+        keeper_metrics.add_chain(chain_id.clone(), config.provider.address);
         let keeper_metrics = keeper_metrics.clone();
         let keeper_private_key_option = keeper_private_key_option.clone();
         let chains = chains.clone();
@@ -168,7 +169,6 @@ async fn setup_chain_and_run_keeper(
         rpc_metrics.clone(),
     )
     .await?;
-    keeper_metrics.add_chain(chain_id.clone(), state.provider_address);
     chains.write().await.insert(
         chain_id.clone(),
         ApiBlockChainState::Initialized(state.clone()),

+ 49 - 38
apps/fortuna/src/keeper.rs

@@ -178,45 +178,56 @@ pub async fn run_keeper_threads(
             };
 
             loop {
-                // There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
-                // If rpc start fails all of these threads will just exit, instead of retrying.
-                // We are tracking rpc failures elsewhere, so it's fine.
-                spawn(
-                    track_provider(
-                        chain_id.clone(),
-                        contract.clone(),
-                        provider_address,
-                        keeper_metrics.clone(),
-                    )
-                    .in_current_span(),
-                );
-                spawn(
-                    track_balance(
-                        chain_id.clone(),
-                        contract.client(),
-                        keeper_address,
-                        keeper_metrics.clone(),
-                    )
-                    .in_current_span(),
-                );
-                spawn(
-                    track_accrued_pyth_fees(
-                        chain_id.clone(),
-                        contract.clone(),
-                        keeper_metrics.clone(),
-                    )
-                    .in_current_span(),
-                );
-                spawn(
-                    track_block_timestamp_lag(
-                        chain_id.clone(),
-                        contract.client(),
-                        keeper_metrics.clone(),
-                    )
-                    .in_current_span(),
-                );
-
                 time::sleep(TRACK_INTERVAL).await;
+
+                // Track provider info and balance sequentially. Note that the tracking is done sequentially with the
+                // timestamp last. If there is a persistent error in any of these methods, the timestamp will lag behind
+                // current time and trigger an alert.
+                if let Err(e) = track_provider(
+                    chain_id.clone(),
+                    contract.clone(),
+                    provider_address,
+                    keeper_metrics.clone(),
+                )
+                .await
+                {
+                    tracing::error!("Error tracking provider: {:?}", e);
+                    continue;
+                }
+
+                if let Err(e) = track_balance(
+                    chain_id.clone(),
+                    contract.client(),
+                    keeper_address,
+                    keeper_metrics.clone(),
+                )
+                .await
+                {
+                    tracing::error!("Error tracking balance: {:?}", e);
+                    continue;
+                }
+
+                if let Err(e) = track_accrued_pyth_fees(
+                    chain_id.clone(),
+                    contract.clone(),
+                    keeper_metrics.clone(),
+                )
+                .await
+                {
+                    tracing::error!("Error tracking accrued pyth fees: {:?}", e);
+                    continue;
+                }
+
+                if let Err(e) = track_block_timestamp_lag(
+                    chain_id.clone(),
+                    contract.client(),
+                    keeper_metrics.clone(),
+                )
+                .await
+                {
+                    tracing::error!("Error tracking block timestamp lag: {:?}", e);
+                    continue;
+                }
             }
         }
         .in_current_span(),

+ 35 - 2
apps/fortuna/src/keeper/block.rs

@@ -3,12 +3,16 @@ use {
         api::{self, BlockchainState},
         chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
         eth_utils::utils::EscalationPolicy,
-        keeper::keeper_metrics::KeeperMetrics,
+        keeper::keeper_metrics::{ChainIdLabel, KeeperMetrics},
         keeper::process_event::process_event_with_backoff,
     },
     anyhow::Result,
     ethers::types::U256,
-    std::{collections::HashSet, sync::Arc},
+    std::{
+        collections::HashSet,
+        sync::Arc,
+        time::{SystemTime, UNIX_EPOCH},
+    },
     tokio::{
         spawn,
         sync::{mpsc, RwLock},
@@ -115,6 +119,10 @@ pub async fn process_single_block_batch(
     metrics: Arc<KeeperMetrics>,
     fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
 ) {
+    let label = ChainIdLabel {
+        chain_id: chain_state.id.clone(),
+    };
+
     loop {
         let events_res = chain_state
             .contract
@@ -125,6 +133,31 @@ pub async fn process_single_block_batch(
             )
             .await;
 
+        // Only update metrics if we successfully retrieved events.
+        if events_res.is_ok() {
+            // Track the last time blocks were processed. If anything happens to the processing thread, the
+            // timestamp will lag, which will trigger an alert.
+            let server_timestamp = SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .map(|duration| duration.as_secs() as i64)
+                .unwrap_or(0);
+            metrics
+                .process_event_timestamp
+                .get_or_create(&label)
+                .set(server_timestamp);
+
+            let current_block = metrics
+                .process_event_block_number
+                .get_or_create(&label)
+                .get();
+            if block_range.to > current_block as u64 {
+                metrics
+                    .process_event_block_number
+                    .get_or_create(&label)
+                    .set(block_range.to as i64);
+            }
+        }
+
         match events_res {
             Ok(events) => {
                 tracing::info!(num_of_events = &events.len(), "Processing",);

+ 38 - 0
apps/fortuna/src/keeper/keeper_metrics.rs

@@ -44,6 +44,10 @@ pub struct KeeperMetrics {
     pub gas_price_estimate: Family<AccountLabel, Gauge<f64, AtomicU64>>,
     pub accrued_pyth_fees: Family<ChainIdLabel, Gauge<f64, AtomicU64>>,
     pub block_timestamp_lag: Family<ChainIdLabel, Gauge>,
+    pub latest_block_timestamp: Family<ChainIdLabel, Gauge>,
+    pub process_event_timestamp: Family<ChainIdLabel, Gauge>,
+    pub latest_block_number: Family<ChainIdLabel, Gauge>,
+    pub process_event_block_number: Family<ChainIdLabel, Gauge>,
 }
 
 impl Default for KeeperMetrics {
@@ -87,6 +91,10 @@ impl Default for KeeperMetrics {
             gas_price_estimate: Family::default(),
             accrued_pyth_fees: Family::default(),
             block_timestamp_lag: Family::default(),
+            latest_block_timestamp: Family::default(),
+            process_event_timestamp: Family::default(),
+            latest_block_number: Family::default(),
+            process_event_block_number: Family::default(),
         }
     }
 }
@@ -228,6 +236,30 @@ impl KeeperMetrics {
             keeper_metrics.block_timestamp_lag.clone(),
         );
 
+        writable_registry.register(
+            "latest_block_timestamp",
+            "The current block timestamp",
+            keeper_metrics.latest_block_timestamp.clone(),
+        );
+
+        writable_registry.register(
+            "process_event_timestamp",
+            "Timestamp of the last time the keeper updated the events",
+            keeper_metrics.process_event_timestamp.clone(),
+        );
+
+        writable_registry.register(
+            "latest_block_number",
+            "The current block number",
+            keeper_metrics.latest_block_number.clone(),
+        );
+
+        writable_registry.register(
+            "process_event_block_number",
+            "The highest block number for which events have been successfully retrieved and processed",
+            keeper_metrics.process_event_block_number.clone(),
+        );
+
         // *Important*: When adding a new metric:
         // 1. Register it above using `writable_registry.register(...)`
         // 2. Add a get_or_create call in the add_chain function below to initialize it for each chain/provider pair
@@ -241,6 +273,12 @@ impl KeeperMetrics {
         };
         let _ = self.accrued_pyth_fees.get_or_create(&chain_id_label);
         let _ = self.block_timestamp_lag.get_or_create(&chain_id_label);
+        let _ = self.latest_block_timestamp.get_or_create(&chain_id_label);
+        let _ = self.process_event_timestamp.get_or_create(&chain_id_label);
+        let _ = self.latest_block_number.get_or_create(&chain_id_label);
+        let _ = self
+            .process_event_block_number
+            .get_or_create(&chain_id_label);
 
         let account_label = AccountLabel {
             chain_id,

+ 44 - 56
apps/fortuna/src/keeper/track.rs

@@ -4,6 +4,7 @@ use {
         api::ChainId, chain::ethereum::InstrumentedPythContract,
         eth_utils::traced_client::TracedClient,
     },
+    anyhow::{anyhow, Result},
     ethers::middleware::Middleware,
     ethers::{prelude::BlockNumber, providers::Provider, types::Address},
     std::{
@@ -14,23 +15,17 @@ use {
 };
 
 /// tracks the balance of the given address on the given chain
-/// if there was an error, the function will just return
 #[tracing::instrument(skip_all)]
 pub async fn track_balance(
     chain_id: String,
     provider: Arc<Provider<TracedClient>>,
     address: Address,
     metrics: Arc<KeeperMetrics>,
-) {
-    let balance = match provider.get_balance(address, None).await {
-        // This conversion to u128 is fine as the total balance will never cross the limits
-        // of u128 practically.
-        Ok(r) => r.as_u128(),
-        Err(e) => {
-            tracing::error!("Error while getting balance. error: {:?}", e);
-            return;
-        }
-    };
+) -> Result<()> {
+    let balance = provider.get_balance(address, None).await?;
+    // This conversion to u128 is fine as the total balance will never cross the limits
+    // of u128 practically.
+    let balance = balance.as_u128();
     // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus.
     // The balance is in wei, so we need to divide by 1e18 to convert it to eth.
     let balance = balance as f64 / 1e18;
@@ -42,6 +37,8 @@ pub async fn track_balance(
             address: address.to_string(),
         })
         .set(balance);
+
+    Ok(())
 }
 
 /// Tracks the difference between the server timestamp and the latest block timestamp for each chain
@@ -50,53 +47,47 @@ pub async fn track_block_timestamp_lag(
     chain_id: String,
     provider: Arc<Provider<TracedClient>>,
     metrics: Arc<KeeperMetrics>,
-) {
-    const INF_LAG: i64 = 1000000; // value that definitely triggers an alert
-    let lag = match provider.get_block(BlockNumber::Latest).await {
-        Ok(block) => match block {
-            Some(block) => {
-                let block_timestamp = block.timestamp;
-                let server_timestamp = SystemTime::now()
-                    .duration_since(UNIX_EPOCH)
-                    .unwrap()
-                    .as_secs();
-                let lag: i64 = (server_timestamp as i64) - (block_timestamp.as_u64() as i64);
-                lag
-            }
-            None => {
-                tracing::error!("Block is None");
-                INF_LAG
-            }
-        },
-        Err(e) => {
-            tracing::error!("Failed to get block - {:?}", e);
-            INF_LAG
-        }
+) -> Result<()> {
+    let label = ChainIdLabel {
+        chain_id: chain_id.clone(),
     };
+
+    let block = provider.get_block(BlockNumber::Latest).await?;
+    let block = block.ok_or(anyhow!("block was none"))?;
+    let block_timestamp = block.timestamp.as_u64();
+    let block_timestamp = i64::try_from(block_timestamp)?;
+    let block_number = block
+        .number
+        .ok_or(anyhow!("block number was none"))?
+        .as_u64();
+
     metrics
-        .block_timestamp_lag
-        .get_or_create(&ChainIdLabel {
-            chain_id: chain_id.clone(),
-        })
-        .set(lag);
+        .latest_block_timestamp
+        .get_or_create(&label)
+        .set(block_timestamp);
+
+    metrics
+        .latest_block_number
+        .get_or_create(&label)
+        .set(block_number as i64);
+
+    let server_timestamp = i64::try_from(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())?;
+
+    let lag = server_timestamp - block_timestamp;
+    metrics.block_timestamp_lag.get_or_create(&label).set(lag);
+
+    Ok(())
 }
 
 /// tracks the collected fees and the hashchain data of the given provider address on the given chain
-/// if there is a error the function will just return
 #[tracing::instrument(skip_all)]
 pub async fn track_provider(
     chain_id: ChainId,
     contract: InstrumentedPythContract,
     provider_address: Address,
     metrics: Arc<KeeperMetrics>,
-) {
-    let provider_info = match contract.get_provider_info(provider_address).call().await {
-        Ok(info) => info,
-        Err(e) => {
-            tracing::error!("Error while getting provider info. error: {:?}", e);
-            return;
-        }
-    };
+) -> Result<()> {
+    let provider_info = contract.get_provider_info(provider_address).call().await?;
 
     // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
     // The fee is in wei, so we divide by 1e18 to convert it to eth.
@@ -150,23 +141,18 @@ pub async fn track_provider(
             address: provider_address.to_string(),
         })
         .set(end_sequence_number as i64);
+
+    Ok(())
 }
 
 /// tracks the accrued pyth fees on the given chain
-/// if there is an error the function will just return
 #[tracing::instrument(skip_all)]
 pub async fn track_accrued_pyth_fees(
     chain_id: ChainId,
     contract: InstrumentedPythContract,
     metrics: Arc<KeeperMetrics>,
-) {
-    let accrued_pyth_fees = match contract.get_accrued_pyth_fees().call().await {
-        Ok(fees) => fees,
-        Err(e) => {
-            tracing::error!("Error while getting accrued pyth fees. error: {:?}", e);
-            return;
-        }
-    };
+) -> Result<()> {
+    let accrued_pyth_fees = contract.get_accrued_pyth_fees().call().await?;
 
     // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
     // The fee is in wei, so we divide by 1e18 to convert it to eth.
@@ -178,4 +164,6 @@ pub async fn track_accrued_pyth_fees(
             chain_id: chain_id.clone(),
         })
         .set(accrued_pyth_fees);
+
+    Ok(())
 }