Browse Source

Quorum: Add metrics over http urls (#2804)

Dani Mehrjerdi 5 months ago
parent
commit
88560c3438

+ 120 - 1
apps/quorum/Cargo.lock

@@ -413,6 +413,26 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "axum-prometheus"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "42e1a6651f119707ec6c416f38fdb5be223707627fe6d47f912850634c106215"
+dependencies = [
+ "axum",
+ "bytes",
+ "futures-core",
+ "http 1.3.1",
+ "http-body 1.0.1",
+ "matchit",
+ "metrics",
+ "metrics-exporter-prometheus",
+ "pin-project",
+ "tokio",
+ "tower",
+ "tower-http",
+]
+
 [[package]]
 name = "backtrace"
 version = "0.3.75"
@@ -1300,6 +1320,12 @@ version = "1.0.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
 
+[[package]]
+name = "foldhash"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
+
 [[package]]
 name = "foreign-types"
 version = "0.3.2"
@@ -1544,6 +1570,9 @@ name = "hashbrown"
 version = "0.15.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3"
+dependencies = [
+ "foldhash",
+]
 
 [[package]]
 name = "heck"
@@ -2132,6 +2161,46 @@ dependencies = [
  "autocfg",
 ]
 
+[[package]]
+name = "metrics"
+version = "0.24.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5"
+dependencies = [
+ "ahash",
+ "portable-atomic",
+]
+
+[[package]]
+name = "metrics-exporter-prometheus"
+version = "0.16.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dd7399781913e5393588a8d8c6a2867bf85fb38eaf2502fdce465aad2dc6f034"
+dependencies = [
+ "base64 0.22.1",
+ "indexmap",
+ "metrics",
+ "metrics-util",
+ "quanta",
+ "thiserror 1.0.69",
+]
+
+[[package]]
+name = "metrics-util"
+version = "0.19.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b8496cc523d1f94c1385dd8f0f0c2c480b2b8aeccb5b7e4485ad6365523ae376"
+dependencies = [
+ "crossbeam-epoch",
+ "crossbeam-utils",
+ "hashbrown 0.15.3",
+ "metrics",
+ "quanta",
+ "rand 0.9.1",
+ "rand_xoshiro",
+ "sketches-ddsketch",
+]
+
 [[package]]
 name = "mime"
 version = "0.3.17"
@@ -2513,6 +2582,26 @@ dependencies = [
  "num",
 ]
 
+[[package]]
+name = "pin-project"
+version = "1.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
 [[package]]
 name = "pin-project-lite"
 version = "0.2.16"
@@ -2671,10 +2760,11 @@ dependencies = [
 
 [[package]]
 name = "quorum"
-version = "0.1.3"
+version = "0.2.0"
 dependencies = [
  "anyhow",
  "axum",
+ "axum-prometheus",
  "borsh 1.5.7",
  "clap",
  "futures",
@@ -2809,6 +2899,15 @@ dependencies = [
  "rand_core 0.5.1",
 ]
 
+[[package]]
+name = "rand_xoshiro"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41"
+dependencies = [
+ "rand_core 0.9.3",
+]
+
 [[package]]
 name = "raw-cpuid"
 version = "11.5.0"
@@ -3443,6 +3542,12 @@ version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
 
+[[package]]
+name = "sketches-ddsketch"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a"
+
 [[package]]
 name = "slab"
 version = "0.4.9"
@@ -5775,6 +5880,20 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "tower-http"
+version = "0.6.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
+dependencies = [
+ "bitflags 2.9.1",
+ "bytes",
+ "http 1.3.1",
+ "pin-project-lite",
+ "tower-layer",
+ "tower-service",
+]
+
 [[package]]
 name = "tower-layer"
 version = "0.3.3"

+ 2 - 1
apps/quorum/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "quorum"
-version = "0.1.3"
+version = "0.2.0"
 edition = "2021"
 
 [dependencies]
@@ -23,3 +23,4 @@ time = "0.3.41"
 serde_json = "1.0.140"
 futures = "0.3.31"
 serde_wormhole = "0.1.0"
+axum-prometheus = "0.8.0"

+ 10 - 1
apps/quorum/src/api.rs

@@ -3,6 +3,7 @@ use axum::{
     routing::{get, post},
     Json, Router,
 };
+use axum_prometheus::{EndpointLabel, PrometheusMetricLayerBuilder};
 use secp256k1::{
     ecdsa::{RecoverableSignature, RecoveryId},
     Message, Secp256k1,
@@ -26,16 +27,24 @@ pub type Payload<'a> = &'a RawMessage;
 pub async fn run(listen_address: SocketAddr, state: State) -> anyhow::Result<()> {
     tracing::info!("Starting server...");
 
+    let (prometheus_layer, _) = PrometheusMetricLayerBuilder::new()
+        .with_metrics_from_fn(|| state.metrics_recorder.clone())
+        .with_endpoint_label_type(EndpointLabel::MatchedPathWithFallbackFn(|_| {
+            "unknown".to_string()
+        }))
+        .build_pair();
+
     let routes = Router::new()
         .route("/live", get(|| async { "OK" }))
         .route("/observation", post(post_observation))
         .route("/ws", get(ws_route_handler))
+        .layer(prometheus_layer)
         .with_state(state);
     let listener = tokio::net::TcpListener::bind(&listen_address).await?;
 
     axum::serve(listener, routes)
         .with_graceful_shutdown(async {
-            let _ = crate::server::EXIT.subscribe().changed().await;
+            crate::server::wait_for_exit().await;
             tracing::info!("Shutting down server...");
         })
         .await?;

+ 1 - 0
apps/quorum/src/main.rs

@@ -4,6 +4,7 @@ use std::io::IsTerminal;
 use crate::server::RunOptions;
 
 mod api;
+mod metrics_server;
 mod pythnet;
 mod server;
 mod ws;

+ 38 - 0
apps/quorum/src/metrics_server.rs

@@ -0,0 +1,38 @@
+use axum::{routing::get, Router};
+use axum_prometheus::{
+    metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle},
+    PrometheusMetricLayerBuilder,
+};
+
+use crate::server::{wait_for_exit, RunOptions, State};
+
+pub const DEFAULT_METRICS_BUCKET: &[f64; 20] = &[
+    0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.25, 1.5, 2.0,
+    3.0, 5.0, 10.0,
+];
+
+pub fn setup_metrics_recorder() -> anyhow::Result<PrometheusHandle> {
+    PrometheusBuilder::new()
+        .set_buckets(DEFAULT_METRICS_BUCKET)?
+        .install_recorder()
+        .map_err(|err| anyhow::anyhow!("Failed to set up metrics recorder: {:?}", err))
+}
+
+pub async fn run(run_options: RunOptions, state: State) -> anyhow::Result<()> {
+    tracing::info!("Starting Metrics Server...");
+
+    let (_, metric_handle) = PrometheusMetricLayerBuilder::new()
+        .with_metrics_from_fn(|| state.metrics_recorder.clone())
+        .build_pair();
+    let app = Router::new();
+    let app = app.route("/metrics", get(|| async move { metric_handle.render() }));
+
+    let listener = tokio::net::TcpListener::bind(&run_options.server.metrics_addr).await?;
+    axum::serve(listener, app)
+        .with_graceful_shutdown(async {
+            let _ = wait_for_exit().await;
+            tracing::info!("Shutting down metrics server...");
+        })
+        .await?;
+    Ok(())
+}

+ 71 - 10
apps/quorum/src/server.rs

@@ -1,18 +1,26 @@
+use axum_prometheus::metrics_exporter_prometheus::PrometheusHandle;
 use clap::{crate_authors, crate_description, crate_name, crate_version, Args, Parser};
 use lazy_static::lazy_static;
 use solana_client::client_error::reqwest::Url;
 use solana_sdk::pubkey::Pubkey;
-use std::{collections::HashMap, net::SocketAddr, ops::Deref, sync::Arc};
-use tokio::sync::{watch, RwLock};
+use std::{
+    collections::HashMap, future::Future, net::SocketAddr, ops::Deref, sync::Arc, time::Duration,
+};
+use tokio::{
+    sync::{watch, RwLock},
+    time::sleep,
+};
 use wormhole_sdk::{vaa::Signature, GuardianSetInfo};
 
 use crate::{
     api::{self},
+    metrics_server::{self, setup_metrics_recorder},
     pythnet::fetch_guardian_set,
     ws::WsState,
 };
 
 const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:9000";
+const DEFAULT_METRICS_ADDR: &str = "127.0.0.1:9001";
 
 #[derive(Args, Clone, Debug)]
 #[command(next_help_heading = "Server Options")]
@@ -23,6 +31,11 @@ pub struct ServerOptions {
     #[arg(default_value = DEFAULT_LISTEN_ADDR)]
     #[arg(env = "LISTEN_ADDR")]
     pub listen_addr: SocketAddr,
+    /// Address and port the metrics will bind to.
+    #[arg(long = "metrics-addr")]
+    #[arg(default_value = DEFAULT_METRICS_ADDR)]
+    #[arg(env = "METRICS_ADDR")]
+    pub metrics_addr: SocketAddr,
 }
 
 // `Options` is a structup definition to provide clean command-line args for Hermes.
@@ -66,7 +79,17 @@ lazy_static! {
     /// - The `Receiver` side of a watch channel performs the detection based on if the change
     ///   happened after the subscribe, so it means all listeners should always be notified
     ///   correctly.
-    pub static ref EXIT: watch::Sender<bool> = watch::channel(false).0;
+
+    static ref EXIT: watch::Sender<bool> = watch::channel(false).0;
+}
+
+pub async fn wait_for_exit() {
+    let mut rx = EXIT.subscribe();
+    // Check if the exit flag is already set, if so, we don't need to wait.
+    if !(*rx.borrow()) {
+        // Wait until the exit flag is set.
+        let _ = rx.changed().await;
+    }
 }
 
 #[derive(Clone)]
@@ -81,6 +104,8 @@ pub struct StateInner {
     pub observation_lifetime: u32,
 
     pub ws: WsState,
+
+    pub metrics_recorder: PrometheusHandle,
 }
 impl Deref for State {
     type Target = Arc<StateInner>;
@@ -93,17 +118,42 @@ impl Deref for State {
 const DEFAULT_OBSERVATION_LIFETIME: u32 = 10; // In seconds
 const WEBSOCKET_NOTIFICATION_CHANNEL_SIZE: usize = 1000;
 
+async fn fault_tolerant_handler<F, Fut>(name: String, f: F)
+where
+    F: Fn() -> Fut,
+    Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
+    Fut::Output: Send + 'static,
+{
+    loop {
+        let res = tokio::spawn(f()).await;
+        match res {
+            Ok(result) => match result {
+                Ok(_) => break, // This will happen on graceful shutdown
+                Err(err) => {
+                    tracing::error!("{} returned error: {:?}", name, err);
+                    sleep(Duration::from_millis(500)).await;
+                }
+            },
+            Err(err) => {
+                tracing::error!("{} is panicked or canceled: {:?}", name, err);
+                EXIT.send_modify(|exit| *exit = true);
+                break;
+            }
+        }
+    }
+}
+
 pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
     // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
     tokio::spawn(async move {
         tracing::info!("Registered shutdown signal handler...");
         tokio::signal::ctrl_c().await.unwrap();
         tracing::info!("Shut down signal received, waiting for tasks...");
-        let _ = EXIT.send(true);
+        EXIT.send_modify(|exit| *exit = true);
     });
 
     let guardian_set = fetch_guardian_set(
-        run_options.pythnet_url,
+        run_options.pythnet_url.clone(),
         run_options.wormhole_pid,
         run_options.guardian_set_index,
     )
@@ -118,19 +168,28 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
         observation_lifetime: run_options.observation_lifetime,
 
         ws: WsState::new(WEBSOCKET_NOTIFICATION_CHANNEL_SIZE),
+
+        metrics_recorder: setup_metrics_recorder()?,
     }));
 
-    tokio::join!(async {
-        if let Err(e) = api::run(run_options.server.listen_addr, state).await {
-            tracing::error!(error = ?e, "Failed to start API server");
-        }
-    });
+    tokio::join!(
+        fault_tolerant_handler("API server".to_string(), || api::run(
+            run_options.server.listen_addr,
+            state.clone()
+        )),
+        fault_tolerant_handler("metrics server".to_string(), || metrics_server::run(
+            run_options.clone(),
+            state.clone()
+        )),
+    );
 
     Ok(())
 }
 
 #[cfg(test)]
 pub mod tests {
+    use axum_prometheus::metrics_exporter_prometheus::PrometheusBuilder;
+
     use super::*;
 
     pub fn get_state(
@@ -146,6 +205,8 @@ pub mod tests {
             guardian_set_index: 0,
 
             ws: WsState::new(1),
+
+            metrics_recorder: PrometheusBuilder::new().build_recorder().handle(),
         }))
     }
 }

