|
|
@@ -1,35 +1,24 @@
|
|
|
use {
|
|
|
crate::{
|
|
|
- api::{self, BlockchainState, ChainId},
|
|
|
+ api::{self, BlockchainState, ChainName},
|
|
|
config::{Config, EthereumConfig, RunOptions},
|
|
|
keeper::{self, keeper_metrics::KeeperMetrics},
|
|
|
},
|
|
|
anyhow::{anyhow, Error, Result},
|
|
|
axum::Router,
|
|
|
- ethers::{middleware::Middleware, types::BlockNumber},
|
|
|
- fortuna::eth_utils::traced_client::{RpcMetrics, TracedClient},
|
|
|
+ fortuna::eth_utils::traced_client::RpcMetrics,
|
|
|
futures::future::join_all,
|
|
|
- prometheus_client::{
|
|
|
- encoding::EncodeLabelSet,
|
|
|
- metrics::{family::Family, gauge::Gauge},
|
|
|
- registry::Registry,
|
|
|
- },
|
|
|
- std::{
|
|
|
- collections::HashMap,
|
|
|
- net::SocketAddr,
|
|
|
- sync::Arc,
|
|
|
- time::{Duration, SystemTime, UNIX_EPOCH},
|
|
|
- },
|
|
|
+ prometheus_client::{encoding::EncodeLabelSet, registry::Registry},
|
|
|
+ std::{collections::HashMap, net::SocketAddr, sync::Arc},
|
|
|
tokio::{
|
|
|
spawn,
|
|
|
sync::{watch, RwLock},
|
|
|
- time,
|
|
|
},
|
|
|
tower_http::cors::CorsLayer,
|
|
|
};
|
|
|
|
|
|
/// Track metrics in this interval
|
|
|
-const TRACK_INTERVAL: Duration = Duration::from_secs(10);
|
|
|
+// const TRACK_INTERVAL: Duration = Duration::from_secs(10);
|
|
|
|
|
|
pub async fn run_api(
|
|
|
socket_addr: SocketAddr,
|
|
|
@@ -111,7 +100,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
|
|
|
}
|
|
|
let states = join_all(tasks).await;
|
|
|
|
|
|
- let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
|
|
|
+ let mut chains: HashMap<ChainName, BlockchainState> = HashMap::new();
|
|
|
for result in states {
|
|
|
let (chain_id, state) = result?;
|
|
|
|
|
|
@@ -153,11 +142,12 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
|
|
|
}
|
|
|
|
|
|
// Spawn a thread to track latest block lag. This helps us know if the rpc is up and updated with the latest block.
|
|
|
- spawn(track_block_timestamp_lag(
|
|
|
- config,
|
|
|
- metrics_registry.clone(),
|
|
|
- rpc_metrics.clone(),
|
|
|
- ));
|
|
|
+ // TODO: turn this into an actor.
|
|
|
+ // spawn(track_block_timestamp_lag(
|
|
|
+ // config,
|
|
|
+ // metrics_registry.clone(),
|
|
|
+ // rpc_metrics.clone(),
|
|
|
+ // ));
|
|
|
|
|
|
run_api(opts.addr, metrics_registry, rx_exit).await?;
|
|
|
|
|
|
@@ -165,11 +155,11 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
|
|
|
}
|
|
|
|
|
|
async fn setup_chain_state(
|
|
|
- chain_id: &ChainId,
|
|
|
+ chain_id: &ChainName,
|
|
|
chain_config: &EthereumConfig,
|
|
|
) -> Result<BlockchainState> {
|
|
|
let state = BlockchainState {
|
|
|
- id: chain_id.clone(),
|
|
|
+ name: chain_id.clone(),
|
|
|
confirmed_block_status: chain_config.confirmed_block_status,
|
|
|
};
|
|
|
Ok(state)
|
|
|
@@ -180,73 +170,73 @@ pub struct ChainLabel {
|
|
|
pub chain_id: String,
|
|
|
}
|
|
|
|
|
|
-#[tracing::instrument(name = "block_timestamp_lag", skip_all, fields(chain_id = chain_id))]
|
|
|
-pub async fn check_block_timestamp_lag(
|
|
|
- chain_id: String,
|
|
|
- chain_config: EthereumConfig,
|
|
|
- metrics: Family<ChainLabel, Gauge>,
|
|
|
- rpc_metrics: Arc<RpcMetrics>,
|
|
|
-) {
|
|
|
- let provider =
|
|
|
- match TracedClient::new(chain_id.clone(), &chain_config.geth_rpc_addr, rpc_metrics) {
|
|
|
- Ok(r) => r,
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("Failed to create provider for chain id - {:?}", e);
|
|
|
- return;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- const INF_LAG: i64 = 1000000; // value that definitely triggers an alert
|
|
|
- let lag = match provider.get_block(BlockNumber::Latest).await {
|
|
|
- Ok(block) => match block {
|
|
|
- Some(block) => {
|
|
|
- let block_timestamp = block.timestamp;
|
|
|
- let server_timestamp = SystemTime::now()
|
|
|
- .duration_since(UNIX_EPOCH)
|
|
|
- .unwrap()
|
|
|
- .as_secs();
|
|
|
- let lag: i64 = (server_timestamp as i64) - (block_timestamp.as_u64() as i64);
|
|
|
- lag
|
|
|
- }
|
|
|
- None => {
|
|
|
- tracing::error!("Block is None");
|
|
|
- INF_LAG
|
|
|
- }
|
|
|
- },
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("Failed to get block - {:?}", e);
|
|
|
- INF_LAG
|
|
|
- }
|
|
|
- };
|
|
|
- metrics
|
|
|
- .get_or_create(&ChainLabel {
|
|
|
- chain_id: chain_id.clone(),
|
|
|
- })
|
|
|
- .set(lag);
|
|
|
-}
|
|
|
-
|
|
|
-/// Tracks the difference between the server timestamp and the latest block timestamp for each chain
|
|
|
-pub async fn track_block_timestamp_lag(
|
|
|
- config: Config,
|
|
|
- metrics_registry: Arc<RwLock<Registry>>,
|
|
|
- rpc_metrics: Arc<RpcMetrics>,
|
|
|
-) {
|
|
|
- let metrics = Family::<ChainLabel, Gauge>::default();
|
|
|
- metrics_registry.write().await.register(
|
|
|
- "block_timestamp_lag",
|
|
|
- "The difference between server timestamp and latest block timestamp",
|
|
|
- metrics.clone(),
|
|
|
- );
|
|
|
- loop {
|
|
|
- for (chain_id, chain_config) in &config.chains {
|
|
|
- spawn(check_block_timestamp_lag(
|
|
|
- chain_id.clone(),
|
|
|
- chain_config.clone(),
|
|
|
- metrics.clone(),
|
|
|
- rpc_metrics.clone(),
|
|
|
- ));
|
|
|
- }
|
|
|
-
|
|
|
- time::sleep(TRACK_INTERVAL).await;
|
|
|
- }
|
|
|
-}
|
|
|
+// #[tracing::instrument(name = "block_timestamp_lag", skip_all, fields(chain_id = chain_id))]
|
|
|
+// pub async fn check_block_timestamp_lag(
|
|
|
+// chain_id: String,
|
|
|
+// chain_config: EthereumConfig,
|
|
|
+// metrics: Family<ChainLabel, Gauge>,
|
|
|
+// rpc_metrics: Arc<RpcMetrics>,
|
|
|
+// ) {
|
|
|
+// let provider =
|
|
|
+// match TracedClient::new(chain_id.clone(), &chain_config.geth_rpc_addr, rpc_metrics) {
|
|
|
+// Ok(r) => r,
|
|
|
+// Err(e) => {
|
|
|
+// tracing::error!("Failed to create provider for chain id - {:?}", e);
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// };
|
|
|
+
|
|
|
+// const INF_LAG: i64 = 1000000; // value that definitely triggers an alert
|
|
|
+// let lag = match provider.get_block(BlockNumber::Latest).await {
|
|
|
+// Ok(block) => match block {
|
|
|
+// Some(block) => {
|
|
|
+// let block_timestamp = block.timestamp;
|
|
|
+// let server_timestamp = SystemTime::now()
|
|
|
+// .duration_since(UNIX_EPOCH)
|
|
|
+// .unwrap()
|
|
|
+// .as_secs();
|
|
|
+// let lag: i64 = (server_timestamp as i64) - (block_timestamp.as_u64() as i64);
|
|
|
+// lag
|
|
|
+// }
|
|
|
+// None => {
|
|
|
+// tracing::error!("Block is None");
|
|
|
+// INF_LAG
|
|
|
+// }
|
|
|
+// },
|
|
|
+// Err(e) => {
|
|
|
+// tracing::error!("Failed to get block - {:?}", e);
|
|
|
+// INF_LAG
|
|
|
+// }
|
|
|
+// };
|
|
|
+// metrics
|
|
|
+// .get_or_create(&ChainLabel {
|
|
|
+// chain_id: chain_id.clone(),
|
|
|
+// })
|
|
|
+// .set(lag);
|
|
|
+// }
|
|
|
+
|
|
|
+// /// Tracks the difference between the server timestamp and the latest block timestamp for each chain
|
|
|
+// pub async fn track_block_timestamp_lag(
|
|
|
+// config: Config,
|
|
|
+// metrics_registry: Arc<RwLock<Registry>>,
|
|
|
+// rpc_metrics: Arc<RpcMetrics>,
|
|
|
+// ) {
|
|
|
+// let metrics = Family::<ChainLabel, Gauge>::default();
|
|
|
+// metrics_registry.write().await.register(
|
|
|
+// "block_timestamp_lag",
|
|
|
+// "The difference between server timestamp and latest block timestamp",
|
|
|
+// metrics.clone(),
|
|
|
+// );
|
|
|
+// loop {
|
|
|
+// for (chain_id, chain_config) in &config.chains {
|
|
|
+// spawn(check_block_timestamp_lag(
|
|
|
+// chain_id.clone(),
|
|
|
+// chain_config.clone(),
|
|
|
+// metrics.clone(),
|
|
|
+// rpc_metrics.clone(),
|
|
|
+// ));
|
|
|
+// }
|
|
|
+
|
|
|
+// time::sleep(TRACK_INTERVAL).await;
|
|
|
+// }
|
|
|
+// }
|