Explorar o código

refactor(fortuna): Move more utilities into evm_utils (#2376)

* mostly fixed

* submit tx with retry

* move escalation policy

* stuff

* cleanup

* hm
Jayant Krishnamurthy hai 9 meses
pai
achega
77c23323b8

+ 114 - 5
apps/fortuna/Cargo.lock

@@ -393,29 +393,63 @@ dependencies = [
  "generic-array",
 ]
 
+[[package]]
+name = "borsh"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "15bf3650200d8bffa99015595e10f1fbd17de07abbc25bb067da79e769939bfa"
+dependencies = [
+ "borsh-derive 0.9.3",
+ "hashbrown 0.11.2",
+]
+
 [[package]]
 name = "borsh"
 version = "0.10.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b"
 dependencies = [
- "borsh-derive",
+ "borsh-derive 0.10.3",
  "hashbrown 0.12.3",
 ]
 
+[[package]]
+name = "borsh-derive"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6441c552f230375d18e3cc377677914d2ca2b0d36e52129fe15450a2dce46775"
+dependencies = [
+ "borsh-derive-internal 0.9.3",
+ "borsh-schema-derive-internal 0.9.3",
+ "proc-macro-crate 0.1.5",
+ "proc-macro2",
+ "syn 1.0.109",
+]
+
 [[package]]
 name = "borsh-derive"
 version = "0.10.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0754613691538d51f329cce9af41d7b7ca150bc973056f1156611489475f54f7"
 dependencies = [
- "borsh-derive-internal",
- "borsh-schema-derive-internal",
+ "borsh-derive-internal 0.10.3",
+ "borsh-schema-derive-internal 0.10.3",
  "proc-macro-crate 0.1.5",
  "proc-macro2",
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "borsh-derive-internal"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5449c28a7b352f2d1e592a8a28bf139bc71afb0764a14f3c02500935d8c44065"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
 [[package]]
 name = "borsh-derive-internal"
 version = "0.10.3"
@@ -427,6 +461,17 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "borsh-schema-derive-internal"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cdbd5696d8bfa21d53d9fe39a714a18538bad11492a42d066dbbc395fb1951c0"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
 [[package]]
 name = "borsh-schema-derive-internal"
 version = "0.10.3"
@@ -987,6 +1032,12 @@ version = "1.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b"
 
+[[package]]
+name = "dyn-clone"
+version = "1.0.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "feeef44e73baff3a26d371801df019877a9866a8c493d315ab00177843314f35"
+
 [[package]]
 name = "ecdsa"
 version = "0.16.8"
@@ -1503,7 +1554,7 @@ dependencies = [
 
 [[package]]
 name = "fortuna"
-version = "7.4.1"
+version = "7.4.2"
 dependencies = [
  "anyhow",
  "axum",
@@ -1753,6 +1804,15 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "hashbrown"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
+dependencies = [
+ "ahash",
+]
+
 [[package]]
 name = "hashbrown"
 version = "0.12.3"
@@ -2852,16 +2912,30 @@ dependencies = [
  "syn 2.0.66",
 ]
 
+[[package]]
+name = "pyth-sdk"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f5c805ba3dfb5b7ed6a8ffa62ec38391f485a79c7cf6b3b11d3bd44fb0325824"
+dependencies = [
+ "borsh 0.9.3",
+ "borsh-derive 0.9.3",
+ "hex",
+ "schemars",
+ "serde",
+]
+
 [[package]]
 name = "pythnet-sdk"
 version = "2.3.1"
 dependencies = [
  "bincode",
- "borsh",
+ "borsh 0.10.3",
  "bytemuck",
  "byteorder",
  "fast-math",
  "hex",
+ "pyth-sdk",
  "rustc_version",
  "serde",
  "sha3",
@@ -3280,6 +3354,30 @@ dependencies = [
  "windows-sys",
 ]
 
+[[package]]
+name = "schemars"
+version = "0.8.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09c024468a378b7e36765cd36702b7a90cc3cba11654f6685c8f233408e89e92"
+dependencies = [
+ "dyn-clone",
+ "schemars_derive",
+ "serde",
+ "serde_json",
+]
+
+[[package]]
+name = "schemars_derive"
+version = "0.8.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1eee588578aff73f856ab961cd2f79e36bc45d7ded33a7562adba4667aecc0e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "serde_derive_internals",
+ "syn 2.0.66",
+]
+
 [[package]]
 name = "scopeguard"
 version = "1.2.0"
@@ -3386,6 +3484,17 @@ dependencies = [
  "syn 2.0.66",
 ]
 
+[[package]]
+name = "serde_derive_internals"
+version = "0.29.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.66",
+]
+
 [[package]]
 name = "serde_json"
 version = "1.0.107"

+ 1 - 1
apps/fortuna/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "fortuna"
-version = "7.4.1"
+version = "7.4.2"
 edition = "2021"
 
 [lib]

+ 1 - 0
apps/fortuna/src/chain/ethereum.rs

@@ -41,6 +41,7 @@ pub type MiddlewaresWrapper<T> = LegacyTxMiddleware<
         EthProviderOracle<Provider<T>>,
     >,
 >;
+
 pub type SignablePythContractInner<T> = PythRandom<MiddlewaresWrapper<T>>;
 pub type SignablePythContract = SignablePythContractInner<Http>;
 pub type InstrumentedSignablePythContract = SignablePythContractInner<TracedClient>;

+ 9 - 32
apps/fortuna/src/config.rs

@@ -2,6 +2,7 @@ use {
     crate::{
         api::ChainId,
         chain::reader::{BlockNumber, BlockStatus},
+        eth_utils::utils::EscalationPolicy,
     },
     anyhow::{anyhow, Result},
     clap::{crate_authors, crate_description, crate_name, crate_version, Args, Parser},
@@ -259,39 +260,15 @@ impl Default for EscalationPolicyConfig {
 }
 
 impl EscalationPolicyConfig {
-    pub fn get_gas_multiplier_pct(&self, num_retries: u64) -> u64 {
-        self.apply_escalation_policy(
-            num_retries,
-            self.initial_gas_multiplier_pct,
-            self.gas_multiplier_pct,
-            self.gas_multiplier_cap_pct,
-        )
-    }
-
-    pub fn get_fee_multiplier_pct(&self, num_retries: u64) -> u64 {
-        self.apply_escalation_policy(
-            num_retries,
-            100,
-            self.fee_multiplier_pct,
-            self.fee_multiplier_cap_pct,
-        )
-    }
-
-    fn apply_escalation_policy(
-        &self,
-        num_retries: u64,
-        initial: u64,
-        multiplier: u64,
-        cap: u64,
-    ) -> u64 {
-        let mut current = initial;
-        let mut i = 0;
-        while i < num_retries && current < cap {
-            current = current.saturating_mul(multiplier) / 100;
-            i += 1;
+    pub fn to_policy(&self) -> EscalationPolicy {
+        EscalationPolicy {
+            gas_limit_tolerance_pct: self.gas_limit_tolerance_pct,
+            initial_gas_multiplier_pct: self.initial_gas_multiplier_pct,
+            gas_multiplier_pct: self.gas_multiplier_pct,
+            gas_multiplier_cap_pct: self.gas_multiplier_cap_pct,
+            fee_multiplier_pct: self.fee_multiplier_pct,
+            fee_multiplier_cap_pct: self.fee_multiplier_cap_pct,
         }
-
-        current.min(cap)
     }
 }
 

+ 30 - 9
apps/fortuna/src/eth_utils/nonce_manager.rs

@@ -2,8 +2,11 @@
 // Copied from: https://github.com/gakonst/ethers-rs/blob/34ed9e372e66235aed7074bc3f5c14922b139242/ethers-middleware/src/nonce_manager.rs
 
 use {
+    super::legacy_tx_middleware::LegacyTxMiddleware,
     axum::async_trait,
+    ethers::prelude::GasOracle,
     ethers::{
+        middleware::gas_oracle::GasOracleMiddleware,
         providers::{Middleware, MiddlewareError, PendingTransaction},
         types::{transaction::eip2718::TypedTransaction, *},
     },
@@ -72,15 +75,6 @@ where
         Ok(nonce)
     } // guard dropped here
 
-    /// Resets the initialized flag so the next usage of the manager will reinitialize the nonce
-    /// based on the chain state.
-    /// This is useful when the RPC does not return an error if the transaction is submitted with
-    /// an incorrect nonce.
-    /// This is the only new method compared to the original NonceManagerMiddleware.
-    pub fn reset(&self) {
-        self.initialized.store(false, Ordering::SeqCst);
-    }
-
     async fn get_transaction_count_with_manager(
         &self,
         block: Option<BlockId>,
@@ -100,6 +94,33 @@ where
     }
 }
 
+pub trait NonceManaged {
+    fn reset(&self);
+}
+
+impl<M: Middleware> NonceManaged for NonceManagerMiddleware<M> {
+    /// Resets the initialized flag so the next usage of the manager will reinitialize the nonce
+    /// based on the chain state.
+    /// This is useful when the RPC does not return an error if the transaction is submitted with
+    /// an incorrect nonce.
+    /// This is the only new method compared to the original NonceManagerMiddleware.
+    fn reset(&self) {
+        self.initialized.store(false, Ordering::SeqCst);
+    }
+}
+
+impl<M: NonceManaged + Middleware, G: GasOracle> NonceManaged for GasOracleMiddleware<M, G> {
+    fn reset(&self) {
+        self.inner().reset();
+    }
+}
+
+impl<T: NonceManaged + Middleware> NonceManaged for LegacyTxMiddleware<T> {
+    fn reset(&self) {
+        self.inner().reset();
+    }
+}
+
 #[derive(Error, Debug)]
 /// Thrown when an error happens at the Nonce Manager
 pub enum NonceManagerError<M: Middleware> {

+ 248 - 0
apps/fortuna/src/eth_utils/utils.rs

@@ -1,10 +1,87 @@
 use {
+    crate::eth_utils::nonce_manager::NonceManaged,
     anyhow::{anyhow, Result},
+    backoff::ExponentialBackoff,
+    ethers::types::TransactionReceipt,
+    ethers::types::U256,
     ethers::{contract::ContractCall, middleware::Middleware},
+    std::sync::atomic::AtomicU64,
     std::sync::Arc,
+    tokio::time::{timeout, Duration},
     tracing,
 };
 
+const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30;
+
+#[derive(Debug)]
+pub struct SubmitTxResult {
+    pub num_retries: u64,
+    pub gas_multiplier: u64,
+    pub fee_multiplier: u64,
+    pub duration: Duration,
+    pub receipt: Result<TransactionReceipt, anyhow::Error>,
+}
+
+#[derive(Clone, Debug)]
+pub struct EscalationPolicy {
+    // The keeper will perform the callback as long as the tx is within this percentage of the configured gas limit.
+    // Default value is 110, meaning a 10% tolerance over the configured value.
+    pub gas_limit_tolerance_pct: u64,
+
+    /// The initial gas multiplier to apply to the tx gas estimate
+    pub initial_gas_multiplier_pct: u64,
+
+    /// The gas multiplier to apply to the tx gas estimate during backoff retries.
+    /// The gas on each successive retry is multiplied by this value, with the maximum multiplier capped at `gas_multiplier_cap_pct`.
+    pub gas_multiplier_pct: u64,
+    /// The maximum gas multiplier to apply to the tx gas estimate during backoff retries.
+    pub gas_multiplier_cap_pct: u64,
+
+    /// The fee multiplier to apply to the fee during backoff retries.
+    /// The initial fee is 100% of the estimate (which itself may be padded based on our chain configuration)
+    /// The fee on each successive retry is multiplied by this value, with the maximum multiplier capped at `fee_multiplier_cap_pct`.
+    pub fee_multiplier_pct: u64,
+    pub fee_multiplier_cap_pct: u64,
+}
+
+impl EscalationPolicy {
+    pub fn get_gas_multiplier_pct(&self, num_retries: u64) -> u64 {
+        self.apply_escalation_policy(
+            num_retries,
+            self.initial_gas_multiplier_pct,
+            self.gas_multiplier_pct,
+            self.gas_multiplier_cap_pct,
+        )
+    }
+
+    pub fn get_fee_multiplier_pct(&self, num_retries: u64) -> u64 {
+        self.apply_escalation_policy(
+            num_retries,
+            100,
+            self.fee_multiplier_pct,
+            self.fee_multiplier_cap_pct,
+        )
+    }
+
+    fn apply_escalation_policy(
+        &self,
+        num_retries: u64,
+        initial: u64,
+        multiplier: u64,
+        cap: u64,
+    ) -> u64 {
+        let mut current = initial;
+        let mut i = 0;
+        while i < num_retries && current < cap {
+            current = current.saturating_mul(multiplier) / 100;
+            i += 1;
+        }
+
+        current.min(cap)
+    }
+}
+
+/// Send a transaction and wait for the receipt to ensure that it was confirmed on chain.
 pub async fn send_and_confirm<A: Middleware>(contract_call: ContractCall<A, ()>) -> Result<()> {
     let call_name = contract_call.function.name.as_str();
     let pending_tx = contract_call
@@ -64,3 +141,174 @@ pub async fn estimate_tx_cost<T: Middleware + 'static>(
 
     Ok(gas_price * gas_used)
 }
+
+/// Submit a transaction, retrying on failure according to a configurable backoff policy.
+/// The transaction is retried with exponentially increasing delay between retries, and
+/// similarly escalating gas and fee multipliers.
+/// The gas_limit parameter is the maximum gas that we expect the transaction to use -- if the gas estimate for
+/// the transaction exceeds this limit, the transaction is not submitted.
+/// Note however that any gas_escalation policy is applied to the estimate, so the actual gas used may exceed the limit.
+/// The transaction is retried until it is confirmed on chain or the maximum number of retries is reached.
+pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
+    middleware: Arc<T>,
+    call: ContractCall<T, ()>,
+    gas_limit: U256,
+    escalation_policy: EscalationPolicy,
+) -> Result<SubmitTxResult> {
+    let start_time = std::time::Instant::now();
+
+    tracing::info!("Started processing event");
+    let backoff = ExponentialBackoff {
+        max_elapsed_time: Some(Duration::from_secs(300)), // retry for 5 minutes
+        ..Default::default()
+    };
+
+    let num_retries = Arc::new(AtomicU64::new(0));
+
+    let success = backoff::future::retry_notify(
+        backoff,
+        || async {
+            let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed);
+
+            let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries);
+            let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries);
+            submit_tx(
+                middleware.clone(),
+                &call,
+                gas_limit,
+                gas_multiplier_pct,
+                fee_multiplier_pct,
+            )
+            .await
+        },
+        |e, dur| {
+            let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed);
+            tracing::error!(
+                "Error on retry {} at duration {:?}: {}",
+                retry_number,
+                dur,
+                e
+            );
+            num_retries.store(retry_number + 1, std::sync::atomic::Ordering::Relaxed);
+        },
+    )
+    .await;
+
+    let duration = start_time.elapsed();
+    let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed);
+
+    Ok(SubmitTxResult {
+        num_retries,
+        gas_multiplier: escalation_policy.get_gas_multiplier_pct(num_retries),
+        fee_multiplier: escalation_policy.get_fee_multiplier_pct(num_retries),
+        duration,
+        receipt: success,
+    })
+}
+
+/// Submit a transaction to the blockchain. It estimates the gas for the transaction,
+/// pads both the gas and fee estimates using the provided multipliers, and submits the transaction.
+/// It will return a permanent or transient error depending on the error type and whether
+/// retry is possible or not.
+pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
+    client: Arc<T>,
+    call: &ContractCall<T, ()>,
+    gas_limit: U256,
+    // A value of 100 submits the tx with the same gas/fee as the estimate.
+    gas_estimate_multiplier_pct: u64,
+    fee_estimate_multiplier_pct: u64,
+) -> Result<TransactionReceipt, backoff::Error<anyhow::Error>> {
+    let gas_estimate_res = call.estimate_gas().await;
+
+    let gas_estimate = gas_estimate_res.map_err(|e| {
+        // we consider the error transient even if it is a contract revert since
+        // it can be because of routing to a lagging RPC node. Retrying such errors will
+        // incur a few additional RPC calls, but it is fine.
+        backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e))
+    })?;
+
+    // The gas limit on the simulated transaction is the maximum expected tx gas estimate,
+    // but we are willing to pad the gas a bit to ensure reliable submission.
+    if gas_estimate > gas_limit {
+        return Err(backoff::Error::permanent(anyhow!(
+            "Gas estimate for reveal with callback is higher than the gas limit {} > {}",
+            gas_estimate,
+            gas_limit
+        )));
+    }
+
+    // Pad the gas estimate after checking it against the simulation gas limit.
+    let gas_estimate = gas_estimate.saturating_mul(gas_estimate_multiplier_pct.into()) / 100;
+
+    let call = call.clone().gas(gas_estimate);
+    let mut transaction = call.tx.clone();
+
+    // manually fill the tx with the gas info, so we can log the details in case of error
+    client
+        .fill_transaction(&mut transaction, None)
+        .await
+        .map_err(|e| {
+            backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e))
+        })?;
+
+    // Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle
+    // in the client that sets the gas price.
+    transaction.set_gas_price(
+        transaction
+            .gas_price()
+            .unwrap_or_default()
+            .saturating_mul(fee_estimate_multiplier_pct.into())
+            / 100,
+    );
+
+    let pending_tx = client
+        .send_transaction(transaction.clone(), None)
+        .await
+        .map_err(|e| {
+            backoff::Error::transient(anyhow!(
+                "Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
+                transaction,
+                e
+            ))
+        })?;
+
+    let reset_nonce = || {
+        client.reset();
+    };
+
+    let pending_receipt = timeout(
+        Duration::from_secs(TX_CONFIRMATION_TIMEOUT_SECS),
+        pending_tx,
+    )
+    .await
+    .map_err(|_| {
+        // Tx can get stuck in mempool without any progress if the nonce is too high
+        // in this case ethers internal polling will not reduce the number of retries
+        // and keep retrying indefinitely. So we set a manual timeout here and reset the nonce.
+        reset_nonce();
+        backoff::Error::transient(anyhow!(
+            "Tx stuck in mempool. Resetting nonce. Tx:{:?}",
+            transaction
+        ))
+    })?;
+
+    let receipt = pending_receipt
+        .map_err(|e| {
+            backoff::Error::transient(anyhow!(
+                "Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
+                transaction,
+                e
+            ))
+        })?
+        .ok_or_else(|| {
+            // RPC may not return an error on tx submission if the nonce is too high.
+            // But we will never get a receipt. So we reset the nonce manager to get the correct nonce.
+            reset_nonce();
+            backoff::Error::transient(anyhow!(
+                "Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}",
+                transaction
+            ))
+        })?;
+
+    Ok(receipt)
+}

