Bladeren bron

feat(fortuna): retry processing backlog using configured block_delays (#2436)

* feat(fortuna): retry processing backlog using configured block_delays

Co-Authored-By: Jayant Krishnamurthy <jayant@dourolabs.xyz>

* chore(fortuna): update Cargo.lock for version bump

Co-Authored-By: Jayant Krishnamurthy <jayant@dourolabs.xyz>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Jayant Krishnamurthy <jayant@dourolabs.xyz>
devin-ai-integration[bot] 8 maanden geleden
bovenliggende
commit
c70c7f87bc
4 gewijzigde bestanden met toevoegingen van 32 en 8 verwijderingen
  1. 1 1
      apps/fortuna/Cargo.lock
  2. 1 1
      apps/fortuna/Cargo.toml
  3. 1 0
      apps/fortuna/src/keeper.rs
  4. 29 6
      apps/fortuna/src/keeper/block.rs

+ 1 - 1
apps/fortuna/Cargo.lock

@@ -1554,7 +1554,7 @@ dependencies = [
 
 [[package]]
 name = "fortuna"
-version = "7.4.2"
+version = "7.4.3"
 dependencies = [
  "anyhow",
  "axum",

+ 1 - 1
apps/fortuna/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "fortuna"
-version = "7.4.2"
+version = "7.4.3"
 edition = "2021"
 
 [lib]

+ 1 - 0
apps/fortuna/src/keeper.rs

@@ -93,6 +93,7 @@ pub async fn run_keeper_threads(
             chain_state.clone(),
             metrics.clone(),
             fulfilled_requests_cache.clone(),
+            chain_eth_config.block_delays.clone(),
         )
         .in_current_span(),
     );

+ 29 - 6
apps/fortuna/src/keeper/block.rs

@@ -334,6 +334,8 @@ pub async fn process_new_blocks(
 }
 
 /// Processes the backlog_range for a chain.
+/// It processes the backlog range for each configured block delay.
+#[allow(clippy::too_many_arguments)]
 #[tracing::instrument(skip_all)]
 pub async fn process_backlog(
     backlog_range: BlockRange,
@@ -343,18 +345,39 @@ pub async fn process_backlog(
     chain_state: BlockchainState,
     metrics: Arc<KeeperMetrics>,
     fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
+    block_delays: Vec<u64>,
 ) {
     tracing::info!("Processing backlog");
+    // Process blocks immediately first
     process_block_range(
-        backlog_range,
-        contract,
+        backlog_range.clone(),
+        Arc::clone(&contract),
         gas_limit,
-        escalation_policy,
-        chain_state,
-        metrics,
-        fulfilled_requests_cache,
+        escalation_policy.clone(),
+        chain_state.clone(),
+        metrics.clone(),
+        fulfilled_requests_cache.clone(),
     )
     .in_current_span()
     .await;
+
+    // Then process with each configured delay
+    for delay in &block_delays {
+        let adjusted_range = BlockRange {
+            from: backlog_range.from.saturating_sub(*delay),
+            to: backlog_range.to.saturating_sub(*delay),
+        };
+        process_block_range(
+            adjusted_range,
+            Arc::clone(&contract),
+            gas_limit,
+            escalation_policy.clone(),
+            chain_state.clone(),
+            metrics.clone(),
+            fulfilled_requests_cache.clone(),
+        )
+        .in_current_span()
+        .await;
+    }
     tracing::info!("Backlog processed");
 }