Kaynağa Gözat

Merge pull request #2474 from pyth-network/hermes/fix-bugs

fix(apps/hermes): fix duplicate events for processed slots
Daniel Chew 8 ay önce
ebeveyn
işleme
e7db34baa6

+ 1 - 1
apps/hermes/server/Cargo.lock

@@ -1868,7 +1868,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
 
 [[package]]
 name = "hermes"
-version = "0.8.2"
+version = "0.8.3"
 dependencies = [
  "anyhow",
  "async-trait",

+ 1 - 1
apps/hermes/server/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name        = "hermes"
-version     = "0.8.2"
+version     = "0.8.3"
 description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
 edition     = "2021"
 

+ 27 - 25
apps/hermes/server/src/api/rest/v2/sse.rs

@@ -93,34 +93,36 @@ where
     // Convert the broadcast receiver into a Stream
     let stream = BroadcastStream::new(update_rx);
 
-    let sse_stream = stream.then(move |message| {
-        let state_clone = state.clone(); // Clone again to use inside the async block
-        let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
-        async move {
-            match message {
-                Ok(event) => {
-                    match handle_aggregation_event(
-                        event,
-                        state_clone,
-                        price_ids_clone,
-                        params.encoding,
-                        params.parsed,
-                        params.benchmarks_only,
-                        params.allow_unordered,
-                    )
-                    .await
-                    {
-                        Ok(Some(update)) => Ok(Event::default()
-                            .json_data(update)
-                            .unwrap_or_else(error_event)),
-                        Ok(None) => Ok(Event::default().comment("No update available")),
-                        Err(e) => Ok(error_event(e)),
+    let sse_stream = stream
+        .then(move |message| {
+            let state_clone = state.clone(); // Clone again to use inside the async block
+            let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
+            async move {
+                match message {
+                    Ok(event) => {
+                        match handle_aggregation_event(
+                            event,
+                            state_clone,
+                            price_ids_clone,
+                            params.encoding,
+                            params.parsed,
+                            params.benchmarks_only,
+                            params.allow_unordered,
+                        )
+                        .await
+                        {
+                            Ok(Some(update)) => Some(Ok(Event::default()
+                                .json_data(update)
+                                .unwrap_or_else(error_event))),
+                            Ok(None) => None,
+                            Err(e) => Some(Ok(error_event(e))),
+                        }
                     }
+                    Err(e) => Some(Ok(error_event(e))),
                 }
-                Err(e) => Ok(error_event(e)),
             }
-        }
-    });
+        })
+        .filter_map(|x| x);
 
     Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
 }

+ 144 - 17
apps/hermes/server/src/state/aggregate.rs

@@ -282,13 +282,23 @@ where
                     WormholePayload::Merkle(proof) => {
                         tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");
 
-                        store_wormhole_merkle_verified_message(
+                        // Store the wormhole merkle verified message and check if it was already stored
+                        let is_new = store_wormhole_merkle_verified_message(
                             self,
                             proof.clone(),
                             update_vaa.to_owned(),
                         )
                         .await?;
 
+                        // If the message was already stored, return early
+                        if !is_new {
+                            tracing::info!(
+                                slot = proof.slot,
+                                "VAA Merkle Proof already stored, skipping."
+                            );
+                            return Ok(());
+                        }
+
                         self.into()
                             .data
                             .write()
@@ -304,9 +314,22 @@ where
                 let slot = accumulator_messages.slot;
                 tracing::info!(slot = slot, "Storing Accumulator Messages.");
 
-                self.store_accumulator_messages(accumulator_messages)
+                // Store the accumulator messages and check if they were already stored in a single operation
+                // This avoids the race condition where multiple threads could check and find nothing
+                // but then both store the same messages
+                let is_new = self
+                    .store_accumulator_messages(accumulator_messages)
                     .await?;
 
+                // If the messages were already stored, return early
+                if !is_new {
+                    tracing::info!(
+                        slot = slot,
+                        "Accumulator Messages already stored, skipping."
+                    );
+                    return Ok(());
+                }
+
                 self.into()
                     .data
                     .write()
@@ -351,28 +374,23 @@ where
         // Update the aggregate state
         let mut aggregate_state = self.into().data.write().await;
 
-        // Send update event to subscribers. We are purposefully ignoring the result
-        // because there might be no subscribers.
-        let _ = match aggregate_state.latest_completed_slot {
+        // Atomic check and update
+        let event = match aggregate_state.latest_completed_slot {
             None => {
-                aggregate_state.latest_completed_slot.replace(slot);
-                self.into()
-                    .api_update_tx
-                    .send(AggregationEvent::New { slot })
+                aggregate_state.latest_completed_slot = Some(slot);
+                AggregationEvent::New { slot }
             }
             Some(latest) if slot > latest => {
                 self.prune_removed_keys(message_state_keys).await;
-                aggregate_state.latest_completed_slot.replace(slot);
-                self.into()
-                    .api_update_tx
-                    .send(AggregationEvent::New { slot })
+                aggregate_state.latest_completed_slot = Some(slot);
+                AggregationEvent::New { slot }
             }
-            _ => self
-                .into()
-                .api_update_tx
-                .send(AggregationEvent::OutOfOrder { slot }),
+            _ => AggregationEvent::OutOfOrder { slot },
         };
 
+        // Only send the event after the state has been updated
+        let _ = self.into().api_update_tx.send(event);
+
         aggregate_state.latest_completed_slot = aggregate_state
             .latest_completed_slot
             .map(|latest| latest.max(slot))
@@ -1374,6 +1392,115 @@ mod test {
 
         assert_eq!(result.unwrap_err().to_string(), "Message not found");
     }
+
+    /// Test that verifies only one event is sent per slot, even when updates arrive out of order
+    /// or when a slot is processed multiple times.
+    #[tokio::test]
+    pub async fn test_out_of_order_updates_send_single_event_per_slot() {
+        let (state, mut update_rx) = setup_state(10).await;
+
+        // Create price feed messages
+        let price_feed_100 = create_dummy_price_feed_message(100, 10, 9);
+        let price_feed_101 = create_dummy_price_feed_message(100, 11, 10);
+
+        // First, process slot 100
+        store_multiple_concurrent_valid_updates(
+            state.clone(),
+            generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 20),
+        )
+        .await;
+
+        // Check that we received the New event for slot 100
+        assert_eq!(
+            update_rx.recv().await,
+            Ok(AggregationEvent::New { slot: 100 })
+        );
+
+        // Next, process slot 101
+        store_multiple_concurrent_valid_updates(
+            state.clone(),
+            generate_update(vec![Message::PriceFeedMessage(price_feed_101)], 101, 21),
+        )
+        .await;
+
+        // Check that we received the New event for slot 101
+        assert_eq!(
+            update_rx.recv().await,
+            Ok(AggregationEvent::New { slot: 101 })
+        );
+
+        // Now, process slot 100 again
+        store_multiple_concurrent_valid_updates(
+            state.clone(),
+            generate_update(vec![Message::PriceFeedMessage(price_feed_100)], 100, 22),
+        )
+        .await;
+
+        // Try to receive another event with a timeout to ensure no more events were sent
+        // We should not receive an OutOfOrder event for slot 100 since we've already sent an event for it
+        let timeout_result =
+            tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await;
+
+        // The timeout should occur, indicating no more events were received
+        assert!(
+            timeout_result.is_err(),
+            "Received unexpected additional event"
+        );
+
+        // Verify that both price feeds were stored correctly
+        let price_feed_ids = (*state).get_price_feed_ids().await;
+        assert_eq!(price_feed_ids.len(), 1);
+        assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32])));
+    }
+
+    /// Test that verifies only one event is sent when multiple concurrent updates
+    /// for the same slot are processed.
+    #[tokio::test]
+    pub async fn test_concurrent_updates_same_slot_sends_single_event() {
+        let (state, mut update_rx) = setup_state(10).await;
+
+        // Create a single price feed message
+        let price_feed = create_dummy_price_feed_message(100, 10, 9);
+
+        // Generate 100 identical updates for the same slot but with different sequence numbers
+        let mut all_updates = Vec::new();
+        for seq in 0..100 {
+            let updates = generate_update(vec![Message::PriceFeedMessage(price_feed)], 10, seq);
+            all_updates.extend(updates);
+        }
+
+        // Process updates concurrently - we don't care if some fail due to the race condition
+        // The important thing is that only one event is sent
+        let state_arc = Arc::clone(&state);
+        let futures = all_updates.into_iter().map(move |u| {
+            let state_clone = Arc::clone(&state_arc);
+            async move {
+                let _ = state_clone.store_update(u).await;
+            }
+        });
+        futures::future::join_all(futures).await;
+
+        // Check that only one AggregationEvent::New is received
+        assert_eq!(
+            update_rx.recv().await,
+            Ok(AggregationEvent::New { slot: 10 })
+        );
+
+        // Try to receive another event with a timeout to ensure no more events were sent
+        let timeout_result =
+            tokio::time::timeout(std::time::Duration::from_millis(100), update_rx.recv()).await;
+
+        // The timeout should occur, indicating no more events were received
+        assert!(
+            timeout_result.is_err(),
+            "Received unexpected additional event"
+        );
+
+        // Verify that the price feed was stored correctly
+        let price_feed_ids = (*state).get_price_feed_ids().await;
+        assert_eq!(price_feed_ids.len(), 1);
+        assert!(price_feed_ids.contains(&PriceIdentifier::new([100; 32])));
+    }
 }
 #[cfg(test)]
 /// Unit tests for the core TWAP calculation logic in `calculate_twap`

