|
|
@@ -1,47 +1,30 @@
|
|
|
use {
|
|
|
crate::{
|
|
|
- api::{self, BlockchainState, ChainId},
|
|
|
+ api::{self, ApiBlockChainState, BlockchainState, ChainId},
|
|
|
chain::ethereum::InstrumentedPythContract,
|
|
|
command::register_provider::CommitmentMetadata,
|
|
|
- config::{Commitment, Config, EthereumConfig, RunOptions},
|
|
|
- eth_utils::traced_client::{RpcMetrics, TracedClient},
|
|
|
+ config::{Commitment, Config, EthereumConfig, ProviderConfig, RunOptions},
|
|
|
+ eth_utils::traced_client::RpcMetrics,
|
|
|
keeper::{self, keeper_metrics::KeeperMetrics},
|
|
|
state::{HashChainState, PebbleHashChain},
|
|
|
},
|
|
|
anyhow::{anyhow, Error, Result},
|
|
|
axum::Router,
|
|
|
- ethers::{
|
|
|
- middleware::Middleware,
|
|
|
- types::{Address, BlockNumber},
|
|
|
- },
|
|
|
- futures::future::join_all,
|
|
|
- prometheus_client::{
|
|
|
- encoding::EncodeLabelSet,
|
|
|
- metrics::{family::Family, gauge::Gauge},
|
|
|
- registry::Registry,
|
|
|
- },
|
|
|
- std::{
|
|
|
- collections::HashMap,
|
|
|
- net::SocketAddr,
|
|
|
- sync::Arc,
|
|
|
- time::{Duration, SystemTime, UNIX_EPOCH},
|
|
|
- },
|
|
|
+ ethers::types::Address,
|
|
|
+ prometheus_client::{encoding::EncodeLabelSet, registry::Registry},
|
|
|
+ std::{collections::HashMap, net::SocketAddr, sync::Arc},
|
|
|
tokio::{
|
|
|
spawn,
|
|
|
sync::{watch, RwLock},
|
|
|
- time,
|
|
|
},
|
|
|
tower_http::cors::CorsLayer,
|
|
|
utoipa::OpenApi,
|
|
|
utoipa_swagger_ui::SwaggerUi,
|
|
|
};
|
|
|
|
|
|
-/// Track metrics in this interval
|
|
|
-const TRACK_INTERVAL: Duration = Duration::from_secs(10);
|
|
|
-
|
|
|
pub async fn run_api(
|
|
|
socket_addr: SocketAddr,
|
|
|
- chains: HashMap<String, api::BlockchainState>,
|
|
|
+ chains: Arc<RwLock<HashMap<String, ApiBlockChainState>>>,
|
|
|
metrics_registry: Arc<RwLock<Registry>>,
|
|
|
mut rx_exit: watch::Receiver<bool>,
|
|
|
) -> Result<()> {
|
|
|
@@ -93,40 +76,6 @@ pub async fn run_api(
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
-pub async fn run_keeper(
|
|
|
- chains: HashMap<String, api::BlockchainState>,
|
|
|
- config: Config,
|
|
|
- private_key: String,
|
|
|
- metrics_registry: Arc<RwLock<Registry>>,
|
|
|
- rpc_metrics: Arc<RpcMetrics>,
|
|
|
-) -> Result<()> {
|
|
|
- let mut handles = Vec::new();
|
|
|
- let keeper_metrics: Arc<KeeperMetrics> = Arc::new({
|
|
|
- let chain_labels: Vec<(String, Address)> = chains
|
|
|
- .iter()
|
|
|
- .map(|(id, state)| (id.clone(), state.provider_address))
|
|
|
- .collect();
|
|
|
- KeeperMetrics::new(metrics_registry.clone(), chain_labels).await
|
|
|
- });
|
|
|
- for (chain_id, chain_config) in chains {
|
|
|
- let chain_eth_config = config
|
|
|
- .chains
|
|
|
- .get(&chain_id)
|
|
|
- .expect("All chains should be present in the config file")
|
|
|
- .clone();
|
|
|
- let private_key = private_key.clone();
|
|
|
- handles.push(spawn(keeper::run_keeper_threads(
|
|
|
- private_key,
|
|
|
- chain_eth_config,
|
|
|
- chain_config.clone(),
|
|
|
- keeper_metrics.clone(),
|
|
|
- rpc_metrics.clone(),
|
|
|
- )));
|
|
|
- }
|
|
|
-
|
|
|
- Ok(())
|
|
|
-}
|
|
|
-
|
|
|
pub async fn run(opts: &RunOptions) -> Result<()> {
|
|
|
let config = Config::load(&opts.config.config)?;
|
|
|
let secret = config.provider.secret.load()?.ok_or(anyhow!(
|
|
|
@@ -136,41 +85,51 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
|
|
|
let metrics_registry = Arc::new(RwLock::new(Registry::default()));
|
|
|
let rpc_metrics = Arc::new(RpcMetrics::new(metrics_registry.clone()).await);
|
|
|
|
|
|
- let mut tasks = Vec::new();
|
|
|
+ let keeper_metrics: Arc<KeeperMetrics> =
|
|
|
+ Arc::new(KeeperMetrics::new(metrics_registry.clone()).await);
|
|
|
+ let keeper_private_key_option = config.keeper.private_key.load()?;
|
|
|
+ if keeper_private_key_option.is_none() {
|
|
|
+ tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
|
|
|
+ }
|
|
|
+ let chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>> = Arc::new(RwLock::new(
|
|
|
+ config
|
|
|
+ .chains
|
|
|
+ .keys()
|
|
|
+ .map(|chain_id| (chain_id.clone(), ApiBlockChainState::Uninitialized))
|
|
|
+ .collect(),
|
|
|
+ ));
|
|
|
for (chain_id, chain_config) in config.chains.clone() {
|
|
|
+ let keeper_metrics = keeper_metrics.clone();
|
|
|
+ let keeper_private_key_option = keeper_private_key_option.clone();
|
|
|
+ let chains = chains.clone();
|
|
|
let secret_copy = secret.clone();
|
|
|
let rpc_metrics = rpc_metrics.clone();
|
|
|
- tasks.push(spawn(async move {
|
|
|
- let state = setup_chain_state(
|
|
|
- &config.provider.address,
|
|
|
- &secret_copy,
|
|
|
- config.provider.chain_sample_interval,
|
|
|
- &chain_id,
|
|
|
- &chain_config,
|
|
|
- rpc_metrics,
|
|
|
- )
|
|
|
- .await;
|
|
|
-
|
|
|
- (chain_id, state)
|
|
|
- }));
|
|
|
- }
|
|
|
- let states = join_all(tasks).await;
|
|
|
-
|
|
|
- let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
|
|
|
- for result in states {
|
|
|
- let (chain_id, state) = result?;
|
|
|
-
|
|
|
- match state {
|
|
|
- Ok(state) => {
|
|
|
- chains.insert(chain_id.clone(), state);
|
|
|
+ let provider_config = config.provider.clone();
|
|
|
+ spawn(async move {
|
|
|
+ loop {
|
|
|
+ let setup_result = setup_chain_and_run_keeper(
|
|
|
+ provider_config.clone(),
|
|
|
+ &chain_id,
|
|
|
+ chain_config.clone(),
|
|
|
+ keeper_metrics.clone(),
|
|
|
+ keeper_private_key_option.clone(),
|
|
|
+ chains.clone(),
|
|
|
+ &secret_copy,
|
|
|
+ rpc_metrics.clone(),
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+ match setup_result {
|
|
|
+ Ok(_) => {
|
|
|
+ tracing::info!("Chain {} initialized successfully", chain_id);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ tracing::error!("Failed to initialize chain {}: {}", chain_id, e);
|
|
|
+ tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("Failed to setup {} {}", chain_id, e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if chains.is_empty() {
|
|
|
- return Err(anyhow!("No chains were successfully setup"));
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
// Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
|
|
|
@@ -185,27 +144,45 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
|
|
|
Ok::<(), Error>(())
|
|
|
});
|
|
|
|
|
|
- if let Some(keeper_private_key) = config.keeper.private_key.load()? {
|
|
|
- spawn(run_keeper(
|
|
|
- chains.clone(),
|
|
|
- config.clone(),
|
|
|
+ run_api(opts.addr, chains.clone(), metrics_registry.clone(), rx_exit).await?;
|
|
|
+ Ok(())
|
|
|
+}
|
|
|
+
|
|
|
+#[allow(clippy::too_many_arguments)]
|
|
|
+async fn setup_chain_and_run_keeper(
|
|
|
+ provider_config: ProviderConfig,
|
|
|
+ chain_id: &ChainId,
|
|
|
+ chain_config: EthereumConfig,
|
|
|
+ keeper_metrics: Arc<KeeperMetrics>,
|
|
|
+ keeper_private_key_option: Option<String>,
|
|
|
+ chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
|
|
|
+ secret_copy: &str,
|
|
|
+ rpc_metrics: Arc<RpcMetrics>,
|
|
|
+) -> Result<()> {
|
|
|
+ let state = setup_chain_state(
|
|
|
+ &provider_config.address,
|
|
|
+ secret_copy,
|
|
|
+ provider_config.chain_sample_interval,
|
|
|
+ chain_id,
|
|
|
+ &chain_config,
|
|
|
+ rpc_metrics.clone(),
|
|
|
+ )
|
|
|
+ .await?;
|
|
|
+ keeper_metrics.add_chain(chain_id.clone(), state.provider_address);
|
|
|
+ chains.write().await.insert(
|
|
|
+ chain_id.clone(),
|
|
|
+ ApiBlockChainState::Initialized(state.clone()),
|
|
|
+ );
|
|
|
+ if let Some(keeper_private_key) = keeper_private_key_option {
|
|
|
+ keeper::run_keeper_threads(
|
|
|
keeper_private_key,
|
|
|
- metrics_registry.clone(),
|
|
|
+ chain_config,
|
|
|
+ state,
|
|
|
+ keeper_metrics.clone(),
|
|
|
rpc_metrics.clone(),
|
|
|
- ));
|
|
|
- } else {
|
|
|
- tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
|
|
|
+ )
|
|
|
+ .await?;
|
|
|
}
|
|
|
-
|
|
|
- // Spawn a thread to track latest block lag. This helps us know if the rpc is up and updated with the latest block.
|
|
|
- spawn(track_block_timestamp_lag(
|
|
|
- config,
|
|
|
- metrics_registry.clone(),
|
|
|
- rpc_metrics.clone(),
|
|
|
- ));
|
|
|
-
|
|
|
- run_api(opts.addr, chains, metrics_registry, rx_exit).await?;
|
|
|
-
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
@@ -267,7 +244,7 @@ async fn setup_chain_state(
|
|
|
let offset = commitment.original_commitment_sequence_number.try_into()?;
|
|
|
offsets.push(offset);
|
|
|
|
|
|
- let pebble_hash_chain = PebbleHashChain::from_config(
|
|
|
+ let pebble_hash_chain = PebbleHashChain::from_config_async(
|
|
|
secret,
|
|
|
chain_id,
|
|
|
provider,
|
|
|
@@ -276,6 +253,7 @@ async fn setup_chain_state(
|
|
|
commitment.chain_length,
|
|
|
chain_sample_interval,
|
|
|
)
|
|
|
+ .await
|
|
|
.map_err(|e| anyhow!("Failed to create hash chain: {}", e))?;
|
|
|
hash_chains.push(pebble_hash_chain);
|
|
|
}
|
|
|
@@ -308,74 +286,3 @@ async fn setup_chain_state(
|
|
|
pub struct ChainLabel {
|
|
|
pub chain_id: String,
|
|
|
}
|
|
|
-
|
|
|
-#[tracing::instrument(name = "block_timestamp_lag", skip_all, fields(chain_id = chain_id))]
|
|
|
-pub async fn check_block_timestamp_lag(
|
|
|
- chain_id: String,
|
|
|
- chain_config: EthereumConfig,
|
|
|
- metrics: Family<ChainLabel, Gauge>,
|
|
|
- rpc_metrics: Arc<RpcMetrics>,
|
|
|
-) {
|
|
|
- let provider =
|
|
|
- match TracedClient::new(chain_id.clone(), &chain_config.geth_rpc_addr, rpc_metrics) {
|
|
|
- Ok(r) => r,
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("Failed to create provider for chain id - {:?}", e);
|
|
|
- return;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- const INF_LAG: i64 = 1000000; // value that definitely triggers an alert
|
|
|
- let lag = match provider.get_block(BlockNumber::Latest).await {
|
|
|
- Ok(block) => match block {
|
|
|
- Some(block) => {
|
|
|
- let block_timestamp = block.timestamp;
|
|
|
- let server_timestamp = SystemTime::now()
|
|
|
- .duration_since(UNIX_EPOCH)
|
|
|
- .unwrap()
|
|
|
- .as_secs();
|
|
|
- let lag: i64 = (server_timestamp as i64) - (block_timestamp.as_u64() as i64);
|
|
|
- lag
|
|
|
- }
|
|
|
- None => {
|
|
|
- tracing::error!("Block is None");
|
|
|
- INF_LAG
|
|
|
- }
|
|
|
- },
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("Failed to get block - {:?}", e);
|
|
|
- INF_LAG
|
|
|
- }
|
|
|
- };
|
|
|
- metrics
|
|
|
- .get_or_create(&ChainLabel {
|
|
|
- chain_id: chain_id.clone(),
|
|
|
- })
|
|
|
- .set(lag);
|
|
|
-}
|
|
|
-
|
|
|
-/// Tracks the difference between the server timestamp and the latest block timestamp for each chain
|
|
|
-pub async fn track_block_timestamp_lag(
|
|
|
- config: Config,
|
|
|
- metrics_registry: Arc<RwLock<Registry>>,
|
|
|
- rpc_metrics: Arc<RpcMetrics>,
|
|
|
-) {
|
|
|
- let metrics = Family::<ChainLabel, Gauge>::default();
|
|
|
- metrics_registry.write().await.register(
|
|
|
- "block_timestamp_lag",
|
|
|
- "The difference between server timestamp and latest block timestamp",
|
|
|
- metrics.clone(),
|
|
|
- );
|
|
|
- loop {
|
|
|
- for (chain_id, chain_config) in &config.chains {
|
|
|
- spawn(check_block_timestamp_lag(
|
|
|
- chain_id.clone(),
|
|
|
- chain_config.clone(),
|
|
|
- metrics.clone(),
|
|
|
- rpc_metrics.clone(),
|
|
|
- ));
|
|
|
- }
|
|
|
-
|
|
|
- time::sleep(TRACK_INTERVAL).await;
|
|
|
- }
|
|
|
-}
|