Ver Fonte

feat(keeper): refactor

Daniel Chew há 8 meses atrás
pai
commit
ae97d91c24

+ 3 - 3
apps/argus/src/chain/reader.rs

@@ -101,7 +101,7 @@ pub mod mock {
                             callback_gas_limit: c,
                             price_ids: p.clone(),
                             publish_time: t,
-                            num_price_ids: 0,
+                            num_price_ids: p.len().min(255) as u8,
                         })
                         .collect(),
                 ),
@@ -121,9 +121,9 @@ pub mod mock {
                 requester,
                 sequence_number: sequence,
                 callback_gas_limit,
-                price_ids,
+                price_ids: price_ids.clone(),
                 publish_time,
-                num_price_ids: 0,
+                num_price_ids: price_ids.len().min(255) as u8,
             });
             self
         }

+ 7 - 18
apps/argus/src/keeper/hermes.rs

@@ -94,17 +94,13 @@ impl HermesClient {
         &self,
         publish_time: u64,
         price_ids: &[[u8; 32]],
-        num_price_ids: u8,
     ) -> Result<Vec<Bytes>> {
-        let valid_price_ids_count = num_price_ids as usize;
-
-        let price_ids_hex: Vec<String> = price_ids[0..valid_price_ids_count]
+        let price_ids_hex: Vec<String> = price_ids
             .iter()
             .map(|id| format!("0x{}", hex::encode(id)))
             .collect();
 
         let url = format!("{}/v2/updates/price/{}", self.base_url, publish_time);
-
         tracing::debug!(
             "Fetching price updates from Hermes for publish_time={} price_ids={:?}",
             publish_time,
@@ -116,13 +112,7 @@ impl HermesClient {
             query_params.push(("ids[]", id.as_str()));
         }
 
-        let response = self
-            .client
-            .get(&url)
-            .query(&query_params)
-            .send()
-            .await?;
-
+        let response = self.client.get(&url).query(&query_params).send().await?;
         println!("Full URL: {}", response.url());
 
         if !response.status().is_success() {
@@ -173,7 +163,6 @@ pub async fn fetch_price_updates_from_hermes(
     publish_time: u64,
     price_ids: &[[u8; 32]],
     hermes_base_url: String,
-    num_price_ids: u8,
 ) -> Result<Vec<Bytes>> {
     const MAX_RETRIES: usize = 3;
     const RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(500);
@@ -185,7 +174,7 @@ pub async fn fetch_price_updates_from_hermes(
 
     for retry in 0..MAX_RETRIES {
         match hermes_client
-            .get_price_updates(publish_time, price_ids, num_price_ids)
+            .get_price_updates(publish_time, price_ids)
             .await
         {
             Ok(update_data) => {
@@ -267,7 +256,7 @@ mod tests {
         let client = HermesClient::with_base_url(url);
 
         // Call the method
-        let result = client.get_price_updates(1234567890, &[price_id], 2).await;
+        let result = client.get_price_updates(1234567890, &[price_id]).await;
 
         // Verify the result
         assert!(result.is_ok());
@@ -315,7 +304,7 @@ mod tests {
         let client = HermesClient::with_base_url(url);
 
         // Call the method
-        let result = client.get_price_updates(1234567890, &[price_id], 2).await;
+        let result = client.get_price_updates(1234567890, &[price_id]).await;
 
         // Verify the result is an error
         assert!(result.is_err());
@@ -363,7 +352,7 @@ mod tests {
         let client = HermesClient::with_base_url(url);
 
         // Call the method
-        let result = client.get_price_updates(1234567890, &[price_id], 2).await;
+        let result = client.get_price_updates(1234567890, &[price_id]).await;
 
         // Verify the result
         assert!(result.is_ok());
@@ -411,7 +400,7 @@ mod tests {
         let client = HermesClient::with_base_url(url);
 
         // Call the method
-        let result = client.get_price_updates(1234567890, &[price_id], 2).await;
+        let result = client.get_price_updates(1234567890, &[price_id]).await;
 
         // Verify the result is an error about invalid hex
         assert!(result.is_err());

+ 44 - 14
apps/argus/src/keeper/request.rs

@@ -2,8 +2,8 @@ use {
     crate::{
         api::{self, BlockchainState},
         chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
-        keeper::keeper_metrics::{AccountLabel, KeeperMetrics},
         keeper::hermes::fetch_price_updates_from_hermes,
+        keeper::keeper_metrics::{AccountLabel, KeeperMetrics},
     },
     anyhow::Result,
     ethers::types::U256,
@@ -24,11 +24,7 @@ const RETRY_INTERVAL: Duration = Duration::from_secs(5);
 /// This is still needed for logging and initialization purposes.
 pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
     loop {
-        match chain_state
-            .contract
-            .get_block_number()
-            .await
-        {
+        match chain_state.contract.get_block_number().await {
             Ok(latest_confirmed_block) => {
                 tracing::info!("Fetched latest safe block {}", latest_confirmed_block);
                 return latest_confirmed_block;
@@ -60,7 +56,10 @@ pub async fn process_active_requests(
 
         match active_requests_res {
             Ok(requests) => {
-                tracing::info!(num_of_requests = &requests.len(), "Processing active requests");
+                tracing::info!(
+                    num_of_requests = &requests.len(),
+                    "Processing active requests"
+                );
 
                 for request in &requests {
                     // The write lock guarantees we spawn only one task per sequence number
@@ -85,7 +84,10 @@ pub async fn process_active_requests(
                     }
                 }
 
-                tracing::info!(num_of_requests = &requests.len(), "Processed active requests");
+                tracing::info!(
+                    num_of_requests = &requests.len(),
+                    "Processed active requests"
+                );
                 break;
             }
             Err(e) => {
@@ -123,7 +125,11 @@ pub async fn process_request_with_backoff(
     tracing::info!("Started processing request");
 
     // Get the request details from the contract to get the publish_time and num_price_ids
-    let request_details = match chain_state.contract.get_request(request.sequence_number).await? {
+    let request_details = match chain_state
+        .contract
+        .get_request(request.sequence_number)
+        .await?
+    {
         Some(req) => req,
         None => {
             tracing::warn!("Request not found on-chain, it may have been already fulfilled");
@@ -131,14 +137,34 @@ pub async fn process_request_with_backoff(
         }
     };
 
-    // Fetch price update data from Hermes for the requested price IDs
+    // Filter price_ids to only include the first num_price_ids elements
+    let filtered_price_ids = if request_details.num_price_ids as usize <= request.price_ids.len() {
+        request.price_ids[..request_details.num_price_ids as usize].to_vec()
+    } else {
+        tracing::warn!(
+            "num_price_ids ({}) exceeds price_ids length ({}), using all available price_ids",
+            request_details.num_price_ids,
+            request.price_ids.len()
+        );
+        request.price_ids.clone()
+    };
+
+    tracing::info!(
+        "Filtered price_ids length: {:?}, Request num_price_ids: {:?}",
+        filtered_price_ids.len(),
+        request_details.num_price_ids
+    );
+
+    // Fetch price update data from Hermes for the filtered price IDs
     let update_data = fetch_price_updates_from_hermes(
         request_details.publish_time.as_u64(),
-        &request.price_ids,
+        &filtered_price_ids,
         hermes_base_url,
-        request_details.num_price_ids,
-    ).await?;
+    )
+    .await?;
+    tracing::info!("Fetched price update data from Hermes");
 
+    tracing::info!("Executing callback");
     let contract_call = contract.execute_callback(
         request.sequence_number,
         update_data,
@@ -152,6 +178,7 @@ pub async fn process_request_with_backoff(
         escalation_policy,
     )
     .await;
+    tracing::info!("Submitted callback");
 
     metrics
         .requests_processed
@@ -205,7 +232,10 @@ pub async fn process_request_with_backoff(
                     }
                 }
             }
-            metrics.callbacks_executed.get_or_create(&account_label).inc();
+            metrics
+                .callbacks_executed
+                .get_or_create(&account_label)
+                .inc();
         }
         Err(e) => {
             // In case the callback did not succeed, we double-check that the request is still on-chain.