Bläddra i källkod

isolate keeper metrics

0xfirefist 1 år sedan
förälder
incheckning
6663a651e9
2 ändrade filer med 247 tillägg och 188 borttagningar
  1. 4 166
      apps/fortuna/src/command/run.rs
  2. 243 22
      apps/fortuna/src/keeper.rs

+ 4 - 166
apps/fortuna/src/command/run.rs

@@ -13,11 +13,7 @@ use {
             ProviderConfig,
             RunOptions,
         },
-        keeper,
-        metrics::{
-            self,
-            AccountLabel,
-        },
+        keeper::{self,},
         state::{
             HashChainState,
             PebbleHashChain,
@@ -29,24 +25,11 @@ use {
         Result,
     },
     axum::Router,
-    ethers::{
-        middleware::Middleware,
-        providers::{
-            Http,
-            Provider,
-        },
-        signers::{
-            LocalWallet,
-            Signer,
-        },
-        types::Address,
-    },
     prometheus_client::registry::Registry,
     std::{
         collections::HashMap,
         net::SocketAddr,
         sync::Arc,
-        time::Duration,
     },
     tokio::{
         spawn,
@@ -54,15 +37,12 @@ use {
             watch,
             RwLock,
         },
-        time,
     },
     tower_http::cors::CorsLayer,
     utoipa::OpenApi,
     utoipa_swagger_ui::SwaggerUi,
 };
 
-const TRACK_INTERVAL: Duration = Duration::from_secs(10);
-
 pub async fn run_api(
     socket_addr: SocketAddr,
     chains: HashMap<String, api::BlockchainState>,
@@ -122,7 +102,7 @@ pub async fn run_keeper(
     chains: HashMap<String, api::BlockchainState>,
     config: Config,
     private_key: String,
-    metrics_registry: Arc<metrics::Metrics>,
+    metrics_registry: Arc<RwLock<Registry>>,
 ) -> Result<()> {
     let mut handles = Vec::new();
     for (chain_id, chain_config) in chains {
@@ -241,160 +221,18 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         Ok::<(), Error>(())
     });
 
-    let registry = Arc::new(RwLock::new(Registry::default()));
-
-
-    let metrics_registry = Arc::new(metrics::Metrics::new());
+    let metrics_registry = Arc::new(RwLock::new(Registry::default()));
 
     if let Some(keeper_private_key) = opts.load_keeper_private_key()? {
-        let keeper_address = keeper_private_key.parse::<LocalWallet>()?.address();
-
         spawn(run_keeper(
             chains.clone(),
             config.clone(),
             keeper_private_key,
             metrics_registry.clone(),
         ));
-
-        spawn(track_balance(
-            config.clone(),
-            keeper_address,
-            metrics_registry.clone(),
-        ));
     }
 
-    spawn(track_hashchain(
-        config.clone(),
-        opts.provider.clone(),
-        metrics_registry.clone(),
-    ));
-    spawn(track_collected_fee(
-        config.clone(),
-        opts.provider.clone(),
-        metrics_registry.clone(),
-    ));
-    run_api(opts.addr.clone(), chains, registry.clone(), rx_exit).await?;
+    run_api(opts.addr.clone(), chains, metrics_registry.clone(), rx_exit).await?;
 
     Ok(())
 }
