Bläddra i källkod

feat(fortuna): Automated fair fee withdrawals with multiple keepers (#2827)

* feat(fortuna): separate fee manager and keeper wallets for multi-replica support

- Add fee_manager_private_key to KeeperConfig for fee manager operations
- Add known_keeper_addresses for balance comparison
- Modify withdrawal logic to use fee manager key for fee manager calls
- Only withdraw fees if current keeper has lowest balance among known keepers
- Maintain backward compatibility with existing single-key setup

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>

* fix: address PR feedback for fee manager/keeper separation

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>

* remove disable_withdrawal flag, improve control flow, update docs

* update config sample

* update docs

* update docs

* docs

* address PR feedback

* naming

* comment

* remove run config

* chore(fortuna): bump version

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Tejas Badadare <tejas@dourolabs.xyz>
Co-authored-by: Tejas Badadare <tejasbadadare@gmail.com>
devin-ai-integration[bot] 4 månader sedan
förälder
incheckning
8d349bb618

+ 1 - 1
Cargo.lock

@@ -3046,7 +3046,7 @@ dependencies = [
 
 [[package]]
 name = "fortuna"
-version = "8.0.0"
+version = "8.1.0"
 dependencies = [
  "anyhow",
  "axum 0.6.20",

+ 1 - 1
apps/fortuna/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "fortuna"
-version = "8.0.0"
+version = "8.1.0"
 edition = "2021"
 
 [lib]

+ 23 - 37
apps/fortuna/README.md

@@ -58,58 +58,45 @@ Fortuna supports running multiple replica instances for high availability and re
 
 ### Fee Management with Multiple Instances
 
-When running multiple Fortuna instances with different keeper wallets but a single provider, only one instance should handle fee management. This instance needs to run using the same private key as the fee manager, because only the registerd fee manager wallet can adjust fees and withdraw funds.
+When running multiple Fortuna instances with different keeper wallets, the system uses a fair fee distribution strategy. Each keeper will withdraw fees from the contract to maintain a balanced distribution across all known keeper addresses and the fee manager address.
+
+The fee manager (configured in the provider section) can be a separate wallet from the keeper wallets. When fees are withdrawn from the contract, they go to the fee manager wallet first, then are automatically transferred to the requesting keeper wallet.
+
+**Key Configuration:**
+- All instances should have `keeper.private_key` and `keeper.fee_manager_private_key` provided so that each keeper can top itself up as fee manager from contract fees.
 
 ### Example Configurations
 
-**Two Replica Setup with Fee Management:**
 ```yaml
-# Replica 0 (fee manager wallet) - handles even sequence numbers + fee management
+# Replica 0 - handles even sequence numbers + fee management
 keeper:
   private_key:
+    value: 0x<keeper_0_private_key>
+  fee_manager_private_key:
     value: 0x<fee_manager_private_key>
+  other_keeper_addresses:
+    - 0x<keeper_0_address>  # This replica's address
+    - 0x<keeper_1_address>  # Other replica's address
   replica_config:
     replica_id: 0
     total_replicas: 2
-    backup_delay_seconds: 30
-  run_config:
-    disable_fee_adjustment: false  # Enable fee management (default)
-    disable_fee_withdrawal: false
+    backup_delay_seconds: 15
+
 
-# Replica 1 (non-fee-manager wallet) - handles odd sequence numbers only
+# Replica 1 - handles odd sequence numbers
 keeper:
   private_key:
-    value: 0x<other_keeper_private_key>
+    value: 0x<keeper_1_private_key>
+  fee_manager_private_key:
+    value: 0x<fee_manager_private_key>
+  other_keeper_addresses:
+    - 0x<keeper_0_address>  # Other replica's address
+    - 0x<keeper_1_address>  # This replica's address
   replica_config:
     replica_id: 1
     total_replicas: 2
-    backup_delay_seconds: 30
-  run_config:
-    disable_fee_adjustment: true   # Disable fee management
-    disable_fee_withdrawal: true
-```
-
-**Three Replica Setup:**
-```yaml
-# Replica 0 (fee manager wallet) - handles sequence numbers 0, 3, 6, 9, ... + fee management
-keeper:
-  replica_config:
-    replica_id: 0
-    total_replicas: 3
-    backup_delay_seconds: 30
-  run_config:
-    disable_fee_adjustment: false
-    disable_fee_withdrawal: false
+    backup_delay_seconds: 15
 
-# Replicas 1 & 2 (non-fee-manager wallets) - request processing only
-keeper:
-  replica_config:
-    replica_id: 1  # or 2
-    total_replicas: 3
-    backup_delay_seconds: 30
-  run_config:
-    disable_fee_adjustment: true
-    disable_fee_withdrawal: true
 ```
 
 ### Deployment Considerations
@@ -117,7 +104,7 @@ keeper:
 1. **Separate Wallets**: Each replica MUST use a different private key to avoid nonce conflicts
 2. **Fee Manager Assignment**: Set the provider's `fee_manager` address to match the primary instance's keeper wallet
 3. **Thread Configuration**: Only enable fee management threads on the instance using the fee manager wallet
-4. **Backup Delay**: Set `backup_delay_seconds` long enough to allow primary replica to process requests, but short enough for acceptable failover time (recommended: 30-60 seconds)
+4. **Backup Delay**: Set `backup_delay_seconds` long enough to allow primary replica to process requests, but short enough for acceptable failover time (recommended: 10-30 seconds)
 5. **Monitoring**: Monitor each replica's processing metrics to ensure proper load distribution
 6. **Gas Management**: Each replica needs sufficient ETH balance for gas fees
 
@@ -127,7 +114,6 @@ keeper:
 - Backup replicas wait for `backup_delay_seconds` before checking if request is still unfulfilled
 - If request is already fulfilled during the delay, backup replica skips processing
 - This prevents duplicate transactions and wasted gas while ensuring reliability
-- Fee management operations (adjustment/withdrawal) only occur on an instance where the keeper wallet is the fee manager wallet.
 
 ## Local Development
 

+ 19 - 20
apps/fortuna/config.sample.yaml

@@ -73,9 +73,9 @@ provider:
     # For production, you can store the private key in a file.
     # file: secret.txt
 
-  # Set this to the address of your keeper wallet if you would like the keeper wallet to
-  # be able to withdraw fees from the contract.
-  fee_manager: 0xADDRESS
+  # The address of the fee manager for the provider. Only used for syncing the fee manager address to the contract.
+  # Fee withdrawals are handled by the fee manager private key defined in the keeper config.
+  fee_manager: 0xfee
 keeper:
   # An ethereum wallet address and private key for running the keeper service.
   # This does not have to be the same key as the provider's key above.
@@ -87,25 +87,24 @@ keeper:
     # For production, you can store the private key in a file.
     # file: keeper-key.txt
 
-  # Runtime configuration for the keeper service
-  # Optional: Configure which keeper threads to disable. If running multiple replicas,
-  # only a single replica should have the fee adjustment and withdrawal threads enabled.
-  # run_config:
-  #   disable_fee_adjustment: false    # Set to true to disable automatic fee adjustment
-  #   disable_fee_withdrawal: false    # Set to true to disable automatic fee withdrawal
+  # Fee manager private key for fee manager operations (if not provided, fee withdrawals won't happen)
+  fee_manager_private_key:
+    value: 0xabcd
+    # file: fee-manager-key.txt
+
+  # List of other known keeper wallet addresses for balance comparison and fair fee withdrawals.
+  # Do not include this keeper's address.
+  other_keeper_addresses:
+    - 0x1234
+    - 0x5678
 
   # Multi-replica configuration
   # Optional: Multi-replica configuration for high availability and load distribution
   # Uncomment and configure for production deployments with multiple Fortuna instances
-  # replica_config:
-  #   replica_id: 0              # Unique identifier for this replica (0, 1, 2, ...)
-  #   total_replicas: 2          # Total number of replica instances running
-  #   backup_delay_seconds: 30   # Seconds to wait before processing other replicas' requests
-  #
-  # Example configurations:
-  #
-  # Two-replica setup (Blue/Green):
-  # - Replica 0: handles even sequence numbers (0, 2, 4, ...)
-  # - Replica 1: handles odd sequence numbers (1, 3, 5, ...)
-  #
+  # See the README for more details.
+  replica_config:
+    replica_id: 0              # Unique identifier for this replica (0, 1, 2, ...)
+    total_replicas: 2          # Total number of replica instances running
+    backup_delay_seconds: 30   # Seconds to wait before processing other replicas' requests
+
   # IMPORTANT: Each replica must use a different private_key to avoid nonce conflicts!

+ 11 - 19
apps/fortuna/src/command/run.rs

@@ -3,10 +3,7 @@ use {
         api::{self, ApiBlockChainState, BlockchainState, ChainId},
         chain::ethereum::InstrumentedPythContract,
         command::register_provider::CommitmentMetadata,
-        config::{
-            Commitment, Config, EthereumConfig, ProviderConfig, ReplicaConfig, RunConfig,
-            RunOptions,
-        },
+        config::{Commitment, Config, EthereumConfig, KeeperConfig, ProviderConfig, RunOptions},
         eth_utils::traced_client::RpcMetrics,
         history::History,
         keeper::{self, keeper_metrics::KeeperMetrics},
@@ -103,9 +100,6 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
     }
 
-    let keeper_replica_config = config.keeper.replica_config.clone();
-    let keeper_run_config = config.keeper.run_config.clone();
-
     let chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>> = Arc::new(RwLock::new(
         config
             .chains
@@ -118,23 +112,25 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         keeper_metrics.add_chain(chain_id.clone(), config.provider.address);
         let keeper_metrics = keeper_metrics.clone();
         let keeper_private_key_option = keeper_private_key_option.clone();
-        let keeper_replica_config = keeper_replica_config.clone();
-        let keeper_run_config = keeper_run_config.clone();
         let chains = chains.clone();
         let secret_copy = secret.clone();
         let rpc_metrics = rpc_metrics.clone();
         let provider_config = config.provider.clone();
         let history = history.clone();
+        let keeper_config_base = config.keeper.clone();
         spawn(async move {
             loop {
+                let keeper_config = if keeper_private_key_option.is_some() {
+                    Some(keeper_config_base.clone())
+                } else {
+                    None
+                };
                 let setup_result = setup_chain_and_run_keeper(
                     provider_config.clone(),
                     &chain_id,
                     chain_config.clone(),
                     keeper_metrics.clone(),
-                    keeper_private_key_option.clone(),
-                    keeper_replica_config.clone(),
-                    keeper_run_config.clone(),
+                    keeper_config,
                     chains.clone(),
                     &secret_copy,
                     history.clone(),
@@ -184,9 +180,7 @@ async fn setup_chain_and_run_keeper(
     chain_id: &ChainId,
     chain_config: EthereumConfig,
     keeper_metrics: Arc<KeeperMetrics>,
-    keeper_private_key_option: Option<String>,
-    keeper_replica_config: Option<ReplicaConfig>,
-    keeper_run_config: RunConfig,
+    keeper_config: Option<KeeperConfig>,
     chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
     secret_copy: &str,
     history: Arc<History>,
@@ -206,11 +200,9 @@ async fn setup_chain_and_run_keeper(
         chain_id.clone(),
         ApiBlockChainState::Initialized(state.clone()),
     );
-    if let Some(keeper_private_key) = keeper_private_key_option {
+    if let Some(keeper_config) = keeper_config {
         keeper::run_keeper_threads(
-            keeper_private_key,
-            keeper_replica_config,
-            keeper_run_config,
+            keeper_config,
             chain_config,
             state,
             keeper_metrics.clone(),

+ 12 - 9
apps/fortuna/src/config.rs

@@ -300,8 +300,8 @@ pub struct ProviderConfig {
     #[serde(default = "default_chain_sample_interval")]
     pub chain_sample_interval: u64,
 
-    /// The address of the fee manager for the provider. Set this value to the keeper wallet address to
-    /// enable keeper balance top-ups.
+    /// The address of the fee manager for the provider. Only used for syncing the fee manager address to the contract.
+    /// Fee withdrawals are handled by the fee manager private key defined in the keeper config.
     pub fee_manager: Option<Address>,
 }
 
@@ -314,10 +314,6 @@ pub struct RunConfig {
     /// Disable automatic fee adjustment threads
     #[serde(default)]
     pub disable_fee_adjustment: bool,
-
-    /// Disable automatic fee withdrawal threads
-    #[serde(default)]
-    pub disable_fee_withdrawal: bool,
 }
 
 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
@@ -342,12 +338,19 @@ pub struct KeeperConfig {
     /// should ensure this is a different key in order to reduce the severity of security breaches.
     pub private_key: SecretString,
 
+    /// The fee manager's private key for fee manager operations.
+    /// This key is used to withdraw fees from the contract as the fee manager.
+    /// Multiple replicas can share the same fee manager private key but different keeper keys (`private_key`).
     #[serde(default)]
-    pub replica_config: Option<ReplicaConfig>,
+    pub fee_manager_private_key: Option<SecretString>,
 
-    /// Runtime configuration for the keeper service
+    /// The addresses of other keepers in the replica set (excluding the current keeper).
+    /// This is used to distribute fees fairly across all keepers.
     #[serde(default)]
-    pub run_config: RunConfig,
+    pub other_keeper_addresses: Vec<Address>,
+
+    #[serde(default)]
+    pub replica_config: Option<ReplicaConfig>,
 }
 
 // A secret is a string that can be provided either as a literal in the config,

+ 53 - 2
apps/fortuna/src/eth_utils/utils.rs

@@ -1,5 +1,7 @@
 use {
-    crate::eth_utils::nonce_manager::NonceManaged,
+    crate::{
+        chain::ethereum::InstrumentedSignablePythContract, eth_utils::nonce_manager::NonceManaged,
+    },
     anyhow::{anyhow, Result},
     backoff::ExponentialBackoff,
     ethabi::ethereum_types::U64,
@@ -7,7 +9,10 @@ use {
         contract::{ContractCall, ContractError},
         middleware::Middleware,
         providers::ProviderError,
-        types::{transaction::eip2718::TypedTransaction, TransactionReceipt, U256},
+        signers::Signer,
+        types::{
+            transaction::eip2718::TypedTransaction, TransactionReceipt, TransactionRequest, U256,
+        },
     },
     std::{
         fmt::Display,
@@ -306,3 +311,49 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
 
     Ok(receipt)
 }
+
+/// Transfer funds from the signing wallet to the destination address.
+pub async fn submit_transfer_tx(
+    contract: Arc<InstrumentedSignablePythContract>,
+    destination_address: ethers::types::Address,
+    transfer_amount: U256,
+) -> Result<ethers::types::H256> {
+    let source_wallet_address = contract.wallet().address();
+
+    tracing::info!(
+        "Transferring {:?} from {:?} to {:?}",
+        transfer_amount,
+        source_wallet_address,
+        destination_address
+    );
+
+    let tx = TransactionRequest::new()
+        .to(destination_address)
+        .value(transfer_amount)
+        .from(source_wallet_address);
+
+    let client = contract.client();
+    let pending_tx = client.send_transaction(tx, None).await?;
+
+    // Wait for confirmation with timeout
+    let tx_receipt = timeout(
+        Duration::from_secs(TX_CONFIRMATION_TIMEOUT_SECS),
+        pending_tx,
+    )
+    .await
+    .map_err(|_| anyhow!("Transfer transaction confirmation timeout"))?
+    .map_err(|e| anyhow!("Transfer transaction confirmation error: {:?}", e))?
+    .ok_or_else(|| anyhow!("Transfer transaction, probably dropped from mempool"))?;
+
+    // Check if transaction was successful
+    if tx_receipt.status == Some(U64::from(0)) {
+        return Err(anyhow!(
+            "Transfer transaction failed on-chain. Receipt: {:?}",
+            tx_receipt
+        ));
+    }
+
+    let tx_hash = tx_receipt.transaction_hash;
+    tracing::info!("Transfer transaction confirmed: {:?}", tx_hash);
+    Ok(tx_hash)
+}

+ 34 - 15
apps/fortuna/src/keeper.rs

@@ -2,7 +2,7 @@ use {
     crate::{
         api::{BlockchainState, ChainId},
         chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
-        config::{EthereumConfig, ReplicaConfig, RunConfig},
+        config::{EthereumConfig, KeeperConfig},
         eth_utils::traced_client::RpcMetrics,
         history::History,
         keeper::{
@@ -17,6 +17,7 @@ use {
             },
         },
     },
+    anyhow,
     ethers::{signers::Signer, types::U256},
     keeper_metrics::{AccountLabel, KeeperMetrics},
     std::{collections::HashSet, sync::Arc},
@@ -57,9 +58,7 @@ pub enum RequestState {
 #[allow(clippy::too_many_arguments)] // Top level orchestration function that needs to configure several threads
 #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
 pub async fn run_keeper_threads(
-    keeper_private_key: String,
-    keeper_replica_config: Option<ReplicaConfig>,
-    keeper_run_config: RunConfig,
+    keeper_config: KeeperConfig,
     chain_eth_config: EthereumConfig,
     chain_state: BlockchainState,
     metrics: Arc<KeeperMetrics>,
@@ -70,6 +69,11 @@ pub async fn run_keeper_threads(
     let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
     tracing::info!("Latest safe block: {}", &latest_safe_block);
 
+    let keeper_private_key = keeper_config.private_key.load()?.ok_or_else(|| {
+        anyhow::anyhow!("Keeper private key is required but not provided in config")
+    })?;
+
+    // Contract that uses the keeper wallet to send transactions
     let contract = Arc::new(InstrumentedSignablePythContract::from_config(
         &chain_eth_config,
         &keeper_private_key,
@@ -86,7 +90,7 @@ pub async fn run_keeper_threads(
         chain_state: chain_state.clone(),
         contract: contract.clone(),
         escalation_policy: chain_eth_config.escalation_policy.to_policy(),
-        replica_config: keeper_replica_config,
+        replica_config: keeper_config.replica_config.clone(),
         metrics: metrics.clone(),
         fulfilled_requests_cache,
         history,
@@ -117,26 +121,39 @@ pub async fn run_keeper_threads(
         .in_current_span(),
     );
 
-    // Spawn a thread that watches the keeper wallet balance and submits withdrawal transactions as needed to top-up the balance.
-    if !keeper_run_config.disable_fee_withdrawal {
+    // If fee manager private key is provided, spawn fee withdrawal and adjustment threads
+    let fee_manager_private_key = if let Some(ref secret) = keeper_config.fee_manager_private_key {
+        secret.load()?
+    } else {
+        None
+    };
+
+    if let Some(fee_manager_private_key) = fee_manager_private_key {
+        let contract_as_fee_manager = Arc::new(InstrumentedSignablePythContract::from_config(
+            &chain_eth_config,
+            &fee_manager_private_key,
+            chain_state.id.clone(),
+            rpc_metrics.clone(),
+            chain_state.network_id,
+        )?);
+
+        // Spawn a thread that periodically withdraws fees to the fee manager and keeper.
         spawn(
             withdraw_fees_wrapper(
-                contract.clone(),
+                contract_as_fee_manager.clone(),
                 chain_state.provider_address,
                 WITHDRAW_INTERVAL,
                 U256::from(chain_eth_config.min_keeper_balance),
+                keeper_address,
+                keeper_config.other_keeper_addresses.clone(),
             )
             .in_current_span(),
         );
-    } else {
-        tracing::info!("Fee withdrawal thread disabled by configuration");
-    }
 
-    // Spawn a thread that periodically adjusts the provider fee.
-    if !keeper_run_config.disable_fee_adjustment {
+        // Spawn a thread that periodically adjusts the provider fee.
         spawn(
             adjust_fee_wrapper(
-                contract.clone(),
+                contract_as_fee_manager.clone(),
                 chain_state.clone(),
                 chain_state.provider_address,
                 ADJUST_FEE_INTERVAL,
@@ -154,7 +171,9 @@ pub async fn run_keeper_threads(
             .in_current_span(),
         );
     } else {
-        tracing::info!("Fee adjustment thread disabled by configuration");
+        tracing::warn!(
+            "Fee manager private key not provided - fee withdrawal and adjustment threads will not run."
+        );
     }
 
     spawn(update_commitments_loop(contract.clone(), chain_state.clone()).in_current_span());

+ 159 - 21
apps/fortuna/src/keeper/fee.rs

@@ -2,7 +2,7 @@ use {
     crate::{
         api::BlockchainState,
         chain::ethereum::InstrumentedSignablePythContract,
-        eth_utils::utils::{estimate_tx_cost, send_and_confirm},
+        eth_utils::utils::{estimate_tx_cost, send_and_confirm, submit_transfer_tx},
         keeper::{AccountLabel, ChainId, KeeperMetrics},
     },
     anyhow::{anyhow, Result},
@@ -16,57 +16,195 @@ use {
     tracing::{self, Instrument},
 };
 
+/// Determines the amount of fees to withdraw based on fair distribution.
+/// Each keeper will try to withdraw up to their fair share of the fees (T/N)
+/// where T is the total fees across all known keepers and the contract, and N is the
+/// number of known keepers.
+///
+/// `other_keeper_addresses` is expected to not include the `keeper_address`, and should
+/// include the fee manager so that the fee manager wallet stays funded.
+async fn calculate_fair_fee_withdrawal_amount<M: Middleware + 'static>(
+    provider: Arc<M>,
+    keeper_address: Address,
+    other_keeper_addresses: &[Address],
+    available_fees: U256,
+) -> Result<U256> {
+    // Early return if no fees available
+    if available_fees.is_zero() {
+        return Ok(U256::zero());
+    }
+
+    // If no other keepers, withdraw all available fees
+    if other_keeper_addresses.is_empty() {
+        return Ok(available_fees);
+    }
+
+    let current_balance = provider
+        .get_balance(keeper_address, None)
+        .await
+        .map_err(|e| anyhow!("Error while getting current keeper balance. error: {:?}", e))?;
+
+    // Calculate total funds across all keepers + available fees
+    let mut total_funds = current_balance + available_fees;
+
+    for &address in other_keeper_addresses {
+        let balance = provider.get_balance(address, None).await.map_err(|e| {
+            anyhow!(
+                "Error while getting keeper balance for {:?}. error: {:?}",
+                address,
+                e
+            )
+        })?;
+        total_funds += balance;
+    }
+
+    // Calculate fair share per keeper
+    let fair_share = total_funds / (other_keeper_addresses.len() + 1); // +1 for current keeper
+
+    // Calculate how much current keeper should withdraw to reach fair share
+    let withdrawal_amount = if current_balance < fair_share {
+        let deficit = fair_share - current_balance;
+        std::cmp::min(deficit, available_fees)
+    } else {
+        U256::zero()
+    };
+
+    tracing::info!(
+        "Fair share calculation: total_funds={:?}, fair_share={:?}, current_balance={:?}, withdrawal_amount={:?}",
+        total_funds,
+        fair_share,
+        current_balance,
+        withdrawal_amount
+    );
+
+    Ok(withdrawal_amount)
+}
+
 #[tracing::instrument(name = "withdraw_fees", skip_all, fields())]
 pub async fn withdraw_fees_wrapper(
-    contract: Arc<InstrumentedSignablePythContract>,
+    contract_as_fee_manager: Arc<InstrumentedSignablePythContract>,
     provider_address: Address,
     poll_interval: Duration,
     min_balance: U256,
+    keeper_address: Address,
+    other_keeper_addresses: Vec<Address>,
 ) {
+    let fee_manager_wallet = contract_as_fee_manager.wallet().address();
+
+    // Add the fee manager to the list of other keepers so that we can fairly distribute the fees
+    // across the fee manager and all the keepers.
+    let mut other_keepers_and_fee_mgr = other_keeper_addresses.clone();
+    other_keepers_and_fee_mgr.push(contract_as_fee_manager.wallet().address());
+
     loop {
-        if let Err(e) = withdraw_fees_if_necessary(contract.clone(), provider_address, min_balance)
-            .in_current_span()
-            .await
+        // Top up the fee manager balance
+        // Do this before attempting to top up the keeper balance, since we need a funded
+        // fee manager to be able to withdraw & transfer funds to the keeper.
+        if let Err(e) = withdraw_fees_if_necessary(
+            contract_as_fee_manager.clone(),
+            provider_address,
+            fee_manager_wallet,
+            other_keepers_and_fee_mgr.clone(),
+            min_balance,
+        )
+        .in_current_span()
+        .await
+        {
+            tracing::error!("Withdrawing fees to fee manager. error: {:?}", e);
+        }
+
+        // Top up the keeper balance
+        if let Err(e) = withdraw_fees_if_necessary(
+            contract_as_fee_manager.clone(),
+            provider_address,
+            keeper_address,
+            other_keepers_and_fee_mgr.clone(),
+            min_balance,
+        )
+        .in_current_span()
+        .await
         {
-            tracing::error!("Withdrawing fees. error: {:?}", e);
+            tracing::error!("Withdrawing fees to keeper. error: {:?}", e);
         }
+
         time::sleep(poll_interval).await;
     }
 }
 
 /// Withdraws accumulated fees in the contract as needed to maintain the balance of the keeper wallet.
 pub async fn withdraw_fees_if_necessary(
-    contract: Arc<InstrumentedSignablePythContract>,
+    contract_as_fee_manager: Arc<InstrumentedSignablePythContract>,
     provider_address: Address,
+    keeper_address: Address,
+    other_keeper_addresses: Vec<Address>,
     min_balance: U256,
 ) -> Result<()> {
-    let provider = contract.provider();
-    let wallet = contract.wallet();
+    let provider = contract_as_fee_manager.provider();
+    let fee_manager_wallet = contract_as_fee_manager.wallet();
 
     let keeper_balance = provider
-        .get_balance(wallet.address(), None)
+        .get_balance(keeper_address, None)
         .await
         .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?;
 
-    let provider_info = contract
+    // Only withdraw if our balance is below the minimum threshold
+    if keeper_balance >= min_balance {
+        return Ok(());
+    }
+
+    let provider_info = contract_as_fee_manager
         .get_provider_info_v2(provider_address)
         .call()
         .await
         .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;
 
-    if provider_info.fee_manager != wallet.address() {
-        return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", provider, provider_info.fee_manager, wallet.address()));
-    }
+    let available_fees = U256::from(provider_info.accrued_fees_in_wei);
 
-    let fees = provider_info.accrued_fees_in_wei;
+    // Determine how much we can fairly withdraw from the contract
+    let withdrawal_amount = calculate_fair_fee_withdrawal_amount(
+        Arc::new(provider.clone()),
+        keeper_address,
+        &other_keeper_addresses,
+        available_fees,
+    )
+    .await?;
 
-    if keeper_balance < min_balance && U256::from(fees) > min_balance {
-        tracing::info!("Claiming accrued fees...");
-        let contract_call = contract.withdraw_as_fee_manager(provider_address, fees);
-        send_and_confirm(contract_call).await?;
-    } else if keeper_balance < min_balance {
+    // Only withdraw as long as we are at least doubling our keeper balance (avoids repeated withdrawals of tiny amounts)
+    let min_withdrawal_amount = keeper_balance;
+    if withdrawal_amount < min_withdrawal_amount {
+        // We don't have enough to meaningfully top up the balance.
         // NOTE: This log message triggers a grafana alert. If you want to change the text, please change the alert also.
-        tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up.", keeper_balance, min_balance)
+        tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up.", keeper_balance, min_balance);
+        return Ok(());
+    }
+
+    tracing::info!(
+        "Keeper balance {:?} below minimum {:?}, claiming {:?} out of available {:?}",
+        keeper_balance,
+        min_balance,
+        withdrawal_amount,
+        available_fees
+    );
+
+    // Proceed with withdrawal
+    let contract_call = contract_as_fee_manager
+        .withdraw_as_fee_manager(provider_address, withdrawal_amount.as_u128());
+    send_and_confirm(contract_call).await?;
+
+    // Transfer the withdrawn funds from fee manager to keeper if fee manager is different from keeper
+    if fee_manager_wallet.address() != keeper_address {
+        submit_transfer_tx(
+            contract_as_fee_manager.clone(),
+            keeper_address,
+            withdrawal_amount,
+        )
+        .await
+        .map_err(|e| {
+            anyhow!(
+                "Failed to transfer fees from fee manager to keeper. error: {:?}",
+                e
+            )
+        })?;
     }
 
     Ok(())