|
@@ -3,16 +3,21 @@ use {
|
|
|
api::{
|
|
api::{
|
|
|
self,
|
|
self,
|
|
|
BlockchainState,
|
|
BlockchainState,
|
|
|
|
|
+ ChainId,
|
|
|
},
|
|
},
|
|
|
chain::{
|
|
chain::{
|
|
|
ethereum::{
|
|
ethereum::{
|
|
|
- PythContract,
|
|
|
|
|
- SignablePythContract,
|
|
|
|
|
|
|
+ InstrumentedPythContract,
|
|
|
|
|
+ InstrumentedSignablePythContract,
|
|
|
},
|
|
},
|
|
|
reader::{
|
|
reader::{
|
|
|
BlockNumber,
|
|
BlockNumber,
|
|
|
RequestedWithCallbackEvent,
|
|
RequestedWithCallbackEvent,
|
|
|
},
|
|
},
|
|
|
|
|
+ traced_client::{
|
|
|
|
|
+ RpcMetrics,
|
|
|
|
|
+ TracedClient,
|
|
|
|
|
+ },
|
|
|
},
|
|
},
|
|
|
config::EthereumConfig,
|
|
config::EthereumConfig,
|
|
|
},
|
|
},
|
|
@@ -23,7 +28,6 @@ use {
|
|
|
backoff::ExponentialBackoff,
|
|
backoff::ExponentialBackoff,
|
|
|
ethers::{
|
|
ethers::{
|
|
|
providers::{
|
|
providers::{
|
|
|
- Http,
|
|
|
|
|
Middleware,
|
|
Middleware,
|
|
|
Provider,
|
|
Provider,
|
|
|
Ws,
|
|
Ws,
|
|
@@ -209,19 +213,22 @@ pub async fn run_keeper_threads(
|
|
|
private_key: String,
|
|
private_key: String,
|
|
|
chain_eth_config: EthereumConfig,
|
|
chain_eth_config: EthereumConfig,
|
|
|
chain_state: BlockchainState,
|
|
chain_state: BlockchainState,
|
|
|
- metrics: Arc<RwLock<Registry>>,
|
|
|
|
|
|
|
+ metrics: Arc<KeeperMetrics>,
|
|
|
|
|
+ rpc_metrics: Arc<RpcMetrics>,
|
|
|
) {
|
|
) {
|
|
|
- // Register metrics
|
|
|
|
|
- let keeper_metrics = Arc::new(KeeperMetrics::new(metrics.clone()).await);
|
|
|
|
|
-
|
|
|
|
|
tracing::info!("starting keeper");
|
|
tracing::info!("starting keeper");
|
|
|
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
|
|
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
|
|
|
tracing::info!("latest safe block: {}", &latest_safe_block);
|
|
tracing::info!("latest safe block: {}", &latest_safe_block);
|
|
|
|
|
|
|
|
let contract = Arc::new(
|
|
let contract = Arc::new(
|
|
|
- SignablePythContract::from_config(&chain_eth_config, &private_key)
|
|
|
|
|
- .await
|
|
|
|
|
- .expect("Chain config should be valid"),
|
|
|
|
|
|
|
+ InstrumentedSignablePythContract::from_config(
|
|
|
|
|
+ &chain_eth_config,
|
|
|
|
|
+ &private_key,
|
|
|
|
|
+ chain_state.id.clone(),
|
|
|
|
|
+ rpc_metrics.clone(),
|
|
|
|
|
+ )
|
|
|
|
|
+ .await
|
|
|
|
|
+ .expect("Chain config should be valid"),
|
|
|
);
|
|
);
|
|
|
let keeper_address = contract.client().inner().inner().inner().signer().address();
|
|
let keeper_address = contract.client().inner().inner().inner().signer().address();
|
|
|
|
|
|
|
@@ -238,7 +245,7 @@ pub async fn run_keeper_threads(
|
|
|
contract.clone(),
|
|
contract.clone(),
|
|
|
gas_limit,
|
|
gas_limit,
|
|
|
chain_state.clone(),
|
|
chain_state.clone(),
|
|
|
- keeper_metrics.clone(),
|
|
|
|
|
|
|
+ metrics.clone(),
|
|
|
fulfilled_requests_cache.clone(),
|
|
fulfilled_requests_cache.clone(),
|
|
|
)
|
|
)
|
|
|
.in_current_span(),
|
|
.in_current_span(),
|
|
@@ -262,7 +269,7 @@ pub async fn run_keeper_threads(
|
|
|
rx,
|
|
rx,
|
|
|
Arc::clone(&contract),
|
|
Arc::clone(&contract),
|
|
|
gas_limit,
|
|
gas_limit,
|
|
|
- keeper_metrics.clone(),
|
|
|
|
|
|
|
+ metrics.clone(),
|
|
|
fulfilled_requests_cache.clone(),
|
|
fulfilled_requests_cache.clone(),
|
|
|
)
|
|
)
|
|
|
.in_current_span(),
|
|
.in_current_span(),
|
|
@@ -274,7 +281,18 @@ pub async fn run_keeper_threads(
|
|
|
let chain_id = chain_state.id.clone();
|
|
let chain_id = chain_state.id.clone();
|
|
|
let chain_config = chain_eth_config.clone();
|
|
let chain_config = chain_eth_config.clone();
|
|
|
let provider_address = chain_state.provider_address.clone();
|
|
let provider_address = chain_state.provider_address.clone();
|
|
|
- let keeper_metrics = keeper_metrics.clone();
|
|
|
|
|
|
|
+ let keeper_metrics = metrics.clone();
|
|
|
|
|
+ let contract = match InstrumentedPythContract::from_config(
|
|
|
|
|
+ &chain_config,
|
|
|
|
|
+ chain_id.clone(),
|
|
|
|
|
+ rpc_metrics,
|
|
|
|
|
+ ) {
|
|
|
|
|
+ Ok(r) => r,
|
|
|
|
|
+ Err(e) => {
|
|
|
|
|
+ tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ };
|
|
|
|
|
|
|
|
loop {
|
|
loop {
|
|
|
// There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
|
|
// There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
|
|
@@ -283,7 +301,7 @@ pub async fn run_keeper_threads(
|
|
|
spawn(
|
|
spawn(
|
|
|
track_provider(
|
|
track_provider(
|
|
|
chain_id.clone(),
|
|
chain_id.clone(),
|
|
|
- chain_config.clone(),
|
|
|
|
|
|
|
+ contract.clone(),
|
|
|
provider_address.clone(),
|
|
provider_address.clone(),
|
|
|
keeper_metrics.clone(),
|
|
keeper_metrics.clone(),
|
|
|
)
|
|
)
|
|
@@ -292,7 +310,7 @@ pub async fn run_keeper_threads(
|
|
|
spawn(
|
|
spawn(
|
|
|
track_balance(
|
|
track_balance(
|
|
|
chain_id.clone(),
|
|
chain_id.clone(),
|
|
|
- chain_config.clone(),
|
|
|
|
|
|
|
+ contract.client(),
|
|
|
keeper_address.clone(),
|
|
keeper_address.clone(),
|
|
|
keeper_metrics.clone(),
|
|
keeper_metrics.clone(),
|
|
|
)
|
|
)
|
|
@@ -314,7 +332,7 @@ pub async fn run_keeper_threads(
|
|
|
pub async fn process_event_with_backoff(
|
|
pub async fn process_event_with_backoff(
|
|
|
event: RequestedWithCallbackEvent,
|
|
event: RequestedWithCallbackEvent,
|
|
|
chain_state: BlockchainState,
|
|
chain_state: BlockchainState,
|
|
|
- contract: Arc<SignablePythContract>,
|
|
|
|
|
|
|
+ contract: Arc<InstrumentedSignablePythContract>,
|
|
|
gas_limit: U256,
|
|
gas_limit: U256,
|
|
|
metrics: Arc<KeeperMetrics>,
|
|
metrics: Arc<KeeperMetrics>,
|
|
|
) {
|
|
) {
|
|
@@ -363,7 +381,7 @@ pub async fn process_event_with_backoff(
|
|
|
pub async fn process_event(
|
|
pub async fn process_event(
|
|
|
event: &RequestedWithCallbackEvent,
|
|
event: &RequestedWithCallbackEvent,
|
|
|
chain_config: &BlockchainState,
|
|
chain_config: &BlockchainState,
|
|
|
- contract: &Arc<SignablePythContract>,
|
|
|
|
|
|
|
+ contract: &InstrumentedSignablePythContract,
|
|
|
gas_limit: U256,
|
|
gas_limit: U256,
|
|
|
metrics: Arc<KeeperMetrics>,
|
|
metrics: Arc<KeeperMetrics>,
|
|
|
) -> Result<(), backoff::Error<anyhow::Error>> {
|
|
) -> Result<(), backoff::Error<anyhow::Error>> {
|
|
@@ -493,7 +511,7 @@ pub async fn process_event(
|
|
|
))]
|
|
))]
|
|
|
pub async fn process_block_range(
|
|
pub async fn process_block_range(
|
|
|
block_range: BlockRange,
|
|
block_range: BlockRange,
|
|
|
- contract: Arc<SignablePythContract>,
|
|
|
|
|
|
|
+ contract: Arc<InstrumentedSignablePythContract>,
|
|
|
gas_limit: U256,
|
|
gas_limit: U256,
|
|
|
chain_state: api::BlockchainState,
|
|
chain_state: api::BlockchainState,
|
|
|
metrics: Arc<KeeperMetrics>,
|
|
metrics: Arc<KeeperMetrics>,
|
|
@@ -538,7 +556,7 @@ pub async fn process_block_range(
|
|
|
))]
|
|
))]
|
|
|
pub async fn process_single_block_batch(
|
|
pub async fn process_single_block_batch(
|
|
|
block_range: BlockRange,
|
|
block_range: BlockRange,
|
|
|
- contract: Arc<SignablePythContract>,
|
|
|
|
|
|
|
+ contract: Arc<InstrumentedSignablePythContract>,
|
|
|
gas_limit: U256,
|
|
gas_limit: U256,
|
|
|
chain_state: api::BlockchainState,
|
|
chain_state: api::BlockchainState,
|
|
|
metrics: Arc<KeeperMetrics>,
|
|
metrics: Arc<KeeperMetrics>,
|
|
@@ -712,7 +730,7 @@ pub async fn watch_blocks(
|
|
|
pub async fn process_new_blocks(
|
|
pub async fn process_new_blocks(
|
|
|
chain_state: BlockchainState,
|
|
chain_state: BlockchainState,
|
|
|
mut rx: mpsc::Receiver<BlockRange>,
|
|
mut rx: mpsc::Receiver<BlockRange>,
|
|
|
- contract: Arc<SignablePythContract>,
|
|
|
|
|
|
|
+ contract: Arc<InstrumentedSignablePythContract>,
|
|
|
gas_limit: U256,
|
|
gas_limit: U256,
|
|
|
metrics: Arc<KeeperMetrics>,
|
|
metrics: Arc<KeeperMetrics>,
|
|
|
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
|
|
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
|
|
@@ -738,7 +756,7 @@ pub async fn process_new_blocks(
|
|
|
#[tracing::instrument(skip_all)]
|
|
#[tracing::instrument(skip_all)]
|
|
|
pub async fn process_backlog(
|
|
pub async fn process_backlog(
|
|
|
backlog_range: BlockRange,
|
|
backlog_range: BlockRange,
|
|
|
- contract: Arc<SignablePythContract>,
|
|
|
|
|
|
|
+ contract: Arc<InstrumentedSignablePythContract>,
|
|
|
gas_limit: U256,
|
|
gas_limit: U256,
|
|
|
chain_state: BlockchainState,
|
|
chain_state: BlockchainState,
|
|
|
metrics: Arc<KeeperMetrics>,
|
|
metrics: Arc<KeeperMetrics>,
|
|
@@ -764,18 +782,10 @@ pub async fn process_backlog(
|
|
|
#[tracing::instrument(skip_all)]
|
|
#[tracing::instrument(skip_all)]
|
|
|
pub async fn track_balance(
|
|
pub async fn track_balance(
|
|
|
chain_id: String,
|
|
chain_id: String,
|
|
|
- chain_config: EthereumConfig,
|
|
|
|
|
|
|
+ provider: Arc<Provider<TracedClient>>,
|
|
|
address: Address,
|
|
address: Address,
|
|
|
- metrics_registry: Arc<KeeperMetrics>,
|
|
|
|
|
|
|
+ metrics: Arc<KeeperMetrics>,
|
|
|
) {
|
|
) {
|
|
|
- let provider = match Provider::<Http>::try_from(&chain_config.geth_rpc_addr) {
|
|
|
|
|
- Ok(r) => r,
|
|
|
|
|
- Err(e) => {
|
|
|
|
|
- tracing::error!("Error while connecting to geth rpc. error: {:?}", e);
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
let balance = match provider.get_balance(address, None).await {
|
|
let balance = match provider.get_balance(address, None).await {
|
|
|
// This conversion to u128 is fine as the total balance will never cross the limits
|
|
// This conversion to u128 is fine as the total balance will never cross the limits
|
|
|
// of u128 practically.
|
|
// of u128 practically.
|
|
@@ -789,7 +799,7 @@ pub async fn track_balance(
|
|
|
// The balance is in wei, so we need to divide by 1e18 to convert it to eth.
|
|
// The balance is in wei, so we need to divide by 1e18 to convert it to eth.
|
|
|
let balance = balance as f64 / 1e18;
|
|
let balance = balance as f64 / 1e18;
|
|
|
|
|
|
|
|
- metrics_registry
|
|
|
|
|
|
|
+ metrics
|
|
|
.balance
|
|
.balance
|
|
|
.get_or_create(&AccountLabel {
|
|
.get_or_create(&AccountLabel {
|
|
|
chain_id: chain_id.clone(),
|
|
chain_id: chain_id.clone(),
|
|
@@ -802,19 +812,11 @@ pub async fn track_balance(
|
|
|
/// if there is a error the function will just return
|
|
/// if there is a error the function will just return
|
|
|
#[tracing::instrument(skip_all)]
|
|
#[tracing::instrument(skip_all)]
|
|
|
pub async fn track_provider(
|
|
pub async fn track_provider(
|
|
|
- chain_id: String,
|
|
|
|
|
- chain_config: EthereumConfig,
|
|
|
|
|
|
|
+ chain_id: ChainId,
|
|
|
|
|
+ contract: InstrumentedPythContract,
|
|
|
provider_address: Address,
|
|
provider_address: Address,
|
|
|
- metrics_registry: Arc<KeeperMetrics>,
|
|
|
|
|
|
|
+ metrics: Arc<KeeperMetrics>,
|
|
|
) {
|
|
) {
|
|
|
- let contract = match PythContract::from_config(&chain_config) {
|
|
|
|
|
- Ok(r) => r,
|
|
|
|
|
- Err(e) => {
|
|
|
|
|
- tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
let provider_info = match contract.get_provider_info(provider_address).call().await {
|
|
let provider_info = match contract.get_provider_info(provider_address).call().await {
|
|
|
Ok(info) => info,
|
|
Ok(info) => info,
|
|
|
Err(e) => {
|
|
Err(e) => {
|
|
@@ -830,7 +832,7 @@ pub async fn track_provider(
|
|
|
let current_sequence_number = provider_info.sequence_number;
|
|
let current_sequence_number = provider_info.sequence_number;
|
|
|
let end_sequence_number = provider_info.end_sequence_number;
|
|
let end_sequence_number = provider_info.end_sequence_number;
|
|
|
|
|
|
|
|
- metrics_registry
|
|
|
|
|
|
|
+ metrics
|
|
|
.collected_fee
|
|
.collected_fee
|
|
|
.get_or_create(&AccountLabel {
|
|
.get_or_create(&AccountLabel {
|
|
|
chain_id: chain_id.clone(),
|
|
chain_id: chain_id.clone(),
|
|
@@ -838,7 +840,7 @@ pub async fn track_provider(
|
|
|
})
|
|
})
|
|
|
.set(collected_fee);
|
|
.set(collected_fee);
|
|
|
|
|
|
|
|
- metrics_registry
|
|
|
|
|
|
|
+ metrics
|
|
|
.current_sequence_number
|
|
.current_sequence_number
|
|
|
.get_or_create(&AccountLabel {
|
|
.get_or_create(&AccountLabel {
|
|
|
chain_id: chain_id.clone(),
|
|
chain_id: chain_id.clone(),
|
|
@@ -848,7 +850,7 @@ pub async fn track_provider(
|
|
|
// a long time for it to cross the limits of i64.
|
|
// a long time for it to cross the limits of i64.
|
|
|
// currently prometheus only supports i64 for Gauge types
|
|
// currently prometheus only supports i64 for Gauge types
|
|
|
.set(current_sequence_number as i64);
|
|
.set(current_sequence_number as i64);
|
|
|
- metrics_registry
|
|
|
|
|
|
|
+ metrics
|
|
|
.end_sequence_number
|
|
.end_sequence_number
|
|
|
.get_or_create(&AccountLabel {
|
|
.get_or_create(&AccountLabel {
|
|
|
chain_id: chain_id.clone(),
|
|
chain_id: chain_id.clone(),
|