ソースを参照

Add initial version

Ali Behjati 2 年 前
コミット
866eace744

ファイルの差分が大きいため隠しています
+ 499 - 222
hermes/Cargo.lock


+ 15 - 10
hermes/Cargo.toml

@@ -5,7 +5,6 @@ edition                        = "2021"
 
 [dependencies]
 axum                           = { version = "0.6.9", features = ["json", "ws", "macros"] }
-axum-extra                     = { version = "0.7.2", features = ["query"] }
 axum-macros                    = { version = "0.3.4" }
 anyhow                         = { version = "1.0.69" }
 base64                         = { version = "0.21.0" }
@@ -15,7 +14,7 @@ dashmap                        = { version = "5.4.0" }
 der                            = { version = "0.7.0" }
 derive_more                    = { version = "0.99.17" }
 env_logger                     = { version = "0.10.0" }
-futures                        = { version = "0.3.26" }
+futures                        = { version = "0.3.28" }
 hex                            = { version = "0.4.3" }
 rand                           = { version = "0.8.5" }
 reqwest                        = { version = "0.11.14", features = ["blocking", "json"] }
@@ -39,24 +38,30 @@ log                            = { version = "0.4.17" }
 wormhole-core                  = { git = "https://github.com/guibescos/wormhole", branch = "reisen/sdk-solana"}
 
 # Parse Wormhole attester price attestations.
-pyth-wormhole-attester-sdk     = { path = "../wormhole_attester/sdk/rust/", version = "0.1.2" }
+pythnet-sdk                    = { path = "../pythnet/pythnet_sdk/", version = "=1.13.6" }
 
 # Setup LibP2P. Unfortunately the dependencies required by libp2p are shared
 # with the dependencies required by solana's geyser plugin. This means that we
 # would have to use the same version of libp2p as solana. Luckily we don't need
 # to do this yet but it's something to keep in mind.
