Bläddra i källkod

feat(hermes): add benchmarks for histroical data

Ali Behjati 2 år sedan
förälder
incheckning
7dc0cb80bf

+ 2 - 2
hermes/README.md

@@ -45,11 +45,11 @@ To set up and run a Hermes node, follow the steps below:
    Your Hermes node will now start and connect to the specified networks. You
    can interact with the node using the REST and Websocket APIs on port 33999.
 
-   For local development, you can also run the node with cargo watch to restart
+   For local development, you can also run the node with [cargo watch](https://crates.io/crates/cargo-watch) to restart
    it automatically when the code changes:
 
    ```bash
-   cargo watch -w src -x "run -- run --pythnet-http-endpoint https://pythnet.rpcpool.com --pythnet-ws-endpoint wss://pythnet.rpcpool.com"
+   cargo watch -w src -x "run -- run --pythnet-http-endpoint https://pythnet-rpc/ --pythnet-ws-endpoint wss://pythnet-rpc/"
    ```
 
 ## Architecture Overview

+ 12 - 5
hermes/src/api/rest/get_price_feed.rs

@@ -74,12 +74,19 @@ pub async fn get_price_feed(
         .await
         .map_err(|_| RestError::UpdateDataNotFound)?;
 
+    let mut price_feed = price_feeds_with_update_data
+        .price_feeds
+        .into_iter()
+        .next()
+        .ok_or(RestError::UpdateDataNotFound)?;
+
+    // Note: This is a hack to get around the fact that Benchmark doesn't give per price feed
+    // update data. Since we request only for a single feed then the whole prices update data
+    // is this price feed update data.
+    price_feed.update_data = price_feeds_with_update_data.update_data.into_iter().next();
+
     Ok(Json(RpcPriceFeed::from_price_feed_update(
-        price_feeds_with_update_data
-            .price_feeds
-            .into_iter()
-            .next()
-            .ok_or(RestError::UpdateDataNotFound)?,
+        price_feed,
         params.verbose,
         params.binary,
     )))

+ 2 - 1
hermes/src/api/rest/get_vaa.rs

@@ -82,7 +82,7 @@ pub async fn get_vaa(
         .map_err(|_| RestError::UpdateDataNotFound)?;
 
     let vaa = price_feeds_with_update_data
-        .wormhole_merkle_update_data
+        .update_data
         .get(0)
         .map(|bytes| base64_standard_engine.encode(bytes))
         .ok_or(RestError::UpdateDataNotFound)?;
@@ -92,6 +92,7 @@ pub async fn get_vaa(
         .get(0)
         .ok_or(RestError::UpdateDataNotFound)?
         .price_feed
+        .get_price_unchecked()
         .publish_time;
 
     Ok(Json(GetVaaResponse { vaa, publish_time }))

+ 1 - 1
hermes/src/api/rest/get_vaa_ccip.rs

@@ -75,7 +75,7 @@ pub async fn get_vaa_ccip(
         .map_err(|_| RestError::CcipUpdateDataNotFound)?;
 
     let bytes = price_feeds_with_update_data
-        .wormhole_merkle_update_data
+        .update_data
         .get(0) // One price feed has only a single VAA as proof.
         .ok_or(RestError::UpdateDataNotFound)?;
 

+ 1 - 1
hermes/src/api/rest/latest_vaas.rs

@@ -66,7 +66,7 @@ pub async fn latest_vaas(
 
     Ok(Json(
         price_feeds_with_update_data
-            .wormhole_merkle_update_data
+            .update_data
             .iter()
             .map(|bytes| base64_standard_engine.encode(bytes)) // TODO: Support multiple
             // encoding formats

+ 1 - 1
hermes/src/api/rest/price_feed_ids.rs

@@ -29,7 +29,7 @@ pub async fn price_feed_ids(
         .get_price_feed_ids()
         .await
         .iter()
-        .map(|id| RpcPriceIdentifier::from(&id))
+        .map(RpcPriceIdentifier::from)
         .collect();
 
     Ok(Json(price_feed_ids))

+ 21 - 18
hermes/src/api/types.rs

@@ -49,12 +49,12 @@ type Base64String = String;
 
 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)]
 pub struct RpcPriceFeedMetadata {
-    #[schema(value_type = u64, example=85480034)]
-    pub slot:                       Slot,
+    #[schema(value_type = Option<u64>, example=85480034)]
+    pub slot:                       Option<Slot>,
     #[schema(example = 26)]
     pub emitter_chain:              u16,
-    #[schema(value_type = i64, example=doc_examples::timestamp_example)]
-    pub price_service_receive_time: UnixTimestamp,
+    #[schema(value_type = Option<i64>, example=doc_examples::timestamp_example)]
+    pub price_service_receive_time: Option<UnixTimestamp>,
 }
 
 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)]
@@ -78,30 +78,33 @@ impl RpcPriceFeed {
         verbose: bool,
         binary: bool,
     ) -> Self {
-        let price_feed_message = price_feed_update.price_feed;
+        let price_feed = price_feed_update.price_feed;
 
         Self {
-            id:        RpcPriceIdentifier::new(price_feed_message.feed_id),
+            id:        RpcPriceIdentifier::new(price_feed.id.to_bytes()),
             price:     RpcPrice {
-                price:        price_feed_message.price,
-                conf:         price_feed_message.conf,
-                expo:         price_feed_message.exponent,
-                publish_time: price_feed_message.publish_time,
+                price:        price_feed.get_price_unchecked().price,
+                conf:         price_feed.get_price_unchecked().conf,
+                expo:         price_feed.get_price_unchecked().expo,
+                publish_time: price_feed.get_price_unchecked().publish_time,
             },
             ema_price: RpcPrice {
-                price:        price_feed_message.ema_price,
-                conf:         price_feed_message.ema_conf,
-                expo:         price_feed_message.exponent,
-                publish_time: price_feed_message.publish_time,
+                price:        price_feed.get_ema_price_unchecked().price,
+                conf:         price_feed.get_ema_price_unchecked().conf,
+                expo:         price_feed.get_ema_price_unchecked().expo,
+                publish_time: price_feed.get_ema_price_unchecked().publish_time,
             },
             metadata:  verbose.then_some(RpcPriceFeedMetadata {
                 emitter_chain:              Chain::Pythnet.into(),
                 price_service_receive_time: price_feed_update.received_at,
                 slot:                       price_feed_update.slot,
             }),
-            vaa:       binary.then_some(
-                base64_standard_engine.encode(price_feed_update.wormhole_merkle_update_data),
-            ),
+            vaa:       match binary {
+                false => None,
+                true => price_feed_update
+                    .update_data
+                    .map(|data| base64_standard_engine.encode(data)),
+            },
         }
     }
 }
@@ -171,6 +174,6 @@ impl RpcPriceIdentifier {
     }
 
     pub fn from(id: &PriceIdentifier) -> RpcPriceIdentifier {
-        RpcPriceIdentifier(id.to_bytes().clone())
+        RpcPriceIdentifier(id.to_bytes())
     }
 }

+ 1 - 1
hermes/src/api/ws.rs

@@ -157,7 +157,7 @@ impl Subscriber {
         {
             let config = self
                 .price_feeds_with_config
-                .get(&PriceIdentifier::new(update.price_feed.feed_id))
+                .get(&update.price_feed.id)
                 .ok_or(anyhow::anyhow!(
                     "Config missing, price feed list was poisoned during iteration."
                 ))?;

+ 6 - 0
hermes/src/config.rs

@@ -1,5 +1,6 @@
 use {
     libp2p::Multiaddr,
+    reqwest::Url,
     solana_sdk::pubkey::Pubkey,
     std::net::SocketAddr,
     structopt::StructOpt,
@@ -61,4 +62,9 @@ pub struct RunOptions {
     #[structopt(default_value = DEFAULT_NETWORK_ID)]
     #[structopt(env = "WORMHOLE_NETWORK_ID")]
     pub wh_network_id: String,
+
+    /// Benchmarks endpoint to retrieve historical update data from.
+    #[structopt(long)]
+    #[structopt(env = "BENCHMARKS_ENDPOINT")]
+    pub benchmarks_endpoint: Option<Url>,
 }

+ 3 - 3
hermes/src/doc_examples.rs

@@ -8,7 +8,7 @@ use crate::store::types::UnixTimestamp;
 
 /// Example value for a price feed id
 pub fn price_feed_id_example() -> &'static str {
-    return "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43";
+    "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"
 }
 
 /// Example value for a unix timestamp
@@ -23,10 +23,10 @@ pub fn timestamp_example() -> UnixTimestamp {
         .duration_since(UNIX_EPOCH)
         .expect("Time went backwards");
 
-    return since_the_epoch.as_secs() as UnixTimestamp;
+    since_the_epoch.as_secs() as UnixTimestamp
 }
 
 /// Example value for a VAA
 pub fn vaa_example() -> &'static str {
-    return "UE5BVQEAAAADuAEAAAADDQC1H7meY5fTed0FsykIb8dt+7nKpbuzfvU2DplDi+dcUl8MC+UIkS65+rkiq+zmNBxE2gaxkBkjdIicZ/fBo+X7AAEqp+WtlWb84np8jJfLpuQ2W+l5KXTigsdAhz5DyVgU3xs+EnaIZxBwcE7EKzjMam+V9rlRy0CGsiQ1kjqqLzfAAQLsoVO0Vu5gVmgc8XGQ7xYhoz36rsBgMjG+e3l/B01esQi/KzPuBf/Ar8Sg5aSEOvEU0muSDb+KIr6d8eEC+FtcAAPZEaBSt4ysXVL84LUcJemQD3SiG30kOfUpF8o7/wI2M2Jf/LyCsbKEQUyLtLbZqnJBSfZJR5AMsrnHDqngMLEGAAY4UDG9GCpRuPvg8hOlsrXuPP3zq7yVPqyG0SG+bNo8rEhP5b1vXlHdG4bZsutX47d5VZ6xnFROKudx3T3/fnWUAQgAU1+kUFc3e0ZZeX1dLRVEryNIVyxMQIcxWwdey+jlIAYowHRM0fJX3Scs80OnT/CERwh5LMlFyU1w578NqxW+AQl2E/9fxjgUTi8crOfDpwsUsmOWw0+Q5OUGhELv/2UZoHAjsaw9OinWUggKACo4SdpPlHYldoWF+J2yGWOW+F4iAQre4c+ocb6a9uSWOnTldFkioqhd9lhmV542+VonCvuy4Tu214NP+2UNd/4Kk3KJCf3iziQJrCBeLi1cLHdLUikgAQtvRFR/nepcF9legl+DywAkUHi5/1MNjlEQvlHyh2XbMiS85yu7/9LgM6Sr+0ukfZY5mSkOcvUkpHn+T+Nw/IrQAQ7lty5luvKUmBpI3ITxSmojJ1aJ0kj/dc0ZcQk+/qo0l0l3/eRLkYjw5j+MZKA8jEubrHzUCke98eSoj8l08+PGAA+DAKNtCwNZe4p6J1Ucod8Lo5RKFfA84CPLVyEzEPQFZ25U9grUK6ilF4GhEia/ndYXLBt3PGW3qa6CBBPM7rH3ABGAyYEtUwzB4CeVedA5o6cKpjRkIebqDNSOqltsr+w7kXdfFVtsK2FMGFZNt5rbpIR+ppztoJ6eOKHmKmi9nQ99ARKkTxRErOs9wJXNHaAuIRV38o1pxRrlQRzGsRuKBqxcQEpC8OPFpyKYcp6iD5l7cO/gRDTamLFyhiUBwKKMP07FAWTEJv8AAAAAABrhAfrtrFhR4yubI7X5QRqMK6xKrj7U3XuBHdGnLqSqcQAAAAAAGp0GAUFVV1YAAAAAAAUYUmIAACcQBsfKUtr4PgZbIXRxRESU79PjE4IBAFUA5i32yLSoX+GmfbRNwS3l2zMPesZrctxliv7fD0pBW0MAAAKqqMJFwAAAAAAqE/NX////+AAAAABkxCb7AAAAAGTEJvoAAAKqIcWxYAAAAAAlR5m4CP/mPsh1IezjYpDlJ4GRb5q4fTs2LjtyO6M0XgVimrIQ4kSh1qg7JKW4gbGkyRntVFR9JO/GNd3FPDit0BK6M+JzXh/h12YNCz9wxlZTvXrNtWNbzqT+91pvl5cphhSPMfAHyEzTPaGR9tKDy9KNu56pmhaY32d2vfEWQmKo22guegeR98oDxs67MmnUraco46a3zEnac2Bm80pasUgMO24=";
+    "UE5BVQEAAAADuAEAAAADDQC1H7meY5fTed0FsykIb8dt+7nKpbuzfvU2DplDi+dcUl8MC+UIkS65+rkiq+zmNBxE2gaxkBkjdIicZ/fBo+X7AAEqp+WtlWb84np8jJfLpuQ2W+l5KXTigsdAhz5DyVgU3xs+EnaIZxBwcE7EKzjMam+V9rlRy0CGsiQ1kjqqLzfAAQLsoVO0Vu5gVmgc8XGQ7xYhoz36rsBgMjG+e3l/B01esQi/KzPuBf/Ar8Sg5aSEOvEU0muSDb+KIr6d8eEC+FtcAAPZEaBSt4ysXVL84LUcJemQD3SiG30kOfUpF8o7/wI2M2Jf/LyCsbKEQUyLtLbZqnJBSfZJR5AMsrnHDqngMLEGAAY4UDG9GCpRuPvg8hOlsrXuPP3zq7yVPqyG0SG+bNo8rEhP5b1vXlHdG4bZsutX47d5VZ6xnFROKudx3T3/fnWUAQgAU1+kUFc3e0ZZeX1dLRVEryNIVyxMQIcxWwdey+jlIAYowHRM0fJX3Scs80OnT/CERwh5LMlFyU1w578NqxW+AQl2E/9fxjgUTi8crOfDpwsUsmOWw0+Q5OUGhELv/2UZoHAjsaw9OinWUggKACo4SdpPlHYldoWF+J2yGWOW+F4iAQre4c+ocb6a9uSWOnTldFkioqhd9lhmV542+VonCvuy4Tu214NP+2UNd/4Kk3KJCf3iziQJrCBeLi1cLHdLUikgAQtvRFR/nepcF9legl+DywAkUHi5/1MNjlEQvlHyh2XbMiS85yu7/9LgM6Sr+0ukfZY5mSkOcvUkpHn+T+Nw/IrQAQ7lty5luvKUmBpI3ITxSmojJ1aJ0kj/dc0ZcQk+/qo0l0l3/eRLkYjw5j+MZKA8jEubrHzUCke98eSoj8l08+PGAA+DAKNtCwNZe4p6J1Ucod8Lo5RKFfA84CPLVyEzEPQFZ25U9grUK6ilF4GhEia/ndYXLBt3PGW3qa6CBBPM7rH3ABGAyYEtUwzB4CeVedA5o6cKpjRkIebqDNSOqltsr+w7kXdfFVtsK2FMGFZNt5rbpIR+ppztoJ6eOKHmKmi9nQ99ARKkTxRErOs9wJXNHaAuIRV38o1pxRrlQRzGsRuKBqxcQEpC8OPFpyKYcp6iD5l7cO/gRDTamLFyhiUBwKKMP07FAWTEJv8AAAAAABrhAfrtrFhR4yubI7X5QRqMK6xKrj7U3XuBHdGnLqSqcQAAAAAAGp0GAUFVV1YAAAAAAAUYUmIAACcQBsfKUtr4PgZbIXRxRESU79PjE4IBAFUA5i32yLSoX+GmfbRNwS3l2zMPesZrctxliv7fD0pBW0MAAAKqqMJFwAAAAAAqE/NX////+AAAAABkxCb7AAAAAGTEJvoAAAKqIcWxYAAAAAAlR5m4CP/mPsh1IezjYpDlJ4GRb5q4fTs2LjtyO6M0XgVimrIQ4kSh1qg7JKW4gbGkyRntVFR9JO/GNd3FPDit0BK6M+JzXh/h12YNCz9wxlZTvXrNtWNbzqT+91pvl5cphhSPMfAHyEzTPaGR9tKDy9KNu56pmhaY32d2vfEWQmKo22guegeR98oDxs67MmnUraco46a3zEnac2Bm80pasUgMO24="
 }

+ 1 - 2
hermes/src/main.rs

@@ -1,5 +1,4 @@
 #![feature(never_type)]
-#![feature(slice_group_by)]
 #![feature(btree_cursors)]
 
 use {
@@ -44,7 +43,7 @@ async fn init() -> Result<()> {
             let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
 
             // Initialize a cache store with a 1000 element circular buffer.
-            let store = Store::new(update_tx.clone(), 1000);
+            let store = Store::new(update_tx.clone(), 1000, opts.benchmarks_endpoint.clone());
 
             // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. We
             // also send off any notifications needed to close off any waiting tasks.

+ 6 - 42
hermes/src/network/p2p.rs

@@ -61,32 +61,7 @@ pub struct ObservationC {
     pub vaa_len: usize,
 }
 
-/// A wrapper around a VAA observed from Wormhole.
-///
-/// This wrapper tracks a Span that allows tracking the VAA through the system. This Span is
-/// expected to be tied to the `proxy` Span and so logging will always be traced back to the
-/// associated `proxy` Span regardless of where in the system it is being used.
-#[derive(Clone, Debug)]
-pub struct Vaa {
-    pub span: tracing::Span,
-    pub data: Vec<u8>,
-}
-
-// Allow PartialEq on Vaa that ignores the Span.
-impl PartialEq for Vaa {
-    fn eq(&self, other: &Self) -> bool {
-        self.data == other.data
-    }
-}
-
-/// Deref to &[u8] so we can ignore the wrapper when passing it to the store.
-impl std::ops::Deref for Vaa {
-    type Target = [u8];
-
-    fn deref(&self) -> &Self::Target {
-        &self.data
-    }
-}
+pub type Vaa = Vec<u8>;
 
 // A Static Channel to pipe the `Observation` from the callback into the local Rust handler for
 // observation messages. It has to be static for now because there's no way to capture state in
@@ -134,19 +109,11 @@ extern "C" fn proxy(o: ObservationC) {
         WormholePayload::Merkle(proof) => proof.slot,
     };
 
-    // Create a Span tied to the Span of the curent proxy.
-    let span = tracing::span!(
-        parent: tracing::Span::current(),
-        tracing::Level::INFO,
-        "Observation",
-        slot = slot,
-    );
-
     // Find the observation time for said VAA (which is a unix timestamp) and serialize as a ISO 8601 string.
-    let observed_time = deserialized_vaa.timestamp;
-    let observed_time = chrono::NaiveDateTime::from_timestamp_opt(observed_time as i64, 0).unwrap();
-    let observed_time = observed_time.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string();
-    span.in_scope(|| tracing::info!(vaa_timestamp = observed_time, "Observed VAA"));
+    let vaa_timestamp = deserialized_vaa.timestamp;
+    let vaa_timestamp = chrono::NaiveDateTime::from_timestamp_opt(vaa_timestamp as i64, 0).unwrap();
+    let vaa_timestamp = vaa_timestamp.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string();
+    tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA");
 
     // The chances of the mutex getting poisioned is very low and if it happens there is no way for
     // us to recover from it.
@@ -154,7 +121,7 @@ extern "C" fn proxy(o: ObservationC) {
         .0
         .lock()
         .map_err(|_| ())
-        .and_then(|tx| tx.send(Vaa { span, data: vaa }).map_err(|_| ()))
+        .and_then(|tx| tx.send(vaa).map_err(|_| ()))
         .is_err()
     {
         crate::SHOULD_EXIT.store(true, Ordering::Release);
@@ -252,9 +219,6 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
             })
             .await??;
 
-            vaa.span
-                .in_scope(|| tracing::info!("Received VAA from P2P layer."));
-
             let store = store.clone();
             tokio::spawn(async move {
                 if let Err(e) = store.store_update(Update::Vaa(vaa)).await {

+ 100 - 52
hermes/src/store.rs

@@ -46,7 +46,11 @@ use {
         Result,
     },
     byteorder::BigEndian,
-    pyth_sdk::PriceIdentifier,
+    pyth_sdk::{
+        Price,
+        PriceFeed,
+        PriceIdentifier,
+    },
     pythnet_sdk::{
         messages::{
             Message,
@@ -60,6 +64,7 @@ use {
             },
         },
     },
+    reqwest::Url,
     std::{
         collections::{
             BTreeMap,
@@ -76,6 +81,7 @@ use {
     wormhole_sdk::Vaa,
 };
 
+pub mod benchmarks;
 pub mod proof;
 pub mod storage;
 pub mod types;
@@ -100,16 +106,23 @@ pub struct Store {
     /// Time of the last completed update. This is used for the health
     /// probes.
     pub last_completed_update_at: RwLock<Option<Instant>>,
+    /// Benchmarks endpoint
+    pub benchmarks_endpoint:      Option<Url>,
 }
 
 impl Store {
-    pub fn new(update_tx: Sender<()>, cache_size: u64) -> Arc<Self> {
+    pub fn new(
+        update_tx: Sender<()>,
+        cache_size: u64,
+        benchmarks_endpoint: Option<Url>,
+    ) -> Arc<Self> {
         Arc::new(Self {
             storage: Storage::new(cache_size),
             observed_vaa_seqs: RwLock::new(Default::default()),
             guardian_set: RwLock::new(Default::default()),
             update_tx,
             last_completed_update_at: RwLock::new(None),
+            benchmarks_endpoint,
         })
     }
 
@@ -151,9 +164,7 @@ impl Store {
 
                 match WormholeMessage::try_from_bytes(vaa.payload)?.payload {
                     WormholePayload::Merkle(proof) => {
-                        update_vaa.span.in_scope(|| {
-                            tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");
-                        });
+                        tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");
 
                         store_wormhole_merkle_verified_message(
                             self,
@@ -169,16 +180,7 @@ impl Store {
 
             Update::AccumulatorMessages(accumulator_messages) => {
                 let slot = accumulator_messages.slot;
-                if let Some(state) = self.storage.fetch_wormhole_merkle_state(slot).await? {
-                    state.vaa.span.in_scope(|| {
-                        tracing::info!(
-                            slot = slot,
-                            "Storing Accumulator Messages (existing Proof)."
-                        );
-                    });
-                } else {
-                    tracing::info!(slot = slot, "Storing Accumulator Messages.");
-                }
+                tracing::info!(slot = slot, "Storing Accumulator Messages.");
 
                 self.storage
                     .store_accumulator_messages(accumulator_messages)
@@ -198,9 +200,7 @@ impl Store {
                 _ => return Ok(()),
             };
 
-        wormhole_merkle_state.vaa.span.in_scope(|| {
-            tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update.");
-        });
+        tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update.");
 
         // Once the accumulator reaches a complete state for a specific slot
         // we can build the message states
@@ -262,7 +262,7 @@ impl Store {
         guardian_sets.insert(id, guardian_set);
     }
 
-    pub async fn get_price_feeds_with_update_data(
+    async fn get_price_feeds_with_update_data_from_storage(
         &self,
         price_ids: Vec<PriceIdentifier>,
         request_time: RequestTime,
@@ -283,26 +283,68 @@ impl Store {
             .iter()
             .map(|message_state| match message_state.message {
                 Message::PriceFeedMessage(price_feed) => Ok(PriceFeedUpdate {
-                    price_feed,
-                    received_at: message_state.received_at,
-                    slot: message_state.slot,
-                    wormhole_merkle_update_data: construct_update_data(vec![message_state])?
-                        .into_iter()
-                        .next()
-                        .ok_or(anyhow!("Missing update data for message"))?,
+                    price_feed:  PriceFeed::new(
+                        PriceIdentifier::new(price_feed.feed_id),
+                        Price {
+                            price:        price_feed.price,
+                            conf:         price_feed.conf,
+                            expo:         price_feed.exponent,
+                            publish_time: price_feed.publish_time,
+                        },
+                        Price {
+                            price:        price_feed.ema_price,
+                            conf:         price_feed.ema_conf,
+                            expo:         price_feed.exponent,
+                            publish_time: price_feed.publish_time,
+                        },
+                    ),
+                    received_at: Some(message_state.received_at),
+                    slot:        Some(message_state.slot),
+                    update_data: Some(
+                        construct_update_data(vec![message_state.clone().into()])?
+                            .into_iter()
+                            .next()
+                            .ok_or(anyhow!("Missing update data for message"))?,
+                    ),
                 }),
                 _ => Err(anyhow!("Invalid message state type")),
             })
             .collect::<Result<Vec<_>>>()?;
 
-        let update_data = construct_update_data(messages.iter().collect())?;
+        let update_data = construct_update_data(messages.into_iter().map(|m| m.into()).collect())?;
 
         Ok(PriceFeedsWithUpdateData {
             price_feeds,
-            wormhole_merkle_update_data: update_data,
+            update_data,
         })
     }
 
+    pub async fn get_price_feeds_with_update_data(
+        &self,
+        price_ids: Vec<PriceIdentifier>,
+        request_time: RequestTime,
+    ) -> Result<PriceFeedsWithUpdateData> {
+        match self
+            .get_price_feeds_with_update_data_from_storage(price_ids.clone(), request_time.clone())
+            .await
+        {
+            Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data),
+            Err(e) => {
+                if let RequestTime::FirstAfter(publish_time) = request_time {
+                    if let Some(endpoint) = &self.benchmarks_endpoint {
+                        return benchmarks::get_price_feeds_with_update_data_from_benchmarks(
+                            endpoint.clone(),
+                            price_ids,
+                            publish_time,
+                        )
+                        .await;
+                    }
+                }
+                Err(e)
+            }
+        }
+    }
+
     pub async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier> {
         self.storage
             .message_state_keys()
@@ -354,6 +396,10 @@ mod test {
         rand::seq::SliceRandom,
         serde_wormhole::RawMessage,
         tokio::sync::mpsc::Receiver,
+        wormhole_sdk::{
+            Address,
+            Chain,
+        },
     };
 
     /// Generate list of updates for the given list of messages at a given slot with given sequence
@@ -403,10 +449,7 @@ mod test {
             payload: serde_wormhole::RawMessage::new(wormhole_message.as_ref()),
         };
 
-        updates.push(Update::Vaa(crate::network::p2p::Vaa {
-            span: tracing::Span::current(),
-            data: serde_wormhole::to_vec(&vaa).unwrap(),
-        }));
+        updates.push(Update::Vaa(serde_wormhole::to_vec(&vaa).unwrap()));
 
         updates
     }
@@ -432,7 +475,7 @@ mod test {
 
     pub async fn setup_store(cache_size: u64) -> (Arc<Store>, Receiver<()>) {
         let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
-        let store = Store::new(update_tx, cache_size);
+        let store = Store::new(update_tx, cache_size, None);
 
         // Add an initial guardian set with public key 0
         store
@@ -488,27 +531,32 @@ mod test {
         assert_eq!(
             price_feeds_with_update_data.price_feeds,
             vec![PriceFeedUpdate {
-                price_feed:                  price_feed_message,
-                slot:                        10,
-                received_at:                 price_feeds_with_update_data.price_feeds[0]
-                    .received_at, // Ignore checking this field.
-                wormhole_merkle_update_data: price_feeds_with_update_data.price_feeds[0]
-                    .wormhole_merkle_update_data
+                price_feed:  PriceFeed::new(
+                    PriceIdentifier::new(price_feed_message.feed_id),
+                    Price {
+                        price:        price_feed_message.price,
+                        conf:         price_feed_message.conf,
+                        expo:         price_feed_message.exponent,
+                        publish_time: price_feed_message.publish_time,
+                    },
+                    Price {
+                        price:        price_feed_message.ema_price,
+                        conf:         price_feed_message.ema_conf,
+                        expo:         price_feed_message.exponent,
+                        publish_time: price_feed_message.publish_time,
+                    }
+                ),
+                slot:        Some(10),
+                received_at: price_feeds_with_update_data.price_feeds[0].received_at, // Ignore checking this field.
+                update_data: price_feeds_with_update_data.price_feeds[0]
+                    .update_data
                     .clone(), // Ignore checking this field.
             }]
         );
 
         // Check the update data is correct.
-        assert_eq!(
-            price_feeds_with_update_data
-                .wormhole_merkle_update_data
-                .len(),
-            1
-        );
-        let update_data = price_feeds_with_update_data
-            .wormhole_merkle_update_data
-            .get(0)
-            .unwrap();
+        assert_eq!(price_feeds_with_update_data.update_data.len(), 1);
+        let update_data = price_feeds_with_update_data.update_data.get(0).unwrap();
         let update_data = AccumulatorUpdateData::try_from_slice(update_data.as_ref()).unwrap();
         match update_data.proof {
             Proof::WormholeMerkle { vaa, updates } => {
@@ -604,7 +652,7 @@ mod test {
         assert_eq!(price_feeds_with_update_data.price_feeds.len(), 1);
         assert_eq!(
             price_feeds_with_update_data.price_feeds[0].received_at,
-            unix_timestamp as i64
+            Some(unix_timestamp as i64)
         );
 
         // Check the store is ready
@@ -668,8 +716,8 @@ mod test {
                 .await
                 .unwrap();
             assert_eq!(price_feeds_with_update_data.price_feeds.len(), 2);
-            assert_eq!(price_feeds_with_update_data.price_feeds[0].slot, slot);
-            assert_eq!(price_feeds_with_update_data.price_feeds[1].slot, slot);
+            assert_eq!(price_feeds_with_update_data.price_feeds[0].slot, Some(slot));
+            assert_eq!(price_feeds_with_update_data.price_feeds[1].slot, Some(slot));
         }
 
         // Check nothing else is retained

+ 104 - 0
hermes/src/store/benchmarks.rs

@@ -0,0 +1,104 @@
+//! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates.
+
+use {
+    super::types::{
+        PriceFeedUpdate,
+        PriceFeedsWithUpdateData,
+        UnixTimestamp,
+    },
+    anyhow::Result,
+    base64::{
+        engine::general_purpose::STANDARD as base64_standard_engine,
+        Engine as _,
+    },
+    pyth_sdk::{
+        PriceFeed,
+        PriceIdentifier,
+    },
+    reqwest::Url,
+};
+
+const BENCHMARKS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
+
+#[derive(serde::Deserialize, Debug, Clone)]
+enum BlobEncoding {
+    #[serde(rename = "base64")]
+    Base64,
+    #[serde(rename = "hex")]
+    Hex,
+}
+
+#[derive(serde::Deserialize, Debug, Clone)]
+struct BinaryBlob {
+    pub encoding: BlobEncoding,
+    pub data:     Vec<String>,
+}
+
+#[derive(serde::Deserialize, Debug, Clone)]
+struct BenchmarkUpdates {
+    pub parsed: Vec<PriceFeed>,
+    pub binary: BinaryBlob,
+}
+
+impl TryFrom<BinaryBlob> for Vec<Vec<u8>> {
+    type Error = anyhow::Error;
+
+    fn try_from(binary_blob: BinaryBlob) -> Result<Self> {
+        binary_blob
+            .data
+            .iter()
+            .map(|datum| {
+                Ok(match binary_blob.encoding {
+                    BlobEncoding::Base64 => base64_standard_engine.decode(datum)?,
+                    BlobEncoding::Hex => hex::decode(datum)?,
+                })
+            })
+            .collect::<Result<_>>()
+    }
+}
+
+impl TryFrom<BenchmarkUpdates> for PriceFeedsWithUpdateData {
+    type Error = anyhow::Error;
+    fn try_from(benchmark_updates: BenchmarkUpdates) -> Result<Self> {
+        Ok(PriceFeedsWithUpdateData {
+            price_feeds: benchmark_updates
+                .parsed
+                .into_iter()
+                .map(|price_feed| PriceFeedUpdate {
+                    price_feed,
+                    slot: None,
+                    received_at: None,
+                    update_data: None,
+                })
+                .collect::<Vec<_>>(),
+            update_data: benchmark_updates.binary.try_into()?,
+        })
+    }
+}
+
+
+pub async fn get_price_feeds_with_update_data_from_benchmarks(
+    endpoint: Url,
+    price_ids: Vec<PriceIdentifier>,
+    publish_time: UnixTimestamp,
+) -> Result<PriceFeedsWithUpdateData> {
+    let endpoint = endpoint
+        .join(&format!("/v1/updates/price/{}", publish_time))
+        .unwrap();
+
+    let client = reqwest::Client::new();
+    let mut request = client
+        .get(endpoint)
+        .timeout(BENCHMARKS_REQUEST_TIMEOUT)
+        .query(&[("encoding", "hex")])
+        .query(&[("parsed", "true")]);
+
+    for price_id in price_ids {
+        request = request.query(&[("ids", price_id)])
+    }
+
+    let response = request.send().await?;
+
+    let benchmark_updates: BenchmarkUpdates = response.json().await?;
+    benchmark_updates.try_into()
+}

+ 94 - 81
hermes/src/store/proof/wormhole_merkle.rs

@@ -1,11 +1,12 @@
 use {
-    crate::{
-        network::p2p::Vaa,
-        store::{
-            storage::MessageState,
-            types::AccumulatorMessages,
-            Store,
+    crate::store::{
+        storage::MessageState,
+        types::{
+            AccumulatorMessages,
+            RawMessage,
+            Slot,
         },
+        Store,
     },
     anyhow::{
         anyhow,
@@ -36,6 +37,8 @@ use {
 // u8 in the wire format. So, we can't have more than 255 messages.
 pub const MAX_MESSAGE_IN_SINGLE_UPDATE_DATA: usize = 255;
 
+pub type Vaa = Vec<u8>;
+
 #[derive(Clone, PartialEq, Debug)]
 pub struct WormholeMerkleState {
     pub root: WormholeMerkleRoot,
@@ -48,6 +51,23 @@ pub struct WormholeMerkleMessageProof {
     pub vaa:   Vaa,
 }
 
+#[derive(Clone, PartialEq, Debug)]
+pub struct RawMessageWithMerkleProof {
+    pub slot:        Slot,
+    pub raw_message: RawMessage,
+    pub proof:       WormholeMerkleMessageProof,
+}
+
+impl From<MessageState> for RawMessageWithMerkleProof {
+    fn from(message_state: MessageState) -> Self {
+        Self {
+            slot:        message_state.slot,
+            raw_message: message_state.raw_message,
+            proof:       message_state.proof_set.wormhole_merkle_proof,
+        }
+    }
+}
+
 pub async fn store_wormhole_merkle_verified_message(
     store: &Store,
     root: WormholeMerkleRoot,
@@ -90,60 +110,54 @@ pub fn construct_message_states_proofs(
         .collect::<Result<Vec<WormholeMerkleMessageProof>>>()
 }
 
-pub fn construct_update_data(mut message_states: Vec<&MessageState>) -> Result<Vec<Vec<u8>>> {
-    message_states.sort_by_key(|m| m.slot);
-
-    message_states
-        .group_by(|a, b| a.slot == b.slot) // States on the same slot share the same merkle root
-        .flat_map(|messages| {
-            messages
-                // Group messages by the number of messages in a single update data
-                .chunks(MAX_MESSAGE_IN_SINGLE_UPDATE_DATA)
-                .map(|messages| {
-                    let vaa = messages
-                        .get(0)
-                        .ok_or(anyhow!("Empty message set"))?
-                        .proof_set
-                        .wormhole_merkle_proof
-                        .vaa
-                        .clone();
-
-                    vaa.span.in_scope(|| {
-                        tracing::info!("Constructing update data for {} Messages.", messages.len())
-                    });
-
-                    Ok(to_vec::<_, byteorder::BE>(&AccumulatorUpdateData::new(
-                        Proof::WormholeMerkle {
-                            vaa:     (*vaa).to_owned().into(),
-                            updates: messages
-                                .iter()
-                                .map(|message| {
-                                    Ok(MerklePriceUpdate {
-                                        message: to_vec::<_, byteorder::BE>(&message.message)
-                                            .map_err(|e| {
-                                                anyhow!("Failed to serialize message: {}", e)
-                                            })?
-                                            .into(),
-                                        proof:   message
-                                            .proof_set
-                                            .wormhole_merkle_proof
-                                            .proof
-                                            .clone(),
-                                    })
-                                })
-                                .collect::<Result<_>>()?,
-                        },
-                    ))?)
-                })
-        })
-        .collect::<Result<Vec<Vec<u8>>>>()
+pub fn construct_update_data(mut messages: Vec<RawMessageWithMerkleProof>) -> Result<Vec<Vec<u8>>> {
+    tracing::info!("Constructing update data for {} messages", messages.len());
+
+    messages.sort_by_key(|m| m.slot);
+
+    let mut iter = messages.into_iter().peekable();
+    let mut result: Vec<Vec<u8>> = vec![];
+
+    while let Some(message) = iter.next() {
+        let slot = message.slot;
+        let vaa = message.proof.vaa;
+        let mut updates = vec![MerklePriceUpdate {
+            message: message.raw_message.into(),
+            proof:   message.proof.proof,
+        }];
+
+        while updates.len() < MAX_MESSAGE_IN_SINGLE_UPDATE_DATA {
+            if let Some(message) = iter.next_if(|m| m.slot == slot) {
+                updates.push(MerklePriceUpdate {
+                    message: message.raw_message.into(),
+                    proof:   message.proof.proof,
+                });
+            } else {
+                break;
+            }
+        }
+
+        tracing::info!(
+            slot = slot,
+            "Combining {} messages in a single updateData",
+            updates.len()
+        );
+
+        result.push(to_vec::<_, byteorder::BE>(&AccumulatorUpdateData::new(
+            Proof::WormholeMerkle {
+                vaa: vaa.into(),
+                updates,
+            },
+        ))?);
+    }
+
+    Ok(result)
 }
 
 #[cfg(test)]
 mod test {
     use {
         super::*,
-        crate::store::types::ProofSet,
         pythnet_sdk::{
             messages::{
                 Message,
@@ -153,45 +167,44 @@ mod test {
         },
     };
 
-    fn create_dummy_message_state(slot_and_pubtime: u64) -> MessageState {
-        MessageState::new(
-            Message::PriceFeedMessage(PriceFeedMessage {
-                conf:              0,
-                price:             0,
-                feed_id:           [0; 32],
-                exponent:          0,
-                ema_conf:          0,
-                ema_price:         0,
-                publish_time:      slot_and_pubtime as i64,
-                prev_publish_time: 0,
-            }),
-            vec![],
-            ProofSet {
-                wormhole_merkle_proof: WormholeMerkleMessageProof {
-                    vaa:   vec![],
-                    proof: MerklePath::default(),
-                },
+    fn create_dummy_raw_message_with_merkle_proof(
+        slot_and_pubtime: u64,
+    ) -> RawMessageWithMerkleProof {
+        let price_feed_message = Message::PriceFeedMessage(PriceFeedMessage {
+            conf:              0,
+            price:             0,
+            feed_id:           [0; 32],
+            exponent:          0,
+            ema_conf:          0,
+            ema_price:         0,
+            publish_time:      slot_and_pubtime as i64,
+            prev_publish_time: 0,
+        });
+        RawMessageWithMerkleProof {
+            slot:        slot_and_pubtime,
+            proof:       WormholeMerkleMessageProof {
+                vaa:   vec![],
+                proof: MerklePath::default(),
             },
-            slot_and_pubtime,
-            0,
-        )
+            raw_message: to_vec::<_, byteorder::BE>(&price_feed_message).unwrap(),
+        }
     }
 
     #[test]
     fn test_construct_update_data_works_on_mixed_slot_and_big_size() {
-        let mut message_states = vec![];
+        let mut messages = vec![];
 
         // Messages slot and publish_time 11 share the same merkle root
-        for i in 0..MAX_MESSAGE_IN_SINGLE_UPDATE_DATA * 2 - 10 {
-            message_states.push(create_dummy_message_state(11));
+        for _ in 0..MAX_MESSAGE_IN_SINGLE_UPDATE_DATA * 2 - 10 {
+            messages.push(create_dummy_raw_message_with_merkle_proof(11));
         }
 
         // Messages on slot and publish_time 10 that share different root from the messages above
-        for i in 0..MAX_MESSAGE_IN_SINGLE_UPDATE_DATA * 2 - 10 {
-            message_states.push(create_dummy_message_state(10));
+        for _ in 0..MAX_MESSAGE_IN_SINGLE_UPDATE_DATA * 2 - 10 {
+            messages.push(create_dummy_raw_message_with_merkle_proof(10));
         }
 
-        let update_data = construct_update_data(message_states.iter().collect()).unwrap();
+        let update_data = construct_update_data(messages).unwrap();
 
         assert_eq!(update_data.len(), 4);
 

+ 7 - 10
hermes/src/store/types.rs

@@ -2,7 +2,7 @@ use {
     super::proof::wormhole_merkle::WormholeMerkleMessageProof,
     crate::network::p2p::Vaa,
     borsh::BorshDeserialize,
-    pythnet_sdk::messages::PriceFeedMessage,
+    pyth_sdk::PriceFeed,
 };
 
 #[derive(Clone, PartialEq, Debug)]
@@ -53,17 +53,14 @@ pub enum Update {
 
 #[derive(Debug, PartialEq)]
 pub struct PriceFeedUpdate {
-    pub price_feed:                  PriceFeedMessage,
-    pub slot:                        Slot,
-    pub received_at:                 UnixTimestamp,
-    /// Wormhole merkle update data for this single price feed update.
-    /// This field is available for backward compatibility and will be
-    /// removed in the future.
-    pub wormhole_merkle_update_data: Vec<u8>,
+    pub price_feed:  PriceFeed,
+    pub slot:        Option<Slot>,
+    pub received_at: Option<UnixTimestamp>,
+    pub update_data: Option<Vec<u8>>,
 }
 
 #[derive(Debug, PartialEq)]
 pub struct PriceFeedsWithUpdateData {
-    pub price_feeds:                 Vec<PriceFeedUpdate>,
-    pub wormhole_merkle_update_data: Vec<Vec<u8>>,
+    pub price_feeds: Vec<PriceFeedUpdate>,
+    pub update_data: Vec<Vec<u8>>,
 }

+ 6 - 0
pythnet/pythnet_sdk/src/wire/prefixed_vec.rs

@@ -120,6 +120,12 @@ impl<L, T> From<PrefixedVec<L, T>> for Vec<T> {
     }
 }
 
+impl<L, T> AsRef<Vec<T>> for PrefixedVec<L, T> {
+    fn as_ref(&self) -> &Vec<T> {
+        &self.data.inner
+    }
+}
+
 impl<L, T> IntoIterator for PrefixedVec<L, T> {
     type Item = T;
     type IntoIter = std::vec::IntoIter<Self::Item>;