Browse Source

[hermes] Add storage tests + refactor (#907)

* [hermes] Add storage tests + refactor

* Bump pythnet_sdk version + update cosmwasm

* Address review feedbacks
Ali Behjati 2 years ago
parent
commit
d07cc9d1ea

+ 2 - 202
hermes/Cargo.lock

@@ -674,12 +674,6 @@ dependencies = [
  "serde",
 ]
 
-[[package]]
-name = "bytecount"
-version = "0.6.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c"
-
 [[package]]
 name = "bytemuck"
 version = "1.13.1"
@@ -712,15 +706,6 @@ version = "1.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
 
-[[package]]
-name = "camino"
-version = "1.1.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c530edf18f37068ac2d977409ed5cd50d53d73bc653c7647b48eb78976ac9ae2"
-dependencies = [
- "serde",
-]
-
 [[package]]
 name = "caps"
 version = "0.5.5"
@@ -731,28 +716,6 @@ dependencies = [
  "thiserror",
 ]
 
-[[package]]
-name = "cargo-platform"
-version = "0.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27"
-dependencies = [
- "serde",
-]
-
-[[package]]
-name = "cargo_metadata"
-version = "0.14.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
-dependencies = [
- "camino",
- "cargo-platform",
- "semver 1.0.17",
- "serde",
- "serde_json",
-]
-
 [[package]]
 name = "cc"
 version = "1.0.79"
@@ -1367,15 +1330,6 @@ dependencies = [
  "libc",
 ]
 
-[[package]]
-name = "error-chain"
-version = "0.12.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
-dependencies = [
- "version_check",
-]
-
 [[package]]
 name = "event-listener"
 version = "2.5.3"
@@ -1639,12 +1593,6 @@ dependencies = [
  "polyval",
 ]
 
-[[package]]
-name = "glob"
-version = "0.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
-
 [[package]]
 name = "gloo-timers"
 version = "0.2.6"
@@ -1711,7 +1659,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
 
 [[package]]
 name = "hermes"
-version = "0.1.2"
+version = "0.1.3"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -1730,7 +1678,6 @@ dependencies = [
  "libc",
  "libp2p",
  "log",
- "moka",
  "pyth-sdk",
  "pythnet-sdk",
  "rand 0.8.5",
@@ -2861,15 +2808,6 @@ dependencies = [
  "linked-hash-map",
 ]
 
-[[package]]
-name = "mach2"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8"
-dependencies = [
- "libc",
-]
-
 [[package]]
 name = "match_cfg"
 version = "0.1.0"
@@ -2966,32 +2904,6 @@ dependencies = [
  "windows-sys 0.45.0",
 ]
 
-[[package]]
-name = "moka"
-version = "0.11.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "934030d03f6191edbb4ba16835ccdb80d560788ac686570a8e2986a0fb59ded8"
-dependencies = [
- "async-io",
- "async-lock",
- "crossbeam-channel",
- "crossbeam-epoch",
- "crossbeam-utils",
- "futures-util",
- "num_cpus",
- "once_cell",
- "parking_lot 0.12.1",
- "quanta",
- "rustc_version 0.4.0",
- "scheduled-thread-pool",
- "skeptic",
- "smallvec",
- "tagptr",
- "thiserror",
- "triomphe",
- "uuid",
-]
-
 [[package]]
 name = "multiaddr"
 version = "0.13.0"
@@ -3776,17 +3688,6 @@ dependencies = [
  "prost",
 ]
 
-[[package]]
-name = "pulldown-cmark"
-version = "0.9.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63"
-dependencies = [
- "bitflags",
- "memchr",
- "unicase",
-]
-
 [[package]]
 name = "pyth-sdk"
 version = "0.7.0"
@@ -3802,7 +3703,7 @@ dependencies = [
 
 [[package]]
 name = "pythnet-sdk"
-version = "1.13.6"
+version = "2.0.0"
 dependencies = [
  "bincode",
  "borsh",
@@ -3827,22 +3728,6 @@ dependencies = [
  "percent-encoding",
 ]
 
-[[package]]
-name = "quanta"
-version = "0.11.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8cc73c42f9314c4bdce450c77e6f09ecbddefbeddb1b5979ded332a3913ded33"
-dependencies = [
- "crossbeam-utils",
- "libc",
- "mach2",
- "once_cell",
- "raw-cpuid",
- "wasi 0.11.0+wasi-snapshot-preview1",
- "web-sys",
- "winapi",
-]
-
 [[package]]
 name = "quick-error"
 version = "1.2.3"
@@ -4011,15 +3896,6 @@ dependencies = [
  "rand_core 0.6.4",
 ]
 
-[[package]]
-name = "raw-cpuid"
-version = "10.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332"
-dependencies = [
- "bitflags",
-]
-
 [[package]]
 name = "rayon"
 version = "1.7.0"
@@ -4323,15 +4199,6 @@ dependencies = [
  "cipher 0.4.4",
 ]
 
-[[package]]
-name = "same-file"
-version = "1.0.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
-dependencies = [
- "winapi-util",
-]
-
 [[package]]
 name = "schannel"
 version = "0.1.21"
@@ -4341,15 +4208,6 @@ dependencies = [
  "windows-sys 0.42.0",
 ]
 
-[[package]]
-name = "scheduled-thread-pool"
-version = "0.2.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
-dependencies = [
- "parking_lot 0.12.1",
-]
-
 [[package]]
 name = "schemars"
 version = "0.8.12"
@@ -4447,9 +4305,6 @@ name = "semver"
 version = "1.0.17"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
-dependencies = [
- "serde",
-]
 
 [[package]]
 name = "semver-parser"
@@ -4681,21 +4536,6 @@ dependencies = [
  "typenum",
 ]
 
-[[package]]
-name = "skeptic"
-version = "0.13.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
-dependencies = [
- "bytecount",
- "cargo_metadata",
- "error-chain",
- "glob",
- "pulldown-cmark",
- "tempfile",
- "walkdir",
-]
-
 [[package]]
 name = "slab"
 version = "0.4.8"
@@ -5543,12 +5383,6 @@ dependencies = [
  "libc",
 ]
 
-[[package]]
-name = "tagptr"
-version = "0.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
-
 [[package]]
 name = "tempfile"
 version = "3.5.0"
@@ -5881,12 +5715,6 @@ dependencies = [
  "once_cell",
 ]
 
-[[package]]
-name = "triomphe"
-version = "0.1.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db"
-
 [[package]]
 name = "trust-dns-proto"
 version = "0.20.4"
@@ -6001,15 +5829,6 @@ dependencies = [
  "static_assertions",
 ]
 
-[[package]]
-name = "unicase"
-version = "2.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
-dependencies = [
- "version_check",
-]
-
 [[package]]
 name = "unicode-bidi"
 version = "0.3.13"
@@ -6110,15 +5929,6 @@ version = "0.7.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
 
-[[package]]
-name = "uuid"
-version = "1.3.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2"
-dependencies = [
- "getrandom 0.2.9",
-]
-
 [[package]]
 name = "value-bag"
 version = "1.0.0-alpha.9"
@@ -6159,16 +5969,6 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
 
-[[package]]
-name = "walkdir"
-version = "2.3.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698"
-dependencies = [
- "same-file",
- "winapi-util",
-]
-
 [[package]]
 name = "want"
 version = "0.3.0"

+ 2 - 3
hermes/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name                   = "hermes"
-version                = "0.1.2"
+version                = "0.1.3"
 edition                = "2021"
 
 [dependencies]
@@ -35,11 +35,10 @@ libp2p                 = { version = "0.42.2", features = [
 ]}
 
 log                    = { version = "0.4.17" }
-moka                   = { version = "0.11.0", features = ["future"] }
 pyth-sdk               = { version = "0.7.0" }
 
 # Parse Wormhole attester price attestations.
-pythnet-sdk            = { path = "../pythnet/pythnet_sdk/", version = "=1.13.6", features = ["strum"] }
+pythnet-sdk            = { path = "../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] }
 
 rand                   = { version = "0.8.5" }
 reqwest                = { version = "0.11.14", features = ["blocking", "json"] }

+ 1 - 1
hermes/src/api/types.rs

@@ -70,7 +70,7 @@ impl RpcPriceFeed {
         let price_feed_message = price_feed_update.price_feed;
 
         Self {
-            id:        PriceIdentifier::new(price_feed_message.id),
+            id:        PriceIdentifier::new(price_feed_message.feed_id),
             price:     Price {
                 price:        price_feed_message.price,
                 conf:         price_feed_message.conf,

+ 1 - 1
hermes/src/api/ws.rs

@@ -165,7 +165,7 @@ impl Subscriber {
         {
             let config = self
                 .price_feeds_with_config
-                .get(&PriceIdentifier::new(update.price_feed.id))
+                .get(&PriceIdentifier::new(update.price_feed.feed_id))
                 .ok_or(anyhow::anyhow!(
                     "Config missing, price feed list was poisoned during iteration."
                 ))?;

+ 18 - 10
hermes/src/store.rs

@@ -33,7 +33,6 @@ use {
         anyhow,
         Result,
     },
-    moka::future::Cache,
     pyth_sdk::PriceIdentifier,
     pythnet_sdk::{
         messages::{
@@ -48,6 +47,7 @@ use {
     std::{
         collections::{
             BTreeMap,
+            BTreeSet,
             HashSet,
         },
         sync::Arc,
@@ -78,9 +78,11 @@ pub mod storage;
 pub mod types;
 pub mod wormhole;
 
+const OBSERVED_CACHE_SIZE: usize = 1000;
+
 pub struct Store {
     pub storage:                  StorageInstance,
-    pub observed_vaa_seqs:        Cache<u64, bool>,
+    pub observed_vaa_seqs:        RwLock<BTreeSet<u64>>,
     pub guardian_set:             RwLock<BTreeMap<u32, GuardianSet>>,
     pub update_tx:                Sender<()>,
     pub last_completed_update_at: RwLock<Option<Instant>>,
@@ -90,10 +92,7 @@ impl Store {
     pub fn new_with_local_cache(update_tx: Sender<()>, cache_size: u64) -> Arc<Self> {
         Arc::new(Self {
             storage: storage::local_storage::LocalStorage::new_instance(cache_size),
-            observed_vaa_seqs: Cache::builder()
-                .max_capacity(cache_size)
-                .time_to_live(Duration::from_secs(60 * 5))
-                .build(),
+            observed_vaa_seqs: RwLock::new(Default::default()),
             guardian_set: RwLock::new(Default::default()),
             update_tx,
             last_completed_update_at: RwLock::new(None),
@@ -113,7 +112,7 @@ impl Store {
                     return Ok(()); // Ignore VAA from other emitters
                 }
 
-                if self.observed_vaa_seqs.get(&vaa.sequence).is_some() {
+                if self.observed_vaa_seqs.read().await.contains(&vaa.sequence) {
                     return Ok(()); // Ignore VAA if we have already seen it
                 }
 
@@ -127,7 +126,13 @@ impl Store {
                     }
                 };
 
-                self.observed_vaa_seqs.insert(vaa.sequence, true).await;
+                {
+                    let mut observed_vaa_seqs = self.observed_vaa_seqs.write().await;
+                    observed_vaa_seqs.insert(vaa.sequence);
+                    while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
+                        observed_vaa_seqs.pop_first();
+                    }
+                }
 
                 match WormholeMessage::try_from_bytes(vaa.payload)?.payload {
                     WormholePayload::Merkle(proof) => {
@@ -232,7 +237,10 @@ impl Store {
         let messages = self
             .storage
             .fetch_message_states(
-                price_ids,
+                price_ids
+                    .iter()
+                    .map(|price_id| price_id.to_bytes())
+                    .collect(),
                 request_time,
                 MessageStateFilter::Only(MessageType::PriceFeedMessage),
             )
@@ -267,7 +275,7 @@ impl Store {
             .message_state_keys()
             .await
             .iter()
-            .map(|key| PriceIdentifier::new(key.id))
+            .map(|key| PriceIdentifier::new(key.feed_id))
             .collect()
     }
 

+ 93 - 7
hermes/src/store/storage.rs

@@ -14,8 +14,8 @@ use {
         Result,
     },
     async_trait::async_trait,
-    pyth_sdk::PriceIdentifier,
     pythnet_sdk::messages::{
+        FeedId,
         Message,
         MessageType,
     },
@@ -57,8 +57,8 @@ impl TryFrom<AccumulatorState> for CompletedAccumulatorState {
 
 #[derive(Clone, PartialEq, Eq, Debug, Hash)]
 pub struct MessageStateKey {
-    pub id:    [u8; 32],
-    pub type_: MessageType,
+    pub feed_id: FeedId,
+    pub type_:   MessageType,
 }
 
 #[derive(Clone, PartialEq, Eq, Debug, PartialOrd, Ord)]
@@ -85,8 +85,8 @@ impl MessageState {
 
     pub fn key(&self) -> MessageStateKey {
         MessageStateKey {
-            id:    self.message.id(),
-            type_: self.message.into(),
+            feed_id: self.message.feed_id(),
+            type_:   self.message.into(),
         }
     }
 
@@ -125,13 +125,99 @@ pub trait Storage: Send + Sync {
     async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
     async fn fetch_message_states(
         &self,
-        ids: Vec<PriceIdentifier>,
+        ids: Vec<FeedId>,
         request_time: RequestTime,
         filter: MessageStateFilter,
     ) -> Result<Vec<MessageState>>;
 
     async fn store_accumulator_state(&self, state: AccumulatorState) -> Result<()>;
-    async fn fetch_accumulator_state(&self, slot: u64) -> Result<Option<AccumulatorState>>;
+    async fn fetch_accumulator_state(&self, slot: Slot) -> Result<Option<AccumulatorState>>;
 }
 
 pub type StorageInstance = Box<dyn Storage>;
+
+#[cfg(test)]
+mod test {
+    use {
+        super::*,
+        pythnet_sdk::wire::v1::WormholeMerkleRoot,
+    };
+
+    #[test]
+    pub fn test_complete_accumulator_state_try_from_accumulator_state_works() {
+        let accumulator_state = AccumulatorState {
+            slot:                  1,
+            accumulator_messages:  None,
+            wormhole_merkle_state: None,
+        };
+
+        assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err());
+
+        let accumulator_state = AccumulatorState {
+            slot:                  1,
+            accumulator_messages:  Some(AccumulatorMessages {
+                slot:      1,
+                magic:     [0; 4],
+                ring_size: 10,
+                messages:  vec![],
+            }),
+            wormhole_merkle_state: None,
+        };
+
+        assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err());
+
+        let accumulator_state = AccumulatorState {
+            slot:                  1,
+            accumulator_messages:  None,
+            wormhole_merkle_state: Some(WormholeMerkleState {
+                vaa:  vec![],
+                root: WormholeMerkleRoot {
+                    slot:      1,
+                    ring_size: 10,
+                    root:      [0; 20],
+                },
+            }),
+        };
+
+        assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err());
+
+        let accumulator_state = AccumulatorState {
+            slot:                  1,
+            accumulator_messages:  Some(AccumulatorMessages {
+                slot:      1,
+                magic:     [0; 4],
+                ring_size: 10,
+                messages:  vec![],
+            }),
+            wormhole_merkle_state: Some(WormholeMerkleState {
+                vaa:  vec![],
+                root: WormholeMerkleRoot {
+                    slot:      1,
+                    ring_size: 10,
+                    root:      [0; 20],
+                },
+            }),
+        };
+
+        assert_eq!(
+            CompletedAccumulatorState::try_from(accumulator_state.clone()).unwrap(),
+            CompletedAccumulatorState {
+                slot:                  1,
+                accumulator_messages:  AccumulatorMessages {
+                    slot:      1,
+                    magic:     [0; 4],
+                    ring_size: 10,
+                    messages:  vec![],
+                },
+                wormhole_merkle_state: WormholeMerkleState {
+                    vaa:  vec![],
+                    root: WormholeMerkleRoot {
+                        slot:      1,
+                        ring_size: 10,
+                        root:      [0; 20],
+                    },
+                },
+            }
+        );
+    }
+}

+ 489 - 11
hermes/src/store/storage/local_storage.rs

@@ -16,20 +16,22 @@ use {
     },
     async_trait::async_trait,
     dashmap::DashMap,
-    moka::sync::Cache,
-    pyth_sdk::PriceIdentifier,
-    pythnet_sdk::messages::MessageType,
+    pythnet_sdk::messages::{
+        FeedId,
+        MessageType,
+    },
     std::{
         collections::VecDeque,
         sync::Arc,
     },
     strum::IntoEnumIterator,
+    tokio::sync::RwLock,
 };
 
 #[derive(Clone)]
 pub struct LocalStorage {
     message_cache:     Arc<DashMap<MessageStateKey, VecDeque<MessageState>>>,
-    accumulator_cache: Cache<Slot, AccumulatorState>,
+    accumulator_cache: Arc<RwLock<VecDeque<AccumulatorState>>>,
     cache_size:        u64,
 }
 
@@ -37,7 +39,7 @@ impl LocalStorage {
     pub fn new_instance(cache_size: u64) -> StorageInstance {
         Box::new(Self {
             message_cache: Arc::new(DashMap::new()),
-            accumulator_cache: Cache::builder().max_capacity(cache_size).build(),
+            accumulator_cache: Arc::new(RwLock::new(VecDeque::new())),
             cache_size,
         })
     }
@@ -123,7 +125,7 @@ impl Storage for LocalStorage {
 
     async fn fetch_message_states(
         &self,
-        ids: Vec<PriceIdentifier>,
+        ids: Vec<FeedId>,
         request_time: RequestTime,
         filter: MessageStateFilter,
     ) -> Result<Vec<MessageState>> {
@@ -137,8 +139,8 @@ impl Storage for LocalStorage {
 
                 message_types.into_iter().map(move |message_type| {
                     let key = MessageStateKey {
-                        id:    id.to_bytes(),
-                        type_: message_type,
+                        feed_id: id,
+                        type_:   message_type,
                     };
                     self.retrieve_message_state(key, request_time.clone())
                         .ok_or(anyhow!("Message not found"))
@@ -155,12 +157,488 @@ impl Storage for LocalStorage {
     }
 
     async fn store_accumulator_state(&self, state: super::AccumulatorState) -> Result<()> {
-        let key = state.slot;
-        self.accumulator_cache.insert(key, state);
+        let mut accumulator_cache = self.accumulator_cache.write().await;
+        accumulator_cache.push_back(state);
+
+        let mut i = accumulator_cache.len().saturating_sub(1);
+        while i > 0 && accumulator_cache[i - 1].slot > accumulator_cache[i].slot {
+            accumulator_cache.swap(i - 1, i);
+            i -= 1;
+        }
+
+        if accumulator_cache.len() > self.cache_size as usize {
+            accumulator_cache.pop_front();
+        }
+
         Ok(())
     }
 
     async fn fetch_accumulator_state(&self, slot: Slot) -> Result<Option<super::AccumulatorState>> {
-        Ok(self.accumulator_cache.get(&slot))
+        let accumulator_cache = self.accumulator_cache.read().await;
+        match accumulator_cache.binary_search_by_key(&slot, |state| state.slot) {
+            Ok(idx) => Ok(accumulator_cache.get(idx).cloned()),
+            Err(_) => Ok(None),
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use {
+        super::*,
+        crate::store::{
+            proof::wormhole_merkle::WormholeMerkleMessageProof,
+            types::{
+                AccumulatorMessages,
+                ProofSet,
+            },
+        },
+        pyth_sdk::UnixTimestamp,
+        pythnet_sdk::{
+            accumulators::merkle::MerklePath,
+            hashers::keccak256_160::Keccak160,
+            messages::{
+                Message,
+                PriceFeedMessage,
+            },
+        },
+    };
+
+    pub fn create_dummy_price_feed_message_state(
+        feed_id: FeedId,
+        publish_time: i64,
+        slot: Slot,
+    ) -> MessageState {
+        MessageState {
+            slot,
+            message: Message::PriceFeedMessage(PriceFeedMessage {
+                feed_id,
+                publish_time,
+                price: 1,
+                conf: 2,
+                exponent: 3,
+                ema_price: 4,
+                ema_conf: 5,
+                prev_publish_time: 6,
+            }),
+            received_at: publish_time,
+            proof_set: ProofSet {
+                wormhole_merkle_proof: WormholeMerkleMessageProof {
+                    vaa:   vec![],
+                    proof: MerklePath::<Keccak160>::new(vec![]),
+                },
+            },
+        }
+    }
+
+    pub async fn create_and_store_dummy_price_feed_message_state(
+        storage: &StorageInstance,
+        feed_id: FeedId,
+        publish_time: UnixTimestamp,
+        slot: Slot,
+    ) -> MessageState {
+        let message_state = create_dummy_price_feed_message_state(feed_id, publish_time, slot);
+        storage
+            .store_message_states(vec![message_state.clone()])
+            .await
+            .unwrap();
+        message_state
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_latest_message_state_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // The latest message state should be the one we just stored.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
+                )
+                .await
+                .unwrap(),
+            vec![message_state]
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_latest_message_state_with_multiple_update_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let _old_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 20 at slot 10.
+        let new_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await;
+
+        // The latest message state should be the one with publish time 20.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage)
+                )
+                .await
+                .unwrap(),
+            vec![new_message_state]
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_latest_message_state_with_out_of_order_update_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store a message state with feed id [1....] and publish time 20 at slot 10.
+        let new_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await;
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let _old_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // The latest message state should be the one with publish time 20.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage)
+                )
+                .await
+                .unwrap(),
+            vec![new_message_state]
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_first_after_message_state_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let old_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
+        let new_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
+
+        // The first message state after time 10 should be the old message state.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::FirstAfter(10),
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage)
+                )
+                .await
+                .unwrap(),
+            vec![old_message_state]
+        );
+
+        // Querying the first after pub time 11, 12, 13 should all return the new message state.
+        for request_time in 11..14 {
+            assert_eq!(
+                storage
+                    .fetch_message_states(
+                        vec![[1; 32]],
+                        RequestTime::FirstAfter(request_time),
+                        MessageStateFilter::Only(MessageType::PriceFeedMessage)
+                    )
+                    .await
+                    .unwrap(),
+                vec![new_message_state.clone()]
+            );
+        }
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_latest_message_state_with_same_pubtime_works() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let slightly_older_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 7.
+        let slightly_newer_message_state =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 7).await;
+
+        // The latest message state should be the one with the higher slot.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
+                )
+                .await
+                .unwrap(),
+            vec![slightly_newer_message_state]
+        );
+
+        // Querying the first message state after time 10 should return the one with the lower slot.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32]],
+                    RequestTime::FirstAfter(10),
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
+                )
+                .await
+                .unwrap(),
+            vec![slightly_older_message_state]
+        );
+    }
+
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_first_after_message_state_fails_for_past_time() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
+
+        // Query the message state before the available times should return an error.
+        // This is because we are not sure that the first available message is really the first.
+        assert!(storage
+            .fetch_message_states(
+                vec![[1; 32]],
+                RequestTime::FirstAfter(9),
+                MessageStateFilter::Only(MessageType::PriceFeedMessage)
+            )
+            .await
+            .is_err());
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_retrieve_first_after_message_state_fails_for_future_time() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
+
+        // Query the message state after the available times should return an error.
+        assert!(storage
+            .fetch_message_states(
+                vec![[1; 32]],
+                RequestTime::FirstAfter(14),
+                MessageStateFilter::Only(MessageType::PriceFeedMessage)
+            )
+            .await
+            .is_err());
+    }
+
+    #[tokio::test]
+    pub async fn test_store_more_message_states_than_cache_size_evicts_old_messages() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [1....] and publish time 13 at slot 10.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await;
+
+        // Create and store a message state with feed id [1....] and publish time 20 at slot 14.
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 14).await;
+
+        // The message at time 10 should be evicted and querying for it should return an error.
+        assert!(storage
+            .fetch_message_states(
+                vec![[1; 32]],
+                RequestTime::FirstAfter(10),
+                MessageStateFilter::Only(MessageType::PriceFeedMessage)
+            )
+            .await
+            .is_err());
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_receive_multiple_message_feed_ids_works() {
+        // Initialize a storage with a cache size of 1 per key.
+        let storage = LocalStorage::new_instance(1);
+
+        // Create and store a message state with feed id [1....] and publish time 10 at slot 5.
+        let message_state_1 =
+            create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Create and store a message state with feed id [2....] and publish time 13 at slot 10.
+        let message_state_2 =
+            create_and_store_dummy_price_feed_message_state(&storage, [2; 32], 10, 5).await;
+
+        // Check both message states can be retrieved.
+        assert_eq!(
+            storage
+                .fetch_message_states(
+                    vec![[1; 32], [2; 32]],
+                    RequestTime::Latest,
+                    MessageStateFilter::Only(MessageType::PriceFeedMessage),
+                )
+                .await
+                .unwrap(),
+            vec![message_state_1, message_state_2]
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_receive_not_existent_message_fails() {
+        // Initialize a storage with a cache size of 2 per key.
+        let storage = LocalStorage::new_instance(2);
+
+        create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
+
+        // Check both message states can be retrieved.
+        assert!(storage
+            .fetch_message_states(
+                vec![[2; 32]],
+                RequestTime::Latest,
+                MessageStateFilter::Only(MessageType::PriceFeedMessage),
+            )
+            .await
+            .is_err());
+    }
+
+    pub fn create_empty_accumulator_state_at_slot(slot: Slot) -> AccumulatorState {
+        AccumulatorState {
+            slot,
+            accumulator_messages: None,
+            wormhole_merkle_state: None,
+        }
+    }
+
+    pub async fn create_and_store_empty_accumulator_state_at_slot(
+        storage: &StorageInstance,
+        slot: Slot,
+    ) -> AccumulatorState {
+        let accumulator_state = create_empty_accumulator_state_at_slot(slot);
+        storage
+            .store_accumulator_state(accumulator_state.clone())
+            .await
+            .unwrap();
+        accumulator_state
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_receive_accumulator_state_works() {
+        // Initialize a storage with a cache size of 2 per key and the accumulator state.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store an accumulator state with slot 10.
+        let accumulator_state =
+            create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
+
+        // Make sure the retrieved accumulator state is what we stored.
+        assert_eq!(
+            storage.fetch_accumulator_state(10).await.unwrap().unwrap(),
+            accumulator_state
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_receive_accumulator_state_works_on_overwrite() {
+        // Initialize a storage with a cache size of 2 per key and the accumulator state.
+        let storage = LocalStorage::new_instance(2);
+
+        // Create and store an accumulator state with slot 10.
+        let mut accumulator_state =
+            create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
+
+        // Retrieve the accumulator state and make sure it is what we stored.
+        assert_eq!(
+            storage.fetch_accumulator_state(10).await.unwrap().unwrap(),
+            accumulator_state
+        );
+
+        // Change the state to have accumulator messages
+        // We mutate the existing state because the normal flow is like this.
+        accumulator_state.accumulator_messages = Some(AccumulatorMessages {
+            magic:     [0; 4],
+            slot:      10,
+            ring_size: 3,
+            messages:  vec![],
+        });
+
+        // Store the accumulator state again.
+        storage
+            .store_accumulator_state(accumulator_state.clone())
+            .await
+            .unwrap();
+
+        // Make sure the retrieved accumulator state is what we stored.
+        assert_eq!(
+            storage.fetch_accumulator_state(10).await.unwrap().unwrap(),
+            accumulator_state
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_receive_multiple_accumulator_state_works() {
+        // Initialize a storage with a cache size of 2 per key and the accumulator state.
+        let storage = LocalStorage::new_instance(2);
+
+        let accumulator_state_at_slot_10 =
+            create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
+        let accumulator_state_at_slot_20 =
+            create_and_store_empty_accumulator_state_at_slot(&storage, 20).await;
+
+        // Retrieve the accumulator states and make sure it is what we stored.
+        assert_eq!(
+            storage.fetch_accumulator_state(10).await.unwrap().unwrap(),
+            accumulator_state_at_slot_10
+        );
+
+        assert_eq!(
+            storage.fetch_accumulator_state(20).await.unwrap().unwrap(),
+            accumulator_state_at_slot_20
+        );
+    }
+
+    #[tokio::test]
+    pub async fn test_store_and_receive_accumulator_state_evicts_cache() {
+        // Initialize a storage with a cache size of 2 per key and the accumulator state.
+        let storage = LocalStorage::new_instance(2);
+
+        let _accumulator_state_at_slot_10 =
+            create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
+        let accumulator_state_at_slot_20 =
+            create_and_store_empty_accumulator_state_at_slot(&storage, 20).await;
+        let accumulator_state_at_slot_30 =
+            create_and_store_empty_accumulator_state_at_slot(&storage, 30).await;
+
+        // The accumulator state at slot 10 should be evicted from the cache.
+        assert_eq!(storage.fetch_accumulator_state(10).await.unwrap(), None);
+
+        // Retrieve the rest of accumulator states and make sure it is what we stored.
+        assert_eq!(
+            storage.fetch_accumulator_state(20).await.unwrap().unwrap(),
+            accumulator_state_at_slot_20
+        );
+
+        assert_eq!(
+            storage.fetch_accumulator_state(30).await.unwrap().unwrap(),
+            accumulator_state_at_slot_30
+        );
     }
 }

+ 1 - 1
pythnet/pythnet_sdk/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "pythnet-sdk"
-version = "1.13.6"
+version = "2.0.0"
 description = "Pyth Runtime for Solana"
 authors = ["Pyth Data Association"]
 repository = "https://github.com/pyth-network/pythnet"

+ 7 - 6
pythnet/pythnet_sdk/src/messages.rs

@@ -17,7 +17,6 @@ use serde::{
 /// some of the methods for PriceFeedMessage and TwapMessage are not used by the oracle
 /// for the same reason. Rust compiler doesn't include the unused methods in the contract.
 /// Once we start using the unused structs and methods, the contract size will increase.
-
 #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
 #[cfg_attr(
     feature = "strum",
@@ -47,10 +46,10 @@ impl Message {
         }
     }
 
-    pub fn id(&self) -> [u8; 32] {
+    pub fn feed_id(&self) -> FeedId {
         match self {
-            Self::PriceFeedMessage(msg) => msg.id,
-            Self::TwapMessage(msg) => msg.id,
+            Self::PriceFeedMessage(msg) => msg.feed_id,
+            Self::TwapMessage(msg) => msg.feed_id,
         }
     }
 }
@@ -65,11 +64,13 @@ impl Arbitrary for Message {
     }
 }
 
+/// Id of a feed producing the message. One feed produces one or more messages.
+pub type FeedId = [u8; 32];
 
 #[repr(C)]
 #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
 pub struct PriceFeedMessage {
-    pub id:                [u8; 32],
+    pub feed_id:           FeedId,
     pub price:             i64,
     pub conf:              u64,
     pub exponent:          i32,
@@ -119,7 +120,7 @@ impl Arbitrary for PriceFeedMessage {
 #[repr(C)]
 #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
 pub struct TwapMessage {
-    pub id:                [u8; 32],
+    pub feed_id:           FeedId,
     pub cumulative_price:  i128,
     pub cumulative_conf:   u128,
     pub num_down_slots:    u64,

+ 1 - 1
target_chains/cosmwasm/Cargo.lock

@@ -1490,7 +1490,7 @@ dependencies = [
 
 [[package]]
 name = "pythnet-sdk"
-version = "1.13.6"
+version = "2.0.0"
 dependencies = [
  "bincode",
  "borsh",

+ 4 - 4
target_chains/cosmwasm/contracts/pyth/src/contract.rs

@@ -542,7 +542,7 @@ fn parse_accumulator(deps: &Deps, env: &Env, data: &[u8]) -> StdResult<Vec<Price
                 match msg {
                     Message::PriceFeedMessage(price_feed_message) => {
                         let price_feed = PriceFeed::new(
-                            PriceIdentifier::new(price_feed_message.id),
+                            PriceIdentifier::new(price_feed_message.feed_id),
                             Price {
                                 price:        price_feed_message.price,
                                 conf:         price_feed_message.conf,
@@ -1186,7 +1186,7 @@ mod test {
         let mut dummy_id = [0; 32];
         dummy_id[0] = value as u8;
         let msg = PriceFeedMessage {
-            id:                dummy_id,
+            feed_id:           dummy_id,
             price:             value,
             conf:              value as u64,
             exponent:          value as i32,
@@ -1265,7 +1265,7 @@ mod test {
         match msg {
             Message::PriceFeedMessage(feed_msg) => {
                 let feed = price_feed_read_bucket(&deps.storage)
-                    .load(&feed_msg.id)
+                    .load(&feed_msg.feed_id)
                     .unwrap();
                 let price = feed.get_price_unchecked();
                 let ema_price = feed.get_ema_price_unchecked();
@@ -1517,7 +1517,7 @@ mod test {
         // Although Twap Message is a valid message but it won't get stored on-chain via
         // `update_price_feeds` and (will be) used in other methods
         let feed1 = Message::TwapMessage(TwapMessage {
-            id:                [0; 32],
+            feed_id:           [0; 32],
             cumulative_price:  0,
             cumulative_conf:   0,
             num_down_slots:    0,