浏览代码

feat(hermes): observe WS latency after flush; observe SSE latency post-encoding

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>
Devin AI 3 月之前
父节点
当前提交
6f86bc9288
共有 2 个文件被更改,包括 50 次插入25 次删除
  1. 12 12
      apps/hermes/server/src/api/rest/v2/sse.rs
  2. 38 13
      apps/hermes/server/src/api/ws.rs

+ 12 - 12
apps/hermes/server/src/api/rest/v2/sse.rs

@@ -206,30 +206,30 @@ where
         return Ok(None);
     }
 
+
+    let price_update_data = price_feeds_with_update_data.update_data;
+    let encoded_data: Vec<String> = price_update_data
+        .into_iter()
+        .map(|data| encoding.encode_str(&data))
+        .collect();
+    let binary_price_update = BinaryUpdate {
+        encoding,
+        data: encoded_data,
+    };
+
     let now_secs = std::time::SystemTime::now()
         .duration_since(std::time::UNIX_EPOCH)
         .map(|d| d.as_secs_f64())
         .unwrap_or(0.0);
     for pu in &parsed_price_updates {
         if let Some(receive_time) = pu.metadata.proof_available_time {
-            let latency = now_secs - (receive_time as f64);
             state
                 .metrics
                 .sse_broadcast_latency
-                .observe(latency.max(0.0));
+                .observe((now_secs - (receive_time as f64)).max(0.0));
         }
     }
 
-    let price_update_data = price_feeds_with_update_data.update_data;
-    let encoded_data: Vec<String> = price_update_data
-        .into_iter()
-        .map(|data| encoding.encode_str(&data))
-        .collect();
-    let binary_price_update = BinaryUpdate {
-        encoding,
-        data: encoded_data,
-    };
-
     Ok(Some(PriceUpdate {
         binary: binary_price_update,
         parsed: if parsed {

+ 38 - 13
apps/hermes/server/src/api/ws.rs

@@ -419,6 +419,28 @@ where
             }
         };
 
+        let batch_min_received_at = updates
+            .price_feeds
+            .iter()
+            .filter_map(|u| u.received_at)
+            .min();
+        let batch_min_publish_time = updates
+            .price_feeds
+            .iter()
+            .map(|u| u.price_feed.get_price_unchecked().publish_time)
+            .min();
+
+        let batch_min_received_at = updates
+            .price_feeds
+            .iter()
+            .filter_map(|u| u.received_at)
+            .min();
+        let batch_min_publish_time = updates
+            .price_feeds
+            .iter()
+            .map(|u| u.price_feed.get_price_unchecked().publish_time)
+            .min();
+
         for update in updates.price_feeds {
             let config = self
                 .price_feeds_with_config
@@ -433,17 +455,6 @@ where
                 }
             }
 
-            let now_secs = std::time::SystemTime::now()
-                .duration_since(std::time::UNIX_EPOCH)
-                .map(|d| d.as_secs_f64())
-                .unwrap_or(0.0);
-            if let Some(received_at) = update.received_at {
-                let latency = now_secs - (received_at as f64);
-                self.ws_state
-                    .metrics
-                    .broadcast_latency
-                    .observe(latency.max(0.0));
-            }
 
             let message = serde_json::to_string(&ServerMessage::PriceUpdate {
                 price_feed: RpcPriceFeed::from_price_feed_update(
@@ -510,6 +521,21 @@ where
         }
 
         self.sender.flush().await?;
+        let now_secs = std::time::SystemTime::now()
+            .duration_since(std::time::UNIX_EPOCH)
+            .map(|d| d.as_secs_f64())
+            .unwrap_or(0.0);
+        if let Some(min_recv) = batch_min_received_at {
+            self.ws_state
+                .metrics
+                .broadcast_latency
+                .observe((now_secs - (min_recv as f64)).max(0.0));
+        } else if let Some(min_pub) = batch_min_publish_time {
+            self.ws_state
+                .metrics
+                .broadcast_latency
+                .observe((now_secs - (min_pub as f64)).max(0.0));
+        }
         Ok(())
     }
 
@@ -518,8 +544,7 @@ where
         let maybe_client_message = match message {
             Message::Close(_) => {
                 // Closing the connection. We don't remove it from the subscribers
-                // list, instead when the Subscriber struct is dropped the channel
-                // to subscribers list will be closed and it will eventually get
+
                 // removed.
                 tracing::trace!(id = self.id, "Subscriber Closed Connection.");
                 self.ws_state