|
|
@@ -6,28 +6,18 @@ use {
|
|
|
ChainWriter,
|
|
|
RevealError,
|
|
|
},
|
|
|
- ethereum::{
|
|
|
- PythContract,
|
|
|
- SignablePythContract,
|
|
|
- },
|
|
|
reader::BlockNumber,
|
|
|
},
|
|
|
- config::EthereumConfig,
|
|
|
state::HashChainState,
|
|
|
},
|
|
|
anyhow::{
|
|
|
anyhow,
|
|
|
Result,
|
|
|
},
|
|
|
- ethers::{
|
|
|
- providers::{
|
|
|
- Http,
|
|
|
- Middleware,
|
|
|
- Provider,
|
|
|
- Ws,
|
|
|
- },
|
|
|
- signers::Signer,
|
|
|
- types::Address,
|
|
|
+ ethers::providers::{
|
|
|
+ Middleware,
|
|
|
+ Provider,
|
|
|
+ Ws,
|
|
|
},
|
|
|
futures::StreamExt,
|
|
|
prometheus_client::{
|
|
|
@@ -270,8 +260,7 @@ async fn get_latest_safe_block(chain_writer: Arc<dyn ChainWriter>) -> BlockNumbe
|
|
|
#[tracing::instrument(name="keeper", skip_all, fields(chain_id=chain_state.id))]
|
|
|
pub async fn run_keeper_threads(
|
|
|
chain_writer: Arc<dyn ChainWriter>,
|
|
|
- private_key: String,
|
|
|
- chain_eth_config: EthereumConfig,
|
|
|
+ geth_rpc_wss: Option<String>,
|
|
|
chain_state: BlockchainState,
|
|
|
metrics: Arc<RwLock<Registry>>,
|
|
|
) {
|
|
|
@@ -284,13 +273,6 @@ pub async fn run_keeper_threads(
|
|
|
.await;
|
|
|
tracing::info!("latest safe block: {}", &latest_safe_block);
|
|
|
|
|
|
- let contract = Arc::new(
|
|
|
- SignablePythContract::from_config(&chain_eth_config, &private_key)
|
|
|
- .await
|
|
|
- .expect("Chain config should be valid"),
|
|
|
- );
|
|
|
- let keeper_address = contract.client().inner().inner().signer().address();
|
|
|
-
|
|
|
let request_cache = Arc::new(RequestCache {
|
|
|
cache: RwLock::new(HashMap::<u64, RequestState>::new()),
|
|
|
});
|
|
|
@@ -316,19 +298,12 @@ pub async fn run_keeper_threads(
|
|
|
chain_writer.clone(),
|
|
|
latest_safe_block,
|
|
|
tx,
|
|
|
- chain_eth_config.geth_rpc_wss.clone(),
|
|
|
+ geth_rpc_wss.clone(),
|
|
|
)
|
|
|
.in_current_span(),
|
|
|
);
|
|
|
- // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
|
|
|
-
|
|
|
- // chain_writer: Arc<dyn ChainWriter>,
|
|
|
- // hash_chain: Arc<HashChainState>,
|
|
|
- // mut rx: mpsc::Receiver<BlockRange>,
|
|
|
- // metrics: Arc<KeeperMetrics>,
|
|
|
- // request_cache: Arc<RequestCache>,
|
|
|
-
|
|
|
|
|
|
+ // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
|
|
|
spawn(
|
|
|
process_new_blocks(
|
|
|
chain_writer.clone(),
|
|
|
@@ -344,8 +319,6 @@ pub async fn run_keeper_threads(
|
|
|
spawn(
|
|
|
async move {
|
|
|
let chain_id = chain_state.id.clone();
|
|
|
- let chain_config = chain_eth_config.clone();
|
|
|
- let provider_address = chain_state.provider_address.clone();
|
|
|
let keeper_metrics = keeper_metrics.clone();
|
|
|
|
|
|
loop {
|
|
|
@@ -355,8 +328,7 @@ pub async fn run_keeper_threads(
|
|
|
spawn(
|
|
|
track_provider(
|
|
|
chain_id.clone(),
|
|
|
- chain_config.clone(),
|
|
|
- provider_address.clone(),
|
|
|
+ chain_writer.clone(),
|
|
|
keeper_metrics.clone(),
|
|
|
)
|
|
|
.in_current_span(),
|
|
|
@@ -364,8 +336,7 @@ pub async fn run_keeper_threads(
|
|
|
spawn(
|
|
|
track_balance(
|
|
|
chain_id.clone(),
|
|
|
- chain_config.clone(),
|
|
|
- keeper_address.clone(),
|
|
|
+ chain_writer.clone(),
|
|
|
keeper_metrics.clone(),
|
|
|
)
|
|
|
.in_current_span(),
|
|
|
@@ -698,36 +669,23 @@ pub async fn process_backlog(
|
|
|
#[tracing::instrument(skip_all)]
|
|
|
pub async fn track_balance(
|
|
|
chain_id: String,
|
|
|
- chain_config: EthereumConfig,
|
|
|
- address: Address,
|
|
|
+ chain_writer: Arc<dyn ChainWriter>,
|
|
|
metrics_registry: Arc<KeeperMetrics>,
|
|
|
) {
|
|
|
- let provider = match Provider::<Http>::try_from(&chain_config.geth_rpc_addr) {
|
|
|
+ let writer_address = chain_writer.get_writer_address();
|
|
|
+ let balance = match chain_writer.get_writer_balance().await {
|
|
|
Ok(r) => r,
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("Error while connecting to geth rpc. error: {:?}", e);
|
|
|
- return;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- let balance = match provider.get_balance(address, None).await {
|
|
|
- // This conversion to u128 is fine as the total balance will never cross the limits
|
|
|
- // of u128 practically.
|
|
|
- Ok(r) => r.as_u128(),
|
|
|
Err(e) => {
|
|
|
tracing::error!("Error while getting balance. error: {:?}", e);
|
|
|
return;
|
|
|
}
|
|
|
};
|
|
|
- // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus.
|
|
|
- // The balance is in wei, so we need to divide by 1e18 to convert it to eth.
|
|
|
- let balance = balance as f64 / 1e18;
|
|
|
|
|
|
metrics_registry
|
|
|
.balance
|
|
|
.get_or_create(&AccountLabel {
|
|
|
chain_id: chain_id.clone(),
|
|
|
- address: address.to_string(),
|
|
|
+ address: writer_address,
|
|
|
})
|
|
|
.set(balance);
|
|
|
}
|
|
|
@@ -737,19 +695,11 @@ pub async fn track_balance(
|
|
|
#[tracing::instrument(skip_all)]
|
|
|
pub async fn track_provider(
|
|
|
chain_id: String,
|
|
|
- chain_config: EthereumConfig,
|
|
|
- provider_address: Address,
|
|
|
+ chain_writer: Arc<dyn ChainWriter>,
|
|
|
metrics_registry: Arc<KeeperMetrics>,
|
|
|
) {
|
|
|
- let contract = match PythContract::from_config(&chain_config) {
|
|
|
- Ok(r) => r,
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
|
|
|
- return;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- let provider_info = match contract.get_provider_info(provider_address).call().await {
|
|
|
+ let provider_address = chain_writer.get_provider_address();
|
|
|
+ let provider_info = match chain_writer.get_provider_info().await {
|
|
|
Ok(info) => info,
|
|
|
Err(e) => {
|
|
|
tracing::error!("Error while getting provider info. error: {:?}", e);
|
|
|
@@ -757,11 +707,8 @@ pub async fn track_provider(
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
|
|
|
- // The fee is in wei, so we divide by 1e18 to convert it to eth.
|
|
|
- let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18;
|
|
|
-
|
|
|
- let current_sequence_number = provider_info.sequence_number;
|
|
|
+ let collected_fee = provider_info.accrued_fee;
|
|
|
+ let current_sequence_number = provider_info.current_sequence_number;
|
|
|
let end_sequence_number = provider_info.end_sequence_number;
|
|
|
|
|
|
metrics_registry
|