ソースを参照

feat: replace block with request

Daniel Chew 8 ヶ月 前
コミット
2cd09cf2a1

+ 26 - 0
apps/argus/src/chain/ethereum.rs

@@ -227,6 +227,32 @@ impl<T: JsonRpcClient + 'static> PulseReader for Pulse<Provider<T>> {
             .as_u64())
     }
 
+    async fn get_active_requests(&self, count: usize) -> Result<Vec<reader::Request>> {
+        let (requests, actual_count) = self
+            .get_first_active_requests(count.into())
+            .call()
+            .await?;
+
+        // Convert actual_count (U256) to usize safely
+        let actual_count_usize = actual_count.as_u64() as usize;
+        let mut result = Vec::with_capacity(actual_count_usize);
+
+        for i in 0..actual_count_usize {
+            let r = &requests[i];
+            if r.sequence_number != 0 {
+                result.push(reader::Request {
+                    requester: r.requester,
+                    sequence_number: r.sequence_number,
+                    callback_gas_limit: r.callback_gas_limit,
+                    price_ids: r.price_ids.to_vec(),
+                    publish_time: r.publish_time,
+                });
+            }
+        }
+
+        Ok(result)
+    }
+
     async fn get_price_update_requested_events(
         &self,
         from_block: BlockNumber,

+ 13 - 6
apps/argus/src/chain/reader.rs

@@ -60,6 +60,10 @@ pub trait PulseReader: Send + Sync {
         self.get_price_update_requested_events(from_block, to_block).await
     }
 
+    /// Get active requests directly from contract storage
+    /// This is more efficient than searching for events in the backlog
+    async fn get_active_requests(&self, count: usize) -> Result<Vec<Request>>;
+
     /// Estimate the gas required to execute a callback for price updates.
     async fn estimate_execute_callback_gas(
         &self,
@@ -153,13 +157,11 @@ pub mod mock {
     #[async_trait]
     impl PulseReader for MockPulseReader {
         async fn get_request(&self, sequence_number: u64) -> Result<Option<Request>> {
-            Ok(self
-                .requests
-                .read()
-                .unwrap()
+            let requests = self.requests.read().unwrap();
+            Ok(requests
                 .iter()
-                .find(|&r| r.sequence_number == sequence_number)
-                .map(|r| (*r).clone()))
+                .find(|r| r.sequence_number == sequence_number)
+                .cloned())
         }
 
         async fn get_block_number(
@@ -169,6 +171,11 @@ pub mod mock {
             Ok(*self.block_number.read().unwrap())
         }
 
+        async fn get_active_requests(&self, count: usize) -> Result<Vec<Request>> {
+            let requests = self.requests.read().unwrap();
+            Ok(requests.iter().take(count).cloned().collect())
+        }
+
         async fn get_price_update_requested_events(
             &self,
             _from_block: BlockNumber,

+ 58 - 46
apps/argus/src/keeper.rs

@@ -3,9 +3,8 @@ use {
         api::{BlockchainState, ChainId},
         chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
         config::EthereumConfig,
-        keeper::block::{
-            get_latest_safe_block, process_backlog, process_new_blocks, watch_blocks_wrapper,
-            BlockRange,
+        keeper::request::{
+            get_latest_safe_block, process_active_requests,
         },
         keeper::fee::adjust_fee_wrapper,
         keeper::fee::withdraw_fees_wrapper,
@@ -18,26 +17,28 @@ use {
     std::{collections::HashSet, sync::Arc},
     tokio::{
         spawn,
-        sync::{mpsc, RwLock},
+        sync::RwLock,
         time::{self, Duration},
     },
     tracing::{self, Instrument},
 };
 
-pub(crate) mod block;
+pub(crate) mod request;
 pub(crate) mod fee;
 pub(crate) mod keeper_metrics;
 pub(crate) mod process_event;
 pub(crate) mod track;
 
-/// How many blocks to look back for events that might be missed when starting the keeper
-const BACKLOG_RANGE: u64 = 1000;
 /// Track metrics in this interval
 const TRACK_INTERVAL: Duration = Duration::from_secs(10);
 /// Check whether we need to conduct a withdrawal at this interval.
 const WITHDRAW_INTERVAL: Duration = Duration::from_secs(300);
 /// Check whether we need to adjust the fee at this interval.
 const ADJUST_FEE_INTERVAL: Duration = Duration::from_secs(30);
+/// Check for active requests at this interval
+const ACTIVE_REQUESTS_INTERVAL: Duration = Duration::from_secs(2);
+/// Maximum number of active requests to process in a single batch
+const MAX_ACTIVE_REQUESTS: usize = 100;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum RequestState {
@@ -49,8 +50,8 @@ pub enum RequestState {
     Processed,
 }
 
-/// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and
-/// handle any events for the new blocks.
+/// Run threads to handle active requests, periodically check for new requests,
+/// and manage fees and balance.
 #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
 pub async fn run_keeper_threads(
     private_key: String,
@@ -77,14 +78,11 @@ pub async fn run_keeper_threads(
 
     let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::<u64>::new()));
 
-    // Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
+    // Spawn a thread to handle active requests initially
     let gas_limit: U256 = chain_eth_config.gas_limit.into();
     spawn(
-        process_backlog(
-            BlockRange {
-                from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
-                to: latest_safe_block,
-            },
+        process_active_requests(
+            MAX_ACTIVE_REQUESTS,
             contract.clone(),
             gas_limit,
             chain_eth_config.escalation_policy.to_policy(),
@@ -95,30 +93,32 @@ pub async fn run_keeper_threads(
         .in_current_span(),
     );
 
-    let (tx, rx) = mpsc::channel::<BlockRange>(1000);
-    // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
-    spawn(
-        watch_blocks_wrapper(
-            chain_state.clone(),
-            latest_safe_block,
-            tx,
-            chain_eth_config.geth_rpc_wss.clone(),
-        )
-        .in_current_span(),
-    );
+    // Clone values needed for the periodic request checking thread
+    let request_check_contract = contract.clone();
+    let request_check_chain_state = chain_state.clone();
+    let request_check_metrics = metrics.clone();
+    let request_check_escalation_policy = chain_eth_config.escalation_policy.to_policy();
+    let request_check_fulfilled_requests_cache = fulfilled_requests_cache.clone();
 
-    // Spawn a thread for block processing with configured delays
+    // Spawn a thread to periodically check for active requests
     spawn(
-        process_new_blocks(
-            chain_state.clone(),
-            rx,
-            Arc::clone(&contract),
-            gas_limit,
-            chain_eth_config.escalation_policy.to_policy(),
-            metrics.clone(),
-            fulfilled_requests_cache.clone(),
-            chain_eth_config.block_delays.clone(),
-        )
+        async move {
+            loop {
+                time::sleep(ACTIVE_REQUESTS_INTERVAL).await;
+
+                process_active_requests(
+                    MAX_ACTIVE_REQUESTS,
+                    request_check_contract.clone(),
+                    gas_limit,
+                    request_check_escalation_policy.clone(),
+                    request_check_chain_state.clone(),
+                    request_check_metrics.clone(),
+                    request_check_fulfilled_requests_cache.clone(),
+                )
+                .in_current_span()
+                .await;
+            }
+        }
         .in_current_span(),
     );
 
@@ -133,12 +133,17 @@ pub async fn run_keeper_threads(
         .in_current_span(),
     );
 
+    // Clone values needed for the fee adjustment thread
+    let fee_adjust_contract = contract.clone();
+    let fee_adjust_chain_state = chain_state.clone();
+    let fee_adjust_metrics = metrics.clone();
+
     // Spawn a thread that periodically adjusts the provider fee.
     spawn(
         adjust_fee_wrapper(
-            contract.clone(),
-            chain_state.clone(),
-            chain_state.provider_address,
+            fee_adjust_contract,
+            fee_adjust_chain_state.clone(),
+            fee_adjust_chain_state.provider_address,
             ADJUST_FEE_INTERVAL,
             chain_eth_config.legacy_tx,
             // NOTE: we are adjusting the fees based on the maximum configured gas for user transactions.
@@ -157,22 +162,29 @@ pub async fn run_keeper_threads(
             u64::try_from(100 + chain_eth_config.max_profit_pct)
                 .expect("max_profit_pct must be >= -100"),
             chain_eth_config.fee,
-            metrics.clone(),
+            fee_adjust_metrics,
         )
         .in_current_span(),
     );
 
+    // Clone values needed for the tracking thread
+    let track_chain_id = chain_state.id.clone();
+    let track_chain_config = chain_eth_config.clone();
+    let track_provider_address = chain_state.provider_address;
+    let track_keeper_metrics = metrics.clone();
+    let track_rpc_metrics = rpc_metrics.clone();
+
     // Spawn a thread to track the provider info and the balance of the keeper
     spawn(
         async move {
-            let chain_id = chain_state.id.clone();
-            let chain_config = chain_eth_config.clone();
-            let provider_address = chain_state.provider_address;
-            let keeper_metrics = metrics.clone();
+            let chain_id = track_chain_id;
+            let chain_config = track_chain_config;
+            let provider_address = track_provider_address;
+            let keeper_metrics = track_keeper_metrics;
             let contract = match InstrumentedPythContract::from_config(
                 &chain_config,
                 chain_id.clone(),
-                rpc_metrics,
+                track_rpc_metrics,
             ) {
                 Ok(r) => r,
                 Err(e) => {

+ 0 - 357
apps/argus/src/keeper/block.rs

@@ -1,357 +0,0 @@
-use {
-    crate::{
-        api::{self, BlockchainState},
-        chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
-        keeper::keeper_metrics::KeeperMetrics,
-        keeper::process_event::process_event_with_backoff,
-    },
-    anyhow::{anyhow, Result},
-    ethers::{
-        providers::{Middleware, Provider, Ws},
-        types::U256,
-    },
-    fortuna::eth_utils::utils::EscalationPolicy,
-    futures::StreamExt,
-    std::{collections::HashSet, sync::Arc},
-    tokio::{
-        spawn,
-        sync::{mpsc, RwLock},
-        time::{self, Duration},
-    },
-    tracing::{self, Instrument},
-};
-
-/// How much to wait before retrying in case of an RPC error
-const RETRY_INTERVAL: Duration = Duration::from_secs(5);
-/// How many blocks to fetch events for in a single rpc call
-const BLOCK_BATCH_SIZE: u64 = 100;
-/// How much to wait before polling the next latest block
-const POLL_INTERVAL: Duration = Duration::from_secs(2);
-/// Retry last N blocks
-const RETRY_PREVIOUS_BLOCKS: u64 = 100;
-
-#[derive(Debug, Clone)]
-pub struct BlockRange {
-    pub from: BlockNumber,
-    pub to: BlockNumber,
-}
-
-/// Get the latest safe block number for the chain. Retry internally if there is an error.
-pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
-    loop {
-        match chain_state
-            .contract
-            .get_block_number(chain_state.confirmed_block_status)
-            .await
-        {
-            Ok(latest_confirmed_block) => {
-                tracing::info!("Fetched latest safe block {}", latest_confirmed_block);
-                return latest_confirmed_block;
-            }
-            Err(e) => {
-                tracing::error!("Error while getting block number. error: {:?}", e);
-                time::sleep(RETRY_INTERVAL).await;
-            }
-        }
-    }
-}
-
-/// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch.
-#[tracing::instrument(skip_all, fields(
-    range_from_block = block_range.from, range_to_block = block_range.to
-))]
-pub async fn process_block_range(
-    block_range: BlockRange,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    chain_state: api::BlockchainState,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
-) {
-    let BlockRange {
-        from: first_block,
-        to: last_block,
-    } = block_range;
-    let mut current_block = first_block;
-    while current_block <= last_block {
-        let mut to_block = current_block + BLOCK_BATCH_SIZE;
-        if to_block > last_block {
-            to_block = last_block;
-        }
-
-        // TODO: this is handling all blocks sequentially we might want to handle them in parallel in future.
-        process_single_block_batch(
-            BlockRange {
-                from: current_block,
-                to: to_block,
-            },
-            contract.clone(),
-            gas_limit,
-            escalation_policy.clone(),
-            chain_state.clone(),
-            metrics.clone(),
-            fulfilled_requests_cache.clone(),
-        )
-        .in_current_span()
-        .await;
-
-        current_block = to_block + 1;
-    }
-}
-
-/// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch
-/// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled.
-/// It won't reprocess it. If the request was already processed, it will reprocess it.
-/// If the process fails, it will retry indefinitely.
-#[tracing::instrument(name = "batch", skip_all, fields(
-    batch_from_block = block_range.from, batch_to_block = block_range.to
-))]
-pub async fn process_single_block_batch(
-    block_range: BlockRange,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    chain_state: api::BlockchainState,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
-) {
-    loop {
-        let events_res = chain_state
-            .contract
-            .get_request_with_callback_events(block_range.from, block_range.to)
-            .await;
-
-        match events_res {
-            Ok(events) => {
-                tracing::info!(num_of_events = &events.len(), "Processing",);
-                for event in &events {
-                    // the write lock guarantees we spawn only one task per sequence number
-                    let newly_inserted = fulfilled_requests_cache
-                        .write()
-                        .await
-                        .insert(event.sequence_number);
-                    if newly_inserted {
-                        spawn(
-                            process_event_with_backoff(
-                                event.clone(),
-                                chain_state.clone(),
-                                contract.clone(),
-                                gas_limit,
-                                escalation_policy.clone(),
-                                metrics.clone(),
-                            )
-                            .in_current_span(),
-                        );
-                    }
-                }
-                tracing::info!(num_of_events = &events.len(), "Processed",);
-                break;
-            }
-            Err(e) => {
-                tracing::error!(
-                    "Error while getting events. Waiting for {} seconds before retry. error: {:?}",
-                    RETRY_INTERVAL.as_secs(),
-                    e
-                );
-                time::sleep(RETRY_INTERVAL).await;
-            }
-        }
-    }
-}
-
-/// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay.
-/// It retries indefinitely.
-#[tracing::instrument(name = "watch_blocks", skip_all, fields(
-    initial_safe_block = latest_safe_block
-))]
-pub async fn watch_blocks_wrapper(
-    chain_state: BlockchainState,
-    latest_safe_block: BlockNumber,
-    tx: mpsc::Sender<BlockRange>,
-    geth_rpc_wss: Option<String>,
-) {
-    let mut last_safe_block_processed = latest_safe_block;
-    loop {
-        if let Err(e) = watch_blocks(
-            chain_state.clone(),
-            &mut last_safe_block_processed,
-            tx.clone(),
-            geth_rpc_wss.clone(),
-        )
-        .in_current_span()
-        .await
-        {
-            tracing::error!("watching blocks. error: {:?}", e);
-            time::sleep(RETRY_INTERVAL).await;
-        }
-    }
-}
-
-/// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel.
-/// We are subscribing to new blocks instead of events. If we miss some blocks, it will be fine as we are sending
-/// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even
-/// know about it.
-pub async fn watch_blocks(
-    chain_state: BlockchainState,
-    last_safe_block_processed: &mut BlockNumber,
-    tx: mpsc::Sender<BlockRange>,
-    geth_rpc_wss: Option<String>,
-) -> Result<()> {
-    tracing::info!("Watching blocks to handle new events");
-
-    let provider_option = match geth_rpc_wss {
-        Some(wss) => Some(match Provider::<Ws>::connect(wss.clone()).await {
-            Ok(provider) => provider,
-            Err(e) => {
-                tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e);
-                return Err(e.into());
-            }
-        }),
-        None => {
-            tracing::info!("No wss provided");
-            None
-        }
-    };
-
-    let mut stream_option = match provider_option {
-        Some(ref provider) => Some(match provider.subscribe_blocks().await {
-            Ok(client) => client,
-            Err(e) => {
-                tracing::error!("Error while subscribing to blocks. error {:?}", e);
-                return Err(e.into());
-            }
-        }),
-        None => None,
-    };
-
-    loop {
-        match stream_option {
-            Some(ref mut stream) => {
-                if stream.next().await.is_none() {
-                    tracing::error!("Error blocks subscription stream ended");
-                    return Err(anyhow!("Error blocks subscription stream ended"));
-                }
-            }
-            None => {
-                time::sleep(POLL_INTERVAL).await;
-            }
-        }
-
-        let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
-        if latest_safe_block > *last_safe_block_processed {
-            let mut from = latest_safe_block.saturating_sub(RETRY_PREVIOUS_BLOCKS);
-
-            // In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10)
-            // TODO: add a metric for this in separate PR. We need alerts
-            // But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and
-            // last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc
-            // to be in consistency after this much time.
-            if from > *last_safe_block_processed {
-                from = *last_safe_block_processed;
-            }
-            match tx
-                .send(BlockRange {
-                    from,
-                    to: latest_safe_block,
-                })
-                .await
-            {
-                Ok(_) => {
-                    tracing::info!(
-                        from_block = from,
-                        to_block = &latest_safe_block,
-                        "Block range sent to handle events",
-                    );
-                    *last_safe_block_processed = latest_safe_block;
-                }
-                Err(e) => {
-                    tracing::error!(
-                        from_block = from,
-                        to_block = &latest_safe_block,
-                        "Error while sending block range to handle events. These will be handled in next call. error: {:?}",
-                        e
-                    );
-                }
-            };
-        }
-    }
-}
-
-/// It waits on rx channel to receive block ranges and then calls process_block_range to process them
-/// for each configured block delay.
-#[tracing::instrument(skip_all)]
-#[allow(clippy::too_many_arguments)]
-pub async fn process_new_blocks(
-    chain_state: BlockchainState,
-    mut rx: mpsc::Receiver<BlockRange>,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
-    block_delays: Vec<u64>,
-) {
-    tracing::info!("Waiting for new block ranges to process");
-    loop {
-        if let Some(block_range) = rx.recv().await {
-            // Process blocks immediately first
-            process_block_range(
-                block_range.clone(),
-                Arc::clone(&contract),
-                gas_limit,
-                escalation_policy.clone(),
-                chain_state.clone(),
-                metrics.clone(),
-                fulfilled_requests_cache.clone(),
-            )
-            .in_current_span()
-            .await;
-
-            // Then process with each configured delay
-            for delay in &block_delays {
-                let adjusted_range = BlockRange {
-                    from: block_range.from.saturating_sub(*delay),
-                    to: block_range.to.saturating_sub(*delay),
-                };
-                process_block_range(
-                    adjusted_range,
-                    Arc::clone(&contract),
-                    gas_limit,
-                    escalation_policy.clone(),
-                    chain_state.clone(),
-                    metrics.clone(),
-                    fulfilled_requests_cache.clone(),
-                )
-                .in_current_span()
-                .await;
-            }
-        }
-    }
-}
-
-/// Processes the backlog_range for a chain.
-#[tracing::instrument(skip_all)]
-pub async fn process_backlog(
-    backlog_range: BlockRange,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    chain_state: BlockchainState,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
-) {
-    tracing::info!("Processing backlog");
-    process_block_range(
-        backlog_range,
-        contract,
-        gas_limit,
-        escalation_policy,
-        chain_state,
-        metrics,
-        fulfilled_requests_cache,
-    )
-    .in_current_span()
-    .await;
-    tracing::info!("Backlog processed");
-}

+ 106 - 0
apps/argus/src/keeper/request.rs

@@ -0,0 +1,106 @@
+use {
+    crate::{
+        api::{self, BlockchainState},
+        chain::{ethereum::InstrumentedSignablePythContract, reader::{BlockNumber, RequestedWithCallbackEvent}},
+        keeper::keeper_metrics::KeeperMetrics,
+        keeper::process_event::process_event_with_backoff,
+    },
+    ethers::types::U256,
+    fortuna::eth_utils::utils::EscalationPolicy,
+    std::{collections::HashSet, sync::Arc},
+    tokio::{
+        spawn,
+        sync::RwLock,
+        time::{self, Duration},
+    },
+    tracing::{self, Instrument},
+};
+
+/// How much to wait before retrying in case of an RPC error
+const RETRY_INTERVAL: Duration = Duration::from_secs(5);
+
+/// Get the latest safe block number for the chain. Retry internally if there is an error.
+/// This is still needed for logging and initialization purposes.
+pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
+    loop {
+        match chain_state
+            .contract
+            .get_block_number(chain_state.confirmed_block_status)
+            .await
+        {
+            Ok(latest_confirmed_block) => {
+                tracing::info!("Fetched latest safe block {}", latest_confirmed_block);
+                return latest_confirmed_block;
+            }
+            Err(e) => {
+                tracing::error!("Error while getting block number. error: {:?}", e);
+                time::sleep(RETRY_INTERVAL).await;
+            }
+        }
+    }
+}
+
+/// Process active requests fetched directly from contract storage
+#[tracing::instrument(name = "process_active_requests", skip_all)]
+pub async fn process_active_requests(
+    max_requests: usize,
+    contract: Arc<InstrumentedSignablePythContract>,
+    gas_limit: U256,
+    escalation_policy: EscalationPolicy,
+    chain_state: api::BlockchainState,
+    metrics: Arc<KeeperMetrics>,
+    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
+) {
+    tracing::info!("Processing active requests from contract storage");
+
+    loop {
+        let active_requests_res = chain_state.contract.get_active_requests(max_requests).await;
+
+        match active_requests_res {
+            Ok(requests) => {
+                tracing::info!(num_of_requests = &requests.len(), "Processing active requests");
+
+                for request in &requests {
+                    // Convert Request to RequestedWithCallbackEvent format for compatibility
+                    let event = RequestedWithCallbackEvent {
+                        sequence_number: request.sequence_number,
+                        requester: request.requester,
+                        price_ids: request.price_ids.clone(),
+                        callback_gas_limit: request.callback_gas_limit,
+                    };
+
+                    // The write lock guarantees we spawn only one task per sequence number
+                    let newly_inserted = fulfilled_requests_cache
+                        .write()
+                        .await
+                        .insert(event.sequence_number);
+
+                    if newly_inserted {
+                        spawn(
+                            process_event_with_backoff(
+                                event,
+                                chain_state.clone(),
+                                contract.clone(),
+                                gas_limit,
+                                escalation_policy.clone(),
+                                metrics.clone(),
+                            )
+                            .in_current_span(),
+                        );
+                    }
+                }
+
+                tracing::info!(num_of_requests = &requests.len(), "Processed active requests");
+                break;
+            }
+            Err(e) => {
+                tracing::error!(
+                    "Error while getting active requests. Waiting for {} seconds before retry. error: {:?}",
+                    RETRY_INTERVAL.as_secs(),
+                    e
+                );
+                time::sleep(RETRY_INTERVAL).await;
+            }
+        }
+    }
+}