|
|
@@ -5,7 +5,10 @@ use {
|
|
|
BlockchainState,
|
|
|
},
|
|
|
chain::{
|
|
|
- ethereum::SignablePythContract,
|
|
|
+ ethereum::{
|
|
|
+ PythContract,
|
|
|
+ SignablePythContract,
|
|
|
+ },
|
|
|
reader::{
|
|
|
BlockNumber,
|
|
|
RequestedWithCallbackEvent,
|
|
|
@@ -20,17 +23,37 @@ use {
|
|
|
ethers::{
|
|
|
contract::ContractError,
|
|
|
providers::{
|
|
|
+ Http,
|
|
|
Middleware,
|
|
|
Provider,
|
|
|
Ws,
|
|
|
},
|
|
|
- types::U256,
|
|
|
+ signers::Signer,
|
|
|
+ types::{
|
|
|
+ Address,
|
|
|
+ U256,
|
|
|
+ },
|
|
|
},
|
|
|
futures::StreamExt,
|
|
|
- std::sync::Arc,
|
|
|
+ prometheus_client::{
|
|
|
+ encoding::EncodeLabelSet,
|
|
|
+ metrics::{
|
|
|
+ counter::Counter,
|
|
|
+ family::Family,
|
|
|
+ gauge::Gauge,
|
|
|
+ },
|
|
|
+ registry::Registry,
|
|
|
+ },
|
|
|
+ std::sync::{
|
|
|
+ atomic::AtomicU64,
|
|
|
+ Arc,
|
|
|
+ },
|
|
|
tokio::{
|
|
|
spawn,
|
|
|
- sync::mpsc,
|
|
|
+ sync::{
|
|
|
+ mpsc,
|
|
|
+ RwLock,
|
|
|
+ },
|
|
|
time::{
|
|
|
self,
|
|
|
Duration,
|
|
|
@@ -42,12 +65,6 @@ use {
|
|
|
},
|
|
|
};
|
|
|
|
|
|
-#[derive(Debug)]
|
|
|
-pub struct BlockRange {
|
|
|
- pub from: BlockNumber,
|
|
|
- pub to: BlockNumber,
|
|
|
-}
|
|
|
-
|
|
|
/// How much to wait before retrying in case of an RPC error
|
|
|
const RETRY_INTERVAL: Duration = Duration::from_secs(5);
|
|
|
/// How many blocks to look back for events that might be missed when starting the keeper
|
|
|
@@ -56,7 +73,89 @@ const BACKLOG_RANGE: u64 = 1000;
|
|
|
const BLOCK_BATCH_SIZE: u64 = 100;
|
|
|
/// How much to wait before polling the next latest block
|
|
|
const POLL_INTERVAL: Duration = Duration::from_secs(2);
|
|
|
+/// Track metrics in this interval
|
|
|
+const TRACK_INTERVAL: Duration = Duration::from_secs(10);
|
|
|
+
|
|
|
+#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
|
|
|
+pub struct AccountLabel {
|
|
|
+ pub chain_id: String,
|
|
|
+ pub address: String,
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Default)]
|
|
|
+pub struct KeeperMetrics {
|
|
|
+ pub current_sequence_number: Family<AccountLabel, Gauge>,
|
|
|
+ pub end_sequence_number: Family<AccountLabel, Gauge>,
|
|
|
+ pub balance: Family<AccountLabel, Gauge<f64, AtomicU64>>,
|
|
|
+ pub collected_fee: Family<AccountLabel, Gauge<f64, AtomicU64>>,
|
|
|
+ pub total_gas_spent: Family<AccountLabel, Gauge<f64, AtomicU64>>,
|
|
|
+ pub requests: Family<AccountLabel, Counter>,
|
|
|
+ pub requests_processed: Family<AccountLabel, Counter>,
|
|
|
+ pub reveals: Family<AccountLabel, Counter>,
|
|
|
+}
|
|
|
+
|
|
|
+impl KeeperMetrics {
|
|
|
+ pub async fn new(registry: Arc<RwLock<Registry>>) -> Self {
|
|
|
+ let mut writable_registry = registry.write().await;
|
|
|
+ let keeper_metrics = KeeperMetrics::default();
|
|
|
+
|
|
|
+ writable_registry.register(
|
|
|
+ "current_sequence_number",
|
|
|
+ "The sequence number for a new request",
|
|
|
+ keeper_metrics.current_sequence_number.clone(),
|
|
|
+ );
|
|
|
+
|
|
|
+ writable_registry.register(
|
|
|
+ "end_sequence_number",
|
|
|
+ "The sequence number for the end request",
|
|
|
+ keeper_metrics.end_sequence_number.clone(),
|
|
|
+ );
|
|
|
+
|
|
|
+ writable_registry.register(
|
|
|
+ "requests",
|
|
|
+ "Number of requests received through events",
|
|
|
+ keeper_metrics.requests.clone(),
|
|
|
+ );
|
|
|
+
|
|
|
+ writable_registry.register(
|
|
|
+ "requests_processed",
|
|
|
+ "Number of requests processed",
|
|
|
+ keeper_metrics.requests_processed.clone(),
|
|
|
+ );
|
|
|
+
|
|
|
+ writable_registry.register(
|
|
|
+ "reveal",
|
|
|
+ "Number of reveals",
|
|
|
+ keeper_metrics.reveals.clone(),
|
|
|
+ );
|
|
|
+
|
|
|
+ writable_registry.register(
|
|
|
+ "balance",
|
|
|
+ "Balance of the keeper",
|
|
|
+ keeper_metrics.balance.clone(),
|
|
|
+ );
|
|
|
|
|
|
+ writable_registry.register(
|
|
|
+ "collected_fee",
|
|
|
+ "Collected fee on the contract",
|
|
|
+ keeper_metrics.collected_fee.clone(),
|
|
|
+ );
|
|
|
+
|
|
|
+ writable_registry.register(
|
|
|
+ "total_gas_spent",
|
|
|
+ "Total gas spent revealing requests",
|
|
|
+ keeper_metrics.total_gas_spent.clone(),
|
|
|
+ );
|
|
|
+
|
|
|
+ keeper_metrics
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug)]
|
|
|
+pub struct BlockRange {
|
|
|
+ pub from: BlockNumber,
|
|
|
+ pub to: BlockNumber,
|
|
|
+}
|
|
|
|
|
|
/// Get the latest safe block number for the chain. Retry internally if there is an error.
|
|
|
async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
|
|
|
@@ -88,7 +187,11 @@ pub async fn run_keeper_threads(
|
|
|
private_key: String,
|
|
|
chain_eth_config: EthereumConfig,
|
|
|
chain_state: BlockchainState,
|
|
|
+ metrics: Arc<RwLock<Registry>>,
|
|
|
) {
|
|
|
+ // Register metrics
|
|
|
+ let keeper_metrics = Arc::new(KeeperMetrics::new(metrics.clone()).await);
|
|
|
+
|
|
|
tracing::info!("starting keeper");
|
|
|
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
|
|
|
tracing::info!("latest safe block: {}", &latest_safe_block);
|
|
|
@@ -98,6 +201,7 @@ pub async fn run_keeper_threads(
|
|
|
.await
|
|
|
.expect("Chain config should be valid"),
|
|
|
);
|
|
|
+ let keeper_address = contract.client().inner().inner().signer().address();
|
|
|
|
|
|
// Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
|
|
|
spawn(
|
|
|
@@ -109,6 +213,7 @@ pub async fn run_keeper_threads(
|
|
|
contract.clone(),
|
|
|
chain_eth_config.gas_limit,
|
|
|
chain_state.clone(),
|
|
|
+ keeper_metrics.clone(),
|
|
|
)
|
|
|
.in_current_span(),
|
|
|
);
|
|
|
@@ -131,9 +236,47 @@ pub async fn run_keeper_threads(
|
|
|
rx,
|
|
|
Arc::clone(&contract),
|
|
|
chain_eth_config.gas_limit,
|
|
|
+ keeper_metrics.clone(),
|
|
|
)
|
|
|
.in_current_span(),
|
|
|
);
|
|
|
+
|
|
|
+ // Spawn a thread to track the provider info and the balance of the keeper
|
|
|
+ spawn(
|
|
|
+ async move {
|
|
|
+ let chain_id = chain_state.id.clone();
|
|
|
+ let chain_config = chain_eth_config.clone();
|
|
|
+ let provider_address = chain_state.provider_address.clone();
|
|
|
+ let keeper_metrics = keeper_metrics.clone();
|
|
|
+
|
|
|
+ loop {
|
|
|
+ // There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
|
|
|
+ // If rpc start fails all of these threads will just exit, instead of retrying.
|
|
|
+ // We are tracking rpc failures elsewhere, so it's fine.
|
|
|
+ spawn(
|
|
|
+ track_provider(
|
|
|
+ chain_id.clone(),
|
|
|
+ chain_config.clone(),
|
|
|
+ provider_address.clone(),
|
|
|
+ keeper_metrics.clone(),
|
|
|
+ )
|
|
|
+ .in_current_span(),
|
|
|
+ );
|
|
|
+ spawn(
|
|
|
+ track_balance(
|
|
|
+ chain_id.clone(),
|
|
|
+ chain_config.clone(),
|
|
|
+ keeper_address.clone(),
|
|
|
+ keeper_metrics.clone(),
|
|
|
+ )
|
|
|
+ .in_current_span(),
|
|
|
+ );
|
|
|
+
|
|
|
+ time::sleep(TRACK_INTERVAL).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ .in_current_span(),
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -146,6 +289,7 @@ pub async fn process_event(
|
|
|
chain_config: &BlockchainState,
|
|
|
contract: &Arc<SignablePythContract>,
|
|
|
gas_limit: U256,
|
|
|
+ metrics: Arc<KeeperMetrics>,
|
|
|
) -> Result<()> {
|
|
|
if chain_config.provider_address != event.provider_address {
|
|
|
return Ok(());
|
|
|
@@ -221,14 +365,53 @@ pub async fn process_event(
|
|
|
};
|
|
|
|
|
|
match pending_tx.await {
|
|
|
- Ok(res) => {
|
|
|
- tracing::info!(
|
|
|
- sequence_number = &event.sequence_number,
|
|
|
- "Revealed with res: {:?}",
|
|
|
- res
|
|
|
- );
|
|
|
- Ok(())
|
|
|
- }
|
|
|
+ Ok(res) => match res {
|
|
|
+ Some(res) => {
|
|
|
+ tracing::info!(
|
|
|
+ sequence_number = &event.sequence_number,
|
|
|
+ transaction_hash = &res.transaction_hash.to_string(),
|
|
|
+ gas_used = ?res.gas_used,
|
|
|
+ "Revealed with res: {:?}",
|
|
|
+ res
|
|
|
+ );
|
|
|
+
|
|
|
+ if let Some(gas_used) = res.gas_used {
|
|
|
+ let gas_used = gas_used.as_u128() as f64 / 1e18;
|
|
|
+ metrics
|
|
|
+ .total_gas_spent
|
|
|
+ .get_or_create(&AccountLabel {
|
|
|
+ chain_id: chain_config.id.clone(),
|
|
|
+ address: contract
|
|
|
+ .client()
|
|
|
+ .inner()
|
|
|
+ .inner()
|
|
|
+ .signer()
|
|
|
+ .address()
|
|
|
+ .to_string(),
|
|
|
+ })
|
|
|
+ .inc_by(gas_used);
|
|
|
+ }
|
|
|
+
|
|
|
+ metrics
|
|
|
+ .reveals
|
|
|
+ .get_or_create(&AccountLabel {
|
|
|
+ chain_id: chain_config.id.clone(),
|
|
|
+ address: chain_config.provider_address.to_string(),
|
|
|
+ })
|
|
|
+ .inc();
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+ None => {
|
|
|
+ tracing::error!(
|
|
|
+ sequence_number = &event.sequence_number,
|
|
|
+ "Can't verify the reveal"
|
|
|
+ );
|
|
|
+ // It is better to return an error in this scenario
|
|
|
+ // For the caller to retry
|
|
|
+ Err(anyhow!("Can't verify the reveal"))
|
|
|
+ }
|
|
|
+ },
|
|
|
+
|
|
|
Err(e) => {
|
|
|
tracing::error!(
|
|
|
sequence_number = &event.sequence_number,
|
|
|
@@ -266,6 +449,7 @@ pub async fn process_block_range(
|
|
|
contract: Arc<SignablePythContract>,
|
|
|
gas_limit: U256,
|
|
|
chain_state: api::BlockchainState,
|
|
|
+ metrics: Arc<KeeperMetrics>,
|
|
|
) {
|
|
|
let BlockRange {
|
|
|
from: first_block,
|
|
|
@@ -286,6 +470,7 @@ pub async fn process_block_range(
|
|
|
contract.clone(),
|
|
|
gas_limit,
|
|
|
chain_state.clone(),
|
|
|
+ metrics.clone(),
|
|
|
)
|
|
|
.in_current_span()
|
|
|
.await;
|
|
|
@@ -302,6 +487,7 @@ pub async fn process_single_block_batch(
|
|
|
contract: Arc<SignablePythContract>,
|
|
|
gas_limit: U256,
|
|
|
chain_state: api::BlockchainState,
|
|
|
+ metrics: Arc<KeeperMetrics>,
|
|
|
) {
|
|
|
loop {
|
|
|
let events_res = chain_state
|
|
|
@@ -313,11 +499,23 @@ pub async fn process_single_block_batch(
|
|
|
Ok(events) => {
|
|
|
tracing::info!(num_of_events = &events.len(), "Processing",);
|
|
|
for event in &events {
|
|
|
+ metrics
|
|
|
+ .requests
|
|
|
+ .get_or_create(&AccountLabel {
|
|
|
+ chain_id: chain_state.id.clone(),
|
|
|
+ address: chain_state.provider_address.to_string(),
|
|
|
+ })
|
|
|
+ .inc();
|
|
|
tracing::info!(sequence_number = &event.sequence_number, "Processing event",);
|
|
|
- while let Err(e) =
|
|
|
- process_event(event.clone(), &chain_state, &contract, gas_limit)
|
|
|
- .in_current_span()
|
|
|
- .await
|
|
|
+ while let Err(e) = process_event(
|
|
|
+ event.clone(),
|
|
|
+ &chain_state,
|
|
|
+ &contract,
|
|
|
+ gas_limit,
|
|
|
+ metrics.clone(),
|
|
|
+ )
|
|
|
+ .in_current_span()
|
|
|
+ .await
|
|
|
{
|
|
|
tracing::error!(
|
|
|
sequence_number = &event.sequence_number,
|
|
|
@@ -328,6 +526,13 @@ pub async fn process_single_block_batch(
|
|
|
time::sleep(RETRY_INTERVAL).await;
|
|
|
}
|
|
|
tracing::info!(sequence_number = &event.sequence_number, "Processed event",);
|
|
|
+ metrics
|
|
|
+ .requests_processed
|
|
|
+ .get_or_create(&AccountLabel {
|
|
|
+ chain_id: chain_state.id.clone(),
|
|
|
+ address: chain_state.provider_address.to_string(),
|
|
|
+ })
|
|
|
+ .inc();
|
|
|
}
|
|
|
tracing::info!(num_of_events = &events.len(), "Processed",);
|
|
|
break;
|
|
|
@@ -455,6 +660,7 @@ pub async fn process_new_blocks(
|
|
|
mut rx: mpsc::Receiver<BlockRange>,
|
|
|
contract: Arc<SignablePythContract>,
|
|
|
gas_limit: U256,
|
|
|
+ metrics: Arc<KeeperMetrics>,
|
|
|
) {
|
|
|
tracing::info!("Waiting for new block ranges to process");
|
|
|
loop {
|
|
|
@@ -464,6 +670,7 @@ pub async fn process_new_blocks(
|
|
|
Arc::clone(&contract),
|
|
|
gas_limit,
|
|
|
chain_state.clone(),
|
|
|
+ metrics.clone(),
|
|
|
)
|
|
|
.in_current_span()
|
|
|
.await;
|
|
|
@@ -478,10 +685,110 @@ pub async fn process_backlog(
|
|
|
contract: Arc<SignablePythContract>,
|
|
|
gas_limit: U256,
|
|
|
chain_state: BlockchainState,
|
|
|
+ metrics: Arc<KeeperMetrics>,
|
|
|
) {
|
|
|
tracing::info!("Processing backlog");
|
|
|
- process_block_range(backlog_range, contract, gas_limit, chain_state)
|
|
|
+ process_block_range(backlog_range, contract, gas_limit, chain_state, metrics)
|
|
|
.in_current_span()
|
|
|
.await;
|
|
|
tracing::info!("Backlog processed");
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+/// tracks the balance of the given address on the given chain
|
|
|
+/// if there was an error, the function will just return
|
|
|
+#[tracing::instrument(skip_all)]
|
|
|
+pub async fn track_balance(
|
|
|
+ chain_id: String,
|
|
|
+ chain_config: EthereumConfig,
|
|
|
+ address: Address,
|
|
|
+ metrics_registry: Arc<KeeperMetrics>,
|
|
|
+) {
|
|
|
+ let provider = match Provider::<Http>::try_from(&chain_config.geth_rpc_addr) {
|
|
|
+ Ok(r) => r,
|
|
|
+ Err(e) => {
|
|
|
+ tracing::error!("Error while connecting to geth rpc. error: {:?}", e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ let balance = match provider.get_balance(address, None).await {
|
|
|
+ // This conversion to u128 is fine as the total balance will never cross the limits
|
|
|
+ // of u128 practically.
|
|
|
+ Ok(r) => r.as_u128(),
|
|
|
+ Err(e) => {
|
|
|
+ tracing::error!("Error while getting balance. error: {:?}", e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus.
|
|
|
+ // The balance is in wei, so we need to divide by 1e18 to convert it to eth.
|
|
|
+ let balance = balance as f64 / 1e18;
|
|
|
+
|
|
|
+ metrics_registry
|
|
|
+ .balance
|
|
|
+ .get_or_create(&AccountLabel {
|
|
|
+ chain_id: chain_id.clone(),
|
|
|
+ address: address.to_string(),
|
|
|
+ })
|
|
|
+ .set(balance);
|
|
|
+}
|
|
|
+
|
|
|
+/// tracks the collected fees and the hashchain data of the given provider address on the given chain
|
|
|
+/// if there is a error the function will just return
|
|
|
+#[tracing::instrument(skip_all)]
|
|
|
+pub async fn track_provider(
|
|
|
+ chain_id: String,
|
|
|
+ chain_config: EthereumConfig,
|
|
|
+ provider_address: Address,
|
|
|
+ metrics_registry: Arc<KeeperMetrics>,
|
|
|
+) {
|
|
|
+ let contract = match PythContract::from_config(&chain_config) {
|
|
|
+ Ok(r) => r,
|
|
|
+ Err(e) => {
|
|
|
+ tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ let provider_info = match contract.get_provider_info(provider_address).call().await {
|
|
|
+ Ok(info) => info,
|
|
|
+ Err(e) => {
|
|
|
+ tracing::error!("Error while getting provider info. error: {:?}", e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
|
|
|
+ // The fee is in wei, so we divide by 1e18 to convert it to eth.
|
|
|
+ let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18;
|
|
|
+
|
|
|
+ let current_sequence_number = provider_info.sequence_number;
|
|
|
+ let end_sequence_number = provider_info.end_sequence_number;
|
|
|
+
|
|
|
+ metrics_registry
|
|
|
+ .collected_fee
|
|
|
+ .get_or_create(&AccountLabel {
|
|
|
+ chain_id: chain_id.clone(),
|
|
|
+ address: provider_address.to_string(),
|
|
|
+ })
|
|
|
+ .set(collected_fee);
|
|
|
+
|
|
|
+ metrics_registry
|
|
|
+ .current_sequence_number
|
|
|
+ .get_or_create(&AccountLabel {
|
|
|
+ chain_id: chain_id.clone(),
|
|
|
+ address: provider_address.to_string(),
|
|
|
+ })
|
|
|
+ // sequence_number type on chain is u64 but practically it will take
|
|
|
+ // a long time for it to cross the limits of i64.
|
|
|
+ // currently prometheus only supports i64 for Gauge types
|
|
|
+ .set(current_sequence_number as i64);
|
|
|
+ metrics_registry
|
|
|
+ .end_sequence_number
|
|
|
+ .get_or_create(&AccountLabel {
|
|
|
+ chain_id: chain_id.clone(),
|
|
|
+ address: provider_address.to_string(),
|
|
|
+ })
|
|
|
+ .set(end_sequence_number as i64);
|
|
|
+}
|