Переглянути джерело

feat(argus): Implement Keeper state and main request processing loop (#2542)

* chore: remove fortuna-specific keeper code

* feat: first pass at keeper fulfillment state and logic

* refactor: replace EscalationPolicy with a stub for now

* fix: address pr comments
Tejas Badadare 7 місяців тому
батько
коміт
501b04f360

+ 74 - 2
apps/argus/Cargo.lock

@@ -132,6 +132,7 @@ name = "argus"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "async-trait",
  "axum",
  "axum-macros",
  "axum-test",
@@ -148,6 +149,7 @@ dependencies = [
  "futures-locks",
  "hex",
  "lazy_static",
+ "mockall",
  "once_cell",
  "prometheus-client",
  "pythnet-sdk",
@@ -184,9 +186,9 @@ dependencies = [
 
 [[package]]
 name = "async-trait"
-version = "0.1.74"
+version = "0.1.88"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9"
+checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1060,6 +1062,12 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "downcast"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
+
 [[package]]
 name = "dtoa"
 version = "1.0.9"
@@ -1632,6 +1640,12 @@ dependencies = [
  "utoipa-swagger-ui",
 ]
 
+[[package]]
+name = "fragile"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
+
 [[package]]
 name = "fs2"
 version = "0.4.3"
@@ -2369,6 +2383,32 @@ dependencies = [
  "windows-sys",
 ]
 
+[[package]]
+name = "mockall"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2"
+dependencies = [
+ "cfg-if",
+ "downcast",
+ "fragile",
+ "mockall_derive",
+ "predicates",
+ "predicates-tree",
+]
+
+[[package]]
+name = "mockall_derive"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898"
+dependencies = [
+ "cfg-if",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.66",
+]
+
 [[package]]
 name = "native-tls"
 version = "0.2.11"
@@ -2852,6 +2892,32 @@ version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
 
+[[package]]
+name = "predicates"
+version = "3.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573"
+dependencies = [
+ "anstyle",
+ "predicates-core",
+]
+
+[[package]]
+name = "predicates-core"
+version = "1.0.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa"
+
+[[package]]
+name = "predicates-tree"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c"
+dependencies = [
+ "predicates-core",
+ "termtree",
+]
+
 [[package]]
 name = "prettyplease"
 version = "0.2.15"
@@ -3966,6 +4032,12 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "termtree"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
+
 [[package]]
 name = "thiserror"
 version = "1.0.61"

+ 2 - 0
apps/argus/Cargo.toml

@@ -40,7 +40,9 @@ chrono = { version = "0.4.38", features = [
 backoff = { version = "0.4.0", features = ["futures", "tokio"] }
 thiserror = "1.0.61"
 futures-locks = "0.7.1"
+async-trait = "0.1.88"
 
 
 [dev-dependencies]
+mockall = "0.13.1"
 axum-test = "13.1.1"

+ 2 - 0
apps/argus/src/keeper.rs

@@ -21,7 +21,9 @@ use {
 };
 
 pub(crate) mod fee;
+pub(crate) mod fulfillment_task;
 pub(crate) mod keeper_metrics;
+pub(crate) mod state;
 pub(crate) mod track;
 
 /// Track metrics in this interval

+ 1 - 1
apps/argus/src/keeper/fee.rs

@@ -3,13 +3,13 @@ use {
         api::BlockchainState, chain::ethereum::InstrumentedSignablePythContract,
         keeper::AccountLabel, keeper::ChainId, keeper::KeeperMetrics,
     },
-    fortuna::eth_utils::utils::{estimate_tx_cost, send_and_confirm},
     anyhow::{anyhow, Result},
     ethers::{
         middleware::Middleware,
         signers::Signer,
         types::{Address, U256},
     },
+    fortuna::eth_utils::utils::{estimate_tx_cost, send_and_confirm},
     std::sync::Arc,
     tokio::time::{self, Duration},
     tracing::{self, Instrument},

+ 51 - 0
apps/argus/src/keeper/fulfillment_task.rs

@@ -0,0 +1,51 @@
+use anyhow::Result;
+use tokio::task::JoinHandle;
+
+use super::state::{EscalationPolicy, PulseRequest};
+use async_trait::async_trait;
+
+#[allow(dead_code)]
+#[derive(Debug)]
+pub struct RequestFulfillmentTask {
+    /// If None, the task hasn't been spawned. If Some(fut), task is in flight or completed.
+    pub task: Option<JoinHandle<Result<()>>>,
+    pub retries: u32,
+    pub success: bool,
+
+    // The error received during fulfillment if `success` is false.
+    // We don't consider the consumer callback reverting as a failure since we catch those
+    // in the Pulse contract. Thus, this should only happen if there's a transient RPC error
+    // (tx failed to land, out of gas, etc)
+    pub error: Option<String>,
+}
+
+#[async_trait]
+pub trait RequestFulfiller: Send + Sync + 'static {
+    #[allow(dead_code)]
+    async fn fulfill_request(
+        &self,
+        request: PulseRequest,
+        hermes_url: &str,
+        escalation_policy: EscalationPolicy,
+    ) -> Result<()>;
+}
+
+#[allow(dead_code)]
+pub struct DefaultRequestFulfiller;
+
+#[async_trait]
+impl RequestFulfiller for DefaultRequestFulfiller {
+    /// Core logic of fulfilling a Pulse request
+    async fn fulfill_request(
+        &self,
+        _request: PulseRequest,
+        _hermes_url: &str,
+        _escalation_policy: EscalationPolicy,
+    ) -> Result<()> {
+        // TODO:
+        // 1. get price update by calling hermes
+        // 2. create contract call and submit it with escalation policy
+        // 3. validate receipt from tx
+        Ok(())
+    }
+}

+ 1 - 0
apps/argus/src/keeper/keeper_metrics.rs

@@ -22,6 +22,7 @@ pub struct ChainIdLabel {
 }
 
 pub struct KeeperMetrics {
+    // TODO: reevaluate what metrics are useful for argus
     pub current_sequence_number: Family<AccountLabel, Gauge>,
     pub end_sequence_number: Family<AccountLabel, Gauge>,
     pub balance: Family<AccountLabel, Gauge<f64, AtomicU64>>,

+ 583 - 0
apps/argus/src/keeper/state.rs

@@ -0,0 +1,583 @@
+//! Keeper state management module.
+//!
+//! This module provides the state layer for the keeper, responsible for tracking
+//! and managing on-chain price update requests. It maintains the current state of
+//! pending requests and their fulfillment status.
+
+use std::{
+    collections::{HashMap, HashSet},
+    sync::Arc,
+};
+
+use super::{
+    fulfillment_task::{RequestFulfiller, RequestFulfillmentTask},
+    keeper_metrics::KeeperMetrics,
+};
+use ethers::types::Address;
+use tokio::sync::RwLock;
+use tracing::{error, info};
+use url::Url;
+
+/// The price request from the Pulse contract (only fields useful in Argus are present here.)
+// TODO: Get this from somewhere else, SDK perhaps?
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
+pub struct PulseRequest {
+    pub sequence_number: u64,
+    pub feed_id_prefixes: Vec<[u8; 8]>,
+
+    // The timestamp at which the callback should be fulfilled
+    pub publish_time: u64,
+
+    // Max gas the user's callback can consume. Passed as a parameter
+    // to the user callback by Pulse.executeCallback.
+    pub callback_gas_limit: u32,
+
+    // Provider's address
+    pub provider: Address,
+}
+// FIXME: Stub EscalationPolicy until we need it. At that point we should
+// refactor it out of Fortuna into a common SDK.
+#[derive(Debug, Clone)]
+pub struct EscalationPolicy;
+
+impl Default for EscalationPolicy {
+    fn default() -> Self {
+        Self {}
+    }
+}
+
+#[allow(dead_code)]
+pub struct KeeperState {
+    /// All currently fulfillable requests from the Pulse contract
+    pub pending_requests: Arc<RwLock<HashMap<PulseRequest, RequestFulfillmentTask>>>,
+
+    /// Map from a prefix feed ID prefix (the values stored in the on-chain requests)
+    /// to the actual price feed ID, which is queryable in Hermes.
+    ///
+    /// NOTE: Maybe support querying by prefix in Hermes? that way we don't have to keep
+    /// an up-to-date map in Argus since that's a little clunky, and we can avoid
+    /// failing to recognize freshly listed IDs if our map is stale.
+    /// OR, we fetch all price ids from Hermes every time and find the prefix.
+    pub prefix_to_price_ids: Arc<HashMap<String, String>>,
+
+    /// The time period after a request's publish_time during which only the requested provider
+    /// can fulfill the request.
+    /// After this period lapses, any provider can fulfill it (TODO: for an extra reward?)
+    pub exclusivity_period_seconds: u32,
+
+    /// The amount of time a request can retry until it's considered unfulfillable and is ignored.
+    pub failure_timeout_seconds: u64,
+
+    /// Policy that defines the internal retries for landing the callback execution tx.
+    /// Increases gas and fees until the tx lands.
+    pub escalation_policy: EscalationPolicy,
+
+    /// The Hermes endpoint to fetch price data from
+    pub hermes_url: Url,
+
+    /// The public key of the provider whose requests this keeper will respond to.
+    pub provider_address: Address,
+
+    /// RequestFulfiller implementor that can execute the callback request
+    pub request_fulfiller: Arc<dyn RequestFulfiller>,
+
+    /// Metrics for tracking keeper performance
+    /// TODO: emit metrics
+    pub metrics: Arc<KeeperMetrics>,
+}
+
+impl KeeperState {
+    #[allow(dead_code)]
+    /// Update the set of pending requests. Add any new requests to the set,
+    /// remove any missing requests (these have been fulfilled/disappeared.)
+    pub async fn update(&mut self, incoming: Vec<PulseRequest>) {
+        let mut pending_requests = self.pending_requests.write().await;
+
+        // Create a set of sequence numbers from the new requests
+        let incoming_sequence_numbers: HashSet<u64> =
+            incoming.iter().map(|req| req.sequence_number).collect();
+
+        // Remove requests that are no longer present
+        pending_requests.retain(|req, _| incoming_sequence_numbers.contains(&req.sequence_number));
+
+        // Add new requests that aren't already being tracked
+        for request in incoming {
+            if !pending_requests.contains_key(&request) {
+                pending_requests.insert(
+                    request,
+                    RequestFulfillmentTask {
+                        task: None,
+                        retries: 0,
+                        success: false,
+                        error: None,
+                    },
+                );
+            }
+        }
+    }
+
+    #[allow(dead_code)]
+    /// Spawns fulfillment tasks and retries for requests that are ready to be fulfilled.
+    /// Intended to be called in a loop. High level flow:
+    /// - Loop over pending_requests and spawn tasks to fulfill.
+    /// - Only spawn tasks for requests that we think we can fulfill at the current time.
+    /// - Check status.task:
+    ///   - None -> Spawnable task
+    ///   - Some(JoinHandle) -> Running or finished task
+    ///     - Retry if the result was failure
+    /// - Keep Pulse requests around for a long time and keep retrying over that time. If any
+    ///   request has been around longer than failure_timeout_seconds, consider it unfulfillable
+    ///   and ignore it. TODO: implement cleaning these up on-chain.
+    pub async fn process_pending_requests(
+        &self,
+        current_time: u64, // Unix timestamp in seconds
+    ) {
+        // TODO: if we see issues with high contention on pending_requests, we can refactor this to use a read lock, and only take the write lock when needed
+        let mut pending_requests = self.pending_requests.write().await;
+
+        for (request, fulfillment_task) in pending_requests.iter_mut() {
+            // Skip requests that aren't fulfillable yet
+            if !self.is_request_fulfillable(request, fulfillment_task, current_time) {
+                continue;
+            }
+
+            // Handle task based on its current state
+            match &fulfillment_task.task {
+                None => {
+                    // Task doesn't exist yet, spawn it
+                    let req_clone = request.clone();
+                    let hermes_url = self.hermes_url.to_string().clone();
+                    let escalation_policy = self.escalation_policy.clone();
+                    let fulfiller = self.request_fulfiller.clone();
+
+                    let handle = tokio::spawn(async move {
+                        info!("Executing task...");
+                        match fulfiller
+                            .fulfill_request(req_clone, &hermes_url, escalation_policy)
+                            .await
+                        {
+                            Ok(()) => Ok(()),
+                            Err(e) => {
+                                error!("Error fulfilling request: {}", e);
+                                Err(e)
+                            }
+                        }
+                    });
+
+                    fulfillment_task.task = Some(handle);
+                    info!(
+                        sequence_number = request.sequence_number,
+                        "Spawned new fulfillment task for request {}", request.sequence_number
+                    );
+                }
+                // Task exists and is completed
+                Some(handle) if handle.is_finished() => {
+                    // Take ownership of the handle and consume the result
+                    let handle = fulfillment_task.task.take().unwrap();
+                    match handle.await {
+                        Ok(Ok(())) => {
+                            // Task completed successfully
+                            fulfillment_task.success = true;
+                            info!(
+                                sequence_number = request.sequence_number,
+                                "Successfully fulfilled request {}", request.sequence_number
+                            );
+                        }
+                        Ok(Err(e)) => {
+                            // Task failed with an error
+                            fulfillment_task.success = false;
+                            fulfillment_task.retries += 1;
+                            let err = e.to_string();
+                            error!(
+                                sequence_number = request.sequence_number,
+                                error = err,
+                                "Request {} fulfillment failed on attempt {} with error '{}'",
+                                request.sequence_number,
+                                fulfillment_task.retries,
+                                err,
+                            );
+
+                            // Reset the task handle so we retry next loop
+                            fulfillment_task.task = None;
+                        }
+                        Err(e) => {
+                            // Task panicked
+                            fulfillment_task.success = false;
+                            fulfillment_task.retries += 1;
+                            let err = e.to_string();
+                            error!(
+                                sequence_number = request.sequence_number,
+                                error = err,
+                                "Request {} fulfillment panicked on attempt {} with error '{}'",
+                                request.sequence_number,
+                                fulfillment_task.retries,
+                                err,
+                            );
+
+                            // Reset the task handle so we retry next loop
+                            fulfillment_task.task = None;
+                        }
+                    }
+                }
+
+                // Task exists and is still running - leave it alone
+                Some(_) => {}
+            }
+
+            // Check if request has been around too long without success
+            let request_age_seconds = current_time - request.publish_time;
+            if !fulfillment_task.success && request_age_seconds > self.failure_timeout_seconds {
+                error!(
+                    "Request #{} has exceeded timeout of {} minutes without successful fulfillment",
+                    request.sequence_number, self.failure_timeout_seconds
+                );
+
+                // TODO: Emit metrics here for monitoring/alerting
+            }
+        }
+    }
+
+    /// Determines if a request is currently fulfillable by this provider
+    fn is_request_fulfillable(
+        &self,
+        request: &PulseRequest,
+        fulfillment_task: &RequestFulfillmentTask,
+        current_time: u64,
+    ) -> bool {
+        // Check if the request's publish time has been reached, or if we've already responded
+        if fulfillment_task.success || current_time < request.publish_time {
+            return false;
+        }
+
+        // Check exclusivity period constraints
+        let is_exclusive_period =
+            current_time < request.publish_time + self.exclusivity_period_seconds as u64;
+        let is_designated_provider = &request.provider == &self.provider_address;
+
+        if is_exclusive_period && !is_designated_provider {
+            return false;
+        }
+
+        // Request is fulfillable
+        true
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use anyhow::Result;
+    use async_trait::async_trait;
+    use lazy_static::lazy_static;
+    use mockall::predicate::*;
+    use mockall::*;
+    use std::str::FromStr;
+    use std::sync::Arc;
+    use std::sync::Once;
+    use tokio::sync::RwLock;
+    use tracing_subscriber::fmt::format::FmtSpan;
+
+    lazy_static! {
+        static ref INIT: Once = Once::new();
+    }
+
+    #[allow(dead_code)]
+    /// Call this in a test to enable logs
+    fn init_test_logging() {
+        INIT.call_once(|| {
+            let _ = tracing_subscriber::fmt()
+                .with_env_filter("info,keeper=debug")
+                .with_span_events(FmtSpan::CLOSE)
+                .try_init();
+        });
+    }
+
+    const MOCK_PROVIDER_ADDRESS: &str = "0x0000000000000000000000000000000000000001";
+    const MOCK_HERMES_URL: &str = "https://hermes.pyth.mock";
+
+    // Create a mock fulfiller that lets us control whether
+    // or not the fulfillment task succeeds
+    mock! {
+        pub Fulfiller {}
+
+        #[async_trait]
+        impl RequestFulfiller for Fulfiller {
+            async fn fulfill_request(
+                &self,
+                request: PulseRequest,
+                hermes_url: &str,
+                escalation_policy: EscalationPolicy,
+            ) -> Result<()>;
+        }
+    }
+
+    /// Helper function to create a test KeeperState with default values and a MockFulfiller
+    /// that we can control the behavior of to simulate callback success and/or failure.
+    fn create_test_keeper_state(mock_fulfiller: Option<MockFulfiller>) -> KeeperState {
+        let provider_address = Address::from_str(MOCK_PROVIDER_ADDRESS).unwrap();
+        let metrics = KeeperMetrics::default();
+
+        // Create a mock fulfiller if one wasn't provided
+        let mock_fulfiller = match mock_fulfiller {
+            Some(fulfiller) => fulfiller,
+            None => {
+                let mut fulfiller = MockFulfiller::new();
+                // Default behavior - succeed on fulfillment
+                fulfiller
+                    .expect_fulfill_request()
+                    .returning(|_, _, _| Ok(()));
+                fulfiller
+            }
+        };
+
+        KeeperState {
+            pending_requests: Arc::new(RwLock::new(HashMap::new())),
+            prefix_to_price_ids: Arc::new(HashMap::new()),
+            exclusivity_period_seconds: 300,
+            failure_timeout_seconds: 3600,
+            escalation_policy: EscalationPolicy::default(),
+            hermes_url: Url::parse(MOCK_HERMES_URL).unwrap(),
+            provider_address,
+            metrics: Arc::new(metrics),
+            request_fulfiller: Arc::new(mock_fulfiller),
+        }
+    }
+
+    // Helper to create a test PulseRequest
+    fn create_test_request(
+        sequence_number: u64,
+        publish_time: u64,
+        provider: &str,
+    ) -> PulseRequest {
+        PulseRequest {
+            sequence_number,
+            feed_id_prefixes: vec![[0xff, 0x61, 0x49, 0x1a, 0x00, 0x00, 0x00, 0x00]],
+            publish_time,
+            callback_gas_limit: 100000,
+            provider: Address::from_str(provider).unwrap_or_default(),
+        }
+    }
+
+    #[tokio::test]
+    async fn test_is_request_fulfillable() {
+        let keeper = create_test_keeper_state(None);
+        let current_time = 1000u64; // Base time for tests
+
+        // Case 1: Request with future publish time should not be fulfillable
+        let future_request = create_test_request(1, current_time + 100, MOCK_PROVIDER_ADDRESS);
+        let task = RequestFulfillmentTask {
+            task: None,
+            retries: 0,
+            success: false,
+            error: None,
+        };
+
+        assert!(!keeper.is_request_fulfillable(&future_request, &task, current_time));
+
+        // Case 2: Already fulfilled request should not be fulfillable
+        let past_request = create_test_request(2, current_time - 100, MOCK_PROVIDER_ADDRESS);
+        let successful_task = RequestFulfillmentTask {
+            task: None,
+            retries: 1,
+            success: true,
+            error: None,
+        };
+
+        assert!(!keeper.is_request_fulfillable(&past_request, &successful_task, current_time));
+
+        // Case 3: Request in exclusivity period for a different provider
+        let other_provider_request = create_test_request(
+            3,
+            current_time - 100,
+            "0x0000000000000000000000000000000000000002", // Different provider
+        );
+        let task = RequestFulfillmentTask {
+            task: None,
+            retries: 0,
+            success: false,
+            error: None,
+        };
+
+        // Should not be fulfillable if in exclusivity period and we're not the provider
+        assert!(!keeper.is_request_fulfillable(&other_provider_request, &task, current_time));
+
+        // Case 4: Request in exclusivity period for our provider
+        let our_provider_request = create_test_request(
+            4,
+            current_time - 100,
+            MOCK_PROVIDER_ADDRESS, // Our provider
+        );
+
+        // Should be fulfillable if we're the requested provider
+        assert!(keeper.is_request_fulfillable(&our_provider_request, &task, current_time));
+
+        // Case 5: Request after exclusivity period
+        let after_exclusivity_time = current_time + keeper.exclusivity_period_seconds as u64 + 100;
+
+        // Any provider can fulfill after exclusivity period
+        assert!(keeper.is_request_fulfillable(
+            &other_provider_request,
+            &task,
+            after_exclusivity_time
+        ));
+    }
+
+    #[tokio::test]
+    async fn test_update() {
+        let mut keeper = create_test_keeper_state(None);
+
+        // Add initial requests
+        let request1 = create_test_request(1, 1000, MOCK_PROVIDER_ADDRESS);
+        let request2 = create_test_request(2, 1000, MOCK_PROVIDER_ADDRESS);
+
+        keeper
+            .update(vec![request1.clone(), request2.clone()])
+            .await;
+
+        // Verify both requests are in the state
+        {
+            let pending = keeper.pending_requests.read().await;
+            assert_eq!(pending.len(), 2);
+            assert!(pending.contains_key(&request1));
+            assert!(pending.contains_key(&request2));
+        }
+
+        // Update with only one request - should remove the other
+        let request3 = create_test_request(3, 1000, MOCK_PROVIDER_ADDRESS);
+        keeper
+            .update(vec![request1.clone(), request3.clone()])
+            .await;
+
+        let pending = keeper.pending_requests.read().await;
+        assert_eq!(pending.len(), 2);
+        assert!(pending.contains_key(&request1));
+        assert!(!pending.contains_key(&request2));
+        assert!(pending.contains_key(&request3));
+    }
+
+    #[tokio::test]
+    async fn test_process_pending_requests() {
+        // Create a test keeper state with a mock fulfiller that we can control
+        let mut mock_fulfiller = MockFulfiller::new();
+        let current_time = 1000u64;
+        let request = create_test_request(1, current_time - 100, MOCK_PROVIDER_ADDRESS);
+
+        // Setup expectations for the mock
+        mock_fulfiller
+            .expect_fulfill_request()
+            .times(1)
+            .returning(|_, _, _| Ok(()));
+
+        let mut keeper = create_test_keeper_state(Some(mock_fulfiller));
+        keeper.update(vec![request.clone()]).await;
+
+        // Code under test
+        keeper.process_pending_requests(current_time).await;
+
+        // Verify that a task was spawned
+        {
+            let pending = keeper.pending_requests.read().await;
+            let task = pending.get(&request).unwrap();
+            assert!(task.task.is_some(), "Expected a task to be spawned");
+        }
+
+        // Wait and poll again, the task should have completed successfully
+        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
+        keeper.process_pending_requests(current_time).await;
+        {
+            let pending = keeper.pending_requests.read().await;
+            let task = pending.get(&request).unwrap();
+            assert!(task.success, "Task should have completed successfully");
+            assert_eq!(task.retries, 0, "No retries should have occurred");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_process_pending_requests_failure_and_retry() {
+        let mut mock_fulfiller = MockFulfiller::new();
+        let current_time = 1000u64;
+        let request = create_test_request(1, current_time - 100, MOCK_PROVIDER_ADDRESS);
+
+        // First fulfillment call fails, second call succeeds
+        mock_fulfiller
+            .expect_fulfill_request()
+            .times(1)
+            .returning(|_, _, _| anyhow::bail!("Simulated failure"));
+
+        mock_fulfiller
+            .expect_fulfill_request()
+            .times(1)
+            .returning(|_, _, _| Ok(()));
+
+        let mut keeper = create_test_keeper_state(Some(mock_fulfiller));
+        keeper.update(vec![request.clone()]).await;
+
+        // First attempt - should fail
+        keeper.process_pending_requests(current_time).await;
+
+        // Wait for first task to complete, check that it failed and is ready for retry
+        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
+        keeper.process_pending_requests(current_time).await;
+        {
+            let pending = keeper.pending_requests.read().await;
+            let task = pending.get(&request).unwrap();
+            assert!(!task.success, "Task should have failed");
+            assert_eq!(task.retries, 1, "One retry should have been recorded");
+            assert!(task.task.is_none(), "Task should be reset for retry");
+        }
+
+        // Second attempt - should succeed
+        keeper.process_pending_requests(current_time).await;
+
+        // Wait for task to complete, check that it succeeded on retry
+        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
+        keeper.process_pending_requests(current_time).await;
+        {
+            let pending = keeper.pending_requests.read().await;
+            let task = pending.get(&request).unwrap();
+            assert!(task.success, "Task should have succeeded on retry");
+            assert_eq!(task.retries, 1, "Retry count should remain at 1");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_process_pending_requests_timeout() {
+        let mut mock_fulfiller = MockFulfiller::new();
+        let start_time = 1000u64;
+        let request = create_test_request(1, start_time - 100, MOCK_PROVIDER_ADDRESS);
+
+        // Setup fulfillment to always fail
+        mock_fulfiller
+            .expect_fulfill_request()
+            .returning(|_, _, _| anyhow::bail!("Simulated persistent failure"));
+
+        let mut keeper = create_test_keeper_state(Some(mock_fulfiller));
+        keeper.update(vec![request.clone()]).await;
+
+        // Process with current time
+        keeper.process_pending_requests(start_time).await;
+
+        // Verify task failed but is still eligible for retry
+        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
+        keeper.process_pending_requests(start_time).await;
+        {
+            let pending = keeper.pending_requests.read().await;
+            let task = pending.get(&request).unwrap();
+            assert!(!task.success);
+            assert_eq!(task.retries, 1);
+        }
+
+        // Now process with a time that exceeds the timeout
+        let timeout_time = start_time + keeper.failure_timeout_seconds + 10;
+        keeper.process_pending_requests(timeout_time).await;
+
+        // Task should not be retried due to timeout, but should still be in the map
+        {
+            let pending = keeper.pending_requests.read().await;
+            let task = pending.get(&request).unwrap();
+            assert!(!task.success);
+            // Retries should still be 1 since no new attempt was made due to timeout
+            assert_eq!(task.retries, 1);
+        }
+    }
+}