瀏覽代碼

fix: avoid race conditions by checking and storing messages in a single operation

Daniel Chew 8 月之前
父節點
當前提交
7c4eeaa523

+ 9 - 5
apps/hermes/server/src/state/aggregate.rs

@@ -314,8 +314,15 @@ where
                 let slot = accumulator_messages.slot;
                 tracing::info!(slot = slot, "Storing Accumulator Messages.");
 
-                // Check if we already have accumulator messages for this slot
-                if (self.fetch_accumulator_messages(slot).await?).is_some() {
+                // Store the accumulator messages and check if they were already stored in a single operation
+                // This avoids the race condition where multiple threads could check and find nothing
+                // but then both store the same messages
+                let is_new = self
+                    .store_accumulator_messages(accumulator_messages)
+                    .await?;
+
+                // If the messages were already stored, return early
+                if !is_new {
                     tracing::info!(
                         slot = slot,
                         "Accumulator Messages already stored, skipping."
@@ -323,9 +330,6 @@ where
                     return Ok(());
                 }
 
-                self.store_accumulator_messages(accumulator_messages)
-                    .await?;
-
                 self.into()
                     .data
                     .write()

+ 4 - 9
apps/hermes/server/src/state/aggregate/wormhole_merkle.rs

@@ -59,17 +59,12 @@ pub async fn store_wormhole_merkle_verified_message<S>(
 where
     S: Cache,
 {
-    // Check if we already have a state for this slot
-    if (state.fetch_wormhole_merkle_state(root.slot).await?).is_some() {
-        // State already exists, return early
-        return Ok(false);
-    }
-
-    // State doesn't exist, store it
+    // Store the state and check if it was already stored in a single operation
+    // This avoids the race condition where multiple threads could check and find nothing
+    // but then both store the same state
     state
         .store_wormhole_merkle_state(WormholeMerkleState { root, vaa })
-        .await?;
-    Ok(true)
+        .await
 }
 
 pub fn construct_message_states_proofs(

+ 26 - 8
apps/hermes/server/src/state/cache.rs

@@ -122,12 +122,12 @@ pub trait Cache {
     async fn store_accumulator_messages(
         &self,
         accumulator_messages: AccumulatorMessages,
-    ) -> Result<()>;
+    ) -> Result<bool>;
     async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>>;
     async fn store_wormhole_merkle_state(
         &self,
         wormhole_merkle_state: WormholeMerkleState,
-    ) -> Result<()>;
+    ) -> Result<bool>;
     async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>;
     async fn message_state_keys(&self) -> Vec<MessageStateKey>;
     async fn fetch_message_states(
@@ -226,13 +226,22 @@ where
     async fn store_accumulator_messages(
         &self,
         accumulator_messages: AccumulatorMessages,
-    ) -> Result<()> {
+    ) -> Result<bool> {
         let mut cache = self.into().accumulator_messages_cache.write().await;
-        cache.insert(accumulator_messages.slot, accumulator_messages);
+        let slot = accumulator_messages.slot;
+
+        // Check if we already have messages for this slot while holding the lock
+        if cache.contains_key(&slot) {
+            // Messages already exist, return false to indicate no insertion happened
+            return Ok(false);
+        }
+
+        // Messages don't exist, store them
+        cache.insert(slot, accumulator_messages);
         while cache.len() > self.into().cache_size as usize {
             cache.pop_first();
         }
-        Ok(())
+        Ok(true)
     }
 
     async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>> {
@@ -243,13 +252,22 @@ where
     async fn store_wormhole_merkle_state(
         &self,
         wormhole_merkle_state: WormholeMerkleState,
-    ) -> Result<()> {
+    ) -> Result<bool> {
         let mut cache = self.into().wormhole_merkle_state_cache.write().await;
-        cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state);
+        let slot = wormhole_merkle_state.root.slot;
+
+        // Check if we already have a state for this slot while holding the lock
+        if cache.contains_key(&slot) {
+            // State already exists, return false to indicate no insertion happened
+            return Ok(false);
+        }
+
+        // State doesn't exist, store it
+        cache.insert(slot, wormhole_merkle_state);
         while cache.len() > self.into().cache_size as usize {
             cache.pop_first();
         }
-        Ok(())
+        Ok(true)
     }
 
     async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>> {