Kaynağa Gözat

[hermes] Add basic structure for price store and rpc (#717)


Co-authored-by: Reisen <Reisen@users.noreply.github.com>
Ali Behjati 2 yıl önce
ebeveyn
işleme
1af86140f1

+ 5 - 5
.pre-commit-config.yaml

@@ -58,13 +58,13 @@ repos:
         entry: cargo +nightly clippy --manifest-path ./target_chains/cosmwasm/Cargo.toml --tests --fix --allow-dirty --allow-staged -- -D warnings
         pass_filenames: false
         files: target_chains/cosmwasm
-      # Hooks for price-service/server-rust
-      - id: cargo-fmt-price-service
-        name: Cargo format for Rust Price Service
+      # Hooks for Hermes
+      - id: cargo-fmt-hermes
+        name: Cargo format for Pyth Hermes
         language: "rust"
-        entry: cargo +nightly fmt --manifest-path ./price_service/server-rust/Cargo.toml --all -- --config-path rustfmt.toml
+        entry: cargo +nightly fmt --manifest-path ./hermes/Cargo.toml --all -- --config-path rustfmt.toml
         pass_filenames: false
-        files: price_service/server-rust
+        files: hermes
       # Hooks for accumulator updater contract
       - id: cargo-fmt-accumulator-updater
         name: Cargo format for accumulator updater contract

+ 3 - 0
hermes/.gitignore

@@ -5,3 +5,6 @@
 src/network/p2p.pb.go
 src/network/p2p.proto
 tools/
+
+# Ignore Wormhole cloned repo
+wormhole/

+ 293 - 1
hermes/Cargo.lock

@@ -380,6 +380,28 @@ dependencies = [
  "tower-service",
 ]
 
+[[package]]
+name = "axum-extra"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ea61f9f77592526b73fd14fe0f5938412bda49423f8b9f372ac76a9d6cf0ad2"
+dependencies = [
+ "axum",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "mime",
+ "pin-project-lite 0.2.9",
+ "serde",
+ "serde_html_form",
+ "tokio",
+ "tower",
+ "tower-http",
+ "tower-layer",
+ "tower-service",
+]
+
 [[package]]
 name = "axum-macros"
 version = "0.3.4"
@@ -480,12 +502,68 @@ version = "0.2.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
 
+[[package]]
+name = "borsh"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "15bf3650200d8bffa99015595e10f1fbd17de07abbc25bb067da79e769939bfa"
+dependencies = [
+ "borsh-derive",
+ "hashbrown 0.11.2",
+]
+
+[[package]]
+name = "borsh-derive"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6441c552f230375d18e3cc377677914d2ca2b0d36e52129fe15450a2dce46775"
+dependencies = [
+ "borsh-derive-internal",
+ "borsh-schema-derive-internal",
+ "proc-macro-crate 0.1.5",
+ "proc-macro2",
+ "syn",
+]
+
+[[package]]
+name = "borsh-derive-internal"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5449c28a7b352f2d1e592a8a28bf139bc71afb0764a14f3c02500935d8c44065"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "borsh-schema-derive-internal"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cdbd5696d8bfa21d53d9fe39a714a18538bad11492a42d066dbbc395fb1951c0"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "bs58"
 version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "771fe0050b883fcc3ea2359b1a96bcfbc090b7116eae7c3c512c7a083fdf23d3"
 
+[[package]]
+name = "bstr"
+version = "0.2.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
+dependencies = [
+ "lazy_static",
+ "memchr",
+ "regex-automata",
+]
+
 [[package]]
 name = "bumpalo"
 version = "3.12.0"
@@ -610,6 +688,12 @@ version = "0.9.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "520fbf3c07483f94e3e3ca9d0cfd913d7718ef2483d2cfd91c0d9e91474ab913"
 
+[[package]]
+name = "convert_case"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
+
 [[package]]
 name = "core-foundation"
 version = "0.9.3"
@@ -956,6 +1040,19 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "derive_more"
+version = "0.99.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
+dependencies = [
+ "convert_case",
+ "proc-macro2",
+ "quote",
+ "rustc_version",
+ "syn",
+]
+
 [[package]]
 name = "digest"
 version = "0.9.0"
@@ -993,6 +1090,12 @@ version = "1.0.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c00704156a7de8df8da0911424e30c2049957b0a714542a44e05fe693dd85313"
 
+[[package]]
+name = "dyn-clone"
+version = "1.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68b0cf012f1230e43cd00ebb729c6bb58707ecfa8ad08b52ef3a4ccd2697fc30"
+
 [[package]]
 name = "ecdsa"
 version = "0.14.8"
@@ -1154,6 +1257,15 @@ version = "0.1.17"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "a214f5bb88731d436478f3ae1f8a277b62124089ba9fb67f4f93fb100ef73c90"
 
+[[package]]
+name = "fixed-hash"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c"
+dependencies = [
+ "static_assertions",
+]
+
 [[package]]
 name = "fixedbitset"
 version = "0.4.2"
@@ -1411,6 +1523,15 @@ version = "1.8.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
 
+[[package]]
+name = "hashbrown"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
+dependencies = [
+ "ahash 0.7.6",
+]
+
 [[package]]
 name = "hashbrown"
 version = "0.12.3"
@@ -1459,10 +1580,14 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "axum",
+ "axum-extra",
  "axum-macros",
+ "base64 0.21.0",
+ "borsh",
  "bs58",
  "dashmap",
  "der 0.7.0",
+ "derive_more",
  "env_logger",
  "futures",
  "hex",
@@ -1470,6 +1595,8 @@ dependencies = [
  "libc",
  "libp2p",
  "log",
+ "pyth-sdk 0.7.0",
+ "pyth-wormhole-attester-sdk",
  "rand 0.8.5",
  "reqwest",
  "ring",
@@ -1483,6 +1610,7 @@ dependencies = [
  "structopt",
  "tokio",
  "typescript-type-def",
+ "wormhole-core",
 ]
 
 [[package]]
@@ -1514,6 +1642,15 @@ name = "hex"
 version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
+dependencies = [
+ "serde",
+]
+
+[[package]]
+name = "hex-literal"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0"
 
 [[package]]
 name = "hex_fmt"
@@ -1831,6 +1968,15 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "keccak"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3afef3b6eff9ce9d8ff9b3601125eec7f0c8cbac7abd14f355d053fa56c98768"
+dependencies = [
+ "cpufeatures",
+]
+
 [[package]]
 name = "lazy_static"
 version = "1.4.0"
@@ -2466,7 +2612,7 @@ version = "0.8.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1d6d4752e6230d8ef7adf7bd5d8c4b1f6561c1014c5ba9a37445ccefe18aa1db"
 dependencies = [
- "proc-macro-crate",
+ "proc-macro-crate 1.1.3",
  "proc-macro-error",
  "proc-macro2",
  "quote",
@@ -2972,6 +3118,25 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "primitive-types"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e28720988bff275df1f51b171e1b2a18c30d194c4d2b61defdacecd625a5d94a"
+dependencies = [
+ "fixed-hash",
+ "uint",
+]
+
+[[package]]
+name = "proc-macro-crate"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1d6ea3c4595b96363c13943497db34af4460fb474a95c43f4446ad341b8c9785"
+dependencies = [
+ "toml",
+]
+
 [[package]]
 name = "proc-macro-crate"
 version = "1.1.3"
@@ -3093,6 +3258,41 @@ dependencies = [
  "prost",
 ]
 
+[[package]]
+name = "pyth-sdk"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f5c805ba3dfb5b7ed6a8ffa62ec38391f485a79c7cf6b3b11d3bd44fb0325824"
+dependencies = [
+ "borsh",
+ "borsh-derive",
+ "hex",
+ "schemars",
+ "serde",
+]
+
+[[package]]
+name = "pyth-sdk"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "00bf2540203ca3c7a5712fdb8b5897534b7f6a0b6e7b0923ff00466c5f9efcb3"
+dependencies = [
+ "borsh",
+ "borsh-derive",
+ "hex",
+ "schemars",
+ "serde",
+]
+
+[[package]]
+name = "pyth-wormhole-attester-sdk"
+version = "0.1.2"
+dependencies = [
+ "hex",
+ "pyth-sdk 0.5.0",
+ "serde",
+]
+
 [[package]]
 name = "quick-error"
 version = "1.2.3"
@@ -3275,6 +3475,12 @@ dependencies = [
  "regex-syntax",
 ]
 
+[[package]]
+name = "regex-automata"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
+
 [[package]]
 name = "regex-syntax"
 version = "0.6.28"
@@ -3512,6 +3718,30 @@ dependencies = [
  "windows-sys 0.42.0",
 ]
 
+[[package]]
+name = "schemars"
+version = "0.8.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "02c613288622e5f0c3fdc5dbd4db1c5fbe752746b1d1a56a0630b78fd00de44f"
+dependencies = [
+ "dyn-clone",
+ "schemars_derive",
+ "serde",
+ "serde_json",
+]
+
+[[package]]
+name = "schemars_derive"
+version = "0.8.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "109da1e6b197438deb6db99952990c7f959572794b80ff93707d55a232545e7c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "serde_derive_internals",
+ "syn",
+]
+
 [[package]]
 name = "scopeguard"
 version = "1.1.0"
@@ -3652,6 +3882,30 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "serde_derive_internals"
+version = "0.26.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "serde_html_form"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "53192e38d5c88564b924dbe9b60865ecbb71b81d38c4e61c817cffd3e36ef696"
+dependencies = [
+ "form_urlencoded",
+ "indexmap",
+ "itoa",
+ "ryu",
+ "serde",
+]
+
 [[package]]
 name = "serde_json"
 version = "1.0.93"
@@ -3742,6 +3996,16 @@ dependencies = [
  "sha2 0.9.9",
 ]
 
+[[package]]
+name = "sha3"
+version = "0.10.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bdf0c33fae925bdc080598b84bc15c55e7b9a4a43b3c704da051f977469691c9"
+dependencies = [
+ "digest 0.10.6",
+ "keccak",
+]
+
 [[package]]
 name = "signal-hook-registry"
 version = "1.4.1"
@@ -4341,6 +4605,18 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "uint"
+version = "0.9.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "76f64bba2c53b04fcab63c01a7d7427eadc821e3bc48c34dc9ba29c501164b52"
+dependencies = [
+ "byteorder",
+ "crunchy",
+ "hex",
+ "static_assertions",
+]
+
 [[package]]
 name = "unicode-bidi"
 version = "0.3.10"
@@ -5025,6 +5301,22 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "wormhole-core"
+version = "0.1.0"
+source = "git+https://github.com/guibescos/wormhole?branch=reisen/sdk-solana#61bb2fb691a8df0aa0e42a21632e43b392ffa90f"
+dependencies = [
+ "borsh",
+ "bstr",
+ "byteorder",
+ "hex",
+ "hex-literal",
+ "nom",
+ "primitive-types",
+ "sha3",
+ "thiserror",
+]
+
 [[package]]
 name = "x25519-dalek"
 version = "1.1.1"

+ 12 - 1
hermes/Cargo.toml

@@ -5,8 +5,10 @@ edition                        = "2021"
 
 [dependencies]
 axum                           = { version = "0.6.9", features = ["json", "ws"] }
+axum-extra                     = { version = "0.7.2", features = ["query"] }
 axum-macros                    = { version = "0.3.4" }
 anyhow                         = { version = "1.0.69" }
+borsh                          = { version = "0.9.0" }
 bs58                           = { version = "0.4.0" }
 dashmap                        = { version = "5.4.0" }
 der                            = { version = "0.7.0" }
@@ -19,17 +21,24 @@ ring                           = { version = "0.16.20" }
 rusqlite                       = { version = "0.28.0", features = ["bundled"] }
 lazy_static                    = { version = "1.4.0" }
 libc                           = { version = "0.2.140" }
+pyth-sdk                       = { version = "0.7.0" }
 secp256k1                      = { version = "0.26.0", features = ["rand", "recovery", "serde"] }
 serde                          = { version = "1.0.152", features = ["derive"] }
 serde_arrays                   = { version = "0.1.0" }
 serde_cbor                     = { version = "0.11.2" }
-serde_json                     = { version = "1.0.93" }
+serde_json                      = { version = "1.0.93" }
 sha256                         = { version = "1.1.2" }
 structopt                      = { version = "0.3.26" }
 tokio                          = { version = "1.26.0", features = ["full"] }
 typescript-type-def            = { version = "0.5.5" }
 log                            = { version = "0.4.17" }
 
+# Parse Wormhole VAAs from our own patch. TODO: Replace with released version when wormhole releases it
+wormhole-core                  = { git = "https://github.com/guibescos/wormhole", branch = "reisen/sdk-solana"}
+
+# Parse Wormhole attester price attestations.
+pyth-wormhole-attester-sdk     = { path = "../wormhole_attester/sdk/rust/", version = "0.1.2" }
+
 # Setup LibP2P. Unfortunately the dependencies required by libp2p are shared
 # with the dependencies required by solana's geyser plugin. This means that we
 # would have to use the same version of libp2p as solana. Luckily we don't need
@@ -49,3 +58,5 @@ libp2p                         = { version = "0.51.1", features = [
     "websocket",
     "yamux",
 ]}
+base64 = "0.21.0"
+derive_more = "0.99.17"

+ 1 - 1
hermes/build.rs

@@ -72,7 +72,7 @@ fn main() {
     println!("cargo:rustc-link-search=native={out_var}");
     println!("cargo:rustc-link-lib=static=pythnet");
 
-    #[cfg(target_os = "aarch64")]
+    #[cfg(target_arch = "aarch64")]
     println!("cargo:rustc-link-lib=resolv");
 
     let status = cmd.status().unwrap();

+ 5 - 1
hermes/src/main.rs

@@ -1,6 +1,7 @@
 #![feature(never_type)]
 
 use {
+    crate::store::Store,
     anyhow::Result,
     futures::{
         channel::mpsc::Receiver,
@@ -16,6 +17,7 @@ use {
 
 mod config;
 mod network;
+mod store;
 
 /// A Wormhole VAA is an array of bytes. TODO: Decoding.
 #[derive(Debug, Clone, Eq, Hash, PartialEq, serde::Serialize, serde::Deserialize)]
@@ -63,7 +65,9 @@ async fn init(_update_channel: Receiver<AccountUpdate>) -> Result<()> {
 
             // Spawn the RPC server.
             log::info!("Starting RPC server on {}", rpc_addr);
-            network::rpc::spawn(rpc_addr.to_string()).await?;
+
+            // TODO: Add max size to the config
+            network::rpc::spawn(rpc_addr.to_string(), Store::new_with_local_cache(1000)).await?;
 
             // Wait on Ctrl+C similar to main.
             tokio::signal::ctrl_c().await?;

+ 1 - 0
hermes/src/network/p2p.rs

@@ -37,6 +37,7 @@ pub type Observation = Vec<u8>;
 // A Static Channel to pipe the `Observation` from the callback into the local Rust handler for
 // observation messages. It has to be static for now because there's no way to capture state in
 // the callback passed into Go-land.
+// TODO: Move this channel to the module level that spawns the services
 lazy_static::lazy_static! {
     pub static ref OBSERVATIONS: (
         Mutex<Sender<Observation>>,

+ 14 - 68
hermes/src/network/rpc.rs

@@ -1,79 +1,28 @@
 use {
     crate::{
         network::p2p::OBSERVATIONS,
-        Vaa,
+        store::{
+            Store,
+            Update,
+        },
     },
     anyhow::Result,
     axum::{
         routing::get,
         Router,
     },
-    dashmap::DashMap,
-    std::sync::Arc,
 };
 
 mod rest;
 
-#[derive(Clone, Default)]
-pub struct VaaCache(Arc<DashMap<String, Vec<(i64, Vaa)>>>);
-
-impl VaaCache {
-    /// Add a VAA to the cache. Keeps the cache sorted by timestamp.
-    fn add(&mut self, key: String, timestamp: i64, vaa: Vaa) -> Result<()> {
-        self.remove_expired()?;
-        let mut entry = self.0.entry(key).or_default();
-        let key = entry
-            .binary_search_by(|(t, _)| t.cmp(&timestamp))
-            .unwrap_or_else(|e| e);
-        entry.insert(key, (timestamp, vaa));
-        Ok(())
-    }
-
-    /// Remove expired VAA's from the cache.
-    fn remove_expired(&mut self) -> Result<()> {
-        let now = std::time::SystemTime::now()
-            .duration_since(std::time::UNIX_EPOCH)?
-            .as_secs() as i64;
-
-        // Scan for items older than now, remove, if the result is empty remove the key altogether.
-        for mut item in self.0.iter_mut() {
-            let (key, vaas) = item.pair_mut();
-            vaas.retain(|(t, _)| t > &now);
-            if vaas.is_empty() {
-                self.0.remove(key);
-            }
-        }
-
-        Ok(())
-    }
-
-    /// For a given set of Price IDs, return the latest VAA for each Price ID.
-    fn latest_for_ids(&self, ids: Vec<String>) -> Vec<(String, Vaa)> {
-        self.0
-            .iter()
-            .filter_map(|item| {
-                if !ids.contains(item.key()) {
-                    return None;
-                }
-
-                let (_, latest_vaa) = item.value().last()?;
-                Some((item.key().clone(), latest_vaa.clone()))
-            })
-            .collect()
-    }
-}
-
 #[derive(Clone)]
 pub struct State {
-    /// A Cache of VAA's that have been fetched from the Wormhole RPC.
-    pub vaa_cache: VaaCache,
+    pub store: Store,
 }
 
 impl State {
-    fn new() -> Self {
-        Self {
-            vaa_cache: VaaCache::default(),
-        }
+    pub fn new(store: Store) -> Self {
+        Self { store }
     }
 }
 
@@ -81,8 +30,8 @@ impl State {
 ///
 /// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
 /// packages they are based on (tokio & hyper).
-pub async fn spawn(rpc_addr: String) -> Result<()> {
-    let mut cfg = State::new();
+pub async fn spawn(rpc_addr: String, store: Store) -> Result<()> {
+    let state = State::new(store);
 
     // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
     // `with_state` method which replaces `Body` with `State` in the type signature.
@@ -90,20 +39,17 @@ pub async fn spawn(rpc_addr: String) -> Result<()> {
     let app = app
         .route("/", get(rest::index))
         .route("/live", get(rest::live))
+        .route("/latest_price_feeds", get(rest::latest_price_feeds))
         .route("/latest_vaas", get(rest::latest_vaas))
-        .with_state(cfg.clone());
+        .with_state(state.clone());
 
     // Listen in the background for new VAA's from the Wormhole RPC.
     tokio::spawn(async move {
         loop {
             if let Ok(observation) = OBSERVATIONS.1.lock().unwrap().recv() {
-                let vaa = Vaa { data: observation };
-
-                // Add the VAA to the cache.
-                //
-                // TODO: We haven't deserialized the VAA yet, so we don't know the Price ID. We
-                // should this but for this PR we just use a placeholder.
-                cfg.vaa_cache.add("UnknownID".to_string(), 0, vaa).unwrap();
+                if let Err(e) = state.store.store_update(Update::Vaa(observation)) {
+                    log::error!("Failed to process VAA: {:?}", e);
+                }
             }
         }
     });

+ 86 - 11
hermes/src/network/rpc/rest.rs

@@ -1,35 +1,110 @@
+use {
+    crate::store::RequestTime,
+    base64::{
+        engine::general_purpose::STANDARD as base64_standard_engine,
+        Engine as _,
+    },
+    pyth_sdk::{
+        PriceFeed,
+        PriceIdentifier,
+    },
+};
 // This file implements a REST service for the Price Service. This is a mostly direct copy of the
 // TypeScript implementation in the `pyth-crosschain` repo. It uses `axum` as the web framework and
 // `tokio` as the async runtime.
 use {
     anyhow::Result,
     axum::{
-        extract::{
-            Query,
-            State,
+        extract::State,
+        http::StatusCode,
+        response::{
+            IntoResponse,
+            Response,
         },
-        response::IntoResponse,
         Json,
     },
+    axum_extra::extract::Query, // Axum extra Query allows us to parse multi-value query parameters.
 };
 
+pub enum RestError {
+    InvalidPriceId,
+    UpdateDataNotFound,
+}
+
+impl IntoResponse for RestError {
+    fn into_response(self) -> Response {
+        match self {
+            RestError::InvalidPriceId => {
+                (StatusCode::BAD_REQUEST, "Invalid Price Id").into_response()
+            }
+            RestError::UpdateDataNotFound => {
+                (StatusCode::NOT_FOUND, "Update data not found").into_response()
+            }
+        }
+    }
+}
+
 #[derive(Debug, serde::Serialize, serde::Deserialize)]
 pub struct LatestVaaQueryParams {
     ids: Vec<String>,
 }
 
-/// REST endpoint /latest_price_feeds?ids[]=...&ids[]=...&ids[]=...
-/// TODO: Replace Infallible with an actual error return type instead of unwrap() crashing the RPC.
+/// REST endpoint /latest_vaas?ids[]=...&ids[]=...&ids[]=...
+///
+/// TODO: This endpoint returns update data as an array of base64 encoded strings. We want
+/// to support other formats such as hex in the future.
 pub async fn latest_vaas(
     State(state): State<super::State>,
     Query(params): Query<LatestVaaQueryParams>,
-) -> Result<impl IntoResponse, std::convert::Infallible> {
-    Ok(Json(state.vaa_cache.latest_for_ids(params.ids)))
+) -> Result<Json<Vec<String>>, RestError> {
+    // TODO: Find better ways to validate query parameters.
+    // FIXME: Handle ids with leading 0x
+    let price_ids: Vec<PriceIdentifier> = params
+        .ids
+        .iter()
+        .map(PriceIdentifier::from_hex)
+        .collect::<Result<Vec<PriceIdentifier>, _>>()
+        .map_err(|_| RestError::InvalidPriceId)?;
+    let price_feeds_with_update_data = state
+        .store
+        .get_price_feeds_with_update_data(price_ids, RequestTime::Latest)
+        .map_err(|_| RestError::UpdateDataNotFound)?;
+    Ok(Json(
+        price_feeds_with_update_data
+            .update_data
+            .batch_vaa
+            .iter()
+            .map(|vaa_bytes| base64_standard_engine.encode(vaa_bytes))
+            .collect(),
+    ))
 }
 
 #[derive(Debug, serde::Serialize, serde::Deserialize)]
-pub struct LastAccsQueryParams {
-    id: String,
+pub struct LatestPriceFeedParams {
+    ids: Vec<String>,
+}
+
+/// REST endpoint /latest_vaas?ids[]=...&ids[]=...&ids[]=...
+pub async fn latest_price_feeds(
+    State(state): State<super::State>,
+    Query(params): Query<LatestPriceFeedParams>,
+) -> Result<Json<Vec<PriceFeed>>, RestError> {
+    let price_ids: Vec<PriceIdentifier> = params
+        .ids
+        .iter()
+        .map(PriceIdentifier::from_hex)
+        .collect::<Result<Vec<PriceIdentifier>, _>>()
+        .map_err(|_| RestError::InvalidPriceId)?;
+    let price_feeds_with_update_data = state
+        .store
+        .get_price_feeds_with_update_data(price_ids, RequestTime::Latest)
+        .map_err(|_| RestError::UpdateDataNotFound)?;
+    Ok(Json(
+        price_feeds_with_update_data
+            .price_feeds
+            .into_values()
+            .collect(),
+    ))
 }
 
 // This function implements the `/live` endpoint. It returns a `200` status code. This endpoint is
@@ -41,5 +116,5 @@ pub async fn live() -> Result<impl IntoResponse, std::convert::Infallible> {
 // This is the index page for the REST service. It will list all the available endpoints.
 // TODO: Dynamically generate this list if possible.
 pub async fn index() -> impl IntoResponse {
-    Json(["/live", "/latest_price_feeds"])
+    Json(["/live", "/latest_price_feeds", "/latest_vaas"])
 }

+ 84 - 0
hermes/src/store.rs

@@ -0,0 +1,84 @@
+use {
+    self::storage::Storage,
+    anyhow::Result,
+    pyth_sdk::{
+        PriceFeed,
+        PriceIdentifier,
+    },
+    serde::{
+        Deserialize,
+        Serialize,
+    },
+    std::{
+        collections::HashMap,
+        sync::Arc,
+    },
+};
+
+mod proof;
+mod storage;
+
+pub type UnixTimestamp = u64;
+
+#[derive(Clone, PartialEq, Eq, Debug)]
+pub enum RequestTime {
+    Latest,
+    FirstAfter(UnixTimestamp),
+}
+
+pub enum Update {
+    Vaa(Vec<u8>),
+}
+
+#[derive(Clone, Default, Serialize, Deserialize)]
+pub struct UpdateData {
+    pub batch_vaa: Vec<Vec<u8>>,
+}
+
+// TODO: A price feed might not have update data in all different
+// formats. For example, Batch VAA and Merkle updates will result
+// in different price feeds. We need to figure out how to handle
+// it properly.
+#[derive(Clone, Default)]
+pub struct PriceFeedsWithUpdateData {
+    pub price_feeds: HashMap<PriceIdentifier, PriceFeed>,
+    pub update_data: UpdateData,
+}
+
+pub type State = Arc<Box<dyn Storage>>;
+
+#[derive(Clone)]
+pub struct Store {
+    pub state: State,
+}
+
+impl Store {
+    pub fn new_with_local_cache(max_size_per_key: usize) -> Self {
+        Self {
+            state: Arc::new(Box::new(storage::local_cache::LocalCache::new(
+                max_size_per_key,
+            ))),
+        }
+    }
+
+    // TODO: This should return the updated feeds so the subscribers can be notified.
+    pub fn store_update(&self, update: Update) -> Result<()> {
+        match update {
+            Update::Vaa(vaa_bytes) => {
+                proof::batch_vaa::store_vaa_update(self.state.clone(), vaa_bytes)
+            }
+        }
+    }
+
+    pub fn get_price_feeds_with_update_data(
+        &self,
+        price_ids: Vec<PriceIdentifier>,
+        request_time: RequestTime,
+    ) -> Result<PriceFeedsWithUpdateData> {
+        proof::batch_vaa::get_price_feeds_with_update_data(
+            self.state.clone(),
+            price_ids,
+            request_time,
+        )
+    }
+}

+ 1 - 0
hermes/src/store/proof.rs

@@ -0,0 +1 @@
+pub mod batch_vaa;

+ 139 - 0
hermes/src/store/proof/batch_vaa.rs

@@ -0,0 +1,139 @@
+use {
+    crate::store::{
+        storage::{
+            Key,
+            StorageData,
+        },
+        PriceFeedsWithUpdateData,
+        RequestTime,
+        State,
+        UnixTimestamp,
+        UpdateData,
+    },
+    anyhow::{
+        anyhow,
+        Result,
+    },
+    pyth_sdk::{
+        Price,
+        PriceFeed,
+        PriceIdentifier,
+    },
+    pyth_wormhole_attester_sdk::{
+        BatchPriceAttestation,
+        PriceAttestation,
+        PriceStatus,
+    },
+    std::collections::{
+        HashMap,
+        HashSet,
+    },
+    wormhole::VAA,
+};
+
+// TODO: We need to add more metadata to this struct.
+#[derive(Clone, Default, PartialEq, Debug)]
+pub struct PriceInfo {
+    pub price_feed:   PriceFeed,
+    pub vaa_bytes:    Vec<u8>,
+    pub publish_time: UnixTimestamp,
+}
+
+
+pub fn store_vaa_update(state: State, vaa_bytes: Vec<u8>) -> Result<()> {
+    // FIXME: Vaa bytes might not be a valid Pyth BatchUpdate message nor originate from Our emitter.
+    // We should check that.
+    let vaa = VAA::from_bytes(&vaa_bytes)?;
+    let batch_price_attestation = BatchPriceAttestation::deserialize(vaa.payload.as_slice())
+        .map_err(|_| anyhow!("Failed to deserialize VAA"))?;
+
+    for price_attestation in batch_price_attestation.price_attestations {
+        let price_feed = price_attestation_to_price_feed(price_attestation);
+
+        let publish_time = price_feed.get_price_unchecked().publish_time.try_into()?;
+
+        let price_info = PriceInfo {
+            price_feed,
+            vaa_bytes: vaa_bytes.clone(),
+            publish_time,
+        };
+
+        let key = Key::new(price_feed.id.to_bytes().to_vec());
+        state.insert(key, publish_time, StorageData::BatchVaa(price_info))?;
+    }
+    Ok(())
+}
+
+
+pub fn get_price_feeds_with_update_data(
+    state: State,
+    price_ids: Vec<PriceIdentifier>,
+    request_time: RequestTime,
+) -> Result<PriceFeedsWithUpdateData> {
+    let mut price_feeds = HashMap::new();
+    let mut vaas: HashSet<Vec<u8>> = HashSet::new();
+    for price_id in price_ids {
+        let key = Key::new(price_id.to_bytes().to_vec());
+        let maybe_data = state.get(key, request_time.clone())?;
+
+        match maybe_data {
+            Some(StorageData::BatchVaa(price_info)) => {
+                price_feeds.insert(price_info.price_feed.id, price_info.price_feed);
+                vaas.insert(price_info.vaa_bytes);
+            }
+            None => {
+                log::info!("No price feed found for price id: {:?}", price_id);
+                return Err(anyhow!("No price feed found for price id: {:?}", price_id));
+            }
+        }
+    }
+    let update_data = UpdateData {
+        batch_vaa: vaas.into_iter().collect(),
+    };
+    Ok(PriceFeedsWithUpdateData {
+        price_feeds,
+        update_data,
+    })
+}
+
+
+/// Convert a PriceAttestation to a PriceFeed.
+///
+/// We cannot implmenet this function as From/Into trait because none of these types are defined in this crate.
+/// Ideally we need to move this method to the wormhole_attester sdk crate or have our own implementation of PriceFeed.
+pub fn price_attestation_to_price_feed(price_attestation: PriceAttestation) -> PriceFeed {
+    if price_attestation.status == PriceStatus::Trading {
+        PriceFeed::new(
+            // This conversion is done because the identifier on the wormhole_attester uses sdk v0.5.0 and this crate uses 0.7.0
+            PriceIdentifier::new(price_attestation.price_id.to_bytes()),
+            Price {
+                price:        price_attestation.price,
+                conf:         price_attestation.conf,
+                publish_time: price_attestation.publish_time,
+                expo:         price_attestation.expo,
+            },
+            Price {
+                price:        price_attestation.ema_price,
+                conf:         price_attestation.ema_conf,
+                publish_time: price_attestation.publish_time,
+                expo:         price_attestation.expo,
+            },
+        )
+    } else {
+        PriceFeed::new(
+            PriceIdentifier::new(price_attestation.price_id.to_bytes()),
+            Price {
+                price:        price_attestation.prev_price,
+                conf:         price_attestation.prev_conf,
+                publish_time: price_attestation.prev_publish_time,
+                expo:         price_attestation.expo,
+            },
+            Price {
+                price:        price_attestation.ema_price,
+                conf:         price_attestation.ema_conf,
+                publish_time: price_attestation.prev_publish_time,
+                expo:         price_attestation.expo,
+            },
+        )
+    }
+}

+ 40 - 0
hermes/src/store/storage.rs

@@ -0,0 +1,40 @@
+use {
+    super::{
+        proof::batch_vaa::PriceInfo,
+        RequestTime,
+        UnixTimestamp,
+    },
+    anyhow::Result,
+    derive_more::{
+        Deref,
+        DerefMut,
+    },
+};
+
+pub mod local_cache;
+
+#[derive(Clone, PartialEq, Debug)]
+pub enum StorageData {
+    BatchVaa(PriceInfo),
+}
+
+#[derive(Clone, PartialEq, Eq, Debug, Hash, Deref, DerefMut)]
+pub struct Key(Vec<u8>);
+
+impl Key {
+    pub fn new(key: Vec<u8>) -> Self {
+        Self(key)
+    }
+}
+
+/// This trait defines the interface for update data storage
+///
+/// Price update data for Pyth can come in multiple formats, for example VAA's and
+/// Merkle proofs. The abstraction therefore allows storing these as binary
+/// data to abstract the details of the update data, and so each update data is stored
+/// under a separate key. The caller is responsible for specifying the right
+/// key for the update data they wish to access.
+pub trait Storage: Sync + Send {
+    fn insert(&self, key: Key, time: UnixTimestamp, value: StorageData) -> Result<()>;
+    fn get(&self, key: Key, request_time: RequestTime) -> Result<Option<StorageData>>;
+}

+ 102 - 0
hermes/src/store/storage/local_cache.rs

@@ -0,0 +1,102 @@
+use {
+    super::{
+        super::RequestTime,
+        Key,
+        Storage,
+        StorageData,
+        UnixTimestamp,
+    },
+    anyhow::Result,
+    dashmap::DashMap,
+    std::{
+        collections::VecDeque,
+        sync::Arc,
+    },
+};
+
+#[derive(Clone, PartialEq, Debug)]
+pub struct Record {
+    pub time:  UnixTimestamp,
+    pub value: StorageData,
+}
+
+#[derive(Clone)]
+pub struct LocalCache {
+    cache:            Arc<DashMap<Key, VecDeque<Record>>>,
+    max_size_per_key: usize,
+}
+
+impl LocalCache {
+    pub fn new(max_size_per_key: usize) -> Self {
+        Self {
+            cache: Arc::new(DashMap::new()),
+            max_size_per_key,
+        }
+    }
+}
+
+impl Storage for LocalCache {
+    /// Add a new db entry to the cache.
+    ///
+    /// This method keeps the backed store sorted for efficiency, and removes
+    /// the oldest record in the cache if the max_size is reached. Entries are
+    /// usually added in increasing order and likely to be inserted near the
+    /// end of the deque. The function is optimized for this specific case.
+    fn insert(&self, key: Key, time: UnixTimestamp, value: StorageData) -> Result<()> {
+        let mut key_cache = self.cache.entry(key).or_insert_with(VecDeque::new);
+
+        let record = Record { time, value };
+
+        key_cache.push_back(record);
+
+        // Shift the pushed record until it's in the right place.
+        let mut i = key_cache.len() - 1;
+        while i > 0 && key_cache[i - 1].time > key_cache[i].time {
+            key_cache.swap(i - 1, i);
+            i -= 1;
+        }
+
+        // Remove the oldest record if the max size is reached.
+        if key_cache.len() > self.max_size_per_key {
+            key_cache.pop_front();
+        }
+
+        Ok(())
+    }
+
+    fn get(&self, key: Key, request_time: RequestTime) -> Result<Option<StorageData>> {
+        match self.cache.get(&key) {
+            Some(key_cache) => {
+                let record = match request_time {
+                    RequestTime::Latest => key_cache.back().cloned(),
+                    RequestTime::FirstAfter(time) => {
+                        // If the requested time is before the first element in the vector, we are
+                        // not sure that the first element is the closest one.
+                        if let Some(oldest_record) = key_cache.front() {
+                            if time < oldest_record.time {
+                                return Ok(None);
+                            }
+                        }
+
+                        // Binary search returns Ok(idx) if the element is found at index idx or Err(idx) if it's not
+                        // found which idx is the index where the element should be inserted to keep the vector sorted.
+                        // Getting idx within any of the match arms will give us the index of the element that is
+                        // closest after or equal to the requested time.
+                        let idx = match key_cache.binary_search_by_key(&time, |record| record.time)
+                        {
+                            Ok(idx) => idx,
+                            Err(idx) => idx,
+                        };
+
+                        // We are using `get` to handle out of bound idx. This happens if the
+                        // requested time is after the last element in the vector.
+                        key_cache.get(idx).cloned()
+                    }
+                };
+
+                Ok(record.map(|record| record.value))
+            }
+            None => Ok(None),
+        }
+    }
+}