-
-/// tracks the balance of the given address for each chain in the given config periodically
-pub async fn track_balance(
-    config: Config,
-    address: Address,
-    metrics_registry: Arc<metrics::Metrics>,
-) {
-    loop {
-        for (chain_id, chain_config) in &config.chains {
-            let provider = match Provider::<Http>::try_from(&chain_config.geth_rpc_addr) {
-                Ok(r) => r,
-                Err(_e) => continue,
-            };
-
-            let balance = match provider.get_balance(address, None).await {
-                // This conversion to u128 is fine as the total balance will never cross the limits
-                // of u128 practically.
-                Ok(r) => r.as_u128(),
-                Err(_e) => continue,
-            };
-            // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus.
-            // The balance is in wei, so we need to divide by 1e18 to convert it to eth.
-            let balance = balance as f64 / 1e18;
-
-            metrics_registry
-                .balance
-                .get_or_create(&AccountLabel {
-                    chain_id: chain_id.clone(),
-                    address:  address.to_string(),
-                })
-                .set(balance);
-        }
-
-        time::sleep(TRACK_INTERVAL).await;
-    }
-}
-
-/// tracks the collected fees of the given address for each chain in the given config periodically
-pub async fn track_collected_fee(
-    config: Config,
-    provider_address: Address,
-    metrics_registry: Arc<metrics::Metrics>,
-) {
-    loop {
-        for (chain_id, chain_config) in &config.chains {
-            let contract = match PythContract::from_config(chain_config) {
-                Ok(r) => r,
-                Err(_e) => continue,
-            };
-
-            let provider_info = match contract.get_provider_info(provider_address).call().await {
-                Ok(info) => info,
-                Err(_e) => {
-                    time::sleep(Duration::from_secs(5)).await;
-                    continue;
-                }
-            };
-
-            // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
-            // The fee is in wei, so we need to divide by 1e18 to convert it to eth.
-            let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18;
-
-            metrics_registry
-                .collected_fee
-                .get_or_create(&AccountLabel {
-                    chain_id: chain_id.clone(),
-                    address:  provider_address.to_string(),
-                })
-                .set(collected_fee);
-        }
-
-        time::sleep(TRACK_INTERVAL).await;
-    }
-}
-
-/// tracks the current sequence number and end sequence number of the given provider address for
-/// each chain in the given config periodically
-pub async fn track_hashchain(
-    config: Config,
-    provider_address: Address,
-    metrics_registry: Arc<metrics::Metrics>,
-) {
-    loop {
-        for (chain_id, chain_config) in &config.chains {
-            let contract = match PythContract::from_config(chain_config) {
-                Ok(r) => r,
-                Err(_e) => continue,
-            };
-
-            let provider_info = match contract.get_provider_info(provider_address).call().await {
-                Ok(info) => info,
-                Err(_e) => {
-                    time::sleep(Duration::from_secs(5)).await;
-                    continue;
-                }
-            };
-            let current_sequence_number = provider_info.sequence_number;
-            let end_sequence_number = provider_info.end_sequence_number;
-
-            metrics_registry
-                .current_sequence_number
-                .get_or_create(&AccountLabel {
-                    chain_id: chain_id.clone(),
-                    address:  provider_address.to_string(),
-                })
-                // sequence_number type on chain is u64 but practically it will take
-                // a long time for it to cross the limits of i64.
-                // currently prometheus only supports i64 for Gauge types
-                .set(current_sequence_number as i64);
-            metrics_registry
-                .end_sequence_number
-                .get_or_create(&AccountLabel {
-                    chain_id: chain_id.clone(),
-                    address:  provider_address.to_string(),
-                })
-                .set(end_sequence_number as i64);
-        }
-
-        time::sleep(TRACK_INTERVAL).await;
-    }
-}

+ 243 - 22
apps/fortuna/src/keeper.rs

@@ -5,17 +5,16 @@ use {
             BlockchainState,
         },
         chain::{
-            ethereum::SignablePythContract,
+            ethereum::{
+                PythContract,
+                SignablePythContract,
+            },
             reader::{
                 BlockNumber,
                 RequestedWithCallbackEvent,
             },
         },
         config::EthereumConfig,
