Explorar o código

refactor(hermes): state->wormhole/metrics downcasting (#1535)

* refactor(hermes): state->wormhole/metrics downcasting
* fix(hermes): work around clippy auto-fix bug
Reisen hai 1 ano
pai
achega
b6f5bf14d4

+ 2 - 2
apps/hermes/src/api.rs

@@ -27,7 +27,7 @@ mod ws;
 pub struct ApiState<S = State> {
     pub state:   Arc<S>,
     pub ws:      Arc<ws::WsState>,
-    pub metrics: Arc<metrics_middleware::Metrics>,
+    pub metrics: Arc<metrics_middleware::ApiMetrics>,
 }
 
 /// Manually implement `Clone` as the derive macro will try and slap `Clone` on
@@ -49,7 +49,7 @@ impl ApiState<State> {
         requester_ip_header_name: String,
     ) -> Self {
         Self {
-            metrics: Arc::new(metrics_middleware::Metrics::new(state.clone())),
+            metrics: Arc::new(metrics_middleware::ApiMetrics::new(state.clone())),
             ws: Arc::new(ws::WsState::new(
                 ws_whitelist,
                 requester_ip_header_name,

+ 25 - 16
apps/hermes/src/api/metrics_middleware.rs

@@ -1,6 +1,6 @@
 use {
     super::ApiState,
-    crate::state::State as AppState,
+    crate::state::metrics::Metrics,
     axum::{
         extract::{
             MatchedPath,
@@ -22,13 +22,19 @@ use {
     tokio::time::Instant,
 };
 
-pub struct Metrics {
+pub struct ApiMetrics {
     pub requests:  Family<Labels, Counter>,
     pub latencies: Family<Labels, Histogram>,
 }
 
-impl Metrics {
-    pub fn new(state: Arc<AppState>) -> Self {
+impl ApiMetrics {
+    pub fn new<S>(state: Arc<S>) -> Self
+    where
+        S: Metrics,
+        S: Send,
+        S: Sync,
+        S: 'static,
+    {
         let new = Self {
             requests:  Family::default(),
             latencies: Family::new_with_constructor(|| {
@@ -46,15 +52,21 @@ impl Metrics {
             let latencies = new.latencies.clone();
 
             tokio::spawn(async move {
-                let mut metrics_registry = state.metrics_registry.write().await;
-
-                metrics_registry.register("api_requests", "Total number of API requests", requests);
+                Metrics::register(
+                    &*state,
+                    ("api_requests", "Total number of API requests", requests),
+                )
+                .await;
 
-                metrics_registry.register(
-                    "api_request_latency_seconds",
-                    "API request latency in seconds",
-                    latencies,
-                );
+                Metrics::register(
+                    &*state,
+                    (
+                        "api_request_latency_seconds",
+                        "API request latency in seconds",
+                        latencies,
+                    ),
+                )
+                .await;
             });
         }
 
@@ -80,13 +92,11 @@ pub async fn track_metrics<B>(
     } else {
         req.uri().path().to_owned()
     };
-    let method = req.method().clone();
 
+    let method = req.method().clone();
     let response = next.run(req).await;
-
     let latency = start.elapsed().as_secs_f64();
     let status = response.status().as_u16();
-
     let labels = Labels {
         method: method.to_string(),
         path,
@@ -94,7 +104,6 @@ pub async fn track_metrics<B>(
     };
 
     api_state.metrics.requests.get_or_create(&labels).inc();
-
     api_state
         .metrics
         .latencies

+ 1 - 1
apps/hermes/src/api/rest/v2/sse.rs

@@ -128,7 +128,7 @@ where
                     {
                         Ok(Some(update)) => Ok(Event::default()
                             .json_data(update)
-                            .unwrap_or_else(|e| error_event(e))),
+                            .unwrap_or_else(error_event)),
                         Ok(None) => Ok(Event::default().comment("No update available")),
                         Err(e) => Ok(error_event(e)),
                     }

+ 22 - 11
apps/hermes/src/api/ws.rs

@@ -12,6 +12,7 @@ use {
             AggregationEvent,
             RequestTime,
         },
+        metrics::Metrics,
         State,
     },
     anyhow::{
@@ -115,12 +116,18 @@ pub struct Labels {
     pub status:      Status,
 }
 
-pub struct Metrics {
+pub struct WsMetrics {
     pub interactions: Family<Labels, Counter>,
 }
 
-impl Metrics {
-    pub fn new(state: Arc<State>) -> Self {
+impl WsMetrics {
+    pub fn new<S>(state: Arc<S>) -> Self
+    where
+        S: Metrics,
+        S: Send,
+        S: Sync,
+        S: 'static,
+    {
         let new = Self {
             interactions: Family::default(),
         };
@@ -129,11 +136,15 @@ impl Metrics {
             let interactions = new.interactions.clone();
 
             tokio::spawn(async move {
-                state.metrics_registry.write().await.register(
-                    "ws_interactions",
-                    "Total number of websocket interactions",
-                    interactions,
-                );
+                Metrics::register(
+                    &*state,
+                    (
+                        "ws_interactions",
+                        "Total number of websocket interactions",
+                        interactions,
+                    ),
+                )
+                .await;
             });
         }
 
@@ -146,7 +157,7 @@ pub struct WsState {
     pub bytes_limit_whitelist:    Vec<IpNet>,
     pub rate_limiter:             DefaultKeyedRateLimiter<IpAddr>,
     pub requester_ip_header_name: String,
-    pub metrics:                  Metrics,
+    pub metrics:                  WsMetrics,
 }
 
 impl WsState {
@@ -158,7 +169,7 @@ impl WsState {
             ))),
             bytes_limit_whitelist: whitelist,
             requester_ip_header_name,
-            metrics: Metrics::new(state.clone()),
+            metrics: WsMetrics::new(state.clone()),
         }
     }
 }
@@ -344,7 +355,7 @@ where
             _ = self.exit.changed() => {
                 self.sender.close().await?;
                 self.closed = true;
-                return Err(anyhow!("Application is shutting down. Closing connection."));
+                Err(anyhow!("Application is shutting down. Closing connection."))
             }
         }
     }

+ 5 - 9
apps/hermes/src/metrics_server.rs

@@ -5,7 +5,10 @@
 use {
     crate::{
         config::RunOptions,
-        state::State as AppState,
+        state::{
+            metrics::Metrics,
+            State as AppState,
+        },
     },
     anyhow::Result,
     axum::{
@@ -15,7 +18,6 @@ use {
         routing::get,
         Router,
     },
-    prometheus_client::encoding::text::encode,
     std::sync::Arc,
 };
 
@@ -43,13 +45,7 @@ pub async fn run(opts: RunOptions, state: Arc<AppState>) -> Result<()> {
 }
 
 pub async fn metrics(State(state): State<Arc<AppState>>) -> impl IntoResponse {
-    let registry = state.metrics_registry.read().await;
-    let mut buffer = String::new();
-
-    // Should not fail if the metrics are valid and there is memory available
-    // to write to the buffer.
-    encode(&mut buffer, &registry).unwrap();
-
+    let buffer = Metrics::encode(&*state).await;
     (
         [(
             header::CONTENT_TYPE,

+ 9 - 6
apps/hermes/src/network/pythnet.rs

@@ -7,7 +7,6 @@ use {
         api::types::PriceFeedMetadata,
         config::RunOptions,
         network::wormhole::{
-            update_guardian_set,
             BridgeData,
             GuardianSet,
             GuardianSetData,
@@ -22,6 +21,7 @@ use {
                 Aggregates,
                 Update,
             },
+            wormhole::Wormhole,
             State,
         },
     },
@@ -215,11 +215,14 @@ pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<!> {
 /// This method performs the necessary work to pull down the bridge state and associated guardian
 /// sets from a deployed Wormhole contract. Note that we only fetch the last two accounts due to
 /// the fact that during a Wormhole upgrade, there will only be messages produces from those two.
-async fn fetch_existing_guardian_sets(
-    state: Arc<State>,
+async fn fetch_existing_guardian_sets<S>(
+    state: Arc<S>,
     pythnet_http_endpoint: String,
     wormhole_contract_addr: Pubkey,
-) -> Result<()> {
+) -> Result<()>
+where
+    S: Wormhole,
+{
     let client = RpcClient::new(pythnet_http_endpoint.to_string());
     let bridge = fetch_bridge_data(&client, &wormhole_contract_addr).await?;
 
@@ -233,7 +236,7 @@ async fn fetch_existing_guardian_sets(
         "Retrieved Current GuardianSet.",
     );
 
-    update_guardian_set(&state, bridge.guardian_set_index, current).await;
+    Wormhole::update_guardian_set(&*state, bridge.guardian_set_index, current).await;
 
     // If there are more than one guardian set, we want to fetch the previous one as well as it
     // may still be in transition phase if a guardian upgrade has just occurred.
@@ -251,7 +254,7 @@ async fn fetch_existing_guardian_sets(
             "Retrieved Previous GuardianSet.",
         );
 
-        update_guardian_set(&state, bridge.guardian_set_index - 1, previous).await;
+        Wormhole::update_guardian_set(&*state, bridge.guardian_set_index - 1, previous).await;
     }
 
     Ok(())

+ 16 - 218
apps/hermes/src/network/wormhole.rs

@@ -8,19 +8,14 @@ use {
     crate::{
         config::RunOptions,
         state::{
-            aggregate::{
-                Aggregates,
-                Update,
-            },
+            wormhole::Wormhole,
             State,
         },
     },
     anyhow::{
         anyhow,
-        ensure,
         Result,
     },
-    chrono::DateTime,
     futures::StreamExt,
     proto::spy::v1::{
         filter_entry::Filter,
@@ -29,45 +24,16 @@ use {
         FilterEntry,
         SubscribeSignedVaaRequest,
     },
-    pythnet_sdk::{
-        wire::v1::{
-            WormholeMessage,
-            WormholePayload,
-        },
-        ACCUMULATOR_EMITTER_ADDRESS,
-    },
-    secp256k1::{
-        ecdsa::{
-            RecoverableSignature,
-            RecoveryId,
-        },
-        Message,
-        Secp256k1,
-    },
-    serde_wormhole::RawMessage,
-    sha3::{
-        Digest,
-        Keccak256,
-    },
+    pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS,
     std::{
         sync::Arc,
         time::Duration,
     },
     tokio::time::Instant,
     tonic::Request,
-    wormhole_sdk::{
-        vaa::{
-            Body,
-            Header,
-        },
-        Address,
-        Chain,
-        Vaa,
-    },
+    wormhole_sdk::Chain,
 };
 
-const OBSERVED_CACHE_SIZE: usize = 1000;
-
 pub type VaaBytes = Vec<u8>;
 
 #[derive(Eq, PartialEq, Clone, Hash, Debug)]
@@ -116,13 +82,6 @@ pub struct GuardianSetData {
     pub _expiration_time: u32,
 }
 
-/// Update the guardian set with the given ID in the state.
-#[tracing::instrument(skip(state, guardian_set))]
-pub async fn update_guardian_set(state: &State, id: u32, guardian_set: GuardianSet) {
-    let mut guardian_sets = state.guardian_set.write().await;
-    guardian_sets.insert(id, guardian_set);
-}
-
 /// Wormhole `prost` compiled definitions.
 ///
 /// We use `prost` to build the protobuf definitions from the upstream Wormhole repository. Which
@@ -180,7 +139,13 @@ pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
 }
 
 #[tracing::instrument(skip(opts, state))]
-async fn run(opts: RunOptions, state: Arc<State>) -> Result<!> {
+async fn run<S>(opts: RunOptions, state: Arc<S>) -> Result<!>
+where
+    S: Wormhole,
+    S: Sync,
+    S: Send,
+    S: 'static,
+{
     let mut client = SpyRpcServiceClient::connect(opts.wormhole.spy_rpc_addr).await?;
     let mut stream = client
         .subscribe_signed_vaa(Request::new(SubscribeSignedVaaRequest {
@@ -195,180 +160,13 @@ async fn run(opts: RunOptions, state: Arc<State>) -> Result<!> {
         .into_inner();
 
     while let Some(Ok(message)) = stream.next().await {
-        if let Err(e) = process_message(state.clone(), message.vaa_bytes).await {
-            tracing::debug!(error = ?e, "Skipped VAA.");
-        }
+        let state = state.clone();
+        tokio::spawn(async move {
+            if let Err(e) = state.process_message(message.vaa_bytes).await {
+                tracing::debug!(error = ?e, "Skipped VAA.");
+            }
+        });
     }
 
     Err(anyhow!("Wormhole gRPC stream terminated."))
 }
-
-/// Process a message received via a Wormhole gRPC connection.
-#[tracing::instrument(skip(state, vaa_bytes))]
-pub async fn process_message(state: Arc<State>, vaa_bytes: Vec<u8>) -> Result<()> {
-    let vaa = serde_wormhole::from_slice::<Vaa<&RawMessage>>(&vaa_bytes)?;
-
-    // Log VAA Processing.
-    let vaa_timestamp = DateTime::from_timestamp(vaa.timestamp as i64, 0)
-        .ok_or(anyhow!("Failed to parse VAA Tiestamp"))?
-        .format("%Y-%m-%dT%H:%M:%S.%fZ")
-        .to_string();
-
-    let slot = match WormholeMessage::try_from_bytes(vaa.payload)?.payload {
-        WormholePayload::Merkle(proof) => proof.slot,
-    };
-    tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA");
-
-    // Check VAA hasn't already been seen.
-    ensure!(
-        !state.observed_vaa_seqs.read().await.contains(&vaa.sequence),
-        "Previously observed VAA: {}",
-        vaa.sequence
-    );
-
-    // Check VAA source is valid, we don't want to process other protocols VAAs.
-    validate_vaa_source(&vaa)?;
-
-    // Verify the VAA has been signed by a known guardian set.
-    let vaa = verify_vaa(
-        state
-            .guardian_set
-            .read()
-            .await
-            .get(&vaa.guardian_set_index)
-            .ok_or_else(|| anyhow!("Unknown guardian set: {}", vaa.guardian_set_index))?,
-        vaa,
-    )?;
-
-    // Finally, store the resulting VAA in Hermes.
-    let sequence = vaa.sequence;
-    tokio::spawn(async move {
-        store_vaa(state.clone(), sequence, vaa_bytes).await;
-    });
-
-    Ok(())
-}
-
-// Rejects VAAs from invalid sources.
-#[tracing::instrument(skip(vaa))]
-fn validate_vaa_source(vaa: &Vaa<&RawMessage>) -> Result<()> {
-    ensure!(
-        vaa.emitter_chain == Chain::Pythnet,
-        "VAA from non-Pythnet Chain."
-    );
-    ensure!(
-        vaa.emitter_address == Address(ACCUMULATOR_EMITTER_ADDRESS),
-        "VAA from non-Pythnet Emitter: {}",
-        vaa.emitter_address
-    );
-    Ok(())
-}
-
-/// Validate a VAA extracted from a Wormhole gRPC message.
-#[tracing::instrument(skip(guardian_set, vaa))]
-pub fn verify_vaa<'a>(
-    guardian_set: &GuardianSet,
-    vaa: Vaa<&'a RawMessage>,
-) -> Result<Vaa<&'a RawMessage>> {
-    let (header, body): (Header, Body<&RawMessage>) = vaa.into();
-    let digest = body.digest()?;
-
-    // Ideally we need to test the signatures but currently Wormhole doesn't give us any easy way
-    // to do it, so we just bypass the check in tests.
-    let quorum = if cfg!(test) {
-        0
-    } else {
-        (guardian_set.keys.len() * 2) / 3 + 1
-    };
-
-    let secp = Secp256k1::new();
-    let mut last_signer_id: Option<usize> = None;
-    let mut signatures = vec![];
-    for signature in header.signatures.into_iter() {
-        // Do not collect more signatures than necessary to reduce on-chain gas spent during
-        // signature verification.
-        if signatures.len() >= quorum {
-            break;
-        }
-
-        let signer_id: usize = signature.index.into();
-        if signer_id >= guardian_set.keys.len() {
-            return Err(anyhow!(
-                "Signer ID is out of range. Signer ID: {}, guardian set size: {}",
-                signer_id,
-                guardian_set.keys.len()
-            ));
-        }
-
-        // On-chain verification expects signatures to be sorted by signer ID. We can exit early if
-        // this constraint is violated.
-        if let Some(true) = last_signer_id.map(|v| v >= signer_id) {
-            return Err(anyhow!(
-                "Signatures are not sorted by signer ID. Last signer ID: {:?}, current signer ID: {}",
-                last_signer_id,
-                signer_id
-            ));
-        }
-
-        // Recover the public key from an [u8; 65] serialized ECDSA signature in (v, r, s) format
-        let recid = RecoveryId::from_i32(signature.signature[64].into())?;
-
-        // An address is the last 20 bytes of the Keccak256 hash of the uncompressed public key.
-        let pubkey: &[u8; 65] = &secp
-            .recover_ecdsa(
-                &Message::from_slice(&digest.secp256k_hash)?,
-                &RecoverableSignature::from_compact(&signature.signature[..64], recid)?,
-            )?
-            .serialize_uncompressed();
-
-        // The address is the last 20 bytes of the Keccak256 hash of the public key
-        let address: [u8; 32] = Keccak256::new_with_prefix(&pubkey[1..]).finalize().into();
-        let address: [u8; 20] = address[address.len() - 20..].try_into()?;
-
-        // Confirm the recovered address matches an address in the guardian set.
-        if guardian_set.keys.get(signer_id) == Some(&address) {
-            signatures.push(signature);
-        }
-
-        last_signer_id = Some(signer_id);
-    }
-
-    // Check if we have enough correct signatures
-    if signatures.len() < quorum {
-        return Err(anyhow!(
-            "Not enough correct signatures. Expected {:?}, received {:?}",
-            quorum,
-            signatures.len()
-        ));
-    }
-
-    Ok((
-        Header {
-            signatures,
-            ..header
-        },
-        body,
-    )
-        .into())
-}
-
-#[tracing::instrument(skip(state, vaa_bytes))]
-pub async fn store_vaa(state: Arc<State>, sequence: u64, vaa_bytes: Vec<u8>) {
-    // Check VAA hasn't already been seen, this may have been checked previously
-    // but due to async nature it's possible other threads have mutated the state
-    // since this VAA started processing.
-    let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await;
-    if observed_vaa_seqs.contains(&sequence) {
-        return;
-    }
-
-    // Clear old cached VAA sequences.
-    while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
-        observed_vaa_seqs.pop_first();
-    }
-
-    // Hand the VAA to the aggregate store.
-    if let Err(e) = Aggregates::store_update(&*state, Update::Vaa(vaa_bytes)).await {
-        tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
-    }
-}

+ 6 - 0
apps/hermes/src/price_feeds_metadata.rs

@@ -16,6 +16,12 @@ pub struct PriceFeedMetaState {
     pub data: RwLock<Vec<PriceFeedMetadata>>,
 }
 
+impl Default for PriceFeedMetaState {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl PriceFeedMetaState {
     pub fn new() -> Self {
         Self {

+ 21 - 33
apps/hermes/src/state.rs

@@ -8,29 +8,21 @@ use {
         },
         benchmarks::BenchmarksState,
         cache::CacheState,
+        metrics::MetricsState,
+        wormhole::WormholeState,
     },
-    crate::{
-        network::wormhole::GuardianSet,
-        price_feeds_metadata::PriceFeedMetaState,
-    },
+    crate::price_feeds_metadata::PriceFeedMetaState,
     prometheus_client::registry::Registry,
     reqwest::Url,
-    std::{
-        collections::{
-            BTreeMap,
-            BTreeSet,
-        },
-        sync::Arc,
-    },
-    tokio::sync::{
-        broadcast::Sender,
-        RwLock,
-    },
+    std::sync::Arc,
+    tokio::sync::broadcast::Sender,
 };
 
 pub mod aggregate;
 pub mod benchmarks;
 pub mod cache;
+pub mod metrics;
+pub mod wormhole;
 
 pub struct State {
     /// State for the `Cache` service for short-lived storage of updates.
@@ -45,15 +37,11 @@ pub struct State {
     /// State for accessing/storing Pyth price aggregates.
     pub aggregates: AggregateState,
 
-    /// Sequence numbers of lately observed Vaas. Store uses this set
-    /// to ignore the previously observed Vaas as a performance boost.
-    pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
-
-    /// Wormhole guardian sets. It is used to verify Vaas before using them.
-    pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
+    /// State for tracking wormhole state when reading VAAs.
+    pub wormhole: WormholeState,
 
-    /// Metrics registry
-    pub metrics_registry: RwLock<Registry>,
+    /// Metrics registry for tracking process metrics and timings.
+    pub metrics: MetricsState,
 }
 
 impl State {
@@ -64,13 +52,12 @@ impl State {
     ) -> Arc<Self> {
         let mut metrics_registry = Registry::default();
         Arc::new(Self {
-            cache:             CacheState::new(cache_size),
-            benchmarks:        BenchmarksState::new(benchmarks_endpoint),
-            price_feed_meta:   PriceFeedMetaState::new(),
-            aggregates:        AggregateState::new(update_tx, &mut metrics_registry),
-            observed_vaa_seqs: RwLock::new(Default::default()),
-            guardian_set:      RwLock::new(Default::default()),
-            metrics_registry:  RwLock::new(metrics_registry),
+            cache:           CacheState::new(cache_size),
+            benchmarks:      BenchmarksState::new(benchmarks_endpoint),
+            price_feed_meta: PriceFeedMetaState::new(),
+            aggregates:      AggregateState::new(update_tx, &mut metrics_registry),
+            wormhole:        WormholeState::new(),
+            metrics:         MetricsState::new(metrics_registry),
         })
     }
 }
@@ -78,8 +65,9 @@ impl State {
 #[cfg(test)]
 pub mod test {
     use {
+        self::wormhole::Wormhole,
         super::*,
-        crate::network::wormhole::update_guardian_set,
+        crate::network::wormhole::GuardianSet,
         tokio::sync::broadcast::Receiver,
     };
 
@@ -88,8 +76,8 @@ pub mod test {
         let state = State::new(update_tx, cache_size, None);
 
         // Add an initial guardian set with public key 0
-        update_guardian_set(
-            &state,
+        Wormhole::update_guardian_set(
+            &*state,
             0,
             GuardianSet {
                 keys: vec![[0; 20]],

+ 13 - 13
apps/hermes/src/state/aggregate.rs

@@ -122,7 +122,7 @@ pub struct AggregateStateData {
     /// probes.
     pub latest_observed_slot: Option<Slot>,
 
-    /// Metrics
+    /// Aggregate Specific Metrics
     pub metrics: metrics::Metrics,
 }
 
@@ -613,7 +613,7 @@ mod test {
     }
 
     pub async fn store_multiple_concurrent_valid_updates(state: Arc<State>, updates: Vec<Update>) {
-        let res = join_all(updates.into_iter().map(|u| (&state).store_update(u))).await;
+        let res = join_all(updates.into_iter().map(|u| state.store_update(u))).await;
         // Check that all store_update calls succeeded
         assert!(res.into_iter().all(|r| r.is_ok()));
     }
@@ -639,13 +639,13 @@ mod test {
 
         // Check the price ids are stored correctly
         assert_eq!(
-            (&*state).get_price_feed_ids().await,
+            (*state).get_price_feed_ids().await,
             vec![PriceIdentifier::new([100; 32])].into_iter().collect()
         );
 
         // Check get_price_feeds_with_update_data retrieves the correct
         // price feed with correct update data.
-        let price_feeds_with_update_data = (&*state)
+        let price_feeds_with_update_data = (*state)
             .get_price_feeds_with_update_data(
                 &[PriceIdentifier::new([100; 32])],
                 RequestTime::Latest,
@@ -764,7 +764,7 @@ mod test {
 
         // Check the price ids are stored correctly
         assert_eq!(
-            (&*state).get_price_feed_ids().await,
+            (*state).get_price_feed_ids().await,
             vec![
                 PriceIdentifier::new([100; 32]),
                 PriceIdentifier::new([200; 32])
@@ -774,7 +774,7 @@ mod test {
         );
 
         // Check that price feed 2 exists
-        assert!((&*state)
+        assert!((*state)
             .get_price_feeds_with_update_data(
                 &[PriceIdentifier::new([200; 32])],
                 RequestTime::Latest,
@@ -801,11 +801,11 @@ mod test {
 
         // Check that price feed 2 does not exist anymore
         assert_eq!(
-            (&*state).get_price_feed_ids().await,
+            (*state).get_price_feed_ids().await,
             vec![PriceIdentifier::new([100; 32]),].into_iter().collect()
         );
 
-        assert!((&*state)
+        assert!((*state)
             .get_price_feeds_with_update_data(
                 &[PriceIdentifier::new([200; 32])],
                 RequestTime::Latest,
@@ -847,7 +847,7 @@ mod test {
         MockClock::advance(Duration::from_secs(1));
 
         // Get the price feeds with update data
-        let price_feeds_with_update_data = (&*state)
+        let price_feeds_with_update_data = (*state)
             .get_price_feeds_with_update_data(
                 &[PriceIdentifier::new([100; 32])],
                 RequestTime::Latest,
@@ -873,13 +873,13 @@ mod test {
             .unwrap();
 
         // Check the state is ready
-        assert!((&state).is_ready().await);
+        assert!(state.is_ready().await);
 
         // Advance the clock to make the prices stale
         MockClock::advance_system_time(READINESS_STALENESS_THRESHOLD);
         MockClock::advance(READINESS_STALENESS_THRESHOLD);
         // Check the state is not ready
-        assert!(!(&state).is_ready().await);
+        assert!(!state.is_ready().await);
     }
 
     /// Test that the state retains the latest slots upon cache eviction.
@@ -922,7 +922,7 @@ mod test {
 
         // Check the last 100 slots are retained
         for slot in 900..1000 {
-            let price_feeds_with_update_data = (&*state)
+            let price_feeds_with_update_data = (*state)
                 .get_price_feeds_with_update_data(
                     &[
                         PriceIdentifier::new([100; 32]),
@@ -939,7 +939,7 @@ mod test {
 
         // Check nothing else is retained
         for slot in 0..900 {
-            assert!((&*state)
+            assert!((*state)
                 .get_price_feeds_with_update_data(
                     &[
                         PriceIdentifier::new([100; 32]),

+ 60 - 0
apps/hermes/src/state/metrics.rs

@@ -0,0 +1,60 @@
+use {
+    super::State,
+    prometheus_client::{
+        encoding::text::encode,
+        registry::{
+            Metric,
+            Registry,
+        },
+    },
+    tokio::sync::RwLock,
+};
+
+pub struct MetricsState {
+    /// Metrics registry, allows interfacing with backends.
+    pub registry: RwLock<Registry>,
+}
+
+impl MetricsState {
+    pub fn new(metrics_registry: Registry) -> Self {
+        Self {
+            registry: RwLock::new(metrics_registry),
+        }
+    }
+}
+
+/// Allow downcasting State into MetricsState for functions that depend on the `Metrics` service.
+impl<'a> From<&'a State> for &'a MetricsState {
+    fn from(state: &'a State) -> &'a MetricsState {
+        &state.metrics
+    }
+}
+
+#[async_trait::async_trait]
+pub trait Metrics {
+    async fn register(&self, metric: (&str, &str, impl Metric));
+    async fn encode(&self) -> String;
+}
+
+#[async_trait::async_trait]
+impl<T> Metrics for T
+where
+    for<'a> &'a T: Into<&'a MetricsState>,
+    T: Sync,
+{
+    async fn register(&self, metric: (&str, &str, impl Metric)) {
+        self.into()
+            .registry
+            .write()
+            .await
+            .register(metric.0, metric.1, metric.2);
+    }
+
+    /// Encode known Metrics in OpenTelemetry format.
+    async fn encode(&self) -> String {
+        let registry = self.into().registry.read().await;
+        let mut buffer = String::new();
+        encode(&mut buffer, &registry).unwrap();
+        buffer
+    }
+}

+ 274 - 0
apps/hermes/src/state/wormhole.rs

@@ -0,0 +1,274 @@
+use {
+    super::{
+        aggregate::{
+            Aggregates,
+            Update,
+        },
+        State,
+    },
+    crate::network::wormhole::GuardianSet,
+    anyhow::{
+        anyhow,
+        ensure,
+        Result,
+    },
+    chrono::DateTime,
+    pythnet_sdk::{
+        wire::v1::{
+            WormholeMessage,
+            WormholePayload,
+        },
+        ACCUMULATOR_EMITTER_ADDRESS,
+    },
+    secp256k1::{
+        ecdsa::{
+            RecoverableSignature,
+            RecoveryId,
+        },
+        Message,
+        Secp256k1,
+    },
+    serde_wormhole::RawMessage,
+    sha3::{
+        Digest,
+        Keccak256,
+    },
+    std::collections::{
+        BTreeMap,
+        BTreeSet,
+    },
+    tokio::sync::RwLock,
+    wormhole_sdk::{
+        vaa::{
+            Body,
+            Header,
+        },
+        Address,
+        Chain,
+        Vaa,
+    },
+};
+
+const OBSERVED_CACHE_SIZE: usize = 1000;
+
+pub struct WormholeState {
+    /// Sequence numbers of lately observed Vaas.
+    ///
+    /// Store uses this set to ignore the previously observed Vaas as a performance boost.
+    observed_vaa_seqs: RwLock<BTreeSet<u64>>,
+
+    /// Wormhole guardian sets. It is used to verify Vaas before using them.
+    guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
+}
+
+impl Default for WormholeState {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl WormholeState {
+    pub fn new() -> Self {
+        Self {
+            observed_vaa_seqs: RwLock::new(BTreeSet::new()),
+            guardian_set:      RwLock::new(BTreeMap::new()),
+        }
+    }
+}
+
+/// Allow downcasting State into WormholeState for functions that depend on the `Wormhole` service.
+impl<'a> From<&'a State> for &'a WormholeState {
+    fn from(state: &'a State) -> &'a WormholeState {
+        &state.wormhole
+    }
+}
+
+#[async_trait::async_trait]
+pub trait Wormhole: Aggregates {
+    async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>);
+    async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<()>;
+    async fn update_guardian_set(&self, id: u32, guardian_set: GuardianSet);
+}
+
+#[async_trait::async_trait]
+impl<T> Wormhole for T
+where
+    for<'a> &'a T: Into<&'a WormholeState>,
+    T: Sync,
+    T: Aggregates,
+{
+    /// Update the guardian set with the given ID in the state.
+    #[tracing::instrument(skip(self, guardian_set))]
+    async fn update_guardian_set(&self, id: u32, guardian_set: GuardianSet) {
+        let mut guardian_sets = self.into().guardian_set.write().await;
+        guardian_sets.insert(id, guardian_set);
+    }
+
+    #[tracing::instrument(skip(self, vaa_bytes))]
+    async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) {
+        // Check VAA hasn't already been seen, this may have been checked previously
+        // but due to async nature it's possible other threads have mutated the state
+        // since this VAA started processing.
+        let mut observed_vaa_seqs = self.into().observed_vaa_seqs.write().await;
+        if observed_vaa_seqs.contains(&sequence) {
+            return;
+        }
+
+        // Clear old cached VAA sequences.
+        while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
+            observed_vaa_seqs.pop_first();
+        }
+
+        // Hand the VAA to the aggregate store.
+        if let Err(e) = Aggregates::store_update(self, Update::Vaa(vaa_bytes)).await {
+            tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
+        }
+    }
+
+    async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<()> {
+        let vaa = serde_wormhole::from_slice::<Vaa<&RawMessage>>(&vaa_bytes)?;
+
+        // Log VAA Processing.
+        let vaa_timestamp = DateTime::from_timestamp(vaa.timestamp as i64, 0)
+            .ok_or(anyhow!("Failed to parse VAA Tiestamp"))?
+            .format("%Y-%m-%dT%H:%M:%S.%fZ")
+            .to_string();
+
+        let slot = match WormholeMessage::try_from_bytes(vaa.payload)?.payload {
+            WormholePayload::Merkle(proof) => proof.slot,
+        };
+        tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA");
+
+        // Check VAA hasn't already been seen.
+        ensure!(
+            !self
+                .into()
+                .observed_vaa_seqs
+                .read()
+                .await
+                .contains(&vaa.sequence),
+            "Previously observed VAA: {}",
+            vaa.sequence
+        );
+
+        // Check VAA source is valid, we don't want to process other protocols VAAs.
+        validate_vaa_source(&vaa)?;
+
+        // Verify the VAA has been signed by a known guardian set.
+        let vaa = verify_vaa(
+            self.into()
+                .guardian_set
+                .read()
+                .await
+                .get(&vaa.guardian_set_index)
+                .ok_or_else(|| anyhow!("Unknown guardian set: {}", vaa.guardian_set_index))?,
+            vaa,
+        )?;
+
+        // Finally, store the resulting VAA in Hermes.
+        self.store_vaa(vaa.sequence, vaa_bytes).await;
+        Ok(())
+    }
+}
+// Rejects VAAs from invalid sources.
+#[tracing::instrument(skip(vaa))]
+fn validate_vaa_source(vaa: &Vaa<&RawMessage>) -> Result<()> {
+    ensure!(
+        vaa.emitter_chain == Chain::Pythnet,
+        "VAA from non-Pythnet Chain."
+    );
+    ensure!(
+        vaa.emitter_address == Address(ACCUMULATOR_EMITTER_ADDRESS),
+        "VAA from non-Pythnet Emitter: {}",
+        vaa.emitter_address
+    );
+    Ok(())
+}
+
+/// Validate a VAA extracted from a Wormhole gRPC message.
+#[tracing::instrument(skip(guardian_set, vaa))]
+fn verify_vaa<'a>(
+    guardian_set: &GuardianSet,
+    vaa: Vaa<&'a RawMessage>,
+) -> Result<Vaa<&'a RawMessage>> {
+    let (header, body): (Header, Body<&RawMessage>) = vaa.into();
+    let digest = body.digest()?;
+
+    // Ideally we need to test the signatures but currently Wormhole doesn't give us any easy way
+    // to do it, so we just bypass the check in tests.
+    let quorum = if cfg!(test) {
+        0
+    } else {
+        (guardian_set.keys.len() * 2) / 3 + 1
+    };
+
+    let secp = Secp256k1::new();
+    let mut last_signer_id: Option<usize> = None;
+    let mut signatures = vec![];
+    for signature in header.signatures.into_iter() {
+        // Do not collect more signatures than necessary to reduce on-chain gas spent during
+        // signature verification.
+        if signatures.len() >= quorum {
+            break;
+        }
+
+        let signer_id: usize = signature.index.into();
+        if signer_id >= guardian_set.keys.len() {
+            return Err(anyhow!(
+                "Signer ID is out of range. Signer ID: {}, guardian set size: {}",
+                signer_id,
+                guardian_set.keys.len()
+            ));
+        }
+
+        // On-chain verification expects signatures to be sorted by signer ID. We can exit early if
+        // this constraint is violated.
+        if let Some(true) = last_signer_id.map(|v| v >= signer_id) {
+            return Err(anyhow!(
+                "Signatures are not sorted by signer ID. Last signer ID: {:?}, current signer ID: {}",
+                last_signer_id,
+                signer_id
+            ));
+        }
+
+        // Recover the public key from an [u8; 65] serialized ECDSA signature in (v, r, s) format
+        let recid = RecoveryId::from_i32(signature.signature[64].into())?;
+
+        // An address is the last 20 bytes of the Keccak256 hash of the uncompressed public key.
+        let pubkey: &[u8; 65] = &secp
+            .recover_ecdsa(
+                &Message::from_slice(&digest.secp256k_hash)?,
+                &RecoverableSignature::from_compact(&signature.signature[..64], recid)?,
+            )?
+            .serialize_uncompressed();
+
+        // The address is the last 20 bytes of the Keccak256 hash of the public key
+        let address: [u8; 32] = Keccak256::new_with_prefix(&pubkey[1..]).finalize().into();
+        let address: [u8; 20] = address[address.len() - 20..].try_into()?;
+
+        // Confirm the recovered address matches an address in the guardian set.
+        if guardian_set.keys.get(signer_id) == Some(&address) {
+            signatures.push(signature);
+        }
+
+        last_signer_id = Some(signer_id);
+    }
+
+    // Check if we have enough correct signatures
+    if signatures.len() < quorum {
+        return Err(anyhow!(
+            "Not enough correct signatures. Expected {:?}, received {:?}",
+            quorum,
+            signatures.len()
+        ));
+    }
+
+    Ok((
+        Header {
+            signatures,
+            ..header
+        },
+        body,
+    )
+        .into())
+}