Jayant Krishnamurthy 2 tygodni temu
rodzic
commit
bbce038aea

+ 358 - 29
apps/fortuna/src/chain/ethereum.rs

@@ -8,25 +8,40 @@ use {
         eth_utils::{
             eth_gas_oracle::EthProviderOracle,
             legacy_tx_middleware::LegacyTxMiddleware,
-            nonce_manager::NonceManagerMiddleware,
+            nonce_manager::{NonceManaged, NonceManagerMiddleware},
             traced_client::{RpcMetrics, TracedClient},
+            utils::{submit_tx_with_backoff, EscalationPolicy, SubmitTxError, SubmitTxResult},
+        },
+        keeper::contract::{
+            KeeperProviderInfo, KeeperTxContract, KeeperTxError, KeeperTxResult,
+            RevealWithCallbackData, TxExecutionOutcome, TxHash, Wei,
         },
-        keeper::contract::{KeeperProviderInfo, KeeperTxContract},
     },
     anyhow::{anyhow, Error, Result},
     axum::async_trait,
+    backoff::Error as BackoffError,
+    ethers::abi::AbiDecode,
     ethers::{
         abi::RawLog,
-        contract::{abigen, ContractCall, EthLogDecode, LogMeta},
+        contract::{abigen, ContractCall, ContractError, EthLogDecode, LogMeta},
         core::types::Address,
         middleware::{gas_oracle::GasOracleMiddleware, SignerMiddleware},
         prelude::JsonRpcClient,
         providers::{Http, Middleware, Provider},
         signers::{LocalWallet, Signer},
-        types::{BlockNumber as EthersBlockNumber, U256},
+        types::{
+            transaction::eip2718::TypedTransaction, BlockNumber as EthersBlockNumber,
+            TransactionReceipt, TransactionRequest, U256, U64,
+        },
     },
+    hex,
     sha3::{Digest, Keccak256},
-    std::sync::Arc,
+    std::{
+        convert::TryFrom,
+        sync::Arc,
+        time::{Duration, Instant},
+    },
+    tokio::time::timeout,
 };
 
 // TODO: Programmatically generate this so we don't have to keep committed ABI in sync with the