-        metrics::{
-            AccountLabel,
-            Metrics,
-        },
     },
     anyhow::{
         anyhow,
@@ -24,18 +23,37 @@ use {
     ethers::{
         contract::ContractError,
         providers::{
+            Http,
             Middleware,
             Provider,
             Ws,
         },
         signers::Signer,
-        types::U256,
+        types::{
+            Address,
+            U256,
+        },
     },
     futures::StreamExt,
-    std::sync::Arc,
+    prometheus_client::{
+        encoding::EncodeLabelSet,
+        metrics::{
+            counter::Counter,
+            family::Family,
+            gauge::Gauge,
+        },
+        registry::Registry,
+    },
+    std::sync::{
+        atomic::AtomicU64,
+        Arc,
+    },
     tokio::{
         spawn,
-        sync::mpsc,
+        sync::{
+            mpsc,
+            RwLock,
+        },
         time::{
             self,
             Duration,
@@ -47,12 +65,6 @@ use {
     },
 };
 
-#[derive(Debug)]
-pub struct BlockRange {
-    pub from: BlockNumber,
-    pub to:   BlockNumber,
-}
-
 /// How much to wait before retrying in case of an RPC error
 const RETRY_INTERVAL: Duration = Duration::from_secs(5);
 /// How many blocks to look back for events that might be missed when starting the keeper
@@ -61,7 +73,96 @@ const BACKLOG_RANGE: u64 = 1000;
 const BLOCK_BATCH_SIZE: u64 = 100;
 /// How much to wait before polling the next latest block
 const POLL_INTERVAL: Duration = Duration::from_secs(2);
+/// Track metrics in this interval
+const TRACK_INTERVAL: Duration = Duration::from_secs(10);
+
+#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
+pub struct AccountLabel {
+    pub chain_id: String,
+    pub address:  String,
+}
 
+pub struct KeeperMetrics {
+    pub current_sequence_number: Family<AccountLabel, Gauge>,
+    pub end_sequence_number:     Family<AccountLabel, Gauge>,
+    pub balance:                 Family<AccountLabel, Gauge<f64, AtomicU64>>,
+    pub collected_fee:           Family<AccountLabel, Gauge<f64, AtomicU64>>,
+    pub total_gas_spent:         Family<AccountLabel, Gauge<f64, AtomicU64>>,
+    pub requests:                Family<AccountLabel, Counter>,
+    pub requests_processed:      Family<AccountLabel, Counter>,
+    pub reveals:                 Family<AccountLabel, Counter>,
+}
+
+impl KeeperMetrics {
+    pub async fn new(registry: Arc<RwLock<Registry>>) -> Self {
+        let mut writable_registry = registry.write().await;
+
+        let current_sequence_number = Family::<AccountLabel, Gauge>::default();
+        writable_registry.register(
+            "current_sequence_number",
+            "The sequence number for a new request.",
+            current_sequence_number.clone(),
+        );
+
+        let end_sequence_number = Family::<AccountLabel, Gauge>::default();
+        writable_registry.register(
+            "end_sequence_number",
+            "The sequence number for the end request.",
+            end_sequence_number.clone(),
+        );
+
+        let requests = Family::<AccountLabel, Counter>::default();
+        writable_registry.register(
+            "requests",
+            "Number of requests received through events",
+            requests.clone(),
+        );
+
+        let requests_processed = Family::<AccountLabel, Counter>::default();
+        writable_registry.register(
+            "requests_processed",
+            "Number of requests processed",
+            requests_processed.clone(),
+        );
+
+        let reveals = Family::<AccountLabel, Counter>::default();
+        writable_registry.register("reveal", "Number of reveals", reveals.clone());
+
+        let balance = Family::<AccountLabel, Gauge<f64, AtomicU64>>::default();
+        writable_registry.register("balance", "Balance of the keeper", balance.clone());
+
+        let collected_fee = Family::<AccountLabel, Gauge<f64, AtomicU64>>::default();
+        writable_registry.register(
+            "collected_fee",
+            "Collected fee on the contract",
+            collected_fee.clone(),
+        );
+
+        let total_gas_spent = Family::<AccountLabel, Gauge<f64, AtomicU64>>::default();
+        writable_registry.register(
+            "total_gas_spent",
+            "Total gas spent revealing requests",
+            total_gas_spent.clone(),
+        );
+
+        KeeperMetrics {
+            current_sequence_number,
+            end_sequence_number,
+            requests,
+            requests_processed,
+            reveals,
+            balance,
+            collected_fee,
+            total_gas_spent,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct BlockRange {
+    pub from: BlockNumber,
+    pub to:   BlockNumber,
+}
 
 /// Get the latest safe block number for the chain. Retry internally if there is an error.
 async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
@@ -93,8 +194,11 @@ pub async fn run_keeper_threads(
     private_key: String,
     chain_eth_config: EthereumConfig,
     chain_state: BlockchainState,
-    metrics: Arc<Metrics>,
+    metrics: Arc<RwLock<Registry>>,
 ) {
+    // Register metrics
+    let keeper_metrics = Arc::new(KeeperMetrics::new(metrics.clone()).await);
+
     tracing::info!("starting keeper");
     let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
     tracing::info!("latest safe block: {}", &latest_safe_block);
@@ -104,6 +208,7 @@ pub async fn run_keeper_threads(
             .await
             .expect("Chain config should be valid"),
     );
+    let keeper_address = contract.client().inner().inner().signer().address();
 
     // Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
     spawn(
@@ -115,7 +220,7 @@ pub async fn run_keeper_threads(
             contract.clone(),
             chain_eth_config.gas_limit,
             chain_state.clone(),
-            metrics.clone(),
+            keeper_metrics.clone(),
         )
         .in_current_span(),
     );
@@ -138,7 +243,29 @@ pub async fn run_keeper_threads(
             rx,
             Arc::clone(&contract),
             chain_eth_config.gas_limit,
-            metrics.clone(),
+            keeper_metrics.clone(),
+        )
+        .in_current_span(),
+    );
+
+    // spawn a thread to track keeper balance
+    spawn(
+        track_balance(
+            chain_state.id.clone(),
+            chain_eth_config.clone(),
+            keeper_address.clone(),
+            keeper_metrics.clone(),
+        )
+        .in_current_span(),
+    );
+
+    // spawn a thread to track provider info
+    spawn(
+        track_provider(
+            chain_state.id.clone(),
+            chain_eth_config.clone(),
+            chain_state.provider_address.clone(),
+            keeper_metrics.clone(),
         )
         .in_current_span(),
     );
@@ -154,7 +281,7 @@ pub async fn process_event(
     chain_config: &BlockchainState,
     contract: &Arc<SignablePythContract>,
     gas_limit: U256,
-    metrics: Arc<Metrics>,
+    metrics: Arc<KeeperMetrics>,
 ) -> Result<()> {
     if chain_config.provider_address != event.provider_address {
         return Ok(());
@@ -314,7 +441,7 @@ pub async fn process_block_range(
     contract: Arc<SignablePythContract>,
     gas_limit: U256,
     chain_state: api::BlockchainState,
-    metrics: Arc<Metrics>,
+    metrics: Arc<KeeperMetrics>,
 ) {
     let BlockRange {
         from: first_block,
@@ -352,7 +479,7 @@ pub async fn process_single_block_batch(
     contract: Arc<SignablePythContract>,
     gas_limit: U256,
     chain_state: api::BlockchainState,
-    metrics: Arc<Metrics>,
+    metrics: Arc<KeeperMetrics>,
 ) {
     loop {
         let events_res = chain_state
@@ -525,7 +652,7 @@ pub async fn process_new_blocks(
     mut rx: mpsc::Receiver<BlockRange>,
     contract: Arc<SignablePythContract>,
     gas_limit: U256,
-    metrics: Arc<Metrics>,
+    metrics: Arc<KeeperMetrics>,
 ) {
     tracing::info!("Waiting for new block ranges to process");
     loop {
@@ -550,7 +677,7 @@ pub async fn process_backlog(
     contract: Arc<SignablePythContract>,
     gas_limit: U256,
     chain_state: BlockchainState,
-    metrics: Arc<Metrics>,
+    metrics: Arc<KeeperMetrics>,
 ) {
     tracing::info!("Processing backlog");
     process_block_range(backlog_range, contract, gas_limit, chain_state, metrics)
@@ -558,3 +685,97 @@ pub async fn process_backlog(
         .await;
     tracing::info!("Backlog processed");
 }
+
+
+/// tracks the balance of the given address for each chain in the given config periodically
+pub async fn track_balance(
+    chain_id: String,
+    chain_config: EthereumConfig,
+    address: Address,
+    metrics_registry: Arc<KeeperMetrics>,
+) {
+    loop {
+        let provider = match Provider::<Http>::try_from(&chain_config.geth_rpc_addr) {
+            Ok(r) => r,
+            Err(_e) => continue,
+        };
+
+        let balance = match provider.get_balance(address, None).await {
+            // This conversion to u128 is fine as the total balance will never cross the limits
+            // of u128 practically.
+            Ok(r) => r.as_u128(),
+            Err(_e) => continue,
+        };
+        // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus.
+        // The balance is in wei, so we need to divide by 1e18 to convert it to eth.
+        let balance = balance as f64 / 1e18;
+
+        metrics_registry
+            .balance
+            .get_or_create(&AccountLabel {
+                chain_id: chain_id.clone(),
+                address:  address.to_string(),
+            })
+            .set(balance);
+
+        time::sleep(TRACK_INTERVAL).await;
+    }
+}
+
+/// tracks the collected fees and the hashchain data of the given provider address for each chain in the given config periodically
+pub async fn track_provider(
+    chain_id: String,
+    chain_config: EthereumConfig,
+    provider_address: Address,
+    metrics_registry: Arc<KeeperMetrics>,
+) {
+    loop {
+        let contract = match PythContract::from_config(&chain_config) {
+            Ok(r) => r,
+            Err(_e) => continue,
+        };
+
+        let provider_info = match contract.get_provider_info(provider_address).call().await {
+            Ok(info) => info,
+            Err(_e) => {
+                time::sleep(Duration::from_secs(5)).await;
+                continue;
+            }
+        };
+
+        // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
+        // The fee is in wei, so we need to divide by 1e18 to convert it to eth.
+        let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18;
+
+        let current_sequence_number = provider_info.sequence_number;
+        let end_sequence_number = provider_info.end_sequence_number;
+
+        metrics_registry
+            .collected_fee
+            .get_or_create(&AccountLabel {
+                chain_id: chain_id.clone(),
+                address:  provider_address.to_string(),
+            })
+            .set(collected_fee);
+
+        metrics_registry
+            .current_sequence_number
+            .get_or_create(&AccountLabel {
+                chain_id: chain_id.clone(),
+                address:  provider_address.to_string(),
+            })
+            // sequence_number type on chain is u64 but practically it will take
+            // a long time for it to cross the limits of i64.
+            // currently prometheus only supports i64 for Gauge types
+            .set(current_sequence_number as i64);
+        metrics_registry
+            .end_sequence_number
+            .get_or_create(&AccountLabel {
+                chain_id: chain_id.clone(),
+                address:  provider_address.to_string(),
+            })
+            .set(end_sequence_number as i64);
+
+        time::sleep(TRACK_INTERVAL).await;
+    }
+}