Преглед изворни кода

feat(hermes): integrate Hermes API for price updates and remove process_event module

Daniel Chew пре 8 месеци
родитељ
комит
ef2d9441e4

Разлика између датотеке није приказан због своје велике величине
+ 286 - 122
apps/argus/Cargo.lock


+ 1 - 0
apps/argus/Cargo.toml

@@ -50,3 +50,4 @@ fortuna = { path = "../fortuna" }
 
 [dev-dependencies]
 axum-test = "13.1.1"
+mockito = "1.2.0"

+ 15 - 0
apps/argus/README.md

@@ -17,6 +17,21 @@ Simply run `cargo build` and `cargo test` to build and test the project.
 The Argus binary has a command-line interface to perform useful operations on the contract, such as
 registering a new price provider, or requesting price updates. To see the available commands, simply run `cargo run`.
 
+## Hermes Integration
+
+Argus integrates with the Hermes API to fetch price updates for fulfilling requests. When a price update request is received, Argus:
+
+1. Retrieves the request details from the blockchain
+2. Fetches the required price updates from Hermes using the `/v2/updates/price/{publish_time}` endpoint
+3. Executes the callback on the Pulse contract with the fetched price updates
+
+The Hermes client is implemented in the `keeper/hermes.rs` module and handles:
+
+- Converting price IDs to the format expected by Hermes
+- Fetching price updates from the Hermes API
+- Parsing the response and converting it to the format expected by the Pulse contract
+- Error handling and retries
+
 ## Local Development
 
 To start an instance of the webserver for local testing, you first need to perform a few setup steps:

+ 6 - 0
apps/argus/config.sample.yaml

@@ -83,3 +83,9 @@ keeper:
     value: 0xabcd
     # For production, you can store the private key in a file.
     # file: keeper-key.txt
+
+# Hermes API configuration
+hermes:
+  # Base URL for the Hermes API
+  # This can be overridden by setting the HERMES_BASE_URL environment variable
+  base_url: https://hermes.pyth.network

+ 22 - 0
apps/argus/src/config.rs

@@ -76,6 +76,8 @@ pub struct Config {
     pub chains: HashMap<ChainId, EthereumConfig>,
     pub provider: ProviderConfig,
     pub keeper: KeeperConfig,
+    #[serde(default)]
+    pub hermes: HermesConfig,
 }
 
 impl Config {
@@ -354,3 +356,23 @@ impl SecretString {
         Ok(None)
     }
 }
+
+/// Configuration for the Hermes API integration
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
+pub struct HermesConfig {
+    /// Base URL for the Hermes API
+    #[serde(default = "default_hermes_base_url")]
+    pub base_url: String,
+}
+
+impl Default for HermesConfig {
+    fn default() -> Self {
+        Self {
+            base_url: default_hermes_base_url(),
+        }
+    }
+}
+
+fn default_hermes_base_url() -> String {
+    "https://hermes.pyth.network".to_string()
+}

+ 1 - 1
apps/argus/src/keeper.rs