@@ -354,58 +369,133 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
 
 #[async_trait]
 impl KeeperTxContract for InstrumentedSignablePythContract {
-    type Middleware = MiddlewaresWrapper<TracedClient>;
-    type ProviderClient = TracedClient;
-
-    fn client(&self) -> Arc<Self::Middleware> {
-        self.client()
+    fn keeper_address(&self) -> Address {
+        self.wallet().address()
     }
 
-    fn wallet(&self) -> LocalWallet {
-        self.wallet()
+    async fn get_balance(&self, address: Address) -> KeeperTxResult<Wei> {
+        let balance = self
+            .provider()
+            .get_balance(address, None)
+            .await
+            .map_err(|e| KeeperTxError::Provider {
+                reason: format!("failed to fetch balance for {address:?}: {e:?}"),
+            })?;
+
+        wei_from_u256(balance, "balance")
     }
 
-    fn provider(&self) -> Provider<Self::ProviderClient> {
-        self.provider()
+    async fn estimate_tx_cost(&self, legacy_tx: bool, gas_limit: u64) -> KeeperTxResult<Wei> {
+        crate::eth_utils::utils::estimate_tx_cost(self.client(), legacy_tx, gas_limit as u128)
+            .await
+            .map(Wei)
+            .map_err(|e| KeeperTxError::Other {
+                reason: format!("failed to estimate transaction cost: {e:?}"),
+            })
     }
 
-    fn reveal_with_callback(
+    async fn reveal_with_callback(
         &self,
         provider_address: Address,
         sequence_number: u64,
         user_random_number: [u8; 32],
         provider_revelation: [u8; 32],
-    ) -> ContractCall<Self::Middleware, ()> {
-        self.reveal_with_callback(
+        escalation_policy: EscalationPolicy,
+    ) -> KeeperTxResult<TxExecutionOutcome<RevealWithCallbackData>> {
+        let contract_call = self.reveal_with_callback(
             provider_address,
             sequence_number,
             user_random_number,
             provider_revelation,
+        );
+
+        let error_mapper =
+            |num_retries: u64,
+             error: BackoffError<SubmitTxError<MiddlewaresWrapper<TracedClient>>>| {
+                if let BackoffError::Transient {
+                    err: SubmitTxError::GasUsageEstimateError(tx, ContractError::Revert(revert)),
+                    ..
+                } = &error
+                {
+                    if let Ok(PythRandomErrorsErrors::NoSuchRequest(_)) =
+                        PythRandomErrorsErrors::decode(revert)
+                    {
+                        let mapped_error = SubmitTxError::GasUsageEstimateError(
+                            tx.clone(),
+                            ContractError::Revert(revert.clone()),
+                        );
+
+                        if num_retries >= 5 {
+                            return BackoffError::Permanent(mapped_error);
+                        }
+
+                        let retry_after_seconds = match num_retries {
+                            0 => 5,
+                            1 => 10,
+                            _ => 60,
+                        };
+
+                        return BackoffError::Transient {
+                            err: mapped_error,
+                            retry_after: Some(Duration::from_secs(retry_after_seconds)),
+                        };
+                    }
+                }
+
+                error
+            };
+
+        let submit_result = submit_tx_with_backoff(
+            self.client(),
+            contract_call,
+            escalation_policy,
+            Some(error_mapper),
         )
+        .await
+        .map_err(map_submit_error)?;
+
+        convert_submit_result(submit_result)
     }
 
-    fn withdraw_as_fee_manager(
+    async fn withdraw_as_fee_manager(
         &self,
         provider_address: Address,
-        amount: u128,
-    ) -> ContractCall<Self::Middleware, ()> {
-        self.withdraw_as_fee_manager(provider_address, amount)
+        amount: Wei,
+    ) -> KeeperTxResult<TxExecutionOutcome<()>> {
+        execute_simple_call(self.withdraw_as_fee_manager(provider_address, amount.0)).await
     }
 
-    fn set_provider_fee_as_fee_manager(
+    async fn set_provider_fee_as_fee_manager(
         &self,
         provider_address: Address,
-        fee: u128,
-    ) -> ContractCall<Self::Middleware, ()> {
-        self.set_provider_fee_as_fee_manager(provider_address, fee)
+        fee: Wei,
+    ) -> KeeperTxResult<TxExecutionOutcome<()>> {
+        execute_simple_call(self.set_provider_fee_as_fee_manager(provider_address, fee.0)).await
+    }
+
+    async fn transfer(
+        &self,
+        destination: Address,
+        amount: Wei,
+    ) -> KeeperTxResult<TxExecutionOutcome<()>> {
+        transfer_funds(self.client(), self.keeper_address(), destination, amount).await
     }
 
-    async fn get_provider_info(&self, provider_address: Address) -> Result<KeeperProviderInfo> {
-        let info = self.get_provider_info_v2(provider_address).call().await?;
+    async fn get_provider_info(
+        &self,
+        provider_address: Address,
+    ) -> KeeperTxResult<KeeperProviderInfo> {
+        let info = self
+            .get_provider_info_v2(provider_address)
+            .call()
+            .await
+            .map_err(|e| KeeperTxError::Provider {
+                reason: format!("failed to fetch provider info: {e:?}"),
+            })?;
 
         Ok(KeeperProviderInfo {
-            accrued_fees_in_wei: U256::from(info.accrued_fees_in_wei),
-            fee_in_wei: U256::from(info.fee_in_wei),
+            accrued_fees_in_wei: wei_from_u256(info.accrued_fees_in_wei.into(), "accrued_fees")?,
+            fee_in_wei: wei_from_u256(info.fee_in_wei.into(), "fee")?,
             sequence_number: info.sequence_number,
             end_sequence_number: info.end_sequence_number,
             current_commitment_sequence_number: info.current_commitment_sequence_number,
@@ -414,3 +504,242 @@ impl KeeperTxContract for InstrumentedSignablePythContract {
         })
     }
 }
+
+const SIMPLE_TX_NUM_RETRIES: u64 = 0;
+const SIMPLE_TX_FEE_MULTIPLIER: u64 = 100;
+const TRANSFER_CONFIRMATION_TIMEOUT_SECS: u64 = 30;
+
+fn convert_submit_result(
+    submit_result: SubmitTxResult,
+) -> KeeperTxResult<TxExecutionOutcome<RevealWithCallbackData>> {
+    let SubmitTxResult {
+        num_retries,
+        fee_multiplier,
+        duration,
+        receipt,
+        revealed_event,
+    } = submit_result;
+
+    let outcome = receipt_to_outcome(receipt, duration, num_retries, fee_multiplier, ())?;
+
+    let callback_gas_used =
+        u256_to_u128(revealed_event.callback_gas_used.into(), "callback_gas_used")?;
+
+    let callback_return_value = revealed_event.callback_return_value.to_vec();
+
+    Ok(TxExecutionOutcome {
+        tx_hash: outcome.tx_hash,
+        block_number: outcome.block_number,
+        gas_used: outcome.gas_used,
+        effective_gas_price: outcome.effective_gas_price,
+        duration: outcome.duration,
+        num_retries: outcome.num_retries,
+        fee_multiplier: outcome.fee_multiplier,
+        result: RevealWithCallbackData {
+            callback_failed: revealed_event.callback_failed,
+            callback_return_value,
+            callback_gas_used,
+        },
+    })
+}
+
+fn receipt_to_outcome<T>(
+    receipt: TransactionReceipt,
+    duration: Duration,
+    num_retries: u64,
+    fee_multiplier: u64,
+    result: T,
+) -> KeeperTxResult<TxExecutionOutcome<T>> {
+    let tx_hash = TxHash(receipt.transaction_hash.to_fixed_bytes());
+    let block_number = receipt.block_number.map(|b| b.as_u64());
+    let gas_used = optional_u256_to_u128(receipt.gas_used, "gas_used")?;
+    let effective_gas_price =
+        optional_u256_to_u128(receipt.effective_gas_price, "effective_gas_price")?;
+
+    Ok(TxExecutionOutcome {
+        tx_hash,
+        block_number,
+        gas_used,
+        effective_gas_price,
+        duration,
+        num_retries,
+        fee_multiplier,
+        result,
+    })
+}
+
+fn optional_u256_to_u128(value: Option<U256>, context: &str) -> KeeperTxResult<Option<u128>> {
+    match value {
+        Some(v) => Ok(Some(u256_to_u128(v, context)?)),
+        None => Ok(None),
+    }
+}
+
+fn u256_to_u128(value: U256, context: &str) -> KeeperTxResult<u128> {
+    value.try_into().map_err(|_| KeeperTxError::Other {
+        reason: format!("{context} {value} exceeds supported range"),
+    })
+}
+
+fn wei_from_u256(value: U256, context: &str) -> KeeperTxResult<Wei> {
+    u256_to_u128(value, context).map(Wei)
+}
+
+fn map_submit_error<T>(error: SubmitTxError<T>) -> KeeperTxError
+where
+    T: Middleware + NonceManaged + 'static,
+{
+    match error {
+        SubmitTxError::GasUsageEstimateError(_, ContractError::Revert(revert)) => {
+            KeeperTxError::Reverted {
+                reason: format!("0x{}", hex::encode(revert)),
+            }
+        }
+        SubmitTxError::GasUsageEstimateError(_, other) => KeeperTxError::Other {
+            reason: format!("gas usage estimate error: {other:?}"),
+        },
+        SubmitTxError::GasLimitExceeded { estimate, limit } => KeeperTxError::GasLimit {
+            estimate: u256_to_u128(estimate, "gas_limit_estimate").unwrap_or(u128::MAX),
+            limit: u256_to_u128(limit, "gas_limit_limit").unwrap_or(u128::MAX),
+        },
+        SubmitTxError::GasPriceEstimateError(_) => KeeperTxError::Provider {
+            reason: "gas_price_estimate".to_string(),
+        },
+        SubmitTxError::SubmissionError(_, _) => KeeperTxError::Submission {
+            reason: "Error submitting the transaction on-chain".to_string(),
+        },
+        SubmitTxError::ConfirmationTimeout(tx) => KeeperTxError::ConfirmationTimeout {
+            reason: format!(
+                "Transaction was submitted, but never confirmed. Hash: {}",
+                format_typed_tx_hash(&tx)
+            ),
+        },
+        SubmitTxError::ConfirmationError(tx, _) => KeeperTxError::Confirmation {
+            reason: format!(
+                "Transaction was submitted, but never confirmed. Hash: {}",
+                format_typed_tx_hash(&tx)
+            ),
+        },
+        SubmitTxError::ReceiptError(tx, _) => KeeperTxError::Receipt {
+            reason: format!(
+                "Reveal transaction failed on-chain. Hash: {}",
+                format_typed_tx_hash(&tx)
+            ),
+        },
+    }
+}
+
+fn format_typed_tx_hash(tx: &TypedTransaction) -> String {
+    format!("{:#x}", tx.sighash())
+}
+
+async fn execute_simple_call(
+    contract_call: ContractCall<MiddlewaresWrapper<TracedClient>, ()>,
+) -> KeeperTxResult<TxExecutionOutcome<()>> {
+    let call_name = contract_call.function.name.clone();
+    let start = Instant::now();
+
+    let pending_tx = contract_call
+        .send()
+        .await
+        .map_err(|e| KeeperTxError::Submission {
+            reason: format!("Error submitting transaction({call_name}) {e:?}"),
+        })?;
+
+    let receipt = pending_tx
+        .await
+        .map_err(|e| KeeperTxError::Confirmation {
+            reason: format!("Error waiting for transaction({call_name}) receipt: {e:?}"),
+        })?
+        .ok_or_else(|| KeeperTxError::Other {
+            reason: format!(
+                "Can't verify the transaction({call_name}), probably dropped from mempool"
+            ),
+        })?;
+
+    tracing::info!(
+        transaction_hash = &receipt.transaction_hash.to_string(),
+        "Confirmed transaction({call_name}). Receipt: {:?}",
+        receipt
+    );
+
+    if receipt.status == Some(U64::from(0u64)) {
+        return Err(KeeperTxError::Receipt {
+            reason: format!(
+                "Transaction({call_name}) reverted on-chain. Receipt: {:?}",
+                receipt
+            ),
+        });
+    }
+
+    receipt_to_outcome(
+        receipt,
+        start.elapsed(),
+        SIMPLE_TX_NUM_RETRIES,
+        SIMPLE_TX_FEE_MULTIPLIER,
+        (),
+    )
+}
+
+async fn transfer_funds(
+    client: Arc<MiddlewaresWrapper<TracedClient>>,
+    source_wallet_address: Address,
+    destination_address: Address,
+    transfer_amount: Wei,
+) -> KeeperTxResult<TxExecutionOutcome<()>> {
+    let transfer_amount_u256 = U256::from(transfer_amount.0);
+    tracing::info!(
+        "Transferring {:?} from {:?} to {:?}",
+        transfer_amount_u256,
+        source_wallet_address,
+        destination_address
+    );
+
+    let tx = TransactionRequest::new()
+        .to(destination_address)
+        .value(U256::from(transfer_amount.0))
+        .from(source_wallet_address);
+
+    let start = Instant::now();
+
+    let pending_tx = client
+        .send_transaction(tx.clone(), None)
+        .await
+        .map_err(|e| KeeperTxError::Submission {
+            reason: format!("failed to submit transfer transaction: {e:?}"),
+        })?;
+
+    let receipt = timeout(
+        Duration::from_secs(TRANSFER_CONFIRMATION_TIMEOUT_SECS),
+        pending_tx,
+    )
+    .await
+    .map_err(|_| KeeperTxError::ConfirmationTimeout {
+        reason: "Transfer transaction confirmation timeout".to_string(),
+    })?
+    .map_err(|e| KeeperTxError::Confirmation {
+        reason: format!("transfer transaction confirmation error: {e:?}"),
+    })?
+    .ok_or_else(|| KeeperTxError::Other {
+        reason: "transfer transaction, probably dropped from mempool".to_string(),
+    })?;
+
+    if receipt.status == Some(U64::from(0u64)) {
+        return Err(KeeperTxError::Receipt {
+            reason: format!("Transfer transaction failed on-chain. Receipt: {receipt:?}"),
+        });
+    }
+
+    tracing::info!(
+        "Transfer transaction confirmed: {:?}",
+        receipt.transaction_hash
+    );
+
+    receipt_to_outcome(
+        receipt,
+        start.elapsed(),
+        SIMPLE_TX_NUM_RETRIES,
+        SIMPLE_TX_FEE_MULTIPLIER,
+        (),
+    )
+}

+ 1 - 55
apps/fortuna/src/eth_utils/utils.rs

@@ -2,7 +2,6 @@ use {
     crate::{
         chain::ethereum::{PythRandomEvents, Revealed2Filter},
         eth_utils::nonce_manager::NonceManaged,
-        keeper::contract::KeeperTxContract,
     },
     anyhow::{anyhow, Result},
     backoff::ExponentialBackoff,
@@ -11,10 +10,7 @@ use {
         contract::{ContractCall, ContractError, EthLogDecode},
         middleware::Middleware,
         providers::{MiddlewareError, ProviderError},
-        signers::Signer,
-        types::{
-            transaction::eip2718::TypedTransaction, TransactionReceipt, TransactionRequest, U256,
-        },
+        types::{transaction::eip2718::TypedTransaction, TransactionReceipt, U256},
     },
     std::{
         fmt::Display,
@@ -351,53 +347,3 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
 
     Ok(receipt)
 }
-
-/// Transfer funds from the signing wallet to the destination address.
-pub async fn submit_transfer_tx<C>(
-    contract: Arc<C>,
-    destination_address: ethers::types::Address,
-    transfer_amount: U256,
-) -> Result<ethers::types::H256>
-where
-    C: KeeperTxContract + 'static,
-{
-    let wallet = contract.wallet();
-    let source_wallet_address = wallet.address();
-
-    tracing::info!(
-        "Transferring {:?} from {:?} to {:?}",
-        transfer_amount,
-        source_wallet_address,
-        destination_address
-    );
-
-    let tx = TransactionRequest::new()
-        .to(destination_address)
-        .value(transfer_amount)
-        .from(source_wallet_address);
-
-    let client = contract.client();
-    let pending_tx = client.send_transaction(tx, None).await?;
-
-    // Wait for confirmation with timeout
-    let tx_receipt = timeout(
-        Duration::from_secs(TX_CONFIRMATION_TIMEOUT_SECS),
-        pending_tx,
-    )
-    .await
-    .map_err(|_| anyhow!("Transfer transaction confirmation timeout"))?
-    .map_err(|e| anyhow!("Transfer transaction confirmation error: {:?}", e))?
-    .ok_or_else(|| anyhow!("Transfer transaction, probably dropped from mempool"))?;
-
-    // Check if transaction was successful
-    if tx_receipt.status == Some(U64::from(0)) {
-        return Err(anyhow!(
-            "Transfer transaction failed on-chain. Receipt: {:?}",
-            tx_receipt
-        ));
-    }
-
-    let tx_hash = tx_receipt.transaction_hash;
-    tracing::info!("Transfer transaction confirmed: {:?}", tx_hash);
-    Ok(tx_hash)
-}

+ 3 - 5
apps/fortuna/src/keeper.rs

@@ -1,6 +1,6 @@
 use {
     crate::{
-        api::{BlockchainState, ChainId},
+        api::BlockchainState,
         chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
         config::{EthereumConfig, KeeperConfig},
         eth_utils::traced_client::RpcMetrics,
@@ -22,7 +22,7 @@ use {
         signers::{LocalWallet, Signer},
         types::U256,
     },
-    keeper_metrics::{AccountLabel, KeeperMetrics},
+    keeper_metrics::KeeperMetrics,
     std::{collections::HashSet, str::FromStr, sync::Arc},
     tokio::{
         spawn,
@@ -113,9 +113,7 @@ pub async fn run_keeper_threads(
 
     let (tx, rx) = mpsc::channel::<BlockRange>(1000);
     // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
-    spawn(
-        watch_blocks_wrapper(chain_state.clone(), latest_safe_block, tx).in_current_span(),
-    );
+    spawn(watch_blocks_wrapper(chain_state.clone(), latest_safe_block, tx).in_current_span());
 
     // Spawn a thread for block processing with configured delays
     spawn(

+ 71 - 68
apps/fortuna/src/keeper/fee.rs

@@ -1,19 +1,13 @@
 use {
     crate::{
-        api::BlockchainState,
-        api::ChainId,
-        eth_utils::utils::{estimate_tx_cost, send_and_confirm, submit_transfer_tx},
+        api::{BlockchainState, ChainId},
         keeper::{
-            contract::KeeperTxContract,
+            contract::{KeeperTxContract, Wei},
             keeper_metrics::{AccountLabel, KeeperMetrics},
         },
     },
     anyhow::{anyhow, Result},
-    ethers::{
-        middleware::Middleware,
-        signers::Signer,
-        types::{Address, U256},
-    },
+    ethers::types::{Address, U256},
     std::sync::Arc,
     tokio::time::{self, Duration},
     tracing::{self, Instrument},
@@ -26,15 +20,16 @@ use {
 ///
 /// `other_keeper_addresses` is expected to not include the `keeper_address`, and should
 /// include the fee manager so that the fee manager wallet stays funded.
-async fn calculate_fair_fee_withdrawal_amount<M: Middleware + 'static>(
-    provider: Arc<M>,
+async fn calculate_fair_fee_withdrawal_amount<C: KeeperTxContract + ?Sized>(
+    contract: &C,
     keeper_address: Address,
     other_keeper_addresses: &[Address],
-    available_fees: U256,
-) -> Result<U256> {
+    available_fees: Wei,
+) -> Result<Wei> {
+    let available_fees_u256 = U256::from(available_fees.0);
     // Early return if no fees available
-    if available_fees.is_zero() {
-        return Ok(U256::zero());
+    if available_fees_u256.is_zero() {
+        return Ok(Wei(0));
     }
 
     // If no other keepers, withdraw all available fees
@@ -42,40 +37,46 @@ async fn calculate_fair_fee_withdrawal_amount<M: Middleware + 'static>(
         return Ok(available_fees);
     }
 
-    let current_balance = provider
-        .get_balance(keeper_address, None)
+    let current_balance = contract
+        .get_balance(keeper_address)
         .await
         .map_err(|e| anyhow!("Error while getting current keeper balance. error: {:?}", e))?;
+    let current_balance_u256 = U256::from(current_balance.0);
 
     tracing::info!(
         "Contract has available fees: {:?}, current keeper ({:?}) has balance: {:?}",
-        available_fees,
+        available_fees_u256,
         keeper_address,
-        current_balance
+        current_balance_u256
     );
 
     // Calculate total funds across all keepers + available fees
-    let mut total_funds = current_balance + available_fees;
+    let mut total_funds = current_balance_u256 + available_fees_u256;
 
     for &address in other_keeper_addresses {
-        let balance = provider.get_balance(address, None).await.map_err(|e| {
+        let balance = contract.get_balance(address).await.map_err(|e| {
             anyhow!(
                 "Error while getting keeper balance for {:?}. error: {:?}",
                 address,
                 e
             )
         })?;
-        tracing::info!("Keeper address {:?} has balance: {:?}", address, balance);
-        total_funds += balance;
+        let balance_u256 = U256::from(balance.0);
+        tracing::info!(
+            "Keeper address {:?} has balance: {:?}",
+            address,
+            balance_u256
+        );
+        total_funds += balance_u256;
     }
 
     // Calculate fair share per keeper
     let fair_share = total_funds / (other_keeper_addresses.len() + 1); // +1 for current keeper
 
     // Calculate how much current keeper should withdraw to reach fair share
-    let withdrawal_amount = if current_balance < fair_share {
-        let deficit = fair_share - current_balance;
-        std::cmp::min(deficit, available_fees)
+    let withdrawal_amount = if current_balance_u256 < fair_share {
+        let deficit = fair_share - current_balance_u256;
+        std::cmp::min(deficit, available_fees_u256)
     } else {
         U256::zero()
     };
@@ -84,11 +85,13 @@ async fn calculate_fair_fee_withdrawal_amount<M: Middleware + 'static>(
         "Fair share calculation: total_funds={:?}, fair_share={:?}, current_balance={:?}, withdrawal_amount={:?}",
         total_funds,
         fair_share,
-        current_balance,
+        current_balance_u256,
         withdrawal_amount
     );
 
-    Ok(withdrawal_amount)
+    Ok(Wei(u128::try_from(withdrawal_amount).map_err(|_| {
+        anyhow!("Withdrawal amount exceeds supported range")
+    })?))
 }
 
 #[tracing::instrument(name = "withdraw_fees", skip_all, fields())]
@@ -102,7 +105,7 @@ pub async fn withdraw_fees_wrapper<C>(
 ) where
     C: KeeperTxContract + 'static,
 {
-    let fee_manager_wallet_address = contract_as_fee_manager.wallet().address();
+    let fee_manager_wallet_address = contract_as_fee_manager.keeper_address();
 
     // Add the fee manager to the list of other keepers so that we can fairly distribute the fees
     // across the fee manager and all the keepers.
@@ -152,13 +155,11 @@ pub async fn withdraw_fees_if_necessary(
     other_keeper_addresses: Vec<Address>,
     min_balance: U256,
 ) -> Result<()> {
-    let provider = contract_as_fee_manager.provider();
-    let fee_manager_wallet = contract_as_fee_manager.wallet();
-
-    let keeper_balance = provider
-        .get_balance(keeper_address, None)
+    let keeper_balance_wei = contract_as_fee_manager
+        .get_balance(keeper_address)
         .await
         .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?;
+    let keeper_balance = U256::from(keeper_balance_wei.0);
 
     // Only withdraw if our balance is below the minimum threshold
     if keeper_balance >= min_balance {
@@ -170,23 +171,25 @@ pub async fn withdraw_fees_if_necessary(
         .await
         .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;
 
-    let available_fees = provider_info.accrued_fees_in_wei;
+    let available_fees = U256::from(provider_info.accrued_fees_in_wei.0);
 
     // Determine how much we can fairly withdraw from the contract
     let withdrawal_amount = calculate_fair_fee_withdrawal_amount(
-        Arc::new(provider.clone()),
+        contract_as_fee_manager.as_ref(),
         keeper_address,
         &other_keeper_addresses,
-        available_fees,
+        provider_info.accrued_fees_in_wei,
     )
     .await?;
 
+    let withdrawal_amount_u256 = U256::from(withdrawal_amount.0);
+
     // Only withdraw as long as we are at least doubling our keeper balance (avoids repeated withdrawals of tiny amounts)
     let min_withdrawal_amount = keeper_balance;
-    if withdrawal_amount < min_withdrawal_amount {
+    if withdrawal_amount_u256 < min_withdrawal_amount {
         // We don't have enough to meaningfully top up the balance.
         // NOTE: This log message triggers a grafana alert. If you want to change the text, please change the alert also.
-        tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up. (withdrawal_amount={:?} < min_withdrawal_amount={:?})", keeper_balance, min_balance, withdrawal_amount, min_withdrawal_amount);
+        tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up. (withdrawal_amount={:?} < min_withdrawal_amount={:?})", keeper_balance, min_balance, withdrawal_amount_u256, min_withdrawal_amount);
         return Ok(());
     }
 
@@ -194,29 +197,27 @@ pub async fn withdraw_fees_if_necessary(
         "Keeper balance {:?} below minimum {:?}, claiming {:?} out of available {:?}",
         keeper_balance,
         min_balance,
-        withdrawal_amount,
+        withdrawal_amount_u256,
         available_fees
     );
 
     // Proceed with withdrawal
-    let contract_call = contract_as_fee_manager
-        .withdraw_as_fee_manager(provider_address, withdrawal_amount.as_u128());
-    send_and_confirm(contract_call).await?;
+    contract_as_fee_manager
+        .withdraw_as_fee_manager(provider_address, withdrawal_amount)
+        .await
+        .map_err(|e| anyhow!("Failed to withdraw fees. error: {:?}", e))?;
 
     // Transfer the withdrawn funds from fee manager to keeper if fee manager is different from keeper
-    if fee_manager_wallet.address() != keeper_address {
-        submit_transfer_tx(
-            contract_as_fee_manager.clone(),
-            keeper_address,
-            withdrawal_amount,
-        )
-        .await
-        .map_err(|e| {
-            anyhow!(
-                "Failed to transfer fees from fee manager to keeper. error: {:?}",
-                e
-            )
-        })?;
+    if contract_as_fee_manager.keeper_address() != keeper_address {
+        contract_as_fee_manager
+            .transfer(keeper_address, withdrawal_amount)
+            .await
+            .map_err(|e| {
+                anyhow!(
+                    "Failed to transfer fees from fee manager to keeper. error: {:?}",
+                    e
+                )
+            })?;
     }
 
     Ok(())
@@ -300,22 +301,22 @@ where
         .await
         .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;
 
-    let wallet = contract.wallet();
-    let wallet_address = wallet.address();
+    let wallet_address = contract.keeper_address();
     if provider_info.fee_manager != wallet_address {
         return Err(anyhow!(
             "Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}",
-            contract.provider(),
+            provider_address,
             provider_info.fee_manager,
             wallet_address
         ));
     }
 
     // Calculate target window for the on-chain fee.
-    let middleware = contract.client();
     let gas_limit: u128 = u128::from(provider_info.default_gas_limit);
-    let max_callback_cost: u128 = estimate_tx_cost(middleware, legacy_tx, gas_limit)
+    let max_callback_cost: u128 = contract
+        .estimate_tx_cost(legacy_tx, provider_info.default_gas_limit)
         .await
+        .map(|wei| wei.0)
         .map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?;
 
     let account_label = AccountLabel {
@@ -347,12 +348,12 @@ where
     );
 
     // Calculate current P&L to determine if we can reduce fees.
-    let provider_client = contract.provider();
-    let current_keeper_balance = provider_client
-        .get_balance(wallet_address, None)
+    let current_keeper_balance = contract
+        .get_balance(wallet_address)
         .await
         .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?;
-    let current_keeper_fees = provider_info.accrued_fees_in_wei;
+    let current_keeper_balance = U256::from(current_keeper_balance.0);
+    let current_keeper_fees = U256::from(provider_info.accrued_fees_in_wei.0);
     let current_pnl = current_keeper_balance + current_keeper_fees;
 
     let can_reduce_fees = match high_water_pnl {
@@ -369,7 +370,7 @@ where
         }
     };
 
-    let provider_fee: u128 = provider_info.fee_in_wei.as_u128();
+    let provider_fee: u128 = provider_info.fee_in_wei.0;
     if is_chain_active
         && ((provider_fee > target_fee_max && can_reduce_fees) || provider_fee < target_fee_min)
     {
@@ -381,8 +382,10 @@ where
             provider_fee,
             target_fee
         );
-        let contract_call = contract.set_provider_fee_as_fee_manager(provider_address, target_fee);
-        send_and_confirm(contract_call).await?;
+        contract
+            .set_provider_fee_as_fee_manager(provider_address, Wei(target_fee))
+            .await
+            .map_err(|e| anyhow!("Failed to set provider fee. error: {:?}", e))?;
 
         *sequence_number_of_last_fee_update = Some(provider_info.sequence_number);
     } else {

+ 47 - 86
apps/fortuna/src/keeper/process_event.rs

@@ -1,17 +1,15 @@
 use {
     super::keeper_metrics::AccountLabel,
     crate::{
-        chain::{
-            ethereum::PythRandomErrorsErrors,
-            reader::{RequestCallbackStatus, RequestedV2Event},
-        },
-        eth_utils::utils::{submit_tx_with_backoff, SubmitTxError},
+        chain::reader::{RequestCallbackStatus, RequestedV2Event},
         history::{RequestEntryState, RequestStatus},
-        keeper::{block::ProcessParams, contract::KeeperTxContract},
+        keeper::{
+            block::ProcessParams,
+            contract::{KeeperTxContract, KeeperTxError},
+        },
     },
     anyhow::{anyhow, Result},
-    ethers::{abi::AbiDecode, contract::ContractError},
-    std::time::Duration,
+    ethers::types::{Bytes, H256, U256},
     tracing,
 };
 
@@ -141,52 +139,15 @@ where
             anyhow!("Error revealing: {:?}", e)
         })?;
 
-    let contract_call = contract.reveal_with_callback(
-        event.provider_address,
-        event.sequence_number,
-        event.user_random_number,
-        provider_revelation,
-    );
-    let error_mapper = |num_retries, e| {
-        if let backoff::Error::Transient {
-            err: SubmitTxError::GasUsageEstimateError(tx, ContractError::Revert(revert)),
-            ..
-        } = &e
-        {
-            if let Ok(PythRandomErrorsErrors::NoSuchRequest(_)) =
-                PythRandomErrorsErrors::decode(revert)
-            {
-                let err = SubmitTxError::GasUsageEstimateError(
-                    tx.clone(),
-                    ContractError::Revert(revert.clone()),
-                );
-                // Slow down the retries if the request is not found.
-                // This probably means that the request is already fulfilled via another process.
-                // After 5 retries, we return the error permanently.
-                if num_retries >= 5 {
-                    return backoff::Error::Permanent(err);
-                }
-                let retry_after_seconds = match num_retries {
-                    0 => 5,
-                    1 => 10,
-                    _ => 60,
-                };
-                return backoff::Error::Transient {
-                    err,
-                    retry_after: Some(Duration::from_secs(retry_after_seconds)),
-                };
-            }
-        }
-        e
-    };
-
-    let success = submit_tx_with_backoff(
-        contract.client(),
-        contract_call,
-        escalation_policy,
-        Some(error_mapper),
-    )
-    .await;
+    let reveal_result = contract
+        .reveal_with_callback(
+            event.provider_address,
+            event.sequence_number,
+            event.user_random_number,
+            provider_revelation,
+            escalation_policy,
+        )
+        .await;
 
     metrics
         .requests_processed
@@ -194,27 +155,28 @@ where
         .inc();
 
     status.last_updated_at = chrono::Utc::now();
-    match success {
+    match reveal_result {
         Ok(result) => {
             status.state = RequestEntryState::Completed {
-                reveal_block_number: result.receipt.block_number.unwrap_or_default().as_u64(),
-                reveal_tx_hash: result.receipt.transaction_hash,
+                reveal_block_number: result.block_number.unwrap_or_default(),
+                reveal_tx_hash: H256::from_slice(&result.tx_hash.0),
                 provider_random_number: provider_revelation,
-                gas_used: result.receipt.gas_used.unwrap_or_default(),
+                gas_used: U256::from(result.gas_used.unwrap_or_default()),
                 combined_random_number: RequestStatus::generate_combined_random_number(
                     &event.user_random_number,
                     &provider_revelation,
                 ),
-                callback_failed: result.revealed_event.callback_failed,
-                callback_return_value: result.revealed_event.callback_return_value,
-                callback_gas_used: result.revealed_event.callback_gas_used,
+                callback_failed: result.result.callback_failed,
+                callback_return_value: Bytes::from(result.result.callback_return_value.clone()),
+                callback_gas_used: u32::try_from(result.result.callback_gas_used)
+                    .unwrap_or(u32::MAX),
             };
             history.add(&status);
             tracing::info!(
                 "Processed event successfully in {:?} after {} retries. Receipt: {:?}",
                 result.duration,
                 result.num_retries,
-                result.receipt
+                result.tx_hash
             );
 
             metrics
@@ -238,15 +200,18 @@ where
                 .get_or_create(&account_label)
                 .observe(result.fee_multiplier as f64);
 
-            if let Some(gas_used) = result.receipt.gas_used {
-                let gas_used_float = gas_used.as_u128() as f64 / 1e18;
+            if let Some(gas_used) = result.gas_used {
+                let gas_used_float = gas_used as f64 / 1e18;
                 metrics
                     .total_gas_spent
                     .get_or_create(&account_label)
                     .inc_by(gas_used_float);
 
-                if let Some(gas_price) = result.receipt.effective_gas_price {
-                    let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18;
+                if let Some(gas_price) = result.effective_gas_price {
+                    let gas_fee = U256::from(gas_used)
+                        .saturating_mul(U256::from(gas_price))
+                        .as_u128() as f64
+                        / 1e18;
                     metrics
                         .total_gas_fee_spent
                         .get_or_create(&account_label)
@@ -272,32 +237,28 @@ where
                     .get_or_create(&account_label)
                     .inc();
                 // Do not display the internal error, it might include RPC details.
-                let reason = match e {
-                    SubmitTxError::GasUsageEstimateError(_, ContractError::Revert(revert)) => {
-                        format!("Reverted: {revert}")
-                    }
-                    SubmitTxError::GasLimitExceeded { limit, estimate } => {
+                let reason = match &e {
+                    KeeperTxError::Reverted { reason } => format!("Reverted: {reason}"),
+                    KeeperTxError::GasLimit { limit, estimate } => {
                         format!("Gas limit exceeded: limit = {limit}, estimate = {estimate}")
                     }
-                    SubmitTxError::GasUsageEstimateError(_, _) => {
-                        "Unable to estimate gas usage".to_string()
-                    }
-                    SubmitTxError::GasPriceEstimateError(_) => {
+                    KeeperTxError::Provider { reason } if reason == "gas_price_estimate" => {
                         "Unable to estimate gas price".to_string()
                     }
-                    SubmitTxError::SubmissionError(_, _) => {
+                    KeeperTxError::Other { reason } if reason == "gas_usage_estimate" => {
+                        "Unable to estimate gas usage".to_string()
+                    }
+                    KeeperTxError::Submission { .. } => {
                         "Error submitting the transaction on-chain".to_string()
                     }
-                    SubmitTxError::ConfirmationTimeout(tx) => format!(
-                        "Transaction was submitted, but never confirmed. Hash: {}",
-                        tx.sighash()
-                    ),
-                    SubmitTxError::ConfirmationError(tx, _) => format!(
-                        "Transaction was submitted, but never confirmed. Hash: {}",
-                        tx.sighash()
-                    ),
-                    SubmitTxError::ReceiptError(tx, _) => {
-                        format!("Reveal transaction failed on-chain. Hash: {}", tx.sighash())
+                    KeeperTxError::ConfirmationTimeout { reason } => reason.clone(),
+                    KeeperTxError::Confirmation { reason } => reason.clone(),
+                    KeeperTxError::Receipt { reason } => reason.clone(),
+                    KeeperTxError::Provider { reason } => {
+                        format!("Provider error: {reason}")
+                    }
+                    KeeperTxError::Other { reason } => {
+                        format!("Unexpected error: {reason}")
                     }
                 };
                 status.state = RequestEntryState::Failed {