Kaynağa Gözat

Add vaa verification

Ali Behjati 2 yıl önce
ebeveyn
işleme
8aeef6e6bd

+ 204 - 0
hermes/Cargo.lock

@@ -532,6 +532,18 @@ dependencies = [
  "typenum",
 ]
 
+[[package]]
+name = "bitvec"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c"
+dependencies = [
+ "funty",
+ "radium",
+ "tap",
+ "wyz",
+]
+
 [[package]]
 name = "blake2"
 version = "0.9.2"
@@ -697,6 +709,12 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "byte-slice-cast"
+version = "1.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c3ac9f8b63eca6fd385229b3675f6cc0dc5c8a5c8a54a59d4f52ffd670d87b0c"
+
 [[package]]
 name = "bytecount"
 version = "0.6.3"
@@ -1550,6 +1568,50 @@ dependencies = [
  "version_check",
 ]
 
+[[package]]
+name = "ethabi"
+version = "18.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7413c5f74cc903ea37386a8965a936cbeb334bd270862fdece542c1b2dcbc898"
+dependencies = [
+ "ethereum-types",
+ "hex",
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "sha3 0.10.8",
+ "thiserror",
+ "uint",
+]
+
+[[package]]
+name = "ethbloom"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c22d4b5885b6aa2fe5e8b9329fb8d232bf739e434e6b87347c63bdd00c120f60"
+dependencies = [
+ "crunchy",
+ "fixed-hash",
+ "impl-rlp",
+ "impl-serde",
+ "tiny-keccak",
+]
+
+[[package]]
+name = "ethereum-types"
+version = "0.14.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "02d215cbf040552efcbe99a38372fe80ab9d00268e20012b79fcd0f073edd8ee"
+dependencies = [
+ "ethbloom",
+ "fixed-hash",
+ "impl-rlp",
+ "impl-serde",
+ "primitive-types",
+ "uint",
+]
+
 [[package]]
 name = "event-listener"
 version = "2.5.3"
@@ -1592,6 +1654,18 @@ version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "835a3dc7d1ec9e75e2b5fb4ba75396837112d2060b03f7d43bc1897c7f7211da"
 
+[[package]]
+name = "fixed-hash"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "835c052cb0c08c1acf6ffd71c022172e18723949c8282f2b9f27efbc51e64534"
+dependencies = [
+ "byteorder",
+ "rand 0.8.5",
+ "rustc-hex",
+ "static_assertions",
+]
+
 [[package]]
 name = "fixedbitset"
 version = "0.4.2"
@@ -1639,6 +1713,12 @@ dependencies = [
  "percent-encoding",
 ]
 
+[[package]]
+name = "funty"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
+
 [[package]]
 name = "futures"
 version = "0.3.28"