@@ -26,8 +26,8 @@ use {
 pub(crate) mod request;
 pub(crate) mod fee;
 pub(crate) mod keeper_metrics;
-pub(crate) mod process_event;
 pub(crate) mod track;
+pub(crate) mod hermes;
 
 /// Track metrics in this interval
 const TRACK_INTERVAL: Duration = Duration::from_secs(10);

+ 422 - 0
apps/argus/src/keeper/hermes.rs

@@ -0,0 +1,422 @@
+/// Hermes Integration
+///
+/// This module provides integration with the Hermes API for fetching price updates to fulfill Pulse requests.
+///
+/// # Overview
+///
+/// The Hermes API is used to fetch price updates for specific price IDs at a given publish time.
+/// These updates are then used to execute callbacks on the Pulse contract.
+///
+/// # API Endpoints
+///
+/// The main endpoint used is:
+///
+/// GET /v2/updates/price/{publish_time}
+///
+/// This endpoint returns price updates for the specified price IDs at the given publish time.
+///
+/// ## Parameters
+///
+/// - `publish_time`: The timestamp for which to fetch price updates
+/// - `ids`: Comma-separated list of price IDs (hex-encoded)
+///
+/// ## Response
+///
+/// The response contains binary data that can be passed directly to the `executeCallback` function on the Pulse contract:
+///
+/// {
+///   "binary": {
+///     "data": ["0x..."],
+///     "encoding": "hex"
+///   }
+/// }
+///
+use {
+    anyhow::{anyhow, Result},
+    ethers::types::Bytes,
+    reqwest::Client,
+    serde::{Deserialize, Serialize},
+    std::{env, time::Duration},
+    tracing,
+};
+
+const HERMES_BASE_URL: &str = "https://hermes.pyth.network";
+const HERMES_ENV_VAR: &str = "HERMES_BASE_URL";
+const HERMES_TIMEOUT: Duration = Duration::from_secs(10);
+
+/// Binary data response from Hermes API
+#[derive(Debug, Serialize, Deserialize)]
+pub struct BinaryData {
+    pub data: Vec<String>,
+    pub encoding: String,
+}
+
+/// Response from Hermes API for price updates
+#[derive(Debug, Serialize, Deserialize)]
+pub struct HermesResponse {
+    pub binary: BinaryData,
+}
+
+/// Client for interacting with the Hermes API
+///
+/// This client provides a simple interface for fetching price updates from the Hermes API.
+/// It handles:
+///
+/// - Converting price IDs to the correct format
+/// - Making HTTP requests to the Hermes API
+/// - Parsing the response and converting it to the format expected by the Pulse contract
+/// - Error handling and retries
+pub struct HermesClient {
+    client: Client,
+    base_url: String,
+}
+
+impl HermesClient {
+    /// Create a new Hermes client with the base URL from environment variable or default
+    pub fn new() -> Self {
+        // Create a default config with the default base URL
+        let default_config = crate::config::HermesConfig {
+            base_url: HERMES_BASE_URL.to_string(),
+        };
+        Self::from_config(&default_config)
+    }
+
+    /// Create a new Hermes client with the base URL from the config
+    pub fn from_config(config: &crate::config::HermesConfig) -> Self {
+        // Environment variable takes precedence over config
+        let base_url = env::var(HERMES_ENV_VAR).unwrap_or_else(|_| config.base_url.clone());
+        Self::with_base_url(base_url)
+    }
+
+    /// Create a new Hermes client with a custom base URL
+    pub fn with_base_url(base_url: String) -> Self {
+        let client = Client::builder()
+            .timeout(HERMES_TIMEOUT)
+            .build()
+            .expect("Failed to build HTTP client");
+
+        Self { client, base_url }
+    }
+
+    /// Fetch price updates for the given price IDs at the specified publish time
+    ///
+    /// Returns the binary update data that can be passed to the executeCallback function
+    pub async fn get_price_updates(
+        &self,
+        publish_time: u64,
+        price_ids: &[[u8; 32]],
+    ) -> Result<Vec<Bytes>> {
+        let price_ids_hex: Vec<String> = price_ids
+            .iter()
+            .map(|id| format!("0x{}", hex::encode(id)))
+            .collect();
+
+        let url = format!("{}/v2/updates/price/{}", self.base_url, publish_time);
+
+        tracing::debug!(
+            "Fetching price updates from Hermes for publish_time={} price_ids={:?}",
+            publish_time,
+            price_ids_hex
+        );
+
+        let response = self
+            .client
+            .get(&url)
+            .query(&[("ids", price_ids_hex.join(","))])
+            .send()
+            .await?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let text = response.text().await?;
+            return Err(anyhow!(
+                "Failed to fetch price updates: status={}, body={}",
+                status,
+                text
+            ));
+        }
+
+        let hermes_response: HermesResponse = response.json().await?;
+
+        // Convert hex-encoded strings to bytes
+        let update_data = hermes_response
+            .binary
+            .data
+            .into_iter()
+            .map(|hex_str| {
+                let hex_str = hex_str.trim_start_matches("0x");
+                let bytes = hex::decode(hex_str)
+                    .map_err(|e| anyhow!("Failed to decode hex string: {}", e))?;
+                Ok(Bytes::from(bytes))
+            })
+            .collect::<Result<Vec<Bytes>>>()?;
+
+        tracing::debug!(
+            "Received {} update data entries from Hermes",
+            update_data.len()
+        );
+
+        Ok(update_data)
+    }
+}
+
+impl Default for HermesClient {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Fetch price updates from Hermes for the given price IDs at the specified publish time
+///
+/// This function handles retries and error handling for Hermes API requests
+#[tracing::instrument(skip_all, fields(publish_time))]
+pub async fn fetch_price_updates_from_hermes(
+    publish_time: u64,
+    price_ids: &[[u8; 32]],
+) -> Result<Vec<Bytes>> {
+    const MAX_RETRIES: usize = 3;
+    const RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(500);
+
+    // Use HermesClient::new() which will read from environment variables if available
+    let hermes_client = HermesClient::new();
+    let mut last_error = None;
+
+    for retry in 0..MAX_RETRIES {
+        match hermes_client
+            .get_price_updates(publish_time, price_ids)
+            .await
+        {
+            Ok(update_data) => {
+                if update_data.is_empty() {
+                    tracing::warn!(
+                        "Hermes returned empty update data for publish_time={}",
+                        publish_time
+                    );
+                    return Err(anyhow!("Hermes returned empty update data"));
+                }
+
+                tracing::info!(
+                    "Successfully fetched price updates from Hermes: {} entries",
+                    update_data.len()
+                );
+                return Ok(update_data);
+            }
+            Err(e) => {
+                last_error = Some(e);
+                if retry < MAX_RETRIES - 1 {
+                    tracing::warn!(
+                        "Failed to fetch price updates from Hermes (retry {}/{}): {:?}",
+                        retry + 1,
+                        MAX_RETRIES,
+                        last_error
+                    );
+                    tokio::time::sleep(RETRY_DELAY).await;
+                }
+            }
+        }
+    }
+
+    Err(anyhow!(
+        "Failed to fetch price updates from Hermes after {} retries: {:?}",
+        MAX_RETRIES,
+        last_error
+    ))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use mockito::{self, Matcher};
+
+    #[tokio::test]
+    async fn test_get_price_updates_success() {
+        // Setup mock server
+        let mut mock_server = mockito::Server::new_async().await;
+        let url = mock_server.url();
+
+        // Create a test price ID
+        let price_id = [
+            0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab,
+            0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78,
+            0x90, 0xab, 0xcd, 0xef,
+        ];
+
+        // Create a mock for the Hermes API endpoint
+        let mock = mock_server
+            .mock("GET", "/v2/updates/price/1234567890")
+            .match_query(Matcher::UrlEncoded(
+                "ids".into(),
+                "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef".into(),
+            ))
+            .with_status(200)
+            .with_header("content-type", "application/json")
+            .with_body(
+                r#"{
+                "binary": {
+                    "data": ["0xabcdef1234567890", "0x123456"],
+                    "encoding": "hex"
+                }
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Create a client with the mock server URL
+        let client = HermesClient::with_base_url(url);
+
+        // Call the method
+        let result = client.get_price_updates(1234567890, &[price_id]).await;
+
+        // Verify the result
+        assert!(result.is_ok());
+        let updates = result.unwrap();
+        assert_eq!(updates.len(), 2);
+
+        // Check the first update data
+        let expected_data1 = hex::decode("abcdef1234567890").unwrap();
+        assert_eq!(updates[0].to_vec(), expected_data1);
+
+        // Check the second update data
+        let expected_data2 = hex::decode("123456").unwrap();
+        assert_eq!(updates[1].to_vec(), expected_data2);
+
+        // Verify the mock was called
+        mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_get_price_updates_error_response() {
+        // Setup mock server
+        let mut mock_server = mockito::Server::new_async().await;
+        let url = mock_server.url();
+
+        // Create a test price ID
+        let price_id = [
+            0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab,
+            0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78,
+            0x90, 0xab, 0xcd, 0xef,
+        ];
+
+        // Create a mock for the Hermes API endpoint with an error response
+        let mock = mock_server
+            .mock("GET", "/v2/updates/price/1234567890")
+            .match_query(Matcher::UrlEncoded(
+                "ids".into(),
+                "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef".into(),
+            ))
+            .with_status(404)
+            .with_body("Not found")
+            .create_async()
+            .await;
+
+        // Create a client with the mock server URL
+        let client = HermesClient::with_base_url(url);
+
+        // Call the method
+        let result = client.get_price_updates(1234567890, &[price_id]).await;
+
+        // Verify the result is an error
+        assert!(result.is_err());
+        let error = result.unwrap_err();
+        assert!(error.to_string().contains("Failed to fetch price updates"));
+
+        // Verify the mock was called
+        mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_get_price_updates_empty_response() {
+        // Setup mock server
+        let mut mock_server = mockito::Server::new_async().await;
+        let url = mock_server.url();
+
+        // Create a test price ID
+        let price_id = [
+            0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab,
+            0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78,
+            0x90, 0xab, 0xcd, 0xef,
+        ];
+
+        // Create a mock for the Hermes API endpoint with an empty data array
+        let mock = mock_server
+            .mock("GET", "/v2/updates/price/1234567890")
+            .match_query(Matcher::UrlEncoded(
+                "ids".into(),
+                "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef".into(),
+            ))
+            .with_status(200)
+            .with_header("content-type", "application/json")
+            .with_body(
+                r#"{
+                "binary": {
+                    "data": [],
+                    "encoding": "hex"
+                }
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Create a client with the mock server URL
+        let client = HermesClient::with_base_url(url);
+
+        // Call the method
+        let result = client.get_price_updates(1234567890, &[price_id]).await;
+
+        // Verify the result
+        assert!(result.is_ok());
+        let updates = result.unwrap();
+        assert_eq!(updates.len(), 0);
+
+        // Verify the mock was called
+        mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_get_price_updates_invalid_hex() {
+        // Setup mock server
+        let mut mock_server = mockito::Server::new_async().await;
+        let url = mock_server.url();
+
+        // Create a test price ID
+        let price_id = [
+            0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab,
+            0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78,
+            0x90, 0xab, 0xcd, 0xef,
+        ];
+
+        // Create a mock for the Hermes API endpoint with invalid hex data
+        let mock = mock_server
+            .mock("GET", "/v2/updates/price/1234567890")
+            .match_query(Matcher::UrlEncoded(
+                "ids".into(),
+                "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef".into(),
+            ))
+            .with_status(200)
+            .with_header("content-type", "application/json")
+            .with_body(
+                r#"{
+                "binary": {
+                    "data": ["0xZZZZ"],
+                    "encoding": "hex"
+                }
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Create a client with the mock server URL
+        let client = HermesClient::with_base_url(url);
+
+        // Call the method
+        let result = client.get_price_updates(1234567890, &[price_id]).await;
+
+        // Verify the result is an error about invalid hex
+        assert!(result.is_err());
+        let error = result.unwrap_err();
+        assert!(error.to_string().contains("Failed to decode hex string"));
+
+        // Verify the mock was called
+        mock.assert_async().await;
+    }
+}

+ 0 - 130
apps/argus/src/keeper/process_event.rs

@@ -1,130 +0,0 @@
-use {
-    super::keeper_metrics::{AccountLabel, KeeperMetrics},
-    crate::{
-        api::BlockchainState,
-        chain::{ethereum::InstrumentedSignablePythContract, reader::RequestedWithCallbackEvent},
-    },
-    anyhow::Result,
-    ethers::types::{Bytes, U256},
-    fortuna::eth_utils::utils::{submit_tx_with_backoff, EscalationPolicy},
-    std::sync::Arc,
-    tracing,
-};
-
-/// Process an event with backoff. It will retry the callback execution on failure for 5 minutes.
-#[tracing::instrument(name = "process_event_with_backoff", skip_all, fields(
-    sequence_number = event.sequence_number
-))]
-pub async fn process_event_with_backoff(
-    event: RequestedWithCallbackEvent,
-    chain_state: BlockchainState,
-    contract: Arc<InstrumentedSignablePythContract>,
-    gas_limit: U256,
-    escalation_policy: EscalationPolicy,
-    metrics: Arc<KeeperMetrics>,
-) -> Result<()> {
-    // We process all price update requests for our provider
-    let account_label = AccountLabel {
-        chain_id: chain_state.id.clone(),
-        address: chain_state.provider_address.to_string(),
-    };
-
-    metrics.requests.get_or_create(&account_label).inc();
-    tracing::info!("Started processing event");
-
-    // Fetch price update data for the requested price IDs
-    // In a real implementation, this would fetch the actual price data from a source
-    // For now, we'll use empty update data as a placeholder
-    let update_data: Vec<Bytes> = vec![]; // This would be replaced with actual price data
-
-    let contract_call = contract.execute_callback(
-        event.sequence_number,
-        update_data,
-        event.price_ids.clone(),
-    );
-
-    let success = submit_tx_with_backoff(
-        contract.client(),
-        contract_call,
-        gas_limit,
-        escalation_policy,
-    )
-    .await;
-
-    metrics
-        .requests_processed
-        .get_or_create(&account_label)
-        .inc();
-
-    match success {
-        Ok(res) => {
-            tracing::info!("Processed event successfully in {:?}", res.duration);
-
-            metrics
-                .requests_processed_success
-                .get_or_create(&account_label)
-                .inc();
-
-            metrics
-                .request_duration_ms
-                .get_or_create(&account_label)
-                .observe(res.duration.as_millis() as f64);
-
-            // Track retry count, gas multiplier, and fee multiplier for successful transactions
-            metrics
-                .retry_count
-                .get_or_create(&account_label)
-                .observe(res.num_retries as f64);
-
-            metrics
-                .final_gas_multiplier
-                .get_or_create(&account_label)
-                .observe(res.gas_multiplier as f64);
-
-            metrics
-                .final_fee_multiplier
-                .get_or_create(&account_label)
-                .observe(res.fee_multiplier as f64);
-
-            if let Ok(receipt) = res.receipt {
-                if let Some(gas_used) = receipt.gas_used {
-                    let gas_used_float = gas_used.as_u128() as f64 / 1e18;
-                    metrics
-                        .total_gas_spent
-                        .get_or_create(&account_label)
-                        .inc_by(gas_used_float);
-
-                    if let Some(gas_price) = receipt.effective_gas_price {
-                        let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18;
-                        metrics
-                            .total_gas_fee_spent
-                            .get_or_create(&account_label)
-                            .inc_by(gas_fee);
-                    }
-                }
-            }
-            metrics.callbacks_executed.get_or_create(&account_label).inc();
-        }
-        Err(e) => {
-            // In case the callback did not succeed, we double-check that the request is still on-chain.
-            // If the request is no longer on-chain, one of the transactions we sent likely succeeded, but
-            // the RPC gave us an error anyway.
-            let req = chain_state
-                .contract
-                .get_request(event.sequence_number)
-                .await;
-
-            tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req);
-
-            // We only count failures for cases where we are completely certain that the callback failed.
-            if req.is_ok_and(|x| x.is_some()) {
-                metrics
-                    .requests_processed_failure
-                    .get_or_create(&account_label)
-                    .inc();
-            }
-        }
-    }
-
-    Ok(())
-}

+ 133 - 7
apps/argus/src/keeper/request.rs

@@ -2,11 +2,12 @@ use {
     crate::{
         api::{self, BlockchainState},
         chain::{ethereum::InstrumentedSignablePythContract, reader::{BlockNumber, RequestedWithCallbackEvent}},
-        keeper::keeper_metrics::KeeperMetrics,
-        keeper::process_event::process_event_with_backoff,
+        keeper::keeper_metrics::{AccountLabel, KeeperMetrics},
+        keeper::hermes::fetch_price_updates_from_hermes,
     },
+    anyhow::Result,
     ethers::types::U256,
-    fortuna::eth_utils::utils::EscalationPolicy,
+    fortuna::eth_utils::utils::{submit_tx_with_backoff, EscalationPolicy},
     std::{collections::HashSet, sync::Arc},
     tokio::{
         spawn,
@@ -62,7 +63,7 @@ pub async fn process_active_requests(
 
                 for request in &requests {
                     // Convert Request to RequestedWithCallbackEvent format for compatibility
-                    let event = RequestedWithCallbackEvent {
+                    let request_event = RequestedWithCallbackEvent {
                         sequence_number: request.sequence_number,
                         requester: request.requester,
                         price_ids: request.price_ids.clone(),
@@ -73,12 +74,12 @@ pub async fn process_active_requests(
                     let newly_inserted = fulfilled_requests_cache
                         .write()
                         .await
-                        .insert(event.sequence_number);
+                        .insert(request_event.sequence_number);
 
                     if newly_inserted {
                         spawn(
-                            process_event_with_backoff(
-                                event,
+                            process_request_with_backoff(
+                                request_event,
                                 chain_state.clone(),
                                 contract.clone(),
                                 gas_limit,
@@ -104,3 +105,128 @@ pub async fn process_active_requests(
         }
     }
 }
+
+/// Process a request with backoff. It will retry the callback execution on failure for 5 minutes.
+#[tracing::instrument(name = "process_request_with_backoff", skip_all, fields(
+    sequence_number = request.sequence_number
+))]
+pub async fn process_request_with_backoff(
+    request: RequestedWithCallbackEvent,
+    chain_state: BlockchainState,
+    contract: Arc<InstrumentedSignablePythContract>,
+    gas_limit: U256,
+    escalation_policy: EscalationPolicy,
+    metrics: Arc<KeeperMetrics>,
+) -> Result<()> {
+    // We process all price update requests for our provider
+    let account_label = AccountLabel {
+        chain_id: chain_state.id.clone(),
+        address: chain_state.provider_address.to_string(),
+    };
+
+    metrics.requests.get_or_create(&account_label).inc();
+    tracing::info!("Started processing request");
+
+    // Get the request details from the contract to get the publish_time
+    let request_details = match chain_state.contract.get_request(request.sequence_number).await? {
+        Some(req) => req,
+        None => {
+            tracing::warn!("Request not found on-chain, it may have been already fulfilled");
+            return Ok(());
+        }
+    };
+
+    // Fetch price update data from Hermes for the requested price IDs
+    let update_data = fetch_price_updates_from_hermes(request_details.publish_time.as_u64(), &request.price_ids).await?;
+
+    let contract_call = contract.execute_callback(
+        request.sequence_number,
+        update_data,
+        request.price_ids.clone(),
+    );
+
+    let success = submit_tx_with_backoff(
+        contract.client(),
+        contract_call,
+        gas_limit,
+        escalation_policy,
+    )
+    .await;
+
+    metrics
+        .requests_processed
+        .get_or_create(&account_label)
+        .inc();
+
+    match success {
+        Ok(res) => {
+            tracing::info!("Processed request successfully in {:?}", res.duration);
+
+            metrics
+                .requests_processed_success
+                .get_or_create(&account_label)
+                .inc();
+
+            metrics
+                .request_duration_ms
+                .get_or_create(&account_label)
+                .observe(res.duration.as_millis() as f64);
+
+            // Track retry count, gas multiplier, and fee multiplier for successful transactions
+            metrics
+                .retry_count
+                .get_or_create(&account_label)
+                .observe(res.num_retries as f64);
+
+            metrics
+                .final_gas_multiplier
+                .get_or_create(&account_label)
+                .observe(res.gas_multiplier as f64);
+
+            metrics
+                .final_fee_multiplier
+                .get_or_create(&account_label)
+                .observe(res.fee_multiplier as f64);
+
+            if let Ok(receipt) = res.receipt {
+                if let Some(gas_used) = receipt.gas_used {
+                    let gas_used_float = gas_used.as_u128() as f64 / 1e18;
+                    metrics
+                        .total_gas_spent
+                        .get_or_create(&account_label)
+                        .inc_by(gas_used_float);
+
+                    if let Some(gas_price) = receipt.effective_gas_price {
+                        let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18;
+                        metrics
+                            .total_gas_fee_spent
+                            .get_or_create(&account_label)
+                            .inc_by(gas_fee);
+                    }
+                }
+            }
+            metrics.callbacks_executed.get_or_create(&account_label).inc();
+        }
+        Err(e) => {
+            // In case the callback did not succeed, we double-check that the request is still on-chain.
+            // If the request is no longer on-chain, one of the transactions we sent likely succeeded, but
+            // the RPC gave us an error anyway.
+            let req = chain_state
+                .contract
+                .get_request(request.sequence_number)
+                .await;
+
+            tracing::error!("Failed to process request: {:?}. Request: {:?}", e, req);
+
+            // We only count failures for cases where we are completely certain that the callback failed.
+            if req.is_ok_and(|x| x.is_some()) {
+                metrics
+                    .requests_processed_failure
+                    .get_or_create(&account_label)
+                    .inc();
+            }
+        }
+    }
+
+    Ok(())
+}

Неке датотеке нису приказане због велике количине промена