ソースを参照

refactor(hermes): remove Storage trait indirection

- Remove Storage trait and LocalStorage and implement everything on Storage struct
- Remove update_accumulator_state and break it down to accumulator_messages and
wormhole_merkle_state
Ali Behjati 2 年 前
コミット
6a4b812ea3

+ 2 - 1
hermes/src/main.rs

@@ -1,5 +1,6 @@
 #![feature(never_type)]
 #![feature(slice_group_by)]
+#![feature(btree_cursors)]
 
 use {
     crate::store::Store,
@@ -33,7 +34,7 @@ async fn init() -> Result<()> {
             let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
 
             log::info!("Running Hermes...");
-            let store = Store::new_with_local_cache(update_tx, 1000);
+            let store = Store::new(update_tx, 1000);
 
             // Spawn the P2P layer.
             log::info!("Starting P2P server on {:?}", wh_listen_addrs);

+ 39 - 30
hermes/src/store.rs

@@ -1,12 +1,16 @@
 use {
     self::{
-        proof::wormhole_merkle::construct_update_data,
+        proof::wormhole_merkle::{
+            construct_update_data,
+            WormholeMerkleState,
+        },
         storage::{
             MessageState,
             MessageStateFilter,
-            StorageInstance,
+            Storage,
         },
         types::{
+            AccumulatorMessages,
             PriceFeedUpdate,
             PriceFeedsWithUpdateData,
             RequestTime,
@@ -19,7 +23,6 @@ use {
             construct_message_states_proofs,
             store_wormhole_merkle_verified_message,
         },
-        storage::CompletedAccumulatorState,
         types::{
             ProofSet,
             UnixTimestamp,
@@ -82,17 +85,27 @@ pub mod wormhole;
 const OBSERVED_CACHE_SIZE: usize = 1000;
 
 pub struct Store {
-    pub storage:                  StorageInstance,
+    /// Storage is a short-lived cache of the state of all the updates
+    /// that have been passed to the store.
+    pub storage:                  Storage,
+    /// Sequence numbers of lately observed Vaas. Store uses this set
+    /// to ignore the previously observed Vaas as a performance boost.
     pub observed_vaa_seqs:        RwLock<BTreeSet<u64>>,
+    /// Wormhole guardian sets. It is used to verify Vaas before using
+    /// them.
     pub guardian_set:             RwLock<BTreeMap<u32, GuardianSet>>,
+    /// The sender to the channel between Store and Api to notify
+    /// completed updates.
     pub update_tx:                Sender<()>,
+    /// Time of the last completed update. This is used for the health
+    /// probes.
     pub last_completed_update_at: RwLock<Option<Instant>>,
 }
 
 impl Store {
-    pub fn new_with_local_cache(update_tx: Sender<()>, cache_size: u64) -> Arc<Self> {
+    pub fn new(update_tx: Sender<()>, cache_size: u64) -> Arc<Self> {
         Arc::new(Self {
-            storage: storage::local_storage::LocalStorage::new_instance(cache_size),
+            storage: Storage::new(cache_size),
             observed_vaa_seqs: RwLock::new(Default::default()),
             guardian_set: RwLock::new(Default::default()),
             update_tx,
@@ -148,34 +161,27 @@ impl Store {
                 let slot = accumulator_messages.slot;
                 log::info!("Storing accumulator messages for slot {:?}.", slot,);
                 self.storage
-                    .update_accumulator_state(
-                        slot,
-                        Box::new(|mut state| {
-                            state.accumulator_messages = Some(accumulator_messages);
-                            state
-                        }),
-                    )
+                    .store_accumulator_messages(accumulator_messages)
                     .await?;
                 slot
             }
         };
 
-        let state = match self.storage.fetch_accumulator_state(slot).await? {
-            Some(state) => state,
-            None => return Ok(()),
-        };
+        let accumulator_messages = self.storage.fetch_accumulator_messages(slot).await?;
+        let wormhole_merkle_state = self.storage.fetch_wormhole_merkle_state(slot).await?;
 
-        let completed_state = state.try_into();
-        let completed_state: CompletedAccumulatorState = match completed_state {
-            Ok(completed_state) => completed_state,
-            Err(_) => {
-                return Ok(());
-            }
-        };
+        let (accumulator_messages, wormhole_merkle_state) =
+            match (accumulator_messages, wormhole_merkle_state) {
+                (Some(accumulator_messages), Some(wormhole_merkle_state)) => {
+                    (accumulator_messages, wormhole_merkle_state)
+                }
+                _ => return Ok(()),
+            };
 
         // Once the accumulator reaches a complete state for a specific slot
         // we can build the message states
-        self.build_message_states(completed_state).await?;
+        self.build_message_states(accumulator_messages, wormhole_merkle_state)
+            .await?;
 
         self.update_tx.send(()).await?;
 
@@ -187,15 +193,18 @@ impl Store {
         Ok(())
     }
 
-    async fn build_message_states(&self, completed_state: CompletedAccumulatorState) -> Result<()> {
+    async fn build_message_states(
+        &self,
+        accumulator_messages: AccumulatorMessages,
+        wormhole_merkle_state: WormholeMerkleState,
+    ) -> Result<()> {
         let wormhole_merkle_message_states_proofs =
-            construct_message_states_proofs(&completed_state)?;
+            construct_message_states_proofs(&accumulator_messages, &wormhole_merkle_state)?;
 
         let current_time: UnixTimestamp =
             SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _;
 
-        let message_states = completed_state
-            .accumulator_messages
+        let message_states = accumulator_messages
             .raw_messages
             .into_iter()
             .enumerate()
@@ -210,7 +219,7 @@ impl Store {
                             .ok_or(anyhow!("Missing proof for message"))?
                             .clone(),
                     },
-                    completed_state.slot,
+                    accumulator_messages.slot,
                     current_time,
                 ))
             })

+ 8 - 18
hermes/src/store/proof/wormhole_merkle.rs

@@ -1,9 +1,7 @@
 use {
     crate::store::{
-        storage::{
-            CompletedAccumulatorState,
-            MessageState,
-        },
+        storage::MessageState,
+        types::AccumulatorMessages,
         Store,
     },
     anyhow::{
@@ -50,26 +48,18 @@ pub async fn store_wormhole_merkle_verified_message(
 ) -> Result<()> {
     store
         .storage
-        .update_accumulator_state(
-            root.slot,
-            Box::new(|mut state| {
-                state.wormhole_merkle_state = Some(WormholeMerkleState {
-                    root,
-                    vaa: vaa_bytes,
-                });
-                state
-            }),
-        )
+        .store_wormhole_merkle_state(WormholeMerkleState {
+            root,
+            vaa: vaa_bytes,
+        })
         .await?;
     Ok(())
 }
 
 pub fn construct_message_states_proofs(
-    completed_accumulator_state: &CompletedAccumulatorState,
+    accumulator_messages: &AccumulatorMessages,
+    wormhole_merkle_state: &WormholeMerkleState,
 ) -> Result<Vec<WormholeMerkleMessageProof>> {
-    let accumulator_messages = &completed_accumulator_state.accumulator_messages;
-    let wormhole_merkle_state = &completed_accumulator_state.wormhole_merkle_state;
-
     // Check whether the state is valid
     let merkle_acc = match MerkleTree::<Keccak160>::from_set(
         accumulator_messages.raw_messages.iter().map(|m| m.as_ref()),

+ 638 - 133
hermes/src/store/storage.rs

@@ -14,48 +14,21 @@ use {
         anyhow,
         Result,
     },
-    async_trait::async_trait,
+    dashmap::DashMap,
     pythnet_sdk::messages::{
         FeedId,
         Message,
         MessageType,
     },
+    std::{
+        collections::BTreeMap,
+        ops::Bound,
+        sync::Arc,
+    },
+    strum::IntoEnumIterator,
+    tokio::sync::RwLock,
 };
 
-pub mod local_storage;
-
-#[derive(Clone, PartialEq, Debug)]
-pub struct AccumulatorState {
-    pub slot:                  Slot,
-    pub accumulator_messages:  Option<AccumulatorMessages>,
-    pub wormhole_merkle_state: Option<WormholeMerkleState>,
-}
-
-#[derive(Clone, PartialEq, Debug)]
-pub struct CompletedAccumulatorState {
-    pub slot:                  Slot,
-    pub accumulator_messages:  AccumulatorMessages,
-    pub wormhole_merkle_state: WormholeMerkleState,
-}
-
-impl TryFrom<AccumulatorState> for CompletedAccumulatorState {
-    type Error = anyhow::Error;
-
-    fn try_from(state: AccumulatorState) -> Result<Self> {
-        let accumulator_messages = state
-            .accumulator_messages
-            .ok_or_else(|| anyhow!("missing accumulator messages"))?;
-        let wormhole_merkle_state = state
-            .wormhole_merkle_state
-            .ok_or_else(|| anyhow!("missing wormhole merkle state"))?;
-        Ok(Self {
-            slot: state.slot,
-            accumulator_messages,
-            wormhole_merkle_state,
-        })
-    }
-}
-
 #[derive(Clone, PartialEq, Eq, Debug, Hash)]
 pub struct MessageStateKey {
     pub feed_id: FeedId,
@@ -116,124 +89,656 @@ pub enum MessageStateFilter {
     Only(MessageType),
 }
 
-/// This trait defines the interface for update data storage
-///
-/// Price update data for Pyth can come in multiple formats, for example VAA's and
-/// Merkle proofs. The abstraction therefore allows storing these as binary
-/// data to abstract the details of the update data, and so each update data is stored
-/// under a separate key. The caller is responsible for specifying the right
-/// key for the update data they wish to access.
-#[async_trait]
-pub trait Storage: Send + Sync {
-    async fn message_state_keys(&self) -> Vec<MessageStateKey>;
-    async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
-    async fn fetch_message_states(
+pub struct Storage {
+    message_cache: Arc<DashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>,
+    /// Accumulator messages cache
+    ///
+    /// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
+    accumulator_messages_cache:  Arc<RwLock<BTreeMap<Slot, AccumulatorMessages>>>,
+    /// Wormhole merkle state cache
+    ///
+    /// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
+    wormhole_merkle_state_cache: Arc<RwLock<BTreeMap<Slot, WormholeMerkleState>>>,
+    cache_size:                  u64,
+}
+
+impl Storage {
+    pub fn new(cache_size: u64) -> Self {
+        Self {
+            message_cache: Arc::new(DashMap::new()),
+            accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())),
+            wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())),
+            cache_size,
+        }
+    }
+
+    pub async fn message_state_keys(&self) -> Vec<MessageStateKey> {
+        self.message_cache
+            .iter()
+            .map(|entry| entry.key().clone())
+            .collect::<Vec<_>>()
+    }
+
+    pub async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
+        for message_state in message_states {
+            let key = message_state.key();
+            let time = message_state.time();
+            let mut cache = self.message_cache.entry(key).or_insert_with(BTreeMap::new);
+
+            cache.insert(time, message_state);
+
+            // Remove the earliest message states if the cache size is exceeded
+            while cache.len() > self.cache_size as usize {
+                cache.pop_first();
+            }
+        }
+        Ok(())
+    }
+
+    fn retrieve_message_state(
+        &self,
+        key: MessageStateKey,
+        request_time: RequestTime,
+    ) -> Option<MessageState> {
+        match self.message_cache.get(&key) {
+            Some(key_cache) => {
+                match request_time {
+                    RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(),
+                    RequestTime::FirstAfter(time) => {
+                        // If the requested time is before the first element in the vector, we are
+                        // not sure that the first element is the closest one.
+                        if let Some((_, oldest_record_value)) = key_cache.first_key_value() {
+                            if time < oldest_record_value.time().publish_time {
+                                return None;
+                            }
+                        }
+
+                        let lookup_time = MessageStateTime {
+                            publish_time: time,
+                            slot:         0,
+                        };
+
+                        // Get the first element that is greater than or equal to the lookup time.
+                        key_cache
+                            .lower_bound(Bound::Included(&lookup_time))
+                            .value()
+                            .cloned()
+                    }
+                }
+            }
+            None => None,
+        }
+    }
+
+    pub async fn fetch_message_states(
         &self,
         ids: Vec<FeedId>,
         request_time: RequestTime,
         filter: MessageStateFilter,
-    ) -> Result<Vec<MessageState>>;
-
-    /// Store the accumulator state. Please note that this call will replace the
-    /// existing accumulator state for the given state's slot. If you wish to
-    /// update the accumulator state, use `update_accumulator_state` instead.
-    async fn store_accumulator_state(&self, state: AccumulatorState) -> Result<()>;
-    async fn fetch_accumulator_state(&self, slot: Slot) -> Result<Option<AccumulatorState>>;
-
-    /// Update the accumulator state inplace using the provided callback. The callback
-    /// takes the current state and returns the new state. If there is no accumulator
-    /// state for the given slot, the callback will be called with an empty accumulator state.
-    async fn update_accumulator_state(
+    ) -> Result<Vec<MessageState>> {
+        ids.into_iter()
+            .flat_map(|id| {
+                let request_time = request_time.clone();
+                let message_types: Vec<MessageType> = match filter {
+                    MessageStateFilter::All => MessageType::iter().collect(),
+                    MessageStateFilter::Only(t) => vec![t],
+                };
+
+                message_types.into_iter().map(move |message_type| {
+                    let key = MessageStateKey {
+                        feed_id: id,
+                        type_:   message_type,
+                    };
+                    self.retrieve_message_state(key, request_time.clone())
+                        .ok_or(anyhow!("Message not found"))
+                })
+            })
+            .collect()
+    }
+
+    pub async fn store_accumulator_messages(
+        &self,
+        accumulator_messages: AccumulatorMessages,
+    ) -> Result<()> {
+        let mut cache = self.accumulator_messages_cache.write().await;
+        cache.insert(accumulator_messages.slot, accumulator_messages);
+        while cache.len() > self.cache_size as usize {
+            cache.pop_first();
+        }
+        Ok(())
+    }
+
+    pub async fn fetch_accumulator_messages(
         &self,
         slot: Slot,
-        callback: Box<dyn (FnOnce(AccumulatorState) -> AccumulatorState) + Send>,
-    ) -> Result<()>;
-}
+    ) -> Result<Option<AccumulatorMessages>> {
+        let cache = self.accumulator_messages_cache.read().await;
+        Ok(cache.get(&slot).cloned())
+    }
+
+    pub async fn store_wormhole_merkle_state(
+        &self,
+        wormhole_merkle_state: WormholeMerkleState,
+    ) -> Result<()> {
+        let mut cache = self.wormhole_merkle_state_cache.write().await;
+        cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state);
+        while cache.len() > self.cache_size as usize {
+            cache.pop_first();
+        }
+        Ok(())
+    }
 
-pub type StorageInstance = Box<dyn Storage>;
+    pub async fn fetch_wormhole_merkle_state(
+        &self,
+        slot: Slot,
+    ) -> Result<Option<WormholeMerkleState>> {
+        let cache = self.wormhole_merkle_state_cache.read().await;
+        Ok(cache.get(&slot).cloned())
+    }
+}
 
 #[cfg(test)]
 mod test {
     use {
         super::*,
-        pythnet_sdk::wire::v1::WormholeMerkleRoot,
+        crate::store::{
+            proof::wormhole_merkle::{
+                WormholeMerkleMessageProof,
+                WormholeMerkleState,
+            },
+            types::{
+                AccumulatorMessages,
+                ProofSet,
+            },
+        },
+        pyth_sdk::UnixTimestamp,
+        pythnet_sdk::{
+            accumulators::merkle::MerklePath,
+            hashers::keccak256_160::Keccak160,
+            messages::{
+                Message,
+                PriceFeedMessage,
+            },
+            wire::v1::WormholeMerkleRoot,
+        },
     };
 
-    #[test]
-    pub fn test_complete_accumulator_state_try_from_accumulator_state_works() {
-        let accumulator_state = AccumulatorState {
-            slot:                  1,
-            accumulator_messages:  None,
-            wormhole_merkle_state: None,
-        };
-
-        assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err());
-
-        let accumulator_state = AccumulatorState {
-            slot:                  1,
-            accumulator_messages:  Some(AccumulatorMessages {
-                slot:         1,
-                magic:        [0; 4],
-                ring_size:    10,
-                raw_messages: vec![],
+    pub fn create_dummy_price_feed_message_state(
+        feed_id: FeedId,
+        publish_time: i64,
+        slot: Slot,
+    ) -> MessageState {
+        MessageState {
+            slot,
+            raw_message: vec![],
+            message: Message::PriceFeedMessage(PriceFeedMessage {
+                feed_id,
+                publish_time,
+                price: 1,
+                conf: 2,
+                exponent: 3,
+                ema_price: 4,
+                ema_conf: 5,
+                prev_publish_time: 6,
             }),
-            wormhole_merkle_state: None,
-        };
-
-        assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err());
-
-        let accumulator_state = AccumulatorState {
-            slot:                  1,
-            accumulator_messages:  None,
-            wormhole_merkle_state: Some(WormholeMerkleState {
-                vaa:  vec![],
-                root: WormholeMerkleRoot {
-                    slot:      1,
-                    ring_size: 10,
-                    root:      [0; 20],
+            received_at: publish_time,
+            proof_set: ProofSet {
+                wormhole_merkle_proof: WormholeMerkleMessageProof {
+                    vaa:   vec![],
+                    proof: MerklePath::<Keccak160>::new(vec![]),
                 },
-            }),
-        };
+            },
+        }
+    }
 
-        assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err());
+    pub async fn create_and_store_dummy_price_feed_message_state(
+        storage: &Storage,
+        feed_id: FeedId,
+        publish_time: UnixTimestamp,
+        slot: Slot,
+    ) -> MessageState {
+        let message_state = create_dummy_price_feed_message_state(feed_id, publish_time, slot);
+        storage
+            .store_message_states(vec![message_state.clone()])
+            .await
+            .unwrap();
+        message_state
+    }
 
-        let accumulator_state = AccumulatorState {
-            slot:                  1,
-            accumulator_messages:  Some(AccumulatorMessages {
-                slot:         1,
-                magic:        [0; 4],
-                ring_size:    10,
-                raw_messages: vec![],
-            }),
-            wormhole_merkle_state: Some(WormholeMerkleState {
-                vaa:  vec![],
-                root: WormholeMerkleRoot {
-                    slot:      1,
-                    ring_size: 10,
-                    root:      [0; 20],
-                },
-            }),
-        };
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_latest_message_state_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = Storage::new(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
 
+        // The latest message state should be the one we just stored.
         assert_eq!(
-            CompletedAccumulatorState::try_from(accumulator_state).unwrap(),
-            CompletedAccumulatorState {
-                slot:                  1,
-                accumulator_messages:  AccumulatorMessages {
-                    slot:         1,
-                    magic:        [0; 4],
-                    ring_size:    10,
-                    raw_messages: vec![],
-                },
-                wormhole_merkle_state: WormholeMerkleState {
-                    vaa:  vec![],
-                    root: WormholeMerkleRoot {
-                        slot:      1,
-                        ring_size: 10,
-                        root:      [0; 20],
-                    },
-                },
-            }
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
+                )
+                .await
+                .unwrap(),
+            vec![message_state]
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_latest_message_state_with_multiple_update_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = Storage::new(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let _old_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 20 at slot 10.
+        let new_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await;
+
+        // The latest message state should be the one with publish time 20.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage)
+                )
+                .await
+                .unwrap(),
+            vec![new_message_state]
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_latest_message_state_with_out_of_order_update_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = Storage::new(2);
+
+        // Create and store a message state with feed id [1....] and publish time 20 at slot 10.
+        let new_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await;
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let _old_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // The latest message state should be the one with publish time 20.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage)
+                )
+                .await
+                .unwrap(),
+            vec![new_message_state]
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_first_after_message_state_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = Storage::new(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let old_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
+        let new_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
+
+        // The first message state after time 10 should be the old message state.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::FirstAfter(10),
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage)
+                )
+                .await
+                .unwrap(),
+            vec![old_message_state]
+        );
+
+        // Querying the first after pub time 11, 12, 13 should all return the new message state.
+        for request_time in 11..14 {
+            assert_eq!(
+                storage
+                    .fetch_message_states(
+                        vec![[1; 32]],
+                        RequestTime::FirstAfter(request_time),
+                        MessageStateFilter::Only(MessageType::PriceFeedMessage)
+                    )
+                    .await
+                    .unwrap(),
+                vec![new_message_state.clone()]
+            );
+        }
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_latest_message_state_with_same_pubtime_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = Storage::new(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let slightly_older_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 7.
+        let slightly_newer_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 7).await;
+
+        // The latest message state should be the one with the higher slot.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
+                )
+                .await
+                .unwrap(),
+            vec![slightly_newer_message_state]
+        );
+
+        // Querying the first message state after time 10 should return the one with the lower slot.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::FirstAfter(10),
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
+                )
+                .await
+                .unwrap(),
+            vec![slightly_older_message_state]
+        );
+    }
+
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_first_after_message_state_fails_for_past_time() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = Storage::new(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
+
+        // Query the message state before the available times should return an error.
+        // This is because we are not sure that the first available message is really the first.
+        assert!(storage
+            .fetch_message_states(
+                vec![[1; 32]],
+                RequestTime::FirstAfter(9),
+                MessageStateFilter::Only(MessageType::PriceFeedMessage)
+            )
+            .await
+            .is_err());
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_first_after_message_state_fails_for_future_time() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = Storage::new(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
+
+        // Query the message state after the available times should return an error.
+        assert!(storage
+            .fetch_message_states(
+                vec![[1; 32]],
+                RequestTime::FirstAfter(14),
+                MessageStateFilter::Only(MessageType::PriceFeedMessage)
+            )
+            .await
+            .is_err());
+    }
+
+    #[tokio::test]
+    pub async fn test_store_more_message_states_than_cache_size_evicts_old_messages() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = Storage::new(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
+
+        // Create and store a message state with feed id [1....] and publish time 20 at slot 14.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 14).await;
+
+        // The message at time 10 should be evicted and querying for it should return an error.
+        assert!(storage
+            .fetch_message_states(
+                vec![[1; 32]],
+                RequestTime::FirstAfter(10),
+                MessageStateFilter::Only(MessageType::PriceFeedMessage)
+            )
+            .await
+            .is_err());
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_fetch_multiple_message_feed_ids_works() {
+        // Initialize a storage with a cache size of 1 per key.
+        let storage = Storage::new(1);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let message_state_1 =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [2....] and publish time 13 at slot 10.
+        let message_state_2 =
+            create_and_store_dummy_price_feed_message_state(&storage, [2; 32], 10, 5).await;
+
+        // Check both message states can be retrieved.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32], [2; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
+                )
+                .await
+                .unwrap(),
+            vec![message_state_1, message_state_2]
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_fetch_not_existent_message_fails() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = Storage::new(2);
+
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Check both message states can be retrieved.
+        assert!(storage
+            .fetch_message_states(
+                vec![[2; 32]],
+                RequestTime::Latest,
+                MessageStateFilter::Only(MessageType::PriceFeedMessage),
+            )
+            .await
+            .is_err());
+    }
+
+    pub fn create_empty_accumulator_messages_at_slot(slot: Slot) -> AccumulatorMessages {
+        AccumulatorMessages {
+            magic: [0; 4],
+            slot,
+            ring_size: 3,
+            raw_messages: vec![],
+        }
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_fetch_accumulator_messages_works() {
+        // Initialize a storage with a cache size of 2 per key and the accumulator state.
+        let storage = Storage::new(2);
+
+        // Make sure the retrieved accumulator messages is what we store.
+        let mut accumulator_messages_at_10 = create_empty_accumulator_messages_at_slot(10);
+        storage
+            .store_accumulator_messages(accumulator_messages_at_10.clone())
+            .await
+            .unwrap();
+        assert_eq!(
+            storage
+                .fetch_accumulator_messages(10)
+                .await
+                .unwrap()
+                .unwrap(),
+            accumulator_messages_at_10
+        );
+
+        // Make sure overwriting the accumulator messages works.
+        accumulator_messages_at_10.ring_size = 5; // Change the ring size from 3 to 5.
+        storage
+            .store_accumulator_messages(accumulator_messages_at_10.clone())
+            .await
+            .unwrap();
+        assert_eq!(
+            storage
+                .fetch_accumulator_messages(10)
+                .await
+                .unwrap()
+                .unwrap(),
+            accumulator_messages_at_10
+        );
+
+        // Create and store an accumulator messages with slot 5 and check it's stored.
+        let accumulator_messages_at_5 = create_empty_accumulator_messages_at_slot(5);
+        storage
+            .store_accumulator_messages(accumulator_messages_at_5.clone())
+            .await
+            .unwrap();
+        assert_eq!(
+            storage
+                .fetch_accumulator_messages(5)
+                .await
+                .unwrap()
+                .unwrap(),
+            accumulator_messages_at_5
+        );
+
+        // Add a newer accumulator messages with slot 15 to exceed cache size and make sure the earliest is evicted.
+        let accumulator_messages_at_15 = create_empty_accumulator_messages_at_slot(15);
+        storage
+            .store_accumulator_messages(accumulator_messages_at_15.clone())
+            .await
+            .unwrap();
+        assert_eq!(
+            storage
+                .fetch_accumulator_messages(15)
+                .await
+                .unwrap()
+                .unwrap(),
+            accumulator_messages_at_15
+        );
+        assert!(storage
+            .fetch_accumulator_messages(5)
+            .await
+            .unwrap()
+            .is_none());
+    }
+
+    pub fn create_empty_wormhole_merkle_state_at_slot(slot: Slot) -> WormholeMerkleState {
+        WormholeMerkleState {
+            vaa:  vec![],
+            root: WormholeMerkleRoot {
+                slot,
+                root: [0; 20],
+                ring_size: 3,
+            },
+        }
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_fetch_wormhole_merkle_state_works() {
+        // Initialize a storage with a cache size of 2 per key and the accumulator state.
+        let storage = Storage::new(2);
+
+        // Make sure the retrieved wormhole merkle state is what we store
+        let mut wormhole_merkle_state_at_10 = create_empty_wormhole_merkle_state_at_slot(10);
+        storage
+            .store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone())
+            .await
+            .unwrap();
+        assert_eq!(
+            storage
+                .fetch_wormhole_merkle_state(10)
+                .await
+                .unwrap()
+                .unwrap(),
+            wormhole_merkle_state_at_10
+        );
+
+        // Make sure overwriting the wormhole merkle state works.
+        wormhole_merkle_state_at_10.root.ring_size = 5; // Change the ring size from 3 to 5.
+        storage
+            .store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone())
+            .await
+            .unwrap();
+        assert_eq!(
+            storage
+                .fetch_wormhole_merkle_state(10)
+                .await
+                .unwrap()
+                .unwrap(),
+            wormhole_merkle_state_at_10
+        );
+
+        // Create and store an wormhole merkle state with slot 5 and check it's stored.
+        let wormhole_merkle_state_at_5 = create_empty_wormhole_merkle_state_at_slot(5);
+        storage
+            .store_wormhole_merkle_state(wormhole_merkle_state_at_5.clone())
+            .await
+            .unwrap();
+        assert_eq!(
+            storage
+                .fetch_wormhole_merkle_state(5)
+                .await
+                .unwrap()
+                .unwrap(),
+            wormhole_merkle_state_at_5
+        );
+
+        // Add a newer wormhole merkle state with slot 15 to exceed cache size and make sure the earliest is evicted.
+        let wormhole_merkle_state_at_15 = create_empty_wormhole_merkle_state_at_slot(15);
+        storage
+            .store_wormhole_merkle_state(wormhole_merkle_state_at_15.clone())
+            .await
+            .unwrap();
+        assert_eq!(
+            storage
+                .fetch_wormhole_merkle_state(15)
+                .await
+                .unwrap()
+                .unwrap(),
+            wormhole_merkle_state_at_15
         );
+        assert!(storage
+            .fetch_wormhole_merkle_state(5)
+            .await
+            .unwrap()
+            .is_none());
     }
 }

+ 0 - 862
hermes/src/store/storage/local_storage.rs

@@ -1,862 +0,0 @@
-use {
-    super::{
-        AccumulatorState,
-        MessageState,
-        MessageStateFilter,
-        MessageStateKey,
-        MessageStateTime,
-        RequestTime,
-        Storage,
-        StorageInstance,
-    },
-    crate::store::types::Slot,
-    anyhow::{
-        anyhow,
-        Result,
-    },
-    async_trait::async_trait,
-    dashmap::DashMap,
-    pythnet_sdk::messages::{
-        FeedId,
-        MessageType,
-    },
-    std::{
-        collections::VecDeque,
-        sync::Arc,
-    },
-    strum::IntoEnumIterator,
-    tokio::sync::RwLock,
-};
-
-#[derive(Clone)]
-pub struct LocalStorage {
-    message_cache:     Arc<DashMap<MessageStateKey, VecDeque<MessageState>>>,
-    accumulator_cache: Arc<RwLock<VecDeque<AccumulatorState>>>,
-    cache_size:        u64,
-}
-
-impl LocalStorage {
-    pub fn new_instance(cache_size: u64) -> StorageInstance {
-        Box::new(Self {
-            message_cache: Arc::new(DashMap::new()),
-            accumulator_cache: Arc::new(RwLock::new(VecDeque::new())),
-            cache_size,
-        })
-    }
-
-    fn retrieve_message_state(
-        &self,
-        key: MessageStateKey,
-        request_time: RequestTime,
-    ) -> Option<MessageState> {
-        match self.message_cache.get(&key) {
-            Some(key_cache) => {
-                match request_time {
-                    RequestTime::Latest => key_cache.back().cloned(),
-                    RequestTime::FirstAfter(time) => {
-                        // If the requested time is before the first element in the vector, we are
-                        // not sure that the first element is the closest one.
-                        if let Some(oldest_record) = key_cache.front() {
-                            if time < oldest_record.time().publish_time {
-                                return None;
-                            }
-                        }
-
-                        let lookup_time = MessageStateTime {
-                            publish_time: time,
-                            slot:         0,
-                        };
-
-                        // Binary search returns Ok(idx) if the element is found at index idx or Err(idx) if it's not
-                        // found which idx is the index where the element should be inserted to keep the vector sorted.
-                        // Getting idx within any of the match arms will give us the index of the element that is
-                        // closest after or equal to the requested time.
-                        let idx = match key_cache
-                            .binary_search_by_key(&lookup_time, |record| record.time())
-                        {
-                            Ok(idx) => idx,
-                            Err(idx) => idx,
-                        };
-
-                        // We are using `get` to handle out of bound idx. This happens if the
-                        // requested time is after the last element in the vector.
-                        key_cache.get(idx).cloned()
-                    }
-                }
-            }
-            None => None,
-        }
-    }
-
-    /// Store the accumulator state in the cache assuming that the lock is already acquired.
-    fn store_accumulator_state_impl(
-        &self,
-        state: AccumulatorState,
-        cache: &mut VecDeque<AccumulatorState>,
-    ) {
-        cache.push_back(state);
-
-        let mut i = cache.len().saturating_sub(1);
-        while i > 0 && cache[i - 1].slot > cache[i].slot {
-            cache.swap(i - 1, i);
-            i -= 1;
-        }
-
-        if cache.len() > self.cache_size as usize {
-            cache.pop_front();
-        }
-    }
-}
-
-#[async_trait]
-impl Storage for LocalStorage {
-    /// Add a new db entry to the cache.
-    ///
-    /// This method keeps the backed store sorted for efficiency, and removes
-    /// the oldest record in the cache if the max_size is reached. Entries are
-    /// usually added in increasing order and likely to be inserted near the
-    /// end of the deque. The function is optimized for this specific case.
-    async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
-        for message_state in message_states {
-            let key = message_state.key();
-
-            let mut key_cache = self.message_cache.entry(key).or_insert_with(VecDeque::new);
-
-            key_cache.push_back(message_state);
-
-            // Shift the pushed record until it's in the right place.
-            let mut i = key_cache.len().saturating_sub(1);
-            while i > 0 && key_cache[i - 1].time() > key_cache[i].time() {
-                key_cache.swap(i - 1, i);
-                i -= 1;
-            }
-
-            // FIXME remove equal elements by key and time
-
-            // Remove the oldest record if the max size is reached.
-            if key_cache.len() > self.cache_size as usize {
-                key_cache.pop_front();
-            }
-        }
-
-        Ok(())
-    }
-
-    async fn fetch_message_states(
-        &self,
-        ids: Vec<FeedId>,
-        request_time: RequestTime,
-        filter: MessageStateFilter,
-    ) -> Result<Vec<MessageState>> {
-        ids.into_iter()
-            .flat_map(|id| {
-                let request_time = request_time.clone();
-                let message_types: Vec<MessageType> = match filter {
-                    MessageStateFilter::All => MessageType::iter().collect(),
-                    MessageStateFilter::Only(t) => vec![t],
-                };
-
-                message_types.into_iter().map(move |message_type| {
-                    let key = MessageStateKey {
-                        feed_id: id,
-                        type_:   message_type,
-                    };
-                    self.retrieve_message_state(key, request_time.clone())
-                        .ok_or(anyhow!("Message not found"))
-                })
-            })
-            .collect()
-    }
-
-    async fn message_state_keys(&self) -> Vec<MessageStateKey> {
-        self.message_cache
-            .iter()
-            .map(|entry| entry.key().clone())
-            .collect()
-    }
-
-    async fn store_accumulator_state(&self, state: super::AccumulatorState) -> Result<()> {
-        let mut accumulator_cache = self.accumulator_cache.write().await;
-        self.store_accumulator_state_impl(state, &mut accumulator_cache);
-        Ok(())
-    }
-
-    async fn fetch_accumulator_state(&self, slot: Slot) -> Result<Option<super::AccumulatorState>> {
-        let accumulator_cache = self.accumulator_cache.read().await;
-        match accumulator_cache.binary_search_by_key(&slot, |state| state.slot) {
-            Ok(idx) => Ok(accumulator_cache.get(idx).cloned()),
-            Err(_) => Ok(None),
-        }
-    }
-
-    async fn update_accumulator_state(
-        &self,
-        slot: Slot,
-        callback: Box<dyn (FnOnce(AccumulatorState) -> AccumulatorState) + Send>,
-    ) -> Result<()> {
-        let mut accumulator_cache = self.accumulator_cache.write().await;
-        match accumulator_cache.binary_search_by_key(&slot, |state| state.slot) {
-            Ok(idx) => {
-                let state = accumulator_cache.get_mut(idx).unwrap();
-                *state = callback(state.clone());
-            }
-            Err(_) => {
-                let state = callback(AccumulatorState {
-                    slot,
-                    accumulator_messages: None,
-                    wormhole_merkle_state: None,
-                });
-                self.store_accumulator_state_impl(state, &mut accumulator_cache);
-            }
-        }
-
-        Ok(())
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use {
-        super::*,
-        crate::store::{
-            proof::wormhole_merkle::{
-                WormholeMerkleMessageProof,
-                WormholeMerkleState,
-            },
-            types::{
-                AccumulatorMessages,
-                ProofSet,
-            },
-        },
-        futures::future::join_all,
-        pyth_sdk::UnixTimestamp,
-        pythnet_sdk::{
-            accumulators::merkle::MerklePath,
-            hashers::keccak256_160::Keccak160,
-            messages::{
-                Message,
-                PriceFeedMessage,
-            },
-            wire::v1::WormholeMerkleRoot,
-        },
-    };
-
-    pub fn create_dummy_price_feed_message_state(
-        feed_id: FeedId,
-        publish_time: i64,
-        slot: Slot,
-    ) -> MessageState {
-        MessageState {
-            slot,
-            raw_message: vec![],
-            message: Message::PriceFeedMessage(PriceFeedMessage {
-                feed_id,
-                publish_time,
-                price: 1,
-                conf: 2,
-                exponent: 3,
-                ema_price: 4,
-                ema_conf: 5,
-                prev_publish_time: 6,
-            }),
-            received_at: publish_time,
-            proof_set: ProofSet {
-                wormhole_merkle_proof: WormholeMerkleMessageProof {
-                    vaa:   vec![],
-                    proof: MerklePath::<Keccak160>::new(vec![]),
-                },
-            },
-        }
-    }
-
-    pub async fn create_and_store_dummy_price_feed_message_state(
-        storage: &StorageInstance,
-        feed_id: FeedId,
-        publish_time: UnixTimestamp,
-        slot: Slot,
-    ) -> MessageState {
-        let message_state = create_dummy_price_feed_message_state(feed_id, publish_time, slot);
-        storage
-            .store_message_states(vec![message_state.clone()])
-            .await
-            .unwrap();
-        message_state
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_retrieve_latest_message_state_works() {
-        // Initialize a storage with a cache size of 2 per key.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
-        let message_state =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // The latest message state should be the one we just stored.
-        assert_eq!(
-            storage
-                .fetch_message_states(
-                    vec![[1; 32]],
-                    RequestTime::Latest,
-                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
-                )
-                .await
-                .unwrap(),
-            vec![message_state]
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_retrieve_latest_message_state_with_multiple_update_works() {
-        // Initialize a storage with a cache size of 2 per key.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
-        let _old_message_state =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // Create and store a message state with feed id [1....] and publish time 20 at slot 10.
-        let new_message_state =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await;
-
-        // The latest message state should be the one with publish time 20.
-        assert_eq!(
-            storage
-                .fetch_message_states(
-                    vec![[1; 32]],
-                    RequestTime::Latest,
-                    MessageStateFilter::Only(MessageType::PriceFeedMessage)
-                )
-                .await
-                .unwrap(),
-            vec![new_message_state]
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_retrieve_latest_message_state_with_out_of_order_update_works() {
-        // Initialize a storage with a cache size of 2 per key.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store a message state with feed id [1....] and publish time 20 at slot 10.
-        let new_message_state =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await;
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
-        let _old_message_state =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // The latest message state should be the one with publish time 20.
-        assert_eq!(
-            storage
-                .fetch_message_states(
-                    vec![[1; 32]],
-                    RequestTime::Latest,
-                    MessageStateFilter::Only(MessageType::PriceFeedMessage)
-                )
-                .await
-                .unwrap(),
-            vec![new_message_state]
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_retrieve_first_after_message_state_works() {
-        // Initialize a storage with a cache size of 2 per key.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
-        let old_message_state =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
-        let new_message_state =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
-
-        // The first message state after time 10 should be the old message state.
-        assert_eq!(
-            storage
-                .fetch_message_states(
-                    vec![[1; 32]],
-                    RequestTime::FirstAfter(10),
-                    MessageStateFilter::Only(MessageType::PriceFeedMessage)
-                )
-                .await
-                .unwrap(),
-            vec![old_message_state]
-        );
-
-        // Querying the first after pub time 11, 12, 13 should all return the new message state.
-        for request_time in 11..14 {
-            assert_eq!(
-                storage
-                    .fetch_message_states(
-                        vec![[1; 32]],
-                        RequestTime::FirstAfter(request_time),
-                        MessageStateFilter::Only(MessageType::PriceFeedMessage)
-                    )
-                    .await
-                    .unwrap(),
-                vec![new_message_state.clone()]
-            );
-        }
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_retrieve_latest_message_state_with_same_pubtime_works() {
-        // Initialize a storage with a cache size of 2 per key.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
-        let slightly_older_message_state =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 7.
-        let slightly_newer_message_state =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 7).await;
-
-        // The latest message state should be the one with the higher slot.
-        assert_eq!(
-            storage
-                .fetch_message_states(
-                    vec![[1; 32]],
-                    RequestTime::Latest,
-                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
-                )
-                .await
-                .unwrap(),
-            vec![slightly_newer_message_state]
-        );
-
-        // Querying the first message state after time 10 should return the one with the lower slot.
-        assert_eq!(
-            storage
-                .fetch_message_states(
-                    vec![[1; 32]],
-                    RequestTime::FirstAfter(10),
-                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
-                )
-                .await
-                .unwrap(),
-            vec![slightly_older_message_state]
-        );
-    }
-
-
-    #[tokio::test]
-    pub async fn test_store_and_retrieve_first_after_message_state_fails_for_past_time() {
-        // Initialize a storage with a cache size of 2 per key.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
-        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
-        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
-
-        // Query the message state before the available times should return an error.
-        // This is because we are not sure that the first available message is really the first.
-        assert!(storage
-            .fetch_message_states(
-                vec![[1; 32]],
-                RequestTime::FirstAfter(9),
-                MessageStateFilter::Only(MessageType::PriceFeedMessage)
-            )
-            .await
-            .is_err());
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_retrieve_first_after_message_state_fails_for_future_time() {
-        // Initialize a storage with a cache size of 2 per key.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
-        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
-        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
-
-        // Query the message state after the available times should return an error.
-        assert!(storage
-            .fetch_message_states(
-                vec![[1; 32]],
-                RequestTime::FirstAfter(14),
-                MessageStateFilter::Only(MessageType::PriceFeedMessage)
-            )
-            .await
-            .is_err());
-    }
-
-    #[tokio::test]
-    pub async fn test_store_more_message_states_than_cache_size_evicts_old_messages() {
-        // Initialize a storage with a cache size of 2 per key.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
-        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
-        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
-
-        // Create and store a message state with feed id [1....] and publish time 20 at slot 14.
-        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 14).await;
-
-        // The message at time 10 should be evicted and querying for it should return an error.
-        assert!(storage
-            .fetch_message_states(
-                vec![[1; 32]],
-                RequestTime::FirstAfter(10),
-                MessageStateFilter::Only(MessageType::PriceFeedMessage)
-            )
-            .await
-            .is_err());
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_receive_multiple_message_feed_ids_works() {
-        // Initialize a storage with a cache size of 1 per key.
-        let storage = LocalStorage::new_instance(1);
-
-        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
-        let message_state_1 =
-            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // Create and store a message state with feed id [2....] and publish time 13 at slot 10.
-        let message_state_2 =
-            create_and_store_dummy_price_feed_message_state(&storage, [2; 32], 10, 5).await;
-
-        // Check both message states can be retrieved.
-        assert_eq!(
-            storage
-                .fetch_message_states(
-                    vec![[1; 32], [2; 32]],
-                    RequestTime::Latest,
-                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
-                )
-                .await
-                .unwrap(),
-            vec![message_state_1, message_state_2]
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_receive_not_existent_message_fails() {
-        // Initialize a storage with a cache size of 2 per key.
-        let storage = LocalStorage::new_instance(2);
-
-        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
-
-        // Check both message states can be retrieved.
-        assert!(storage
-            .fetch_message_states(
-                vec![[2; 32]],
-                RequestTime::Latest,
-                MessageStateFilter::Only(MessageType::PriceFeedMessage),
-            )
-            .await
-            .is_err());
-    }
-
-    pub fn create_empty_accumulator_state_at_slot(slot: Slot) -> AccumulatorState {
-        AccumulatorState {
-            slot,
-            accumulator_messages: None,
-            wormhole_merkle_state: None,
-        }
-    }
-
-    pub async fn create_and_store_empty_accumulator_state_at_slot(
-        storage: &StorageInstance,
-        slot: Slot,
-    ) -> AccumulatorState {
-        let accumulator_state = create_empty_accumulator_state_at_slot(slot);
-        storage
-            .store_accumulator_state(accumulator_state.clone())
-            .await
-            .unwrap();
-        accumulator_state
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_receive_accumulator_state_works() {
-        // Initialize a storage with a cache size of 2 per key and the accumulator state.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store an accumulator state with slot 10.
-        let accumulator_state =
-            create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
-
-        // Make sure the retrieved accumulator state is what we stored.
-        assert_eq!(
-            storage.fetch_accumulator_state(10).await.unwrap().unwrap(),
-            accumulator_state
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_receive_accumulator_state_works_on_overwrite() {
-        // Initialize a storage with a cache size of 2 per key and the accumulator state.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create and store an accumulator state with slot 10.
-        let mut accumulator_state =
-            create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
-
-        // Retrieve the accumulator state and make sure it is what we stored.
-        assert_eq!(
-            storage.fetch_accumulator_state(10).await.unwrap().unwrap(),
-            accumulator_state
-        );
-
-        // Change the state to have accumulator messages
-        // We mutate the existing state because the normal flow is like this.
-        accumulator_state.accumulator_messages = Some(AccumulatorMessages {
-            magic:        [0; 4],
-            slot:         10,
-            ring_size:    3,
-            raw_messages: vec![],
-        });
-
-        // Store the accumulator state again.
-        storage
-            .store_accumulator_state(accumulator_state.clone())
-            .await
-            .unwrap();
-
-        // Make sure the retrieved accumulator state is what we stored.
-        assert_eq!(
-            storage.fetch_accumulator_state(10).await.unwrap().unwrap(),
-            accumulator_state
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_receive_multiple_accumulator_state_works() {
-        // Initialize a storage with a cache size of 2 per key and the accumulator state.
-        let storage = LocalStorage::new_instance(2);
-
-        let accumulator_state_at_slot_10 =
-            create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
-        let accumulator_state_at_slot_20 =
-            create_and_store_empty_accumulator_state_at_slot(&storage, 20).await;
-
-        // Retrieve the accumulator states and make sure it is what we stored.
-        assert_eq!(
-            storage.fetch_accumulator_state(10).await.unwrap().unwrap(),
-            accumulator_state_at_slot_10
-        );
-
-        assert_eq!(
-            storage.fetch_accumulator_state(20).await.unwrap().unwrap(),
-            accumulator_state_at_slot_20
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_store_and_receive_accumulator_state_evicts_cache() {
-        // Initialize a storage with a cache size of 2 per key and the accumulator state.
-        let storage = LocalStorage::new_instance(2);
-
-        let _accumulator_state_at_slot_10 =
-            create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
-        let accumulator_state_at_slot_20 =
-            create_and_store_empty_accumulator_state_at_slot(&storage, 20).await;
-        let accumulator_state_at_slot_30 =
-            create_and_store_empty_accumulator_state_at_slot(&storage, 30).await;
-
-        // The accumulator state at slot 10 should be evicted from the cache.
-        assert_eq!(storage.fetch_accumulator_state(10).await.unwrap(), None);
-
-        // Retrieve the rest of accumulator states and make sure it is what we stored.
-        assert_eq!(
-            storage.fetch_accumulator_state(20).await.unwrap().unwrap(),
-            accumulator_state_at_slot_20
-        );
-
-        assert_eq!(
-            storage.fetch_accumulator_state(30).await.unwrap().unwrap(),
-            accumulator_state_at_slot_30
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_update_accumulator_state_works() {
-        // Initialize a storage with a cache size of 2 per key and the accumulator state.
-        let storage = LocalStorage::new_instance(2);
-
-        // Create an empty accumulator state at slot 10.
-        create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
-
-        // Update the accumulator state with slot 10.
-        let accumulator_messages = AccumulatorMessages {
-            magic:        [0; 4],
-            slot:         10,
-            ring_size:    3,
-            raw_messages: vec![],
-        };
-
-        let accumulator_messages_clone = accumulator_messages.clone();
-
-        storage
-            .update_accumulator_state(
-                10,
-                Box::new(|mut accumulator_state| {
-                    accumulator_state.accumulator_messages = Some(accumulator_messages_clone);
-                    accumulator_state
-                }),
-            )
-            .await
-            .unwrap();
-
-        // Make sure the retrieved accumulator state is what we stored.
-        assert_eq!(
-            storage.fetch_accumulator_state(10).await.unwrap(),
-            Some(AccumulatorState {
-                slot:                  10,
-                accumulator_messages:  Some(accumulator_messages),
-                wormhole_merkle_state: None,
-            })
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_update_accumulator_state_works_on_nonexistent_state() {
-        // Initialize an empty storage with a cache size of 2 per key and the accumulator state.
-        let storage = LocalStorage::new_instance(2);
-
-        // Update the accumulator state with slot 10.
-        let accumulator_messages = AccumulatorMessages {
-            magic:        [0; 4],
-            slot:         10,
-            ring_size:    3,
-            raw_messages: vec![],
-        };
-        let accumulator_messages_clone = accumulator_messages.clone();
-        storage
-            .update_accumulator_state(
-                10,
-                Box::new(|mut accumulator_state| {
-                    accumulator_state.accumulator_messages = Some(accumulator_messages_clone);
-                    accumulator_state
-                }),
-            )
-            .await
-            .unwrap();
-
-        // Make sure the retrieved accumulator state is what we stored.
-        assert_eq!(
-            storage.fetch_accumulator_state(10).await.unwrap(),
-            Some(AccumulatorState {
-                slot:                  10,
-                accumulator_messages:  Some(accumulator_messages),
-                wormhole_merkle_state: None,
-            })
-        );
-    }
-
-    #[tokio::test]
-    pub async fn test_update_accumulator_state_works_on_concurrent_updates() {
-        // Initialize an empty storage with a cache size of 20 per key and the accumulator state.
-        let storage = LocalStorage::new_instance(20);
-
-
-        // Run this check 10 times to make sure the concurrent updates work.
-        let mut futures = vec![];
-
-        for slot in 1..10 {
-            futures.push(storage.update_accumulator_state(
-                slot,
-                Box::new(|mut accumulator_state| {
-                    accumulator_state.accumulator_messages = Some(AccumulatorMessages {
-                        magic:        [0; 4],
-                        slot:         123,
-                        ring_size:    3,
-                        raw_messages: vec![],
-                    });
-                    accumulator_state
-                }),
-            ));
-            futures.push(storage.update_accumulator_state(
-                slot,
-                Box::new(|mut accumulator_state| {
-                    accumulator_state.wormhole_merkle_state = Some(WormholeMerkleState {
-                        root: WormholeMerkleRoot {
-                            root:      [0; 20],
-                            slot:      123,
-                            ring_size: 3,
-                        },
-                        vaa:  vec![],
-                    });
-                    accumulator_state
-                }),
-            ));
-        }
-
-        join_all(futures).await;
-
-        for slot in 1..10 {
-            assert_eq!(
-                storage.fetch_accumulator_state(slot).await.unwrap(),
-                Some(AccumulatorState {
-                    slot,
-                    accumulator_messages: Some(AccumulatorMessages {
-                        magic:        [0; 4],
-                        slot:         123,
-                        ring_size:    3,
-                        raw_messages: vec![],
-                    }),
-                    wormhole_merkle_state: Some(WormholeMerkleState {
-                        root: WormholeMerkleRoot {
-                            root:      [0; 20],
-                            slot:      123,
-                            ring_size: 3,
-                        },
-                        vaa:  vec![],
-                    }),
-                })
-            )
-        }
-    }
-
-    #[tokio::test]
-    pub async fn test_update_accumulator_state_evicts_cache() {
-        // Initialize a storage with a cache size of 2 per key and the accumulator state.
-        let storage = LocalStorage::new_instance(2);
-
-        storage
-            .update_accumulator_state(10, Box::new(|accumulator_state| accumulator_state))
-            .await
-            .unwrap();
-        storage
-            .update_accumulator_state(20, Box::new(|accumulator_state| accumulator_state))
-            .await
-            .unwrap();
-        storage
-            .update_accumulator_state(30, Box::new(|accumulator_state| accumulator_state))
-            .await
-            .unwrap();
-
-        // The accumulator state at slot 10 should be evicted from the cache.
-        assert_eq!(storage.fetch_accumulator_state(10).await.unwrap(), None);
-
-        // Retrieve the rest of accumulator states and make sure it is what we stored.
-        assert_eq!(
-            storage.fetch_accumulator_state(20).await.unwrap().unwrap(),
-            AccumulatorState {
-                slot:                  20,
-                accumulator_messages:  None,
-                wormhole_merkle_state: None,
-            }
-        );
-
-        assert_eq!(
-            storage.fetch_accumulator_state(30).await.unwrap().unwrap(),
-            AccumulatorState {
-                slot:                  30,
-                accumulator_messages:  None,
-                wormhole_merkle_state: None,
-            }
-        );
-    }
-}