+ 5 - 3
apps/hermes/server/src/state/aggregate/wormhole_merkle.rs

@@ -55,14 +55,16 @@ pub async fn store_wormhole_merkle_verified_message<S>(
     state: &S,
     root: WormholeMerkleRoot,
     vaa: VaaBytes,
-) -> Result<()>
+) -> Result<bool>
 where
     S: Cache,
 {
+    // Store the state and check if it was already stored in a single operation
+    // This avoids the race condition where multiple threads could check and find nothing
+    // but then both store the same state
     state
         .store_wormhole_merkle_state(WormholeMerkleState { root, vaa })
-        .await?;
-    Ok(())
+        .await
 }
 
 pub fn construct_message_states_proofs(

+ 28 - 36
apps/hermes/server/src/state/cache.rs

@@ -122,12 +122,12 @@ pub trait Cache {
     async fn store_accumulator_messages(
         &self,
         accumulator_messages: AccumulatorMessages,
-    ) -> Result<()>;
+    ) -> Result<bool>;
     async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>>;
     async fn store_wormhole_merkle_state(
         &self,
         wormhole_merkle_state: WormholeMerkleState,
-    ) -> Result<()>;
+    ) -> Result<bool>;
     async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>;
     async fn message_state_keys(&self) -> Vec<MessageStateKey>;
     async fn fetch_message_states(
@@ -226,13 +226,22 @@ where
     async fn store_accumulator_messages(
         &self,
         accumulator_messages: AccumulatorMessages,
-    ) -> Result<()> {
+    ) -> Result<bool> {
         let mut cache = self.into().accumulator_messages_cache.write().await;
-        cache.insert(accumulator_messages.slot, accumulator_messages);
+        let slot = accumulator_messages.slot;
+
+        // Check if we already have messages for this slot while holding the lock
+        if cache.contains_key(&slot) {
+            // Messages already exist, return false to indicate no insertion happened
+            return Ok(false);
+        }
+
+        // Messages don't exist, store them
+        cache.insert(slot, accumulator_messages);
         while cache.len() > self.into().cache_size as usize {
             cache.pop_first();
         }
-        Ok(())
+        Ok(true)
     }
 
     async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>> {
@@ -243,13 +252,22 @@ where
     async fn store_wormhole_merkle_state(
         &self,
         wormhole_merkle_state: WormholeMerkleState,
-    ) -> Result<()> {
+    ) -> Result<bool> {
         let mut cache = self.into().wormhole_merkle_state_cache.write().await;
-        cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state);
+        let slot = wormhole_merkle_state.root.slot;
+
+        // Check if we already have a state for this slot while holding the lock
+        if cache.contains_key(&slot) {
+            // State already exists, return false to indicate no insertion happened
+            return Ok(false);
+        }
+
+        // State doesn't exist, store it
+        cache.insert(slot, wormhole_merkle_state);
         while cache.len() > self.into().cache_size as usize {
             cache.pop_first();
         }
-        Ok(())
+        Ok(true)
     }
 
     async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>> {
@@ -702,18 +720,7 @@ mod test {
         let (state, _) = setup_state(2).await;
 
         // Make sure the retrieved accumulator messages is what we store.
-        let mut accumulator_messages_at_10 = create_empty_accumulator_messages_at_slot(10);
-        state
-            .store_accumulator_messages(accumulator_messages_at_10.clone())
-            .await
-            .unwrap();
-        assert_eq!(
-            state.fetch_accumulator_messages(10).await.unwrap().unwrap(),
-            accumulator_messages_at_10
-        );
-
-        // Make sure overwriting the accumulator messages works.
-        accumulator_messages_at_10.ring_size = 5; // Change the ring size from 3 to 5.
+        let accumulator_messages_at_10 = create_empty_accumulator_messages_at_slot(10);
         state
             .store_accumulator_messages(accumulator_messages_at_10.clone())
             .await
@@ -764,22 +771,7 @@ mod test {
         let (state, _) = setup_state(2).await;
 
         // Make sure the retrieved wormhole merkle state is what we store
-        let mut wormhole_merkle_state_at_10 = create_empty_wormhole_merkle_state_at_slot(10);
-        state
-            .store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone())
-            .await
-            .unwrap();
-        assert_eq!(
-            state
-                .fetch_wormhole_merkle_state(10)
-                .await
-                .unwrap()
-                .unwrap(),
-            wormhole_merkle_state_at_10
-        );
-
-        // Make sure overwriting the wormhole merkle state works.
-        wormhole_merkle_state_at_10.root.ring_size = 5; // Change the ring size from 3 to 5.
+        let wormhole_merkle_state_at_10 = create_empty_wormhole_merkle_state_at_slot(10);
         state
             .store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone())
             .await