-libp2p                         = { version = "0.51.1", features = [
-    "dns",
+libp2p                         = { version = "0.42.2", features = [
     "gossipsub",
     "identify",
-    "macros",
     "mplex",
     "noise",
-    "quic",
     "secp256k1",
-    "tcp",
-    "tls",
-    "tokio",
     "websocket",
     "yamux",
 ]}
+
+async-trait = "0.1.68"
+solana-client = "=1.15.2"
+solana-sdk = "=1.15.2"
+solana-account-decoder = "=1.15.2"
+moka = { version = "0.11.0", features = ["future"] }
+derive_builder = "0.12.0"
+byteorder = "1.4.3"
+serde_qs = { version = "0.12.0", features = ["axum"] }
+
+[patch.crates-io]
+serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole" }

+ 0 - 2
hermes/build.rs

@@ -71,8 +71,6 @@ fn main() {
     // Tell Rust to link our Go library at compile time.
     println!("cargo:rustc-link-search=native={out_var}");
     println!("cargo:rustc-link-lib=static=pythnet");
-
-    #[cfg(target_arch = "aarch64")]
     println!("cargo:rustc-link-lib=resolv");
 
     let status = cmd.status().unwrap();

+ 17 - 23
hermes/src/network/rpc.rs → hermes/src/api.rs

@@ -1,12 +1,6 @@
 use {
     self::ws::dispatch_updates,
-    crate::{
-        network::p2p::OBSERVATIONS,
-        store::{
-            Store,
-            Update,
-        },
-    },
+    crate::store::Store,
     anyhow::Result,
     axum::{
         routing::get,
@@ -36,7 +30,7 @@ impl State {
 
 /// This method provides a background service that responds to REST requests
 ///
-/// Currently this is based on Axum due to the simplicity and strong ecosyjtem support for the
+/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
 /// packages they are based on (tokio & hyper).
 pub async fn spawn(rpc_addr: String, store: Store) -> Result<()> {
     let state = State::new(store);
@@ -55,25 +49,25 @@ pub async fn spawn(rpc_addr: String, store: Store) -> Result<()> {
         .route("/api/price_feed_ids", get(rest::price_feed_ids))
         .with_state(state.clone());
 
-    // Listen in the background for new VAA's from the Wormhole RPC.
+
+    // Binds the axum's server to the configured address and port. This is a blocking call and will
+    // not return until the server is shutdown.
+    tokio::spawn(async move {
+        // FIXME handle errors properly
+        axum::Server::bind(&rpc_addr.parse().unwrap())
+            .serve(app.into_make_service())
+            .await
+            .unwrap();
+    });
+
+    // Call dispatch updates to websocket every 1 seconds
+    // FIXME use a channel to get updates from the store
     tokio::spawn(async move {
         loop {
-            if let Ok(observation) = OBSERVATIONS.1.lock().unwrap().recv() {
-                match state.store.store_update(Update::Vaa(observation)) {
-                    Ok(updated_feed_ids) => {
-                        tokio::spawn(dispatch_updates(updated_feed_ids, state.clone()));
-                    }
-                    Err(e) => log::error!("Failed to process VAA: {:?}", e),
-                }
-            }
+            dispatch_updates(state.store.get_price_feed_ids(), state.clone()).await;
+            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
         }
     });
 
-    // Binds the axum's server to the configured address and port. This is a blocking call and will
-    // not return until the server is shutdown.
-    axum::Server::bind(&rpc_addr.parse()?)
-        .serve(app.into_make_service())
-        .await?;
-
     Ok(())
 }

+ 28 - 29
hermes/src/network/rpc/rest.rs → hermes/src/api/rest.rs

@@ -1,10 +1,14 @@
-use super::types::PriceIdInput;
 use {
-    super::types::RpcPriceFeed,
-    crate::store::RequestTime,
+    super::types::{
+        PriceIdInput,
+        RpcPriceFeed,
+    },
     crate::{
         impl_deserialize_for_hex_string_wrapper,
-        store::UnixTimestamp,
+        store::types::{
+            RequestTime,
+            UnixTimestamp,
+        },
     },
     anyhow::Result,
     axum::{
@@ -16,7 +20,6 @@ use {
         },
         Json,
     },
-    axum_extra::extract::Query, // Axum extra Query allows us to parse multi-value query parameters.
     base64::{
         engine::general_purpose::STANDARD as base64_standard_engine,
         Engine as _,
@@ -26,6 +29,7 @@ use {
         DerefMut,
     },
     pyth_sdk::PriceIdentifier,
+    serde_qs::axum::QsQuery,
 };
 
 pub enum RestError {
@@ -68,7 +72,7 @@ pub struct LatestVaasQueryParams {
 
 pub async fn latest_vaas(
     State(state): State<super::State>,
-    Query(params): Query<LatestVaasQueryParams>,
+    QsQuery(params): QsQuery<LatestVaasQueryParams>,
 ) -> Result<Json<Vec<String>>, RestError> {
     let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
     let price_feeds_with_update_data = state
@@ -77,10 +81,9 @@ pub async fn latest_vaas(
         .map_err(|_| RestError::UpdateDataNotFound)?;
     Ok(Json(
         price_feeds_with_update_data
-            .batch_vaa
-            .update_data
+            .wormhole_merkle_update_data
             .iter()
-            .map(|vaa_bytes| base64_standard_engine.encode(vaa_bytes)) // TODO: Support multiple
+            .map(|bytes| base64_standard_engine.encode(bytes)) // TODO: Support multiple
             // encoding formats
             .collect(),
     ))
@@ -97,7 +100,7 @@ pub struct LatestPriceFeedsQueryParams {
 
 pub async fn latest_price_feeds(
     State(state): State<super::State>,
-    Query(params): Query<LatestPriceFeedsQueryParams>,
+    QsQuery(params): QsQuery<LatestPriceFeedsQueryParams>,
 ) -> Result<Json<Vec<RpcPriceFeed>>, RestError> {
     let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
     let price_feeds_with_update_data = state
@@ -106,11 +109,10 @@ pub async fn latest_price_feeds(
         .map_err(|_| RestError::UpdateDataNotFound)?;
     Ok(Json(
         price_feeds_with_update_data
-            .batch_vaa
-            .price_infos
-            .into_values()
-            .map(|price_info| {
-                RpcPriceFeed::from_price_info(price_info, params.verbose, params.binary)
+            .price_feeds
+            .into_iter()
+            .map(|price_feed| {
+                RpcPriceFeed::from_price_feed_message(price_feed, params.verbose, params.binary)
             })
             .collect(),
     ))
@@ -131,7 +133,7 @@ pub struct GetVaaResponse {
 
 pub async fn get_vaa(
     State(state): State<super::State>,
-    Query(params): Query<GetVaaQueryParams>,
+    QsQuery(params): QsQuery<GetVaaQueryParams>,
 ) -> Result<Json<GetVaaResponse>, RestError> {
     let price_id: PriceIdentifier = params.id.into();
 
@@ -144,18 +146,16 @@ pub async fn get_vaa(
         .map_err(|_| RestError::UpdateDataNotFound)?;
 
     let vaa = price_feeds_with_update_data
-        .batch_vaa
-        .update_data
+        .wormhole_merkle_update_data
         .get(0)
-        .map(|vaa_bytes| base64_standard_engine.encode(vaa_bytes))
+        .map(|bytes| base64_standard_engine.encode(bytes))
         .ok_or(RestError::UpdateDataNotFound)?;
 
     let publish_time = price_feeds_with_update_data
-        .batch_vaa
-        .price_infos
-        .get(&price_id)
-        .map(|price_info| price_info.publish_time)
-        .ok_or(RestError::UpdateDataNotFound)?;
+        .price_feeds
+        .get(0)
+        .ok_or(RestError::UpdateDataNotFound)?
+        .publish_time; // TODO: This should never happen.
 
     Ok(Json(GetVaaResponse { vaa, publish_time }))
 }
@@ -176,7 +176,7 @@ pub struct GetVaaCcipResponse {
 
 pub async fn get_vaa_ccip(
     State(state): State<super::State>,
-    Query(params): Query<GetVaaCcipQueryParams>,
+    QsQuery(params): QsQuery<GetVaaCcipQueryParams>,
 ) -> Result<Json<GetVaaCcipResponse>, RestError> {
     let price_id: PriceIdentifier = PriceIdentifier::new(params.data[0..32].try_into().unwrap());
     let publish_time = UnixTimestamp::from_be_bytes(params.data[32..40].try_into().unwrap());
@@ -186,14 +186,13 @@ pub async fn get_vaa_ccip(
         .get_price_feeds_with_update_data(vec![price_id], RequestTime::FirstAfter(publish_time))
         .map_err(|_| RestError::CcipUpdateDataNotFound)?;
 
-    let vaa = price_feeds_with_update_data
-        .batch_vaa
-        .update_data
+    let bytes = price_feeds_with_update_data
+        .wormhole_merkle_update_data
         .get(0) // One price feed has only a single VAA as proof.
         .ok_or(RestError::UpdateDataNotFound)?;
 
     Ok(Json(GetVaaCcipResponse {
-        data: format!("0x{}", hex::encode(vaa)),
+        data: format!("0x{}", hex::encode(bytes)),
     }))
 }
 

+ 32 - 18
hermes/src/network/rpc/types.rs → hermes/src/api/types.rs

@@ -1,15 +1,11 @@
 use {
     crate::{
         impl_deserialize_for_hex_string_wrapper,
-        store::{
-            proof::batch_vaa::PriceInfo,
+        store::types::{
+            PriceFeedMessage,
             UnixTimestamp,
         },
     },
-    base64::{
-        engine::general_purpose::STANDARD as base64_standard_engine,
-        Engine as _,
-    },
     derive_more::{
         Deref,
         DerefMut,
@@ -41,7 +37,6 @@ type Base64String = String;
 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 pub struct RpcPriceFeedMetadata {
     pub emitter_chain:              u16,
-    pub attestation_time:           UnixTimestamp,
     pub sequence_number:            u64,
     pub price_service_receive_time: UnixTimestamp,
 }
@@ -51,26 +46,45 @@ pub struct RpcPriceFeed {
     pub id:        PriceIdentifier,
     pub price:     Price,
     pub ema_price: Price,
+    #[serde(skip_serializing_if = "Option::is_none")]
     pub metadata:  Option<RpcPriceFeedMetadata>,
     /// Vaa binary represented in base64.
+    #[serde(skip_serializing_if = "Option::is_none")]
     pub vaa:       Option<Base64String>,
 }
 
 impl RpcPriceFeed {
     // TODO: Use a Encoding type to have None, Base64, and Hex variants instead of binary flag.
     // TODO: Use a Verbosity type to define None, or Full instead of verbose flag.
-    pub fn from_price_info(price_info: PriceInfo, verbose: bool, binary: bool) -> Self {
+    pub fn from_price_feed_message(
+        price_feed_message: PriceFeedMessage,
+        _verbose: bool,
+        _binary: bool,
+    ) -> Self {
         Self {
-            id:        price_info.price_feed.id,
-            price:     price_info.price_feed.get_price_unchecked(),
-            ema_price: price_info.price_feed.get_ema_price_unchecked(),
-            metadata:  verbose.then_some(RpcPriceFeedMetadata {
-                emitter_chain:              price_info.emitter_chain,
-                attestation_time:           price_info.attestation_time,
-                sequence_number:            price_info.sequence_number,
-                price_service_receive_time: price_info.receive_time,
-            }),
-            vaa:       binary.then_some(base64_standard_engine.encode(price_info.vaa_bytes)),
+            id:        PriceIdentifier::new(price_feed_message.id),
+            price:     Price {
+                price:        price_feed_message.price,
+                conf:         price_feed_message.conf,
+                expo:         price_feed_message.exponent,
+                publish_time: price_feed_message.publish_time,
+            },
+            ema_price: Price {
+                price:        price_feed_message.ema_price,
+                conf:         price_feed_message.ema_conf,
+                expo:         price_feed_message.exponent,
+                publish_time: price_feed_message.publish_time,
+            },
+            // FIXME: Handle verbose flag properly.
+            // metadata:  verbose.then_some(RpcPriceFeedMetadata {
+            //     emitter_chain:              price_feed_message.emitter_chain,
+            //     sequence_number:            price_feed_message.sequence_number,
+            //     price_service_receive_time: price_feed_message.receive_time,
+            // }),
+            metadata:  None,
+            // FIXME: The vaa is wrong, fix it
+            // vaa:       binary.then_some(base64_standard_engine.encode(message_state.proof_set.wormhole_merkle_proof.vaa)),
+            vaa:       None,
         }
     }
 }

+ 18 - 14
hermes/src/network/rpc/ws.rs → hermes/src/api/ws.rs

@@ -3,7 +3,10 @@ use {
         PriceIdInput,
         RpcPriceFeed,
     },
-    crate::store::Store,
+    crate::store::{
+        types::RequestTime,
+        Store,
+    },
     anyhow::Result,
     axum::{
         extract::{
@@ -139,20 +142,21 @@ impl Subscriber {
     ) -> Result<()> {
         for price_feed_id in price_feed_ids {
             if let Some(config) = self.price_feeds_with_config.get(&price_feed_id) {
-                let price_feeds_with_update_data = self.store.get_price_feeds_with_update_data(
-                    vec![price_feed_id],
-                    crate::store::RequestTime::Latest,
-                )?;
-                let price_info = price_feeds_with_update_data
-                    .batch_vaa
-                    .price_infos
-                    .get(&price_feed_id)
+                let price_feeds_with_update_data = self
+                    .store
+                    .get_price_feeds_with_update_data(vec![price_feed_id], RequestTime::Latest)?;
+                let price_feed = *price_feeds_with_update_data
+                    .price_feeds
+                    .iter()
+                    .find(|price_feed| price_feed.id == price_feed_id.to_bytes())
                     .ok_or_else(|| {
                         anyhow::anyhow!("Price feed {} not found.", price_feed_id.to_string())
-                    })?
-                    .clone();
-                let price_feed =
-                    RpcPriceFeed::from_price_info(price_info, config.verbose, config.binary);
+                    })?;
+                let price_feed = RpcPriceFeed::from_price_feed_message(
+                    price_feed,
+                    config.verbose,
+                    config.binary,
+                );
                 // Feed does not flush the message and will allow us
                 // to send multiple messages in a single flush.
                 self.sender
@@ -237,7 +241,7 @@ pub async fn dispatch_updates(update_feed_ids: Vec<PriceIdentifier>, state: supe
                     Ok(_) => None,
                     Err(e) => {
                         log::debug!("Error sending update to subscriber: {}", e);
-                        Some(subscriber.key().clone())
+                        Some(*subscriber.key())
                     }
                 }
             }),

+ 17 - 33
hermes/src/config.rs

@@ -1,9 +1,6 @@
 use {
     libp2p::Multiaddr,
-    std::{
-        net::SocketAddr,
-        path::PathBuf,
-    },
+    std::net::SocketAddr,
     structopt::StructOpt,
 };
 
@@ -12,24 +9,27 @@ use {
 /// Some of these arguments are not currently used, but are included for future use to guide the
 /// structure of the application.
 #[derive(StructOpt, Debug)]
-#[structopt(name = "pythnet", about = "PythNet")]
+#[structopt(name = "hermes", about = "Hermes")]
 pub enum Options {
-    /// Run the PythNet P2P service.
     Run {
-        /// A Path to a protobuf encoded ed25519 private key.
-        #[structopt(short, long)]
-        id: Option<PathBuf>,
-
-        /// A Path to a protobuf encoded secp256k1 private key.
-        #[structopt(long)]
-        id_secp256k1: Option<PathBuf>,
+        #[structopt(long, env = "PYTHNET_WS_ENDPOINT")]
+        pythnet_ws_endpoint: String,
 
         /// Network ID for Wormhole
-        #[structopt(long, env = "WORMHOLE_NETWORK_ID")]
+        #[structopt(
+            long,
+            default_value = "/wormhole/mainnet/2",
+            env = "WORMHOLE_NETWORK_ID"
+        )]
         wh_network_id: String,
 
         /// Multiaddresses for Wormhole bootstrap peers (separated by comma).
-        #[structopt(long, use_delimiter = true, env = "WORMHOLE_BOOTSTRAP_ADDRS")]
+        #[structopt(
+            long,
+            use_delimiter = true,
+            default_value = "/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7",
+            env = "WORMHOLE_BOOTSTRAP_ADDRS"
+        )]
         wh_bootstrap_addrs: Vec<Multiaddr>,
 
         /// Multiaddresses to bind Wormhole P2P to (separated by comma)
@@ -41,24 +41,8 @@ pub enum Options {
         )]
         wh_listen_addrs: Vec<Multiaddr>,
 
-        /// The address to bind the RPC server to.
+        /// The address to bind the API server to.
         #[structopt(long, default_value = "127.0.0.1:33999")]
-        rpc_addr: SocketAddr,
-
-        /// Multiaddress to bind Pyth P2P server to.
-        #[structopt(long, default_value = "/ip4/127.0.0.1/tcp/34000")]
-        p2p_addr: Multiaddr,
-
-        /// A bootstrapping peer to join the cluster.
-        #[allow(dead_code)]
-        #[structopt(long)]
-        p2p_peer: Vec<SocketAddr>,
-    },
-
-    /// Generate a new keypair.
-    Keygen {
-        /// The path to write the generated key to.
-        #[structopt(short, long)]
-        output: PathBuf,
+        api_addr: SocketAddr,
     },
 }

+ 22 - 77
hermes/src/main.rs

@@ -1,89 +1,62 @@
 #![feature(never_type)]
+#![feature(slice_group_by)]
 
 use {
     crate::store::Store,
     anyhow::Result,
-    futures::{
-        channel::mpsc::Receiver,
-        SinkExt,
-    },
-    std::time::Duration,
     structopt::StructOpt,
-    tokio::{
-        spawn,
-        time::sleep,
-    },
 };
 
+mod api;
 mod config;
 mod macros;
 mod network;
 mod store;
 
-/// A Wormhole VAA is an array of bytes. TODO: Decoding.
-#[derive(Debug, Clone, Eq, Hash, PartialEq, serde::Serialize, serde::Deserialize)]
-pub struct Vaa {
-    pub data: Vec<u8>,
-}
-
-/// A PythNet AccountUpdate is a 32-byte address and a variable length data field.
-///
-/// This type is emitted by the Geyser plugin when an observed account is updated and is forwrarded
-/// to this process via IPC.
-#[derive(Debug, Clone, Eq, Hash, PartialEq, serde::Serialize, serde::Deserialize)]
-pub struct AccountUpdate {
-    addr: [u8; 32],
-    data: Vec<u8>,
-}
-
-/// Handler for LibP2P messages. Currently these consist only of Wormhole Observations.
-fn handle_message(_observation: network::p2p::Observation) -> Result<()> {
-    println!("Rust: Received Observation");
-    Ok(())
-}
-
 /// Initialize the Application. This can be invoked either by real main, or by the Geyser plugin.
-async fn init(_update_channel: Receiver<AccountUpdate>) -> Result<()> {
-    log::info!("Initializing PythNet...");
+async fn init() -> Result<()> {
+    log::info!("Initializing Hermes...");
 
     // Parse the command line arguments with StructOpt, will exit automatically on `--help` or
     // with invalid arguments.
     match config::Options::from_args() {
         config::Options::Run {
-            id: _,
-            id_secp256k1: _,
+            pythnet_ws_endpoint,
             wh_network_id,
             wh_bootstrap_addrs,
             wh_listen_addrs,
-            rpc_addr,
-            p2p_addr,
-            p2p_peer: _,
+            api_addr,
         } => {
-            log::info!("Starting PythNet...");
+            log::info!("Running Hermes...");
+            let store = Store::new_with_local_cache(1000);
+
+            // FIXME: Instead of spawing threads separately, we should handle all their
+            // errors properly.
 
             // Spawn the P2P layer.
-            log::info!("Starting P2P server on {}", p2p_addr);
+            log::info!("Starting P2P server on {:?}", wh_listen_addrs);
             network::p2p::spawn(
-                handle_message,
                 wh_network_id.to_string(),
                 wh_bootstrap_addrs,
                 wh_listen_addrs,
+                store.clone(),
             )
             .await?;
 
             // Spawn the RPC server.
-            log::info!("Starting RPC server on {}", rpc_addr);
+            log::info!("Starting RPC server on {}", api_addr);
 
             // TODO: Add max size to the config
-            network::rpc::spawn(rpc_addr.to_string(), Store::new_with_local_cache(1000)).await?;
+            api::spawn(api_addr.to_string(), store.clone()).await?;
+
+            // Spawn the Pythnet listener
+            // TODO: Exit the thread when it gets spawned
+            log::info!("Starting Pythnet listener using {}", pythnet_ws_endpoint);
+            network::pythnet::spawn(pythnet_ws_endpoint, store.clone()).await?;
 
             // Wait on Ctrl+C similar to main.
             tokio::signal::ctrl_c().await?;
         }
-
-        config::Options::Keygen { output: _ } => {
-            println!("Currently not implemented.");
-        }
     }
 
     Ok(())
@@ -93,43 +66,15 @@ async fn init(_update_channel: Receiver<AccountUpdate>) -> Result<()> {
 async fn main() -> Result<!> {
     env_logger::init();
 
-    // Generate a stream of fake AccountUpdates when run in binary mode. This is temporary until
-    // the Geyser component of the accumulator work is complete.
-    let (mut tx, rx) = futures::channel::mpsc::channel(1);
-
-    spawn(async move {
-        let mut data = 0u32;
-
-        loop {
-            // Simulate PythNet block time.
-            sleep(Duration::from_millis(200)).await;
-
-            // Ignore the return type of `send`, since we don't care if the receiver is closed.
-            // It's better to let the process continue to run as this is just a temporary hack.
-            let _ = SinkExt::send(
-                &mut tx,
-                AccountUpdate {
-                    addr: [0; 32],
-                    data: {
-                        data += 1;
-                        let mut data = data.to_be_bytes().to_vec();
-                        data.resize(32, 0);
-                        data
-                    },
-                },
-            )
-            .await;
-        }
-    });
-
     tokio::spawn(async move {
         // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE
         // should be set to 1 for this otherwise it will only print the top-level error.
-        if let Err(result) = init(rx).await {
+        if let Err(result) = init().await {
             eprintln!("{}", result.backtrace());
             for cause in result.chain() {
                 eprintln!("{cause}");
             }
+            std::process::exit(1);
         }
     });
 

+ 1 - 1
hermes/src/network.rs

@@ -1,2 +1,2 @@
 pub mod p2p;
-pub mod rpc;
+pub mod pythnet;

+ 42 - 18
hermes/src/network/p2p.rs

@@ -10,6 +10,10 @@
 //! their infrastructure.
 
 use {
+    crate::store::{
+        types::Update,
+        Store,
+    },
     anyhow::Result,
     libp2p::Multiaddr,
     std::{
@@ -66,6 +70,7 @@ lazy_static::lazy_static! {
 extern "C" fn proxy(o: ObservationC) {
     // Create a fixed slice from the pointer and length.
     let vaa = unsafe { std::slice::from_raw_parts(o.vaa, o.vaa_len) }.to_owned();
+    // FIXME: Remove unwrap
     if let Err(e) = OBSERVATIONS.0.lock().unwrap().send(vaa) {
         log::error!("Failed to send observation: {}", e);
     }
@@ -76,15 +81,11 @@ extern "C" fn proxy(o: ObservationC) {
 /// TODO: handle_message should be capable of handling more than just Observations. But we don't
 /// have our own P2P network, we pass it in to keep the code structure and read directly from the
 /// OBSERVATIONS channel in the RPC for now.
-pub fn bootstrap<H>(
-    _handle_message: H,
+pub fn bootstrap(
     network_id: String,
     wh_bootstrap_addrs: Vec<Multiaddr>,
     wh_listen_addrs: Vec<Multiaddr>,
-) -> Result<()>
-where
-    H: Fn(Observation) -> Result<()> + 'static,
-{
+) -> Result<()> {
     let network_id_cstr = CString::new(network_id)?;
     let wh_bootstrap_addrs_cstr = CString::new(
         wh_bootstrap_addrs
@@ -114,20 +115,43 @@ where
 }
 
 // Spawn's the P2P layer as a separate thread via Go.
-pub async fn spawn<H>(
-    handle_message: H,
+pub async fn spawn(
     network_id: String,
     wh_bootstrap_addrs: Vec<Multiaddr>,
     wh_listen_addrs: Vec<Multiaddr>,
-) -> Result<()>
-where
-    H: Fn(Observation) -> Result<()> + Send + 'static,
-{
-    bootstrap(
-        handle_message,
-        network_id,
-        wh_bootstrap_addrs,
-        wh_listen_addrs,
-    )?;
+    store: Store,
+) -> Result<()> {
+    bootstrap(network_id, wh_bootstrap_addrs, wh_listen_addrs)?;
+
+    // Listen in the background for new VAA's from the p2p layer
+    // and update the state accordingly.
+    tokio::spawn(async move {
+        loop {
+            let vaa_bytes = {
+                let observation = OBSERVATIONS.1.lock();
+
+                let observation = match observation {
+                    Ok(observation) => observation,
+                    Err(e) => {
+                        log::error!("Failed to lock observation channel: {}", e);
+                        return;
+                    }
+                };
+
+                match observation.recv() {
+                    Ok(vaa_bytes) => vaa_bytes,
+                    Err(e) => {
+                        log::error!("Failed to receive observation: {}", e);
+                        return;
+                    }
+                }
+            };
+
+            if let Err(e) = store.store_update(Update::Vaa(vaa_bytes)).await {
+                log::error!("Failed to process VAA: {:?}", e);
+            }
+        }
+    });
+
     Ok(())
 }

+ 92 - 0
hermes/src/network/pythnet.rs

@@ -0,0 +1,92 @@
+//! This module connects to the Pythnet RPC server and listens for accumulator
+//! updates. It then sends the updates to the store module for processing and
+//! storage.
+
+use {
+    crate::store::{
+        types::{
+            AccumulatorMessages,
+            RawMessage,
+            Update,
+        },
+        Store,
+    },
+    anyhow::Result,
+    borsh::BorshDeserialize,
+    futures::stream::StreamExt,
+    solana_account_decoder::UiAccountEncoding,
+    solana_client::{
+        nonblocking::pubsub_client::PubsubClient,
+        rpc_config::{
+            RpcAccountInfoConfig,
+            RpcProgramAccountsConfig,
+        },
+    },
+    solana_sdk::{
+        account::Account,
+        commitment_config::CommitmentConfig,
+        pubkey::Pubkey,
+        system_program,
+    },
+    std::ops::Rem,
+};
+
+const RING_SIZE: u32 = 10_000;
+
+pub async fn spawn(pythnet_ws_endpoint: String, store: Store) -> Result<()> {
+    let client = PubsubClient::new(pythnet_ws_endpoint.as_ref()).await?;
+
+    let config = RpcProgramAccountsConfig {
+        account_config: RpcAccountInfoConfig {
+            commitment: Some(CommitmentConfig::confirmed()),
+            encoding: Some(UiAccountEncoding::Base64Zstd),
+            ..Default::default()
+        },
+        with_context: Some(true),
+        ..Default::default()
+    };
+
+    let (mut notif, _unsub) = client
+        .program_subscribe(&system_program::id(), Some(config))
+        .await?;
+
+    loop {
+        let update = notif.next().await;
+        log::debug!("Received Pythnet update: {:?}", update);
+
+        if let Some(update) = update {
+            // Check whether this account matches the state for this slot
+            // FIXME this is hardcoded for localnet, we need to remove it from the code
+            let pyth = Pubkey::try_from("7th6GdMuo4u1zNLzFAyMY6psunHNsGjPjo8hXvcTgKei").unwrap();
+
+            let accumulator_slot = update.context.slot - 1;
+
+            // Apparently we get the update for the previous slot, so we need to subtract 1
+            let ring_index = accumulator_slot.rem(RING_SIZE as u64) as u32;
+
+            let (candidate, _) = Pubkey::find_program_address(
+                &[
+                    b"AccumulatorState",
+                    &pyth.to_bytes(),
+                    &ring_index.to_be_bytes(),
+                ],
+                &system_program::id(),
+            );
+
+            if candidate.to_string() != update.value.pubkey {
+                continue;
+            }
+
+            let account: Account = update.value.account.decode().unwrap();
+            log::debug!("Received Accumulator update: {:?}", account);
+            let accumulator_messages = AccumulatorMessages {
+                slot:     accumulator_slot,
+                messages: Vec::<RawMessage>::try_from_slice(account.data.as_ref())?,
+            };
+
+            store
+                .store_update(Update::AccumulatorMessages(accumulator_messages))
+                .await?;
+        }
+    }
+}

+ 164 - 34
hermes/src/store.rs

@@ -1,56 +1,167 @@
 use {
     self::{
-        proof::batch_vaa::PriceInfosWithUpdateData,
-        storage::Storage,
+        proof::wormhole_merkle::{
+            construct_update_data,
+            WormholeMerkleProof,
+        },
+        storage::StorageInstance,
+        types::{
+            AccumulatorMessages,
+            MessageType,
+            PriceFeedsWithUpdateData,
+            RequestTime,
+            Update,
+        },
     },
-    anyhow::Result,
+    crate::store::{
+        proof::wormhole_merkle::{
+            construct_message_states_proofs,
+            store_wormhole_merkle_verified_message,
+        },
+        types::{
+            Message,
+            MessageState,
+            ProofSet,
+            WormholePayload,
+        },
+    },
+    anyhow::{
+        anyhow,
+        Result,
+    },
+    derive_builder::Builder,
+    moka::future::Cache,
     pyth_sdk::PriceIdentifier,
-    std::sync::Arc,
+    std::{
+        ops::Rem,
+        time::Duration,
+    },
+    wormhole::VAA,
 };
 
 pub mod proof;
 pub mod storage;
+pub mod types;
 
-pub type UnixTimestamp = u64;
-
-#[derive(Clone, PartialEq, Eq, Debug)]
-pub enum RequestTime {
-    Latest,
-    FirstAfter(UnixTimestamp),
-}
-
-pub enum Update {
-    Vaa(Vec<u8>),
-}
+pub type RingIndex = u32;
 
-pub struct PriceFeedsWithUpdateData {
-    pub batch_vaa: PriceInfosWithUpdateData,
+#[derive(Clone, PartialEq, Debug, Builder)]
+#[builder(derive(Debug), pattern = "immutable")]
+pub struct AccumulatorState {
+    pub accumulator_messages:  AccumulatorMessages,
+    pub wormhole_merkle_proof: WormholeMerkleProof,
 }
 
-pub type State = Arc<Box<dyn Storage>>;
-
 #[derive(Clone)]
 pub struct Store {
-    pub state: State,
+    pub storage:               StorageInstance,
+    pub pending_accumulations: Cache<RingIndex, AccumulatorStateBuilder>,
 }
 
 impl Store {
     pub fn new_with_local_cache(max_size_per_key: usize) -> Self {
         Self {
-            state: Arc::new(Box::new(storage::local_cache::LocalCache::new(
+            storage:               storage::local_storage::LocalStorage::new_instance(
                 max_size_per_key,
-            ))),
+            ),
+            pending_accumulations: Cache::builder()
+                .max_capacity(10_000)
+                .time_to_live(Duration::from_secs(60 * 5))
+                .build(), // FIXME: Make this configurable
         }
     }
 
-    /// Stores the update data in the store and returns the price identifiers for which
-    /// price feeds were updated.
-    pub fn store_update(&self, update: Update) -> Result<Vec<PriceIdentifier>> {
-        match update {
+    /// Stores the update data in the store
+    pub async fn store_update(&self, update: Update) -> Result<()> {
+        let ring_index = match update {
             Update::Vaa(vaa_bytes) => {
-                proof::batch_vaa::store_vaa_update(self.state.clone(), vaa_bytes)
+                let vaa = VAA::from_bytes(vaa_bytes.clone())?;
+                let payload = WormholePayload::try_from_bytes(&vaa.payload, &vaa_bytes)?;
+
+                // FIXME: Validate the VAA
+                // FIXME: Skip similar VAAs
+
+                match payload {
+                    WormholePayload::Merkle(proof) => {
+                        log::info!("Storing merkle proof for state index {:?}", proof);
+                        store_wormhole_merkle_verified_message(self, proof.clone()).await?;
+                        proof.state_index
+                    }
+                }
             }
-        }
+            Update::AccumulatorMessages(accumulator_messages) => {
+                // FIXME: Move this constant to a better place
+                const RING_SIZE: u32 = 10_000;
+                let ring_index = accumulator_messages.slot.rem(RING_SIZE as u64) as u32;
+
+                log::info!(
+                    "Storing accumulator messages for ring index {:?}: {:?}",
+                    ring_index,
+                    accumulator_messages
+                );
+
+                let pending_acc = self
+                    .pending_accumulations
+                    .entry(ring_index)
+                    .or_default()
+                    .await
+                    .into_value();
+                self.pending_accumulations
+                    .insert(
+                        ring_index,
+                        pending_acc.accumulator_messages(accumulator_messages),
+                    )
+                    .await;
+
+                ring_index
+            }
+        };
+
+        let pending_state = self.pending_accumulations.get(&ring_index);
+        let pending_state = match pending_state {
+            Some(pending_state) => pending_state,
+            // Due to some race conditions this might happen when it's processed before
+            None => return Ok(()),
+        };
+
+        let state = match pending_state.build() {
+            Ok(state) => state,
+            Err(_) => return Ok(()),
+        };
+
+        log::info!("State: {:?}", state);
+
+        let wormhole_merkle_message_states_proofs = construct_message_states_proofs(state.clone())?;
+
+        let message_states = state
+            .accumulator_messages
+            .messages
+            .iter()
+            .enumerate()
+            .map(|(idx, raw_message)| {
+                let message = Message::from_bytes(raw_message)?;
+
+                Ok(MessageState::new(
+                    message,
+                    raw_message.clone(),
+                    ProofSet {
+                        wormhole_merkle_proof: wormhole_merkle_message_states_proofs
+                            .get(idx)
+                            .ok_or(anyhow!("Missing proof for message"))?
+                            .clone(),
+                    },
+                    state.accumulator_messages.slot,
+                ))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        log::info!("Message states: {:?}", message_states);
+
+        self.storage.store_message_states(message_states)?;
+
+        self.pending_accumulations.invalidate(&ring_index).await;
+
+        Ok(())
     }
 
     pub fn get_price_feeds_with_update_data(
@@ -58,16 +169,35 @@ impl Store {
         price_ids: Vec<PriceIdentifier>,
         request_time: RequestTime,
     ) -> Result<PriceFeedsWithUpdateData> {
+        let messages = self.storage.retrieve_message_states(
+            price_ids
+                .iter()
+                .map(|price_id| price_id.to_bytes())
+                .collect(),
+            types::RequestType::Some(vec![MessageType::PriceFeed]),
+            request_time,
+        )?;
+
+        let price_feeds = messages
+            .iter()
+            .map(|message_state| match message_state.message {
+                Message::PriceFeed(price_feed) => Ok(price_feed),
+                _ => Err(anyhow!("Invalid message state type")),
+            })
+            .collect::<Result<Vec<_>>>()?;
+        let update_data = construct_update_data(messages)?;
+
         Ok(PriceFeedsWithUpdateData {
-            batch_vaa: proof::batch_vaa::get_price_infos_with_update_data(
-                self.state.clone(),
-                price_ids,
-                request_time,
-            )?,
+            price_feeds,
+            wormhole_merkle_update_data: update_data,
         })
     }
 
     pub fn get_price_feed_ids(&self) -> Vec<PriceIdentifier> {
-        proof::batch_vaa::get_price_feed_ids(self.state.clone())
+        self.storage
+            .keys()
+            .iter()
+            .map(|key| PriceIdentifier::new(key.id))
+            .collect()
     }
 }

+ 1 - 1
hermes/src/store/proof.rs

@@ -1 +1 @@
-pub mod batch_vaa;
+pub mod wormhole_merkle;

+ 0 - 175
hermes/src/store/proof/batch_vaa.rs

@@ -1,175 +0,0 @@
-use {
-    crate::store::{
-        storage::{
-            Key,
-            StorageData,
-        },
-        RequestTime,
-        State,
-        UnixTimestamp,
-    },
-    anyhow::{
-        anyhow,
-        Result,
-    },
-    pyth_sdk::{
-        Price,
-        PriceFeed,
-        PriceIdentifier,
-    },
-    pyth_wormhole_attester_sdk::{
-        BatchPriceAttestation,
-        PriceAttestation,
-        PriceStatus,
-    },
-    std::{
-        collections::{
-            HashMap,
-            HashSet,
-        },
-        time::{
-            SystemTime,
-            UNIX_EPOCH,
-        },
-    },
-    wormhole::VAA,
-};
-
-// TODO: We need to add more metadata to this struct.
-#[derive(Clone, Default, PartialEq, Debug)]
-pub struct PriceInfo {
-    pub price_feed:       PriceFeed,
-    pub vaa_bytes:        Vec<u8>,
-    pub publish_time:     UnixTimestamp,
-    pub emitter_chain:    u16,
-    pub attestation_time: UnixTimestamp,
-    pub receive_time:     UnixTimestamp,
-    pub sequence_number:  u64,
-}
-
-#[derive(Clone, Default)]
-pub struct PriceInfosWithUpdateData {
-    pub price_infos: HashMap<PriceIdentifier, PriceInfo>,
-    pub update_data: Vec<Vec<u8>>,
-}
-
-pub fn store_vaa_update(state: State, vaa_bytes: Vec<u8>) -> Result<Vec<PriceIdentifier>> {
-    // FIXME: Vaa bytes might not be a valid Pyth BatchUpdate message nor originate from Our emitter.
-    // We should check that.
-    // FIXME: We receive multiple vaas for the same update (due to different signedVAAs). We need
-    // to drop them.
-    let vaa = VAA::from_bytes(&vaa_bytes)?;
-    let batch_price_attestation = BatchPriceAttestation::deserialize(vaa.payload.as_slice())
-        .map_err(|_| anyhow!("Failed to deserialize VAA"))?;
-
-    let mut updated_price_feed_ids = Vec::new();
-
-    for price_attestation in batch_price_attestation.price_attestations {
-        let price_feed = price_attestation_to_price_feed(price_attestation.clone());
-
-        let publish_time = price_feed.get_price_unchecked().publish_time.try_into()?;
-
-        let price_info = PriceInfo {
-            price_feed,
-            vaa_bytes: vaa_bytes.clone(),
-            publish_time,
-            emitter_chain: vaa.emitter_chain.into(),
-            attestation_time: price_attestation.attestation_time.try_into()?,
-            receive_time: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
-            sequence_number: vaa.sequence,
-        };
-
-        let key = Key::BatchVaa(price_feed.id);
-        state.insert(key, publish_time, StorageData::BatchVaa(price_info))?;
-
-        // FIXME: Only add price feed if it's newer
-        // or include whether it's newer or not in the vector
-        updated_price_feed_ids.push(price_feed.id);
-    }
-
-    Ok(updated_price_feed_ids)
-}
-
-
-pub fn get_price_infos_with_update_data(
-    state: State,
-    price_ids: Vec<PriceIdentifier>,
-    request_time: RequestTime,
-) -> Result<PriceInfosWithUpdateData> {
-    let mut price_infos = HashMap::new();
-    let mut vaas: HashSet<Vec<u8>> = HashSet::new();
-    for price_id in price_ids {
-        let key = Key::BatchVaa(price_id);
-        let maybe_data = state.get(key, request_time.clone())?;
-
-        match maybe_data {
-            Some(StorageData::BatchVaa(price_info)) => {
-                vaas.insert(price_info.vaa_bytes.clone());
-                price_infos.insert(price_id, price_info);
-            }
-            None => {
-                return Err(anyhow!("No price feed found for price id: {:?}", price_id));
-            }
-        }
-    }
-    let update_data: Vec<Vec<u8>> = vaas.into_iter().collect();
-    Ok(PriceInfosWithUpdateData {
-        price_infos,
-        update_data,
-    })
-}
-
-
-pub fn get_price_feed_ids(state: State) -> Vec<PriceIdentifier> {
-    // Currently we have only one type and filter map is not necessary.
-    // But we might have more types in the future.
-    #[allow(clippy::unnecessary_filter_map)]
-    state
-        .keys()
-        .into_iter()
-        .filter_map(|key| match key {
-            Key::BatchVaa(price_id) => Some(price_id),
-        })
-        .collect()
-}
-
-/// Convert a PriceAttestation to a PriceFeed.
-///
-/// We cannot implmenet this function as From/Into trait because none of these types are defined in this crate.
-/// Ideally we need to move this method to the wormhole_attester sdk crate or have our own implementation of PriceFeed.
-pub fn price_attestation_to_price_feed(price_attestation: PriceAttestation) -> PriceFeed {
-    if price_attestation.status == PriceStatus::Trading {
-        PriceFeed::new(
-            // This conversion is done because the identifier on the wormhole_attester uses sdk v0.5.0 and this crate uses 0.7.0
-            PriceIdentifier::new(price_attestation.price_id.to_bytes()),
-            Price {
-                price:        price_attestation.price,
-                conf:         price_attestation.conf,
-                publish_time: price_attestation.publish_time,
-                expo:         price_attestation.expo,
-            },
-            Price {
-                price:        price_attestation.ema_price,
-                conf:         price_attestation.ema_conf,
-                publish_time: price_attestation.publish_time,
-                expo:         price_attestation.expo,
-            },
-        )
-    } else {
-        PriceFeed::new(
-            PriceIdentifier::new(price_attestation.price_id.to_bytes()),
-            Price {
-                price:        price_attestation.prev_price,
-                conf:         price_attestation.prev_conf,
-                publish_time: price_attestation.prev_publish_time,
-                expo:         price_attestation.expo,
-            },
-            Price {
-                price:        price_attestation.ema_price,
-                conf:         price_attestation.ema_conf,
-                publish_time: price_attestation.prev_publish_time,
-                expo:         price_attestation.expo,
-            },
-        )
-    }
-}

+ 155 - 0
hermes/src/store/proof/wormhole_merkle.rs

@@ -0,0 +1,155 @@
+use {
+    crate::store::{
+        types::MessageState,
+        AccumulatorState,
+        Store,
+    },
+    anyhow::{
+        anyhow,
+        Result,
+    },
+    byteorder::{
+        BigEndian,
+        WriteBytesExt,
+    },
+    pythnet_sdk::{
+        accumulators::{
+            merkle::{
+                MerkleAccumulator,
+                MerklePath,
+            },
+            Accumulator,
+        },
+        hashers::keccak256_160::Keccak160,
+    },
+    std::io::{
+        Cursor,
+        Write,
+    },
+};
+
+type Hash = [u8; 20];
+
+#[derive(Clone, PartialEq, Debug)]
+pub struct WormholeMerkleProof {
+    pub vaa:         Vec<u8>,
+    pub state_index: u32,
+    pub root:        Hash,
+}
+
+#[derive(Clone, PartialEq, Debug)]
+pub struct WormholeMerkleMessageProof {
+    pub vaa:   Vec<u8>,
+    pub proof: MerklePath<Keccak160>,
+}
+
+pub async fn store_wormhole_merkle_verified_message(
+    store: &Store,
+    proof: WormholeMerkleProof,
+) -> Result<()> {
+    let pending_acc = store
+        .pending_accumulations
+        .entry(proof.state_index)
+        .or_default()
+        .await
+        .into_value();
+    store
+        .pending_accumulations
+        .insert(proof.state_index, pending_acc.wormhole_merkle_proof(proof))
+        .await;
+    Ok(())
+}
+
+pub fn construct_message_states_proofs(
+    state: AccumulatorState,
+) -> Result<Vec<WormholeMerkleMessageProof>> {
+    // Check whether the state is valid
+    let merkle_acc = match MerkleAccumulator::<Keccak160>::from_set(
+        state
+            .accumulator_messages
+            .messages
+            .iter()
+            .map(|m| m.as_ref()),
+    ) {
+        Some(merkle_acc) => merkle_acc,
+        None => return Ok(vec![]), // It only happens when the message set is empty
+    };
+
+    let proof = &state.wormhole_merkle_proof;
+
+    log::info!(
+        "Merkle root: {:?}, Verified root: {:?}",
+        merkle_acc.root,
+        proof.root
+    );
+    log::info!("Valid: {}", merkle_acc.root == proof.root);
+
+    state
+        .accumulator_messages
+        .messages
+        .iter()
+        .map(|m| {
+            Ok(WormholeMerkleMessageProof {
+                vaa:   state.wormhole_merkle_proof.vaa.clone(),
+                proof: merkle_acc
+                    .prove(m.as_ref())
+                    .ok_or(anyhow!("Failed to prove message"))?,
+            })
+        })
+        .collect::<Result<Vec<WormholeMerkleMessageProof>>>()
+}
+
+pub fn construct_update_data(mut message_states: Vec<MessageState>) -> Result<Vec<Vec<u8>>> {
+    message_states.sort_by_key(
+        |m| m.proof_set.wormhole_merkle_proof.vaa.clone(), // FIXME: This is not efficient
+    );
+
+    message_states
+        .group_by(|a, b| {
+            a.proof_set.wormhole_merkle_proof.vaa == b.proof_set.wormhole_merkle_proof.vaa
+        })
+        .map(|messages| {
+            let vaa = messages
+                .get(0)
+                .ok_or(anyhow!("Empty message set"))?
+                .proof_set
+                .wormhole_merkle_proof
+                .vaa
+                .clone();
+
+            let mut cursor = Cursor::new(Vec::new());
+
+            cursor.write_u32::<BigEndian>(0x504e4155)?; // "PNAU"
+            cursor.write_u8(0x01)?; // Major version
+            cursor.write_u8(0x00)?; // Minor version
+            cursor.write_u8(0)?; // Trailing header size
+
+            cursor.write_u8(0)?; // Update type of WormholeMerkle. FIXME: Make this out of enum
+
+            // Writing VAA
+            cursor.write_u16::<BigEndian>(vaa.len().try_into()?)?;
+            cursor.write_all(&vaa)?;
+
+            // Writing number of messages
+            cursor.write_u8(messages.len().try_into()?)?;
+
+            for message in messages {
+                // Writing message
+                cursor.write_u16::<BigEndian>(message.raw_message.len().try_into()?)?;
+                cursor.write_all(&message.raw_message)?;
+
+                // Writing proof
+                cursor.write_all(
+                    &message
+                        .proof_set
+                        .wormhole_merkle_proof
+                        .proof
+                        .serialize()
+                        .ok_or(anyhow!("Unable to serialize merkle proof path"))?,
+                )?;
+            }
+
+            Ok(cursor.into_inner())
+        })
+        .collect::<Result<Vec<Vec<u8>>>>()
+}

+ 18 - 19
hermes/src/store/storage.rs

@@ -1,24 +1,16 @@
 use {
-    super::{
-        proof::batch_vaa::PriceInfo,
+    super::types::{
+        MessageIdentifier,
+        MessageKey,
+        MessageState,
         RequestTime,
-        UnixTimestamp,
+        RequestType,
     },
     anyhow::Result,
-    pyth_sdk::PriceIdentifier,
+    std::sync::Arc,
 };
 
-pub mod local_cache;
-
-#[derive(Clone, PartialEq, Debug)]
-pub enum StorageData {
-    BatchVaa(PriceInfo),
-}
-
-#[derive(Clone, PartialEq, Eq, Debug, Hash)]
-pub enum Key {
-    BatchVaa(PriceIdentifier),
-}
+pub mod local_storage;
 
 /// This trait defines the interface for update data storage
 ///
@@ -27,8 +19,15 @@ pub enum Key {
 /// data to abstract the details of the update data, and so each update data is stored
 /// under a separate key. The caller is responsible for specifying the right
 /// key for the update data they wish to access.
-pub trait Storage: Sync + Send {
-    fn insert(&self, key: Key, time: UnixTimestamp, value: StorageData) -> Result<()>;
-    fn get(&self, key: Key, request_time: RequestTime) -> Result<Option<StorageData>>;
-    fn keys(&self) -> Vec<Key>;
+pub trait Storage: Send + Sync {
+    fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
+    fn retrieve_message_states(
+        &self,
+        ids: Vec<MessageIdentifier>,
+        request_type: RequestType,
+        request_time: RequestTime,
+    ) -> Result<Vec<MessageState>>;
+    fn keys(&self) -> Vec<MessageKey>;
 }
+
+pub type StorageInstance = Arc<Box<dyn Storage>>;

+ 0 - 106
hermes/src/store/storage/local_cache.rs

@@ -1,106 +0,0 @@
-use {
-    super::{
-        super::RequestTime,
-        Key,
-        Storage,
-        StorageData,
-        UnixTimestamp,
-    },
-    anyhow::Result,
-    dashmap::DashMap,
-    std::{
-        collections::VecDeque,
-        sync::Arc,
-    },
-};
-
-#[derive(Clone, PartialEq, Debug)]
-pub struct Record {
-    pub time:  UnixTimestamp,
-    pub value: StorageData,
-}
-
-#[derive(Clone)]
-pub struct LocalCache {
-    cache:            Arc<DashMap<Key, VecDeque<Record>>>,
-    max_size_per_key: usize,
-}
-
-impl LocalCache {
-    pub fn new(max_size_per_key: usize) -> Self {
-        Self {
-            cache: Arc::new(DashMap::new()),
-            max_size_per_key,
-        }
-    }
-}
-
-impl Storage for LocalCache {
-    /// Add a new db entry to the cache.
-    ///
-    /// This method keeps the backed store sorted for efficiency, and removes
-    /// the oldest record in the cache if the max_size is reached. Entries are
-    /// usually added in increasing order and likely to be inserted near the
-    /// end of the deque. The function is optimized for this specific case.
-    fn insert(&self, key: Key, time: UnixTimestamp, value: StorageData) -> Result<()> {
-        let mut key_cache = self.cache.entry(key).or_insert_with(VecDeque::new);
-
-        let record = Record { time, value };
-
-        key_cache.push_back(record);
-
-        // Shift the pushed record until it's in the right place.
-        let mut i = key_cache.len() - 1;
-        while i > 0 && key_cache[i - 1].time > key_cache[i].time {
-            key_cache.swap(i - 1, i);
-            i -= 1;
-        }
-
-        // Remove the oldest record if the max size is reached.
-        if key_cache.len() > self.max_size_per_key {
-            key_cache.pop_front();
-        }
-
-        Ok(())
-    }
-
-    fn get(&self, key: Key, request_time: RequestTime) -> Result<Option<StorageData>> {
-        match self.cache.get(&key) {
-            Some(key_cache) => {
-                let record = match request_time {
-                    RequestTime::Latest => key_cache.back().cloned(),
-                    RequestTime::FirstAfter(time) => {
-                        // If the requested time is before the first element in the vector, we are
-                        // not sure that the first element is the closest one.
-                        if let Some(oldest_record) = key_cache.front() {
-                            if time < oldest_record.time {
-                                return Ok(None);
-                            }
-                        }
-
-                        // Binary search returns Ok(idx) if the element is found at index idx or Err(idx) if it's not
-                        // found which idx is the index where the element should be inserted to keep the vector sorted.
-                        // Getting idx within any of the match arms will give us the index of the element that is
-                        // closest after or equal to the requested time.
-                        let idx = match key_cache.binary_search_by_key(&time, |record| record.time)
-                        {
-                            Ok(idx) => idx,
-                            Err(idx) => idx,
-                        };
-
-                        // We are using `get` to handle out of bound idx. This happens if the
-                        // requested time is after the last element in the vector.
-                        key_cache.get(idx).cloned()
-                    }
-                };
-
-                Ok(record.map(|record| record.value))
-            }
-            None => Ok(None),
-        }
-    }
-
-    fn keys(&self) -> Vec<Key> {
-        self.cache.iter().map(|entry| entry.key().clone()).collect()
-    }
-}

+ 136 - 0
hermes/src/store/storage/local_storage.rs

@@ -0,0 +1,136 @@
+use {
+    super::{
+        MessageIdentifier,
+        MessageKey,
+        MessageState,
+        RequestTime,
+        RequestType,
+        Storage,
+        StorageInstance,
+    },
+    crate::store::types::MessageType,
+    anyhow::{
+        anyhow,
+        Result,
+    },
+    dashmap::DashMap,
+    std::{
+        collections::VecDeque,
+        sync::Arc,
+    },
+};
+
+#[derive(Clone)]
+pub struct LocalStorage {
+    cache:            Arc<DashMap<MessageKey, VecDeque<MessageState>>>,
+    max_size_per_key: usize,
+}
+
+impl LocalStorage {
+    pub fn new_instance(max_size_per_key: usize) -> StorageInstance {
+        Arc::new(Box::new(Self {
+            cache: Arc::new(DashMap::new()),
+            max_size_per_key,
+        }))
+    }
+
+    fn retrieve_message_state(
+        &self,
+        key: MessageKey,
+        request_time: RequestTime,
+    ) -> Option<MessageState> {
+        match self.cache.get(&key) {
+            Some(key_cache) => {
+                match request_time {
+                    RequestTime::Latest => key_cache.back().cloned(),
+                    RequestTime::FirstAfter(time) => {
+                        // If the requested time is before the first element in the vector, we are
+                        // not sure that the first element is the closest one.
+                        if let Some(oldest_record) = key_cache.front() {
+                            if time < oldest_record.time().publish_time {
+                                return None;
+                            }
+                        }
+
+                        // Binary search returns Ok(idx) if the element is found at index idx or Err(idx) if it's not
+                        // found which idx is the index where the element should be inserted to keep the vector sorted.
+                        // Getting idx within any of the match arms will give us the index of the element that is
+                        // closest after or equal to the requested time.
+                        let idx = match key_cache
+                            .binary_search_by_key(&time, |record| record.time().publish_time)
+                        {
+                            Ok(idx) => idx,
+                            Err(idx) => idx,
+                        };
+
+                        // We are using `get` to handle out of bound idx. This happens if the
+                        // requested time is after the last element in the vector.
+                        key_cache.get(idx).cloned()
+                    }
+                }
+            }
+            None => None,
+        }
+    }
+}
+
+impl Storage for LocalStorage {
+    /// Add a new db entry to the cache.
+    ///
+    /// This method keeps the backed store sorted for efficiency, and removes
+    /// the oldest record in the cache if the max_size is reached. Entries are
+    /// usually added in increasing order and likely to be inserted near the
+    /// end of the deque. The function is optimized for this specific case.
+    fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
+        for message_state in message_states {
+            let key = message_state.key();
+
+            let mut key_cache = self.cache.entry(key).or_insert_with(VecDeque::new);
+
+            key_cache.push_back(message_state);
+
+            // Shift the pushed record until it's in the right place.
+            let mut i = key_cache.len().saturating_sub(1);
+            while i > 0 && key_cache[i - 1].time() > key_cache[i].time() {
+                key_cache.swap(i - 1, i);
+                i -= 1;
+            }
+
+            // FIXME remove equal elements by key and time
+
+            // Remove the oldest record if the max size is reached.
+            if key_cache.len() > self.max_size_per_key {
+                key_cache.pop_front();
+            }
+        }
+
+        Ok(())
+    }
+
+    fn retrieve_message_states(
+        &self,
+        ids: Vec<MessageIdentifier>,
+        request_type: RequestType,
+        request_time: RequestTime,
+    ) -> Result<Vec<MessageState>> {
+        // TODO: Should we return an error if any of the ids are not found?
+        let types: Vec<MessageType> = request_type.into();
+        ids.into_iter()
+            .flat_map(|id| {
+                let request_time = request_time.clone();
+                types.iter().map(move |message_type| {
+                    let key = MessageKey {
+                        id,
+                        type_: message_type.clone(),
+                    };
+                    self.retrieve_message_state(key, request_time.clone())
+                        .ok_or(anyhow!("Message not found"))
+                })
+            })
+            .collect()
+    }
+
+    fn keys(&self) -> Vec<MessageKey> {
+        self.cache.iter().map(|entry| entry.key().clone()).collect()
+    }
+}

+ 299 - 0
hermes/src/store/types.rs

@@ -0,0 +1,299 @@
+use {
+    super::proof::wormhole_merkle::{
+        WormholeMerkleMessageProof,
+        WormholeMerkleProof,
+    },
+    anyhow::{
+        anyhow,
+        Result,
+    },
+};
+
+#[derive(Clone, Debug, PartialEq)]
+pub enum WormholePayload {
+    Merkle(WormholeMerkleProof),
+}
+
+impl WormholePayload {
+    pub fn try_from_bytes(bytes: &[u8], vaa_bytes: &[u8]) -> Result<Self> {
+        let magic = u32::from_be_bytes(bytes[0..4].try_into()?);
+        if magic != 0x41555756u32 {
+            return Err(anyhow!("Invalid magic"));
+        }
+
+        let message_type = u8::from_be_bytes(bytes[4..5].try_into()?);
+
+        if message_type != 0 {
+            return Err(anyhow!("Invalid message type"));
+        }
+
+        let state_index = u32::from_be_bytes(bytes[5..9].try_into()?);
+        let root_digest = bytes[9..29].try_into()?;
+
+        if bytes.len() > 29 {
+            return Err(anyhow!("Invalid message length"));
+        }
+
+        Ok(Self::Merkle(WormholeMerkleProof {
+            root: root_digest,
+            state_index,
+            vaa: vaa_bytes.to_vec(),
+        }))
+    }
+}
+
+pub type RawMessage = Vec<u8>;
+pub type MessageIdentifier = [u8; 32];
+
+#[derive(Clone, PartialEq, Eq, Debug, Hash)]
+pub enum MessageType {
+    PriceFeed,
+    TwapPrice,
+}
+
+impl MessageType {
+    pub fn all() -> Vec<Self> {
+        // FIXME: This is a bit brittle, guard it in the future
+        vec![Self::PriceFeed, Self::TwapPrice]
+    }
+}
+
+#[derive(Clone, PartialEq, Debug)]
+pub struct WormholeMerkleState {
+    pub digest_proof: Vec<u8>,
+    pub tree:         Option<Vec<Vec<u8>>>,
+}
+
+#[derive(Clone, PartialEq, Eq, Debug, Hash)]
+pub struct MessageKey {
+    // -> this is the real message id
+    pub id:    MessageIdentifier, // -> this is price feed id
+    pub type_: MessageType,
+}
+
+#[derive(Clone, PartialEq, Eq, Debug, PartialOrd, Ord)]
+pub struct MessageTime {
+    pub publish_time: UnixTimestamp,
+    pub slot:         Slot,
+}
+
+#[derive(Clone, PartialEq, Debug)]
+pub struct ProofSet {
+    pub wormhole_merkle_proof: WormholeMerkleMessageProof,
+}
+
+
+#[derive(Clone, PartialEq, Debug)]
+pub struct MessageState {
+    pub publish_time: UnixTimestamp,
+    pub slot:         Slot,
+    pub id:           MessageIdentifier,
+    pub type_:        MessageType,
+    pub message:      Message,
+    pub raw_message:  RawMessage,
+    pub proof_set:    ProofSet,
+}
+
+impl MessageState {
+    pub fn time(&self) -> MessageTime {
+        MessageTime {
+            publish_time: self.publish_time,
+            slot:         self.slot,
+        }
+    }
+
+    pub fn key(&self) -> MessageKey {
+        MessageKey {
+            id:    self.id,
+            type_: self.type_.clone(),
+        }
+    }
+
+    pub fn new(message: Message, raw_message: RawMessage, proof_set: ProofSet, slot: Slot) -> Self {
+        Self {
+            publish_time: message.publish_time(),
+            slot,
+            id: *message.id(),
+            type_: message.message_type(),
+            message,
+            raw_message,
+            proof_set,
+        }
+    }
+}
+
+pub enum RequestType {
+    All,
+    Some(Vec<MessageType>),
+}
+
+impl From<RequestType> for Vec<MessageType> {
+    fn from(request_type: RequestType) -> Self {
+        match request_type {
+            RequestType::All => MessageType::all(),
+            RequestType::Some(types) => types,
+        }
+    }
+}
+
+pub type Slot = u64;
+pub type UnixTimestamp = i64;
+
+#[derive(Clone, PartialEq, Eq, Debug)]
+pub enum RequestTime {
+    Latest,
+    FirstAfter(UnixTimestamp),
+}
+
+#[derive(Clone, PartialEq, Debug)]
+pub struct AccumulatorMessages {
+    pub slot:     Slot,
+    pub messages: Vec<RawMessage>,
+}
+
+pub enum Update {
+    Vaa(Vec<u8>),
+    AccumulatorMessages(AccumulatorMessages),
+}
+
+#[repr(C)]
+#[derive(Debug, Copy, Clone, PartialEq)]
+pub struct PriceFeedMessage {
+    pub id:                [u8; 32],
+    pub price:             i64,
+    pub conf:              u64,
+    pub exponent:          i32,
+    pub publish_time:      i64,
+    pub prev_publish_time: i64,
+    pub ema_price:         i64,
+    pub ema_conf:          u64,
+}
+
+impl PriceFeedMessage {
+    // The size of the serialized message. Note that this is not the same as the size of the struct
+    // (because of the discriminator & struct padding/alignment).
+    pub const MESSAGE_SIZE: usize = 1 + 32 + 8 + 8 + 4 + 8 + 8 + 8 + 8;
+    pub const DISCRIMINATOR: u8 = 0;
+
+    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
+        if bytes.len() != Self::MESSAGE_SIZE {
+            return Err(anyhow!("Invalid message length"));
+        }
+
+        let mut id = [0u8; 32];
+        id.copy_from_slice(&bytes[1..33]);
+
+        let price = i64::from_be_bytes(bytes[33..41].try_into()?);
+        let conf = u64::from_be_bytes(bytes[41..49].try_into()?);
+        let exponent = i32::from_be_bytes(bytes[49..53].try_into()?);
+        let publish_time = i64::from_be_bytes(bytes[53..61].try_into()?);
+        let prev_publish_time = i64::from_be_bytes(bytes[61..69].try_into()?);
+        let ema_price = i64::from_be_bytes(bytes[69..77].try_into()?);
+        let ema_conf = u64::from_be_bytes(bytes[77..85].try_into()?);
+
+        Ok(Self {
+            id,
+            price,
+            conf,
+            exponent,
+            publish_time,
+            prev_publish_time,
+            ema_price,
+            ema_conf,
+        })
+    }
+}
+
+#[repr(C)]
+#[derive(Debug, Copy, Clone, PartialEq)]
+pub struct TwapMessage {
+    pub id:                [u8; 32],
+    pub cumulative_price:  i128,
+    pub cumulative_conf:   u128,
+    pub num_down_slots:    u64,
+    pub exponent:          i32,
+    pub publish_time:      i64,
+    pub prev_publish_time: i64,
+    pub publish_slot:      u64,
+}
+
+#[allow(dead_code)]
+impl TwapMessage {
+    // The size of the serialized message. Note that this is not the same as the size of the struct
+    // (because of the discriminator & struct padding/alignment).
+    pub const MESSAGE_SIZE: usize = 1 + 32 + 16 + 16 + 8 + 4 + 8 + 8 + 8;
+    pub const DISCRIMINATOR: u8 = 1;
+
+    // FIXME: Use nom or a TLV ser/de library
+    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
+        if bytes.len() != Self::MESSAGE_SIZE {
+            return Err(anyhow!("Invalid message length"));
+        }
+
+        let mut id = [0u8; 32];
+        id.copy_from_slice(&bytes[1..33]);
+
+        let cumulative_price = i128::from_be_bytes(bytes[33..49].try_into()?);
+        let cumulative_conf = u128::from_be_bytes(bytes[49..65].try_into()?);
+        let num_down_slots = u64::from_be_bytes(bytes[65..73].try_into()?);
+        let exponent = i32::from_be_bytes(bytes[73..77].try_into()?);
+        let publish_time = i64::from_be_bytes(bytes[77..85].try_into()?);
+        let prev_publish_time = i64::from_be_bytes(bytes[85..93].try_into()?);
+        let publish_slot = u64::from_be_bytes(bytes[93..101].try_into()?);
+
+        Ok(Self {
+            id,
+            cumulative_price,
+            cumulative_conf,
+            num_down_slots,
+            exponent,
+            publish_time,
+            prev_publish_time,
+            publish_slot,
+        })
+    }
+}
+
+#[derive(Clone, PartialEq, Debug)]
+pub enum Message {
+    PriceFeed(PriceFeedMessage),
+    TwapPrice(TwapMessage),
+}
+
+impl Message {
+    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
+        match bytes[0] {
+            PriceFeedMessage::DISCRIMINATOR => {
+                Ok(Self::PriceFeed(PriceFeedMessage::from_bytes(bytes)?))
+            }
+            TwapMessage::DISCRIMINATOR => Ok(Self::TwapPrice(TwapMessage::from_bytes(bytes)?)),
+            _ => Err(anyhow!("Invalid message discriminator")),
+        }
+    }
+
+    pub fn message_type(&self) -> MessageType {
+        match self {
+            Self::PriceFeed(_) => MessageType::PriceFeed,
+            Self::TwapPrice(_) => MessageType::TwapPrice,
+        }
+    }
+
+    pub fn id(&self) -> &[u8; 32] {
+        match self {
+            Self::PriceFeed(msg) => &msg.id,
+            Self::TwapPrice(msg) => &msg.id,
+        }
+    }
+
+    pub fn publish_time(&self) -> i64 {
+        match self {
+            Self::PriceFeed(msg) => msg.publish_time,
+            Self::TwapPrice(msg) => msg.publish_time,
+        }
+    }
+}
+
+pub struct PriceFeedsWithUpdateData {
+    pub price_feeds:                 Vec<PriceFeedMessage>,
+    pub wormhole_merkle_update_data: Vec<Vec<u8>>,
+}

+ 1 - 0
pythnet/pythnet_sdk/.gitignore

@@ -0,0 +1 @@
+Cargo.lock

この差分においてかなりの量のファイルが変更されているため、一部のファイルを表示していません