|
@@ -30,8 +30,6 @@ const RETRY_INTERVAL: Duration = Duration::from_secs(5);
|
|
|
const BLOCK_BATCH_SIZE: u64 = 100;
|
|
const BLOCK_BATCH_SIZE: u64 = 100;
|
|
|
/// How much to wait before polling the next latest block
|
|
/// How much to wait before polling the next latest block
|
|
|
const POLL_INTERVAL: Duration = Duration::from_secs(2);
|
|
const POLL_INTERVAL: Duration = Duration::from_secs(2);
|
|
|
-/// Retry last N blocks
|
|
|
|
|
-const RETRY_PREVIOUS_BLOCKS: u64 = 100;
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
#[derive(Debug, Clone)]
|
|
|
pub struct BlockRange {
|
|
pub struct BlockRange {
|
|
@@ -196,6 +194,7 @@ pub async fn watch_blocks_wrapper(
|
|
|
chain_state: BlockchainState,
|
|
chain_state: BlockchainState,
|
|
|
latest_safe_block: BlockNumber,
|
|
latest_safe_block: BlockNumber,
|
|
|
tx: mpsc::Sender<BlockRange>,
|
|
tx: mpsc::Sender<BlockRange>,
|
|
|
|
|
+ retry_previous_blocks: u64,
|
|
|
) {
|
|
) {
|
|
|
let mut last_safe_block_processed = latest_safe_block;
|
|
let mut last_safe_block_processed = latest_safe_block;
|
|
|
loop {
|
|
loop {
|
|
@@ -203,6 +202,7 @@ pub async fn watch_blocks_wrapper(
|
|
|
chain_state.clone(),
|
|
chain_state.clone(),
|
|
|
&mut last_safe_block_processed,
|
|
&mut last_safe_block_processed,
|
|
|
tx.clone(),
|
|
tx.clone(),
|
|
|
|
|
+ retry_previous_blocks,
|
|
|
)
|
|
)
|
|
|
.in_current_span()
|
|
.in_current_span()
|
|
|
.await
|
|
.await
|
|
@@ -221,6 +221,7 @@ pub async fn watch_blocks(
|
|
|
chain_state: BlockchainState,
|
|
chain_state: BlockchainState,
|
|
|
last_safe_block_processed: &mut BlockNumber,
|
|
last_safe_block_processed: &mut BlockNumber,
|
|
|
tx: mpsc::Sender<BlockRange>,
|
|
tx: mpsc::Sender<BlockRange>,
|
|
|
|
|
+ retry_previous_blocks: u64,
|
|
|
) -> Result<()> {
|
|
) -> Result<()> {
|
|
|
tracing::info!("Watching blocks to handle new events");
|
|
tracing::info!("Watching blocks to handle new events");
|
|
|
|
|
|
|
@@ -229,7 +230,7 @@ pub async fn watch_blocks(
|
|
|
|
|
|
|
|
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
|
|
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
|
|
|
if latest_safe_block > *last_safe_block_processed {
|
|
if latest_safe_block > *last_safe_block_processed {
|
|
|
- let mut from = latest_safe_block.saturating_sub(RETRY_PREVIOUS_BLOCKS);
|
|
|
|
|
|
|
+ let mut from = latest_safe_block.saturating_sub(retry_previous_blocks);
|
|
|
|
|
|
|
|
// In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10)
|
|
// In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10)
|
|
|
// TODO: add a metric for this in separate PR. We need alerts
|
|
// TODO: add a metric for this in separate PR. We need alerts
|