+ 2 - 2
apps/fortuna/src/keeper.rs

@@ -89,7 +89,7 @@ pub async fn run_keeper_threads(
             },
             contract.clone(),
             gas_limit,
-            chain_eth_config.escalation_policy.clone(),
+            chain_eth_config.escalation_policy.to_policy(),
             chain_state.clone(),
             metrics.clone(),
             fulfilled_requests_cache.clone(),
@@ -116,7 +116,7 @@ pub async fn run_keeper_threads(
             rx,
             Arc::clone(&contract),
             gas_limit,
-            chain_eth_config.escalation_policy.clone(),
+            chain_eth_config.escalation_policy.to_policy(),
             metrics.clone(),
             fulfilled_requests_cache.clone(),
             chain_eth_config.block_delays.clone(),

+ 5 - 5
apps/fortuna/src/keeper/block.rs

@@ -2,7 +2,7 @@ use {
     crate::{
         api::{self, BlockchainState},
         chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
-        config::EscalationPolicyConfig,
+        eth_utils::utils::EscalationPolicy,
         keeper::keeper_metrics::KeeperMetrics,
         keeper::process_event::process_event_with_backoff,
     },
@@ -67,7 +67,7 @@ pub async fn process_block_range(
     block_range: BlockRange,
     contract: Arc<InstrumentedSignablePythContract>,
     gas_limit: U256,
-    escalation_policy: EscalationPolicyConfig,
+    escalation_policy: EscalationPolicy,
     chain_state: api::BlockchainState,
     metrics: Arc<KeeperMetrics>,
     fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
@@ -114,7 +114,7 @@ pub async fn process_single_block_batch(
     block_range: BlockRange,
     contract: Arc<InstrumentedSignablePythContract>,
     gas_limit: U256,
-    escalation_policy: EscalationPolicyConfig,
+    escalation_policy: EscalationPolicy,
     chain_state: api::BlockchainState,
     metrics: Arc<KeeperMetrics>,
     fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
@@ -290,7 +290,7 @@ pub async fn process_new_blocks(
     mut rx: mpsc::Receiver<BlockRange>,
     contract: Arc<InstrumentedSignablePythContract>,
     gas_limit: U256,
-    escalation_policy: EscalationPolicyConfig,
+    escalation_policy: EscalationPolicy,
     metrics: Arc<KeeperMetrics>,
     fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
     block_delays: Vec<u64>,
@@ -339,7 +339,7 @@ pub async fn process_backlog(
     backlog_range: BlockRange,
     contract: Arc<InstrumentedSignablePythContract>,
     gas_limit: U256,
-    escalation_policy: EscalationPolicyConfig,
+    escalation_policy: EscalationPolicy,
     chain_state: BlockchainState,
     metrics: Arc<KeeperMetrics>,
     fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,

+ 50 - 223
apps/fortuna/src/keeper/process_event.rs

@@ -3,16 +3,12 @@ use {
     crate::{
         api::BlockchainState,
         chain::{ethereum::InstrumentedSignablePythContract, reader::RequestedWithCallbackEvent},
-        config::EscalationPolicyConfig,
+        eth_utils::utils::{submit_tx_with_backoff, EscalationPolicy},
     },
     anyhow::{anyhow, Result},
-    backoff::ExponentialBackoff,
-    ethers::middleware::Middleware,
-    ethers::signers::Signer,
     ethers::types::U256,
-    std::sync::{atomic::AtomicU64, Arc},
-    tokio::time::{timeout, Duration},
-    tracing::{self, Instrument},
+    std::sync::Arc,
+    tracing,
 };
 
 /// Process an event with backoff. It will retry the reveal on failure for 5 minutes.
@@ -24,10 +20,14 @@ pub async fn process_event_with_backoff(
     chain_state: BlockchainState,
     contract: Arc<InstrumentedSignablePythContract>,
     gas_limit: U256,
-    escalation_policy: EscalationPolicyConfig,
+    escalation_policy: EscalationPolicy,
     metrics: Arc<KeeperMetrics>,
-) {
-    let start_time = std::time::Instant::now();
+) -> Result<()> {
+    // ignore requests that are not for the configured provider
+    if chain_state.provider_address != event.provider_address {
+        return Ok(());
+    }
+
     let account_label = AccountLabel {
         chain_id: chain_state.id.clone(),
         address: chain_state.provider_address.to_string(),
@@ -35,54 +35,35 @@ pub async fn process_event_with_backoff(
 
     metrics.requests.get_or_create(&account_label).inc();
     tracing::info!("Started processing event");
-    let backoff = ExponentialBackoff {
-        max_elapsed_time: Some(Duration::from_secs(300)), // retry for 5 minutes
-        ..Default::default()
-    };
 
-    let num_retries = Arc::new(AtomicU64::new(0));
+    let provider_revelation = chain_state
+        .state
+        .reveal(event.sequence_number)
+        .map_err(|e| anyhow!("Error revealing: {:?}", e))?;
 
-    let success = backoff::future::retry_notify(
-        backoff,
-        || async {
-            let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed);
+    let contract_call = contract.reveal_with_callback(
+        event.provider_address,
+        event.sequence_number,
+        event.user_random_number,
+        provider_revelation,
+    );
 
-            let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries);
-            let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries);
-            process_event(
-                &event,
-                &chain_state,
-                &contract,
-                gas_limit.saturating_mul(escalation_policy.gas_limit_tolerance_pct.into()) / 100,
-                gas_multiplier_pct,
-                fee_multiplier_pct,
-                metrics.clone(),
-            )
-            .await
-        },
-        |e, dur| {
-            let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed);
-            tracing::error!(
-                "Error on retry {} at duration {:?}: {}",
-                retry_number,
-                dur,
-                e
-            );
-            num_retries.store(retry_number + 1, std::sync::atomic::Ordering::Relaxed);
-        },
+    let success = submit_tx_with_backoff(
+        contract.client(),
+        contract_call,
+        gas_limit,
+        escalation_policy,
     )
     .await;
 
-    let duration = start_time.elapsed();
-
     metrics
         .requests_processed
         .get_or_create(&account_label)
         .inc();
 
     match success {
-        Ok(()) => {
-            tracing::info!("Processed event successfully in {:?}", duration);
+        Ok(res) => {
+            tracing::info!("Processed event successfully in {:?}", res.duration);
 
             metrics
                 .requests_processed_success
@@ -92,26 +73,42 @@ pub async fn process_event_with_backoff(
             metrics
                 .request_duration_ms
                 .get_or_create(&account_label)
-                .observe(duration.as_millis() as f64);
+                .observe(res.duration.as_millis() as f64);
 
             // Track retry count, gas multiplier, and fee multiplier for successful transactions
-            let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed);
             metrics
                 .retry_count
                 .get_or_create(&account_label)
-                .observe(num_retries as f64);
+                .observe(res.num_retries as f64);
 
-            let gas_multiplier = escalation_policy.get_gas_multiplier_pct(num_retries);
             metrics
                 .final_gas_multiplier
                 .get_or_create(&account_label)
-                .observe(gas_multiplier as f64);
+                .observe(res.gas_multiplier as f64);
 
-            let fee_multiplier = escalation_policy.get_fee_multiplier_pct(num_retries);
             metrics
                 .final_fee_multiplier
                 .get_or_create(&account_label)
-                .observe(fee_multiplier as f64);
+                .observe(res.fee_multiplier as f64);
+
+            if let Ok(receipt) = res.receipt {
+                if let Some(gas_used) = receipt.gas_used {
+                    let gas_used_float = gas_used.as_u128() as f64 / 1e18;
+                    metrics
+                        .total_gas_spent
+                        .get_or_create(&account_label)
+                        .inc_by(gas_used_float);
+
+                    if let Some(gas_price) = receipt.effective_gas_price {
+                        let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18;
+                        metrics
+                            .total_gas_fee_spent
+                            .get_or_create(&account_label)
+                            .inc_by(gas_fee);
+                    }
+                }
+            }
+            metrics.reveals.get_or_create(&account_label).inc();
         }
         Err(e) => {
             // In case the callback did not succeed, we double-check that the request is still on-chain.
@@ -133,176 +130,6 @@ pub async fn process_event_with_backoff(
             }
         }
     }
-}
-
-const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30;
-
-/// Process a callback on a chain. It estimates the gas for the reveal with callback and
-/// submits the transaction if the gas estimate is below the gas limit.
-/// It will return a permanent or transient error depending on the error type and whether
-/// retry is possible or not.
-pub async fn process_event(
-    event: &RequestedWithCallbackEvent,
-    chain_config: &BlockchainState,
-    contract: &InstrumentedSignablePythContract,
-    gas_limit: U256,
-    // A value of 100 submits the tx with the same gas/fee as the estimate.
-    gas_estimate_multiplier_pct: u64,
-    fee_estimate_multiplier_pct: u64,
-    metrics: Arc<KeeperMetrics>,
-) -> Result<(), backoff::Error<anyhow::Error>> {
-    // ignore requests that are not for the configured provider
-    if chain_config.provider_address != event.provider_address {
-        return Ok(());
-    }
-    let provider_revelation = chain_config
-        .state
-        .reveal(event.sequence_number)
-        .map_err(|e| backoff::Error::permanent(anyhow!("Error revealing: {:?}", e)))?;
-
-    let gas_estimate_res = chain_config
-        .contract
-        .estimate_reveal_with_callback_gas(
-            contract.wallet().address(),
-            event.provider_address,
-            event.sequence_number,
-            event.user_random_number,
-            provider_revelation,
-        )
-        .in_current_span()
-        .await;
-
-    let gas_estimate = gas_estimate_res.map_err(|e| {
-        // we consider the error transient even if it is a contract revert since
-        // it can be because of routing to a lagging RPC node. Retrying such errors will
-        // incur a few additional RPC calls, but it is fine.
-        backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e))
-    })?;
-
-    // The gas limit on the simulated transaction is the configured gas limit on the chain,
-    // but we are willing to pad the gas a bit to ensure reliable submission.
-    if gas_estimate > gas_limit {
-        return Err(backoff::Error::permanent(anyhow!(
-            "Gas estimate for reveal with callback is higher than the gas limit {} > {}",
-            gas_estimate,
-            gas_limit
-        )));
-    }
-
-    // Pad the gas estimate after checking it against the simulation gas limit, ensuring that
-    // the padded gas estimate doesn't exceed the maximum amount of gas we are willing to use.
-    let gas_estimate = gas_estimate.saturating_mul(gas_estimate_multiplier_pct.into()) / 100;
-
-    let contract_call = contract
-        .reveal_with_callback(
-            event.provider_address,
-            event.sequence_number,
-            event.user_random_number,
-            provider_revelation,
-        )
-        .gas(gas_estimate);
-
-    let client = contract.client();
-    let mut transaction = contract_call.tx.clone();
-
-    // manually fill the tx with the gas info, so we can log the details in case of error
-    client
-        .fill_transaction(&mut transaction, None)
-        .await
-        .map_err(|e| {
-            backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e))
-        })?;
-
-    // Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle
-    // in the client that sets the gas price.
-    transaction.set_gas_price(
-        transaction
-            .gas_price()
-            .unwrap_or_default()
-            .saturating_mul(fee_estimate_multiplier_pct.into())
-            / 100,
-    );
-
-    let pending_tx = client
-        .send_transaction(transaction.clone(), None)
-        .await
-        .map_err(|e| {
-            backoff::Error::transient(anyhow!(
-                "Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
-                transaction,
-                e
-            ))
-        })?;
-
-    let reset_nonce = || {
-        let nonce_manager = contract.client_ref().inner().inner();
-        nonce_manager.reset();
-    };
-
-    let pending_receipt = timeout(
-        Duration::from_secs(TX_CONFIRMATION_TIMEOUT_SECS),
-        pending_tx,
-    )
-    .await
-    .map_err(|_| {
-        // Tx can get stuck in mempool without any progress if the nonce is too high
-        // in this case ethers internal polling will not reduce the number of retries
-        // and keep retrying indefinitely. So we set a manual timeout here and reset the nonce.
-        reset_nonce();
-        backoff::Error::transient(anyhow!(
-            "Tx stuck in mempool. Resetting nonce. Tx:{:?}",
-            transaction
-        ))
-    })?;
-
-    let receipt = pending_receipt
-        .map_err(|e| {
-            backoff::Error::transient(anyhow!(
-                "Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
-                transaction,
-                e
-            ))
-        })?
-        .ok_or_else(|| {
-            // RPC may not return an error on tx submission if the nonce is too high.
-            // But we will never get a receipt. So we reset the nonce manager to get the correct nonce.
-            reset_nonce();
-            backoff::Error::transient(anyhow!(
-                "Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}",
-                transaction
-            ))
-        })?;
-
-    tracing::info!(
-        sequence_number = &event.sequence_number,
-        transaction_hash = &receipt.transaction_hash.to_string(),
-        gas_used = ?receipt.gas_used,
-        "Revealed with res: {:?}",
-        receipt
-    );
-
-    let account_label = AccountLabel {
-        chain_id: chain_config.id.clone(),
-        address: chain_config.provider_address.to_string(),
-    };
-
-    if let Some(gas_used) = receipt.gas_used {
-        let gas_used_float = gas_used.as_u128() as f64 / 1e18;
-        metrics
-            .total_gas_spent
-            .get_or_create(&account_label)
-            .inc_by(gas_used_float);
-
-        if let Some(gas_price) = receipt.effective_gas_price {
-            let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18;
-            metrics
-                .total_gas_fee_spent
-                .get_or_create(&account_label)
-                .inc_by(gas_fee);
-        }
-    }
-
-    metrics.reveals.get_or_create(&account_label).inc();
 
     Ok(())
 }