Jelajahi Sumber

feat(hermes): add staleness histograms for publish→receive and receive→broadcast latencies

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>
Devin AI 3 bulan lalu
induk
melakukan
4f4be83674

+ 1 - 1
apps/hermes/server/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name        = "hermes"
-version     = "0.10.4"
+version     = "0.10.5"
 description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
 edition     = "2021"
 

+ 17 - 0
apps/hermes/server/src/api/metrics_middleware.rs

@@ -18,6 +18,7 @@ use {
 pub struct ApiMetrics {
     pub requests: Family<Labels, Counter>,
     pub latencies: Family<Labels, Histogram>,
+    pub sse_broadcast_latency: Histogram,
 }
 
 impl ApiMetrics {
@@ -36,6 +37,12 @@ impl ApiMetrics {
                     .into_iter(),
                 )
             }),
+            sse_broadcast_latency: Histogram::new(
+                [
+                    0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
+                ]
+                .into_iter(),
+            ),
         };
 
         {
@@ -58,6 +65,16 @@ impl ApiMetrics {
                     ),
                 )
                 .await;
+
+                Metrics::register(
+                    &*state,
+                    (
+                        "sse_broadcast_latency_seconds",
+                        "Latency from Hermes receive_time to SSE send in seconds",
+                        new.sse_broadcast_latency.clone(),
+                    ),
+                )
+                .await;
             });
         }
 

+ 14 - 0
apps/hermes/server/src/api/rest/v2/sse.rs

@@ -206,6 +206,20 @@ where
         return Ok(None);
     }
 
+    let now_secs = std::time::SystemTime::now()
+        .duration_since(std::time::UNIX_EPOCH)
+        .map(|d| d.as_secs_f64())
+        .unwrap_or(0.0);
+    for pu in &parsed_price_updates {
+        if let Some(receive_time) = pu.metadata.proof_available_time {
+            let latency = now_secs - (receive_time as f64);
+            state
+                .metrics
+                .sse_broadcast_latency
+                .observe(latency.max(0.0));
+        }
+    }
+
     let price_update_data = price_feeds_with_update_data.update_data;
     let encoded_data: Vec<String> = price_update_data
         .into_iter()

+ 29 - 0
apps/hermes/server/src/api/ws.rs

@@ -85,6 +85,7 @@ pub struct Labels {
 
 pub struct WsMetrics {
     pub interactions: Family<Labels, Counter>,
+    pub broadcast_latency: prometheus_client::metrics::histogram::Histogram,
 }
 
 impl WsMetrics {
@@ -95,6 +96,12 @@ impl WsMetrics {
     {
         let new = Self {
             interactions: Family::default(),
+            broadcast_latency: prometheus_client::metrics::histogram::Histogram::new(
+                [
+                    0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
+                ]
+                .into_iter(),
+            ),
         };
 
         {
@@ -110,6 +117,16 @@ impl WsMetrics {
                     ),
                 )
                 .await;
+
+                Metrics::register(
+                    &*state,
+                    (
+                        "ws_broadcast_latency_seconds",
+                        "Latency from Hermes receive_time to WS send in seconds",
+                        new.broadcast_latency.clone(),
+                    ),
+                )
+                .await;
             });
         }
 
@@ -415,6 +432,18 @@ where
                 }
             }
 
+            let now_secs = std::time::SystemTime::now()
+                .duration_since(std::time::UNIX_EPOCH)
+                .map(|d| d.as_secs_f64())
+                .unwrap_or(0.0);
+            if let Some(received_at) = update.received_at {
+                let latency = now_secs - (received_at as f64);
+                self.ws_state
+                    .metrics
+                    .broadcast_latency
+                    .observe(latency.max(0.0));
+            }
+
             let message = serde_json::to_string(&ServerMessage::PriceUpdate {
                 price_feed: RpcPriceFeed::from_price_feed_update(
                     update,

+ 10 - 0
apps/hermes/server/src/state/aggregate.rs

@@ -367,6 +367,16 @@ where
         // we can build the message states
         let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?;
 
+        {
+            let mut data = self.into().data.write().await;
+            for ms in &message_states {
+                let publish = ms.message.publish_time() as f64;
+                let receive = ms.received_at as f64;
+                let latency = receive - publish;
+                data.metrics.observe_publish_to_receive(latency);
+            }
+        }
+
         let message_state_keys = message_states
             .iter()
             .map(|message_state| message_state.key())

+ 19 - 0
apps/hermes/server/src/state/aggregate/metrics.rs

@@ -34,6 +34,7 @@ struct ObservedSlotLabels {
 pub struct Metrics {
     observed_slot: Family<ObservedSlotLabels, Counter>,
     observed_slot_latency: Family<ObservedSlotLabels, Histogram>,
+    publish_to_receive_latency: Histogram,
     first_observed_time_of_slot: BTreeMap<Slot, Instant>,
     newest_observed_slot: HashMap<Event, Slot>,
 }
@@ -50,6 +51,12 @@ impl Metrics {
                     .into_iter(),
                 )
             }),
+            publish_to_receive_latency: Histogram::new(
+                [
+                    0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
+                ]
+                .into_iter(),
+            ),
             first_observed_time_of_slot: BTreeMap::new(),
             newest_observed_slot: HashMap::new(),
         };
@@ -69,11 +76,23 @@ impl Metrics {
                 "Latency of observed slots in seconds",
                 observed_slot_latency,
             );
+
+            metrics_registry.register(
+                "publish_to_receive_latency_seconds",
+                "Latency from message publish_time to Hermes receive_time in seconds",
+                new.publish_to_receive_latency.clone(),
+            );
         }
 
         new
     }
 
+    pub fn observe_publish_to_receive(&mut self, latency_secs: f64) {
+        if latency_secs.is_finite() && latency_secs >= 0.0 {
+            self.publish_to_receive_latency.observe(latency_secs);
+        }
+    }
+
     /// Observe a slot and event. An event at a slot should be observed only once.
     pub fn observe(&mut self, slot: Slot, event: Event) {
         let order = if self