+ 3 - 5
apps/quorum/src/ws.rs

@@ -1,5 +1,5 @@
 use {
-    crate::server::{State, EXIT},
+    crate::server::{wait_for_exit, State},
     anyhow::{anyhow, Result},
     axum::{
         extract::{
@@ -16,7 +16,7 @@ use {
         sync::atomic::{AtomicUsize, Ordering},
         time::Duration,
     },
-    tokio::sync::{broadcast, watch},
+    tokio::sync::broadcast,
 };
 
 pub struct WsState {
@@ -68,7 +68,6 @@ pub struct Subscriber {
     sender: SplitSink<WebSocket, Message>,
     ping_interval: tokio::time::Interval,
     responded_to_ping: bool,
-    exit: watch::Receiver<bool>,
 }
 
 const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30);
@@ -88,7 +87,6 @@ impl Subscriber {
             sender,
             ping_interval: tokio::time::interval(PING_INTERVAL_DURATION),
             responded_to_ping: true, // We start with true so we don't close the connection immediately
-            exit: EXIT.subscribe(),
         }
     }
 
@@ -122,7 +120,7 @@ impl Subscriber {
                 self.sender.send(Message::Ping(vec![].into())).await?;
                 Ok(())
             },
-            _ = self.exit.changed() => {
+            _ = wait_for_exit() => {
                 self.sender.close().await?;
                 self.closed = true;
                 Err(anyhow!("Application is shutting down. Closing connection."))