|
|
@@ -6,12 +6,8 @@ use {
|
|
|
keeper::keeper_metrics::KeeperMetrics,
|
|
|
keeper::process_event::process_event_with_backoff,
|
|
|
},
|
|
|
- anyhow::{anyhow, Result},
|
|
|
- ethers::{
|
|
|
- providers::{Middleware, Provider, Ws},
|
|
|
- types::U256,
|
|
|
- },
|
|
|
- futures::StreamExt,
|
|
|
+ anyhow::Result,
|
|
|
+ ethers::types::U256,
|
|
|
std::{collections::HashSet, sync::Arc},
|
|
|
tokio::{
|
|
|
spawn,
|
|
|
@@ -176,7 +172,6 @@ pub async fn watch_blocks_wrapper(
|
|
|
chain_state: BlockchainState,
|
|
|
latest_safe_block: BlockNumber,
|
|
|
tx: mpsc::Sender<BlockRange>,
|
|
|
- geth_rpc_wss: Option<String>,
|
|
|
) {
|
|
|
let mut last_safe_block_processed = latest_safe_block;
|
|
|
loop {
|
|
|
@@ -184,7 +179,6 @@ pub async fn watch_blocks_wrapper(
|
|
|
chain_state.clone(),
|
|
|
&mut last_safe_block_processed,
|
|
|
tx.clone(),
|
|
|
- geth_rpc_wss.clone(),
|
|
|
)
|
|
|
.in_current_span()
|
|
|
.await
|
|
|
@@ -203,47 +197,11 @@ pub async fn watch_blocks(
|
|
|
chain_state: BlockchainState,
|
|
|
last_safe_block_processed: &mut BlockNumber,
|
|
|
tx: mpsc::Sender<BlockRange>,
|
|
|
- geth_rpc_wss: Option<String>,
|
|
|
) -> Result<()> {
|
|
|
tracing::info!("Watching blocks to handle new events");
|
|
|
|
|
|
- let provider_option = match geth_rpc_wss {
|
|
|
- Some(wss) => Some(match Provider::<Ws>::connect(wss.clone()).await {
|
|
|
- Ok(provider) => provider,
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e);
|
|
|
- return Err(e.into());
|
|
|
- }
|
|
|
- }),
|
|
|
- None => {
|
|
|
- tracing::info!("No wss provided");
|
|
|
- None
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- let mut stream_option = match provider_option {
|
|
|
- Some(ref provider) => Some(match provider.subscribe_blocks().await {
|
|
|
- Ok(client) => client,
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("Error while subscribing to blocks. error {:?}", e);
|
|
|
- return Err(e.into());
|
|
|
- }
|
|
|
- }),
|
|
|
- None => None,
|
|
|
- };
|
|
|
-
|
|
|
loop {
|
|
|
- match stream_option {
|
|
|
- Some(ref mut stream) => {
|
|
|
- if stream.next().await.is_none() {
|
|
|
- tracing::error!("Error blocks subscription stream ended");
|
|
|
- return Err(anyhow!("Error blocks subscription stream ended"));
|
|
|
- }
|
|
|
- }
|
|
|
- None => {
|
|
|
- time::sleep(POLL_INTERVAL).await;
|
|
|
- }
|
|
|
- }
|
|
|
+ time::sleep(POLL_INTERVAL).await;
|
|
|
|
|
|
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
|
|
|
if latest_safe_block > *last_safe_block_processed {
|