Parcourir la source

feat: Delayed block processing for Fortuna (#2307)

* feat: add delayed block processing and bump minor version

Co-Authored-By: Jayant Krishnamurthy <jayant@dourolabs.xyz>

* fix: add clippy allow attribute for too many arguments

Co-Authored-By: Jayant Krishnamurthy <jayant@dourolabs.xyz>

* fix: remove trailing whitespace in config.sample.yaml

Co-Authored-By: Jayant Krishnamurthy <jayant@dourolabs.xyz>

* refactor: move block delays to EthereumConfig and support multiple delays

Co-Authored-By: Jayant Krishnamurthy <jayant@dourolabs.xyz>

* fix: pass only block delays param and subtract delays

Co-Authored-By: Jayant Krishnamurthy <jayant@dourolabs.xyz>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Jayant Krishnamurthy <jayant@dourolabs.xyz>
devin-ai-integration[bot] il y a 9 mois
Parent
commit
de934f524b

+ 1 - 1
apps/fortuna/Cargo.lock

@@ -1503,7 +1503,7 @@ dependencies = [
 
 [[package]]
 name = "fortuna"
-version = "7.3.0"
+version = "7.4.0"
 dependencies = [
  "anyhow",
  "axum",

+ 1 - 1
apps/fortuna/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "fortuna"
-version = "7.3.0"
+version = "7.4.0"
 edition = "2021"
 
 [dependencies]

+ 5 - 0
apps/fortuna/config.sample.yaml

@@ -39,6 +39,11 @@ chains:
     target_profit_pct: 20
     max_profit_pct: 100
 
+    # A list of block delays for processing blocks multiple times. Each number represents
+    # how many blocks to wait before processing. For example, [5, 10, 20] means process
+    # blocks after 5 blocks, then again after 10 blocks, and finally after 20 blocks.
+    block_delays: [5, 10, 20]
+
     # Historical commitments -- delete this block for local development purposes
     commitments:
       # prettier-ignore

+ 10 - 0
apps/fortuna/src/config.rs

@@ -177,6 +177,16 @@ pub struct EthereumConfig {
     /// Maximum number of hashes to record in a request.
     /// This should be set according to the maximum gas limit the provider supports for callbacks.
     pub max_num_hashes: Option<u32>,
+
+    /// A list of delays (in blocks) that indicates how many blocks should be delayed
+    /// before we process a block. For retry logic, we can process blocks multiple times
+    /// at each specified delay. For example: [5, 10, 20].
+    #[serde(default = "default_block_delays")]
+    pub block_delays: Vec<u64>,
+}
+
+fn default_block_delays() -> Vec<u64> {
+    vec![5]
 }
 
 fn default_priority_fee_multiplier_pct() -> u64 {

+ 29 - 4
apps/fortuna/src/keeper.rs

@@ -250,7 +250,7 @@ impl KeeperMetrics {
     }
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct BlockRange {
     pub from: BlockNumber,
     pub to: BlockNumber,
@@ -346,7 +346,8 @@ pub async fn run_keeper_threads(
         )
         .in_current_span(),
     );
-    // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
+
+    // Spawn a thread for block processing with configured delays
     spawn(
         process_new_blocks(
             chain_state.clone(),
@@ -356,6 +357,7 @@ pub async fn run_keeper_threads(
             chain_eth_config.escalation_policy.clone(),
             metrics.clone(),
             fulfilled_requests_cache.clone(),
+            chain_eth_config.block_delays.clone(),
         )
         .in_current_span(),
     );
@@ -965,8 +967,10 @@ pub async fn watch_blocks(
     }
 }
 
-/// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
+/// It waits on rx channel to receive block ranges and then calls process_block_range to process them
+/// for each configured block delay.
 #[tracing::instrument(skip_all)]
+#[allow(clippy::too_many_arguments)]
 pub async fn process_new_blocks(
     chain_state: BlockchainState,
     mut rx: mpsc::Receiver<BlockRange>,
@@ -975,12 +979,14 @@ pub async fn process_new_blocks(
     escalation_policy: EscalationPolicyConfig,
     metrics: Arc<KeeperMetrics>,
     fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
+    block_delays: Vec<u64>,
 ) {
     tracing::info!("Waiting for new block ranges to process");
     loop {
         if let Some(block_range) = rx.recv().await {
+            // Process blocks immediately first
             process_block_range(
-                block_range,
+                block_range.clone(),
                 Arc::clone(&contract),
                 gas_limit,
                 escalation_policy.clone(),
@@ -990,6 +996,25 @@ pub async fn process_new_blocks(
             )
             .in_current_span()
             .await;
+
+            // Then process with each configured delay
+            for delay in &block_delays {
+                let adjusted_range = BlockRange {
+                    from: block_range.from.saturating_sub(*delay),
+                    to: block_range.to.saturating_sub(*delay),
+                };
+                process_block_range(
+                    adjusted_range,
+                    Arc::clone(&contract),
+                    gas_limit,
+                    escalation_policy.clone(),
+                    chain_state.clone(),
+                    metrics.clone(),
+                    fulfilled_requests_cache.clone(),
+                )
+                .in_current_span()
+                .await;
+            }
         }
     }
 }