@@ -1927,6 +2007,7 @@ dependencies = [
  "derive_builder",
  "derive_more",
  "env_logger 0.10.0",
+ "ethabi",
  "futures",
  "hex",
  "lazy_static",
@@ -2235,6 +2316,44 @@ dependencies = [
  "version_check",
 ]
 
+[[package]]
+name = "impl-codec"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ba6a270039626615617f3f36d15fc827041df3b78c439da2cadfa47455a77f2f"
+dependencies = [
+ "parity-scale-codec",
+]
+
+[[package]]
+name = "impl-rlp"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f28220f89297a075ddc7245cd538076ee98b01f2a9c23a53a4f1105d5a322808"
+dependencies = [
+ "rlp",
+]
+
+[[package]]
+name = "impl-serde"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ebc88fc67028ae3db0c853baa36269d398d5f45b6982f95549ff5def78c935cd"
+dependencies = [
+ "serde",
+]
+
+[[package]]
+name = "impl-trait-for-tuples"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "11d7a9f6330b71fea57921c9b61c47ee6e84f72d394754eff6163ae67e7395eb"
+dependencies = [
+ "proc-macro2 1.0.56",
+ "quote 1.0.27",
+ "syn 1.0.109",
+]
+
 [[package]]
 name = "indexmap"
 version = "1.9.3"
@@ -3645,6 +3764,32 @@ dependencies = [
  "stable_deref_trait",
 ]
 
+[[package]]
+name = "parity-scale-codec"
+version = "3.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ddb756ca205bd108aee3c62c6d3c994e1df84a59b9d6d4a5ea42ee1fd5a9a28"
+dependencies = [
+ "arrayvec 0.7.2",
+ "bitvec",
+ "byte-slice-cast",
+ "impl-trait-for-tuples",
+ "parity-scale-codec-derive",
+ "serde",
+]
+
+[[package]]
+name = "parity-scale-codec-derive"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "86b26a931f824dd4eca30b3e43bb4f31cd5f0d3a403c5f5ff27106b805bfde7b"
+dependencies = [
+ "proc-macro-crate 1.3.1",
+ "proc-macro2 1.0.56",
+ "quote 1.0.27",
+ "syn 1.0.109",
+]
+
 [[package]]
 name = "parity-send-wrapper"
 version = "0.1.0"
@@ -3899,6 +4044,19 @@ version = "0.2.17"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
 
+[[package]]
+name = "primitive-types"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9f3486ccba82358b11a77516035647c34ba167dfa53312630de83b12bd4f3d66"
+dependencies = [
+ "fixed-hash",
+ "impl-codec",
+ "impl-rlp",
+ "impl-serde",
+ "uint",
+]
+
 [[package]]
 name = "proc-macro-crate"
 version = "0.1.5"
@@ -4181,6 +4339,12 @@ dependencies = [
  "proc-macro2 1.0.56",
 ]
 
+[[package]]
+name = "radium"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09"
+
 [[package]]
 name = "rand"
 version = "0.7.3"
@@ -4425,6 +4589,16 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "rlp"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bb919243f34364b6bd2fc10ef797edbfa75f33c252e7998527479c6d6b47e1ec"
+dependencies = [
+ "bytes",
+ "rustc-hex",
+]
+
 [[package]]
 name = "rpassword"
 version = "6.0.1"
@@ -4472,6 +4646,12 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
 
+[[package]]
+name = "rustc-hex"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6"
+
 [[package]]
 name = "rustc_version"
 version = "0.3.3"
@@ -5854,6 +6034,12 @@ version = "0.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
 
+[[package]]
+name = "tap"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
+
 [[package]]
 name = "tempfile"
 version = "3.5.0"
@@ -5968,6 +6154,15 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "tiny-keccak"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
+dependencies = [
+ "crunchy",
+]
+
 [[package]]
 name = "tinyvec"
 version = "1.6.0"
@@ -6896,6 +7091,15 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "wyz"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed"
+dependencies = [
+ "tap",
+]
+
 [[package]]
 name = "x25519-dalek"
 version = "1.2.0"

+ 7 - 0
hermes/Cargo.toml

@@ -68,6 +68,13 @@ wormhole-sdk = { git = "https://github.com/wormhole-foundation/wormhole", tag =
 pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", rev = "7d593d87e07a1e2486e7ca21597d664ee72be1ec", features = ["library"] }
 
 strum = { version = "0.24", features = ["derive"] }
+ethabi = { version = "18.0.0", features = ["serde"] }
 
 [patch.crates-io]
 serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" }
+
+[profile.release]
+panic = 'abort'
+
+[profile.dev]
+panic = 'abort'

+ 1 - 1
hermes/src/api.rs

@@ -32,7 +32,7 @@ impl State {
 ///
 /// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
 /// packages they are based on (tokio & hyper).
-pub async fn spawn(rpc_addr: String, store: Store) -> Result<()> {
+pub async fn spawn(store: Store, rpc_addr: String) -> Result<()> {
     let state = State::new(store);
 
     // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the

+ 11 - 3
hermes/src/main.rs

@@ -36,23 +36,31 @@ async fn init() -> Result<()> {
             // Spawn the P2P layer.
             log::info!("Starting P2P server on {:?}", wh_listen_addrs);
             network::p2p::spawn(
+                store.clone(),
                 wh_network_id.to_string(),
                 wh_bootstrap_addrs,
                 wh_listen_addrs,
-                store.clone(),
             )
             .await?;
 
+            // Spawn the Ethereum guardian set watcher
+            network::ethereum::spawn(
+                store.clone(),
+                "https://rpc.ankr.com/eth".to_owned(),
+                "0x98f3c9e6E3fAce36bAAd05FE09d375Ef1464288B".to_owned(),
+            )
+            .await;
+
             // Spawn the RPC server.
             log::info!("Starting RPC server on {}", api_addr);
 
             // TODO: Add max size to the config
-            api::spawn(api_addr.to_string(), store.clone()).await?;
+            api::spawn(store.clone(), api_addr.to_string()).await?;
 
             // Spawn the Pythnet listener
             // TODO: Exit the thread when it gets spawned
             log::info!("Starting Pythnet listener using {}", pythnet_ws_endpoint);
-            network::pythnet::spawn(pythnet_ws_endpoint, store.clone()).await?;
+            network::pythnet::spawn(store.clone(), pythnet_ws_endpoint).await?;
 
             // Wait on Ctrl+C similar to main.
             tokio::signal::ctrl_c().await?;

+ 1 - 0
hermes/src/network.rs

@@ -1,2 +1,3 @@
+pub mod ethereum;
 pub mod p2p;
 pub mod pythnet;

+ 157 - 0
hermes/src/network/ethereum.rs

@@ -0,0 +1,157 @@
+//! This module connects to Ethereum network to fetch the latest Wormhole Guardian set periodically
+//! and sends the set to the store module for processing and storage. To have maximum security,
+//! protocols should consider running their own Ethereum validator.
+//!
+//! We are not using ethers-rs because it has dependency conflict with our solana dependencies, and
+//! the logic here is quite simple. The ABIs has been taken from the contract on Etherscan.
+//!
+//! TODO: We are polling the Ethereum network for the latest guardian set and this can cause a downtime
+//! until the new guardian set is polled. We can instead subscribe to the contract events to get the
+//! latest guardian set.
+
+use {
+    crate::store::Store,
+    anyhow::{
+        anyhow,
+        Result,
+    },
+    ethabi::Function,
+    reqwest::Client,
+    wormhole_sdk::GuardianAddress,
+};
+
+async fn query(
+    rpc_endpoint: String,
+    contract: String,
+    method: Function,
+    params: Vec<ethabi::Token>,
+) -> Result<Vec<ethabi::Token>> {
+    let client = Client::new();
+
+    let res = client
+        .post(rpc_endpoint.as_str())
+        .json(&serde_json::json!({
+            "method": "eth_call",
+            "params": [
+                {
+                    "to": contract,
+                    "data": "0x".to_owned() + hex::encode(method.encode_input(&params)?).as_str()
+                },
+                "latest"
+            ],
+            "id": 1,
+            "jsonrpc": "2.0"
+        }))
+        .send()
+        .await?;
+
+    let res: serde_json::Value = res.json().await?;
+
+    let res = res
+        .get("result")
+        .ok_or(anyhow!("Invalid RPC Response, 'result' not found"))?
+        .as_str()
+        .ok_or(anyhow!("Invalid result"))?;
+
+    let res = hex::decode(&res[2..]).unwrap();
+    let res = method.decode_output(&res)?;
+
+    Ok(res)
+}
+
+async fn run(store: Store, rpc_endpoint: String, wormhole_contract: String) -> Result<()> {
+    loop {
+        let get_current_index_method = serde_json::from_str::<Function>(
+            r#"{"inputs":[],"name":"getCurrentGuardianSetIndex","outputs":[{"internalType":"uint32","name":"","type":"uint32"}],
+            "stateMutability":"view","type":"function"}"#,
+        )?;
+
+        let current_index = query(
+            rpc_endpoint.clone(),
+            wormhole_contract.clone(),
+            get_current_index_method,
+            vec![],
+        )
+        .await?;
+
+        let current_index = match current_index.as_slice() {
+            &[ethabi::Token::Uint(index)] => Ok(index),
+            _ => Err(anyhow!(
+                "Unexpected tokens {:?}. Expected a single uint",
+                current_index
+            )),
+        }?;
+
+        let get_guardian_set_method = serde_json::from_str::<Function>(
+            r#"{"inputs":[{"internalType":"uint32","name":"index","type":"uint32"}],"name":"getGuardianSet",
+            "outputs":[{"components":[{"internalType":"address[]","name":"keys","type":"address[]"},
+            {"internalType":"uint32","name":"expirationTime","type":"uint32"}],"internalType":"struct Structs.GuardianSet",
+            "name":"","type":"tuple"}],"stateMutability":"view","type":"function"}"#,
+        )?;
+
+        let guardian_set = query(
+            rpc_endpoint.clone(),
+            wormhole_contract.clone(),
+            get_guardian_set_method,
+            vec![ethabi::Token::Uint(current_index)],
+        )
+        .await?;
+
+        let guardian_set = match guardian_set.as_slice() {
+            [ethabi::Token::Tuple(guardian_set)] => Ok(guardian_set.clone()),
+            _ => Err(anyhow!(
+                "Unexpected tokens {:?}. Excepted a single tuple",
+                guardian_set
+            )),
+        }?;
+
+        let guardian_set = match guardian_set.as_slice() {
+            [ethabi::Token::Array(guardian_set), _expiration_time] => Ok(guardian_set.clone()),
+            _ => Err(anyhow!(
+                "Unexpected tokens {:?}. Expected a tuple of (array, uint)",
+                guardian_set
+            )),
+        }?;
+
+        let guardian_set = guardian_set
+            .into_iter()
+            .map(|guardian| match guardian {
+                ethabi::Token::Address(address) => Ok(address),
+                _ => Err(anyhow!(
+                    "Unexpected token {:?}. Expected a single address",
+                    guardian
+                )),
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        log::info!("Guardian set: {:?}", guardian_set);
+
+        store
+            .update_guardian_set(
+                guardian_set
+                    .into_iter()
+                    .map(|address| GuardianAddress(address.0))
+                    .collect(),
+            )
+            .await;
+
+        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
+    }
+}
+
+pub async fn spawn(store: Store, rpc_endpoint: String, wormhole_contract: String) {
+    tokio::spawn(async move {
+        loop {
+            if let Err(e) = run(
+                store.clone(),
+                rpc_endpoint.clone(),
+                wormhole_contract.clone(),
+            )
+            .await
+            {
+                log::error!("Error in ethereum network: {}", e);
+                // TODO: Add a backoff here.
+            }
+        }
+    });
+}

+ 1 - 1
hermes/src/network/p2p.rs

@@ -116,10 +116,10 @@ pub fn bootstrap(
 
 // Spawn's the P2P layer as a separate thread via Go.
 pub async fn spawn(
+    store: Store,
     network_id: String,
     wh_bootstrap_addrs: Vec<Multiaddr>,
     wh_listen_addrs: Vec<Multiaddr>,
-    store: Store,
 ) -> Result<()> {
     bootstrap(network_id, wh_bootstrap_addrs, wh_listen_addrs)?;
 

+ 1 - 1
hermes/src/network/pythnet.rs

@@ -34,7 +34,7 @@ use {
     },
 };
 
-pub async fn spawn(pythnet_ws_endpoint: String, store: Store) -> Result<()> {
+pub async fn spawn(store: Store, pythnet_ws_endpoint: String) -> Result<()> {
     let client = PubsubClient::new(pythnet_ws_endpoint.as_ref()).await?;
 
     let config = RpcProgramAccountsConfig {

+ 34 - 6
hermes/src/store.rs

@@ -22,6 +22,9 @@ use {
         types::{
             MessageState,
             ProofSet,
+        },
+        wormhole::{
+            parse_and_verify_vaa,
             WormholePayload,
         },
     },
@@ -35,14 +38,21 @@ use {
     pyth_sdk::PriceIdentifier,
     std::{
         collections::HashSet,
+        sync::Arc,
         time::Duration,
     },
-    wormhole_sdk::Vaa,
+    tokio::sync::RwLock,
+    wormhole_sdk::{
+        Address,
+        Chain,
+        GuardianAddress,
+    },
 };
 
 pub mod proof;
 pub mod storage;
 pub mod types;
+pub mod wormhole;
 
 #[derive(Clone, PartialEq, Debug, Builder)]
 #[builder(derive(Debug), pattern = "immutable")]
@@ -55,10 +65,13 @@ pub struct AccumulatorState {
 pub struct Store {
     pub storage:               StorageInstance,
     pub pending_accumulations: Cache<Slot, AccumulatorStateBuilder>,
+    pub guardian_set:          Arc<RwLock<Option<Vec<GuardianAddress>>>>,
 }
 
 impl Store {
     pub fn new_with_local_cache(max_size_per_key: usize) -> Self {
+        // TODO: Should we return an Arc<Self>? Although we are currently safe to be cloned without
+        // an Arc but it is easily to miss and cause a bug.
         Self {
             storage:               storage::local_storage::LocalStorage::new_instance(
                 max_size_per_key,
@@ -67,6 +80,7 @@ impl Store {
                 .max_capacity(10_000)
                 .time_to_live(Duration::from_secs(60 * 5))
                 .build(), // FIXME: Make this configurable
+            guardian_set:          Arc::new(RwLock::new(None)),
         }
     }
 
@@ -74,12 +88,22 @@ impl Store {
     pub async fn store_update(&self, update: Update) -> Result<()> {
         let slot = match update {
             Update::Vaa(vaa_bytes) => {
-                let vaa =
-                    serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(&vaa_bytes)?;
-                let payload = WormholePayload::try_from_bytes(vaa.payload, &vaa_bytes)?;
+                let body = parse_and_verify_vaa(self, &vaa_bytes).await;
+                let body = match body {
+                    Ok(body) => body,
+                    Err(err) => {
+                        log::info!("Ignoring invalid VAA: {:?}", err);
+                        return Ok(());
+                    }
+                };
 
-                // FIXME: Validate the VAA
-                // FIXME: Skip similar VAAs
+                if body.emitter_chain != Chain::Pythnet
+                    || body.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS)
+                {
+                    return Ok(()); // Ignore VAA from other emitters
+                }
+
+                let payload = WormholePayload::try_from_bytes(body.payload, &vaa_bytes)?;
 
                 match payload {
                     WormholePayload::Merkle(proof) => {
@@ -165,6 +189,10 @@ impl Store {
         Ok(())
     }
 
+    pub async fn update_guardian_set(&self, guardian_set: Vec<GuardianAddress>) {
+        self.guardian_set.write().await.replace(guardian_set);
+    }
+
     pub fn get_price_feeds_with_update_data(
         &self,
         price_ids: Vec<PriceIdentifier>,

+ 1 - 45
hermes/src/store/types.rs

@@ -1,12 +1,5 @@
 use {
-    super::proof::wormhole_merkle::{
-        WormholeMerkleMessageProof,
-        WormholeMerkleProof,
-    },
-    anyhow::{
-        anyhow,
-        Result,
-    },
+    super::proof::wormhole_merkle::WormholeMerkleMessageProof,
     borsh::BorshDeserialize,
     pyth_oracle::{
         Message,
@@ -16,43 +9,6 @@ use {
     strum::EnumIter,
 };
 
-#[derive(Clone, Debug, PartialEq)]
-pub enum WormholePayload {
-    Merkle(WormholeMerkleProof),
-}
-
-impl WormholePayload {
-    pub fn try_from_bytes(bytes: &[u8], vaa_bytes: &[u8]) -> Result<Self> {
-        if bytes.len() != 37 {
-            return Err(anyhow!("Invalid message length"));
-        }
-
-        // TODO: Use byte string literals for this check
-        let magic = u32::from_be_bytes(bytes[0..4].try_into()?);
-        if magic != 0x41555756u32 {
-            return Err(anyhow!("Invalid magic"));
-        }
-
-        let message_type = u8::from_be_bytes(bytes[4..5].try_into()?);
-
-        if message_type != 0 {
-            return Err(anyhow!("Invalid message type"));
-        }
-
-        let slot = u64::from_be_bytes(bytes[5..13].try_into()?);
-        let ring_size = u32::from_be_bytes(bytes[13..17].try_into()?);
-        let root_digest = bytes[17..37].try_into()?;
-
-
-        Ok(Self::Merkle(WormholeMerkleProof {
-            root: root_digest,
-            slot,
-            ring_size,
-            vaa: vaa_bytes.to_vec(),
-        }))
-    }
-}
-
 
 // TODO: We can use strum on Message enum to derive this.
 #[derive(Clone, Debug, Eq, PartialEq, Hash, EnumIter)]

+ 105 - 0
hermes/src/store/wormhole.rs

@@ -0,0 +1,105 @@
+use {
+    super::{
+        proof::wormhole_merkle::WormholeMerkleProof,
+        Store,
+    },
+    anyhow::{
+        anyhow,
+        Result,
+    },
+    secp256k1::{
+        ecdsa::{
+            RecoverableSignature,
+            RecoveryId,
+        },
+        Message,
+        Secp256k1,
+    },
+    serde_wormhole::RawMessage,
+    wormhole_sdk::{
+        vaa::{
+            Body,
+            Header,
+        },
+        GuardianAddress,
+        Vaa,
+    },
+};
+
+#[derive(Clone, Debug, PartialEq)]
+pub enum WormholePayload {
+    Merkle(WormholeMerkleProof),
+}
+
+impl WormholePayload {
+    pub fn try_from_bytes(bytes: &[u8], vaa_bytes: &[u8]) -> Result<Self> {
+        if bytes.len() != 37 {
+            return Err(anyhow!("Invalid message length"));
+        }
+
+        // TODO: Use byte string literals for this check
+        let magic = u32::from_be_bytes(bytes[0..4].try_into()?);
+        if magic != 0x41555756u32 {
+            return Err(anyhow!("Invalid magic"));
+        }
+
+        let message_type = u8::from_be_bytes(bytes[4..5].try_into()?);
+
+        if message_type != 0 {
+            return Err(anyhow!("Invalid message type"));
+        }
+
+        let slot = u64::from_be_bytes(bytes[5..13].try_into()?);
+        let ring_size = u32::from_be_bytes(bytes[13..17].try_into()?);
+        let root_digest = bytes[17..37].try_into()?;
+
+
+        Ok(Self::Merkle(WormholeMerkleProof {
+            root: root_digest,
+            slot,
+            ring_size,
+            vaa: vaa_bytes.to_vec(),
+        }))
+    }
+}
+
+/// Parses and verifies a VAA to ensure it is signed by the Wormhole guardian set.
+pub async fn parse_and_verify_vaa<'a>(
+    store: &Store,
+    vaa_bytes: &'a [u8],
+) -> Result<Body<&'a RawMessage>> {
+    let vaa = serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(vaa_bytes)?;
+    let (header, body): (Header, Body<&RawMessage>) = vaa.into();
+    let digest = body.digest()?;
+
+    let mut num_correct_signers = 0;
+    for sig in header.signatures.iter() {
+        let signer_id: usize = sig.index.into();
+        let sig = sig.signature;
+
+        let recid = RecoveryId::from_i32(sig[64].into())?;
+
+        // Recover the public key from ecdsa signature from [u8; 65] that has (v, r, s) format
+        let secp = Secp256k1::new();
+        let pubkey = &secp
+            .recover_ecdsa(
+                &Message::from_slice(&digest.secp256k_hash)?,
+                &RecoverableSignature::from_compact(&sig[..64], recid)?,
+            )?
+            .serialize();
+
+        let address = GuardianAddress(pubkey[pubkey.len() - 20..].try_into()?);
+
+        if let Some(guardian_set) = store.guardian_set.read().await.as_ref() {
+            if guardian_set.get(signer_id) == Some(&address) {
+                num_correct_signers += 1;
+            }
+        }
+    }
+
+    if num_correct_signers < header.signatures.len() * 2 / 3 {
+        return Err(anyhow!("Not enough correct signatures"));
+    }
+
+    Ok(body)
+}