소스 검색

isolate metrics for api

0xfirefist 1 년 전
부모
커밋
466cef5fd9
5개의 변경된 파일70개의 추가작업 그리고 43개의 파일을 삭제
  1. 54 14
      apps/fortuna/src/api.rs
  2. 1 1
      apps/fortuna/src/api/metrics.rs
  3. 4 6
      apps/fortuna/src/api/revelation.rs
  4. 11 7
      apps/fortuna/src/command/run.rs
  5. 0 15
      apps/fortuna/src/metrics.rs

+ 54 - 14
apps/fortuna/src/api.rs

@@ -5,7 +5,6 @@ use {
             BlockStatus,
             EntropyReader,
         },
-        metrics::Metrics,
         state::HashChainState,
     },
     anyhow::Result,
@@ -20,10 +19,19 @@ use {
         Router,
     },
     ethers::core::types::Address,
+    prometheus_client::{
+        encoding::EncodeLabelSet,
+        metrics::{
+            counter::Counter,
+            family::Family,
+        },
+        registry::Registry,
+    },
     std::{
         collections::HashMap,
         sync::Arc,
     },
+    tokio::sync::RwLock,
     url::Url,
 };
 pub use {
@@ -44,20 +52,44 @@ mod revelation;
 
 pub type ChainId = String;
 
+#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
+pub struct RequestLabel {
+    pub value: String,
+}
+
+pub struct ApiMetrics {
+    pub metrics_registry: Arc<RwLock<Registry>>,
+    pub http_requests:    Family<RequestLabel, Counter>,
+}
+
 #[derive(Clone)]
 pub struct ApiState {
     pub chains: Arc<HashMap<ChainId, BlockchainState>>,
 
     /// Prometheus metrics
-    pub metrics: Arc<Metrics>,
+    pub metrics: Arc<ApiMetrics>,
 }
 
 impl ApiState {
-    pub fn new(chains: &[(ChainId, BlockchainState)]) -> ApiState {
-        let map: HashMap<ChainId, BlockchainState> = chains.into_iter().cloned().collect();
+    pub async fn new(
+        chains: HashMap<ChainId, BlockchainState>,
+        metrics_registry: Arc<RwLock<Registry>>,
+    ) -> ApiState {
+        let metrics = ApiMetrics {
+            http_requests: Family::default(),
+            metrics_registry,
+        };
+
+        let http_requests = metrics.http_requests.clone();
+        metrics.metrics_registry.write().await.register(
+            "http_requests",
+            "Number of HTTP requests received",
+            http_requests,
+        );
+
         ApiState {
-            chains:  Arc::new(map),
-            metrics: Arc::new(Metrics::new()),
+            chains:  Arc::new(chains),
+            metrics: Arc::new(metrics),
         }
     }
 }
@@ -185,7 +217,12 @@ mod test {
         },
         ethers::prelude::Address,
         lazy_static::lazy_static,
-        std::sync::Arc,
+        prometheus_client::registry::Registry,
+        std::{
+            collections::HashMap,
+            sync::Arc,
+        },
+        tokio::sync::RwLock,
     };
 
     const PROVIDER: Address = Address::zero();
@@ -203,7 +240,7 @@ mod test {
         ));
     }
 
-    fn test_server() -> (TestServer, Arc<MockEntropyReader>, Arc<MockEntropyReader>) {
+    async fn test_server() -> (TestServer, Arc<MockEntropyReader>, Arc<MockEntropyReader>) {
         let eth_read = Arc::new(MockEntropyReader::with_requests(10, &[]));
 
         let eth_state = BlockchainState {
@@ -215,6 +252,8 @@ mod test {
             confirmed_block_status: BlockStatus::Latest,
         };
 
+        let metrics_registry = Arc::new(RwLock::new(Registry::default()));
+
         let avax_read = Arc::new(MockEntropyReader::with_requests(10, &[]));
 
         let avax_state = BlockchainState {
@@ -226,10 +265,11 @@ mod test {
             confirmed_block_status: BlockStatus::Latest,
         };
 
-        let api_state = ApiState::new(&[
-            ("ethereum".into(), eth_state),
-            ("avalanche".into(), avax_state),
-        ]);
+        let mut chains = HashMap::new();
+        chains.insert("ethereum".into(), eth_state);
+        chains.insert("avalanche".into(), avax_state);
+
+        let api_state = ApiState::new(chains, metrics_registry).await;
 
         let app = api::routes(api_state);
         (TestServer::new(app).unwrap(), eth_read, avax_read)
@@ -247,7 +287,7 @@ mod test {
 
     #[tokio::test]
     async fn test_revelation() {
-        let (server, eth_contract, avax_contract) = test_server();
+        let (server, eth_contract, avax_contract) = test_server().await;
 
         // Can't access a revelation if it hasn't been requested
         get_and_assert_status(
@@ -376,7 +416,7 @@ mod test {
 
     #[tokio::test]
     async fn test_revelation_confirmation_delay() {
-        let (server, eth_contract, avax_contract) = test_server();
+        let (server, eth_contract, avax_contract) = test_server().await;
 
         eth_contract.insert(PROVIDER, 0, 10, false);
         eth_contract.insert(PROVIDER, 1, 11, false);

+ 1 - 1
apps/fortuna/src/api/metrics.rs

@@ -9,7 +9,7 @@ use {
 };
 
 pub async fn metrics(State(state): State<crate::api::ApiState>) -> impl IntoResponse {
-    let registry = state.metrics.registry.read().await;
+    let registry = state.metrics.metrics_registry.read().await;
     let mut buffer = String::new();
 
     // Should not fail if the metrics are valid and there is memory available

+ 4 - 6
apps/fortuna/src/api/revelation.rs

@@ -1,10 +1,8 @@
 use {
-    crate::{
-        api::{
-            ChainId,
-            RestError,
-        },
-        metrics::RequestLabel,
+    crate::api::{
+        ChainId,
+        RequestLabel,
+        RestError,
     },
     anyhow::Result,
     axum::{

+ 11 - 7
apps/fortuna/src/command/run.rs

@@ -41,6 +41,7 @@ use {
         },
         types::Address,
     },
+    prometheus_client::registry::Registry,
     std::{
         collections::HashMap,
         net::SocketAddr,
@@ -49,7 +50,10 @@ use {
     },
     tokio::{
         spawn,
-        sync::watch,
+        sync::{
+            watch,
+            RwLock,
+        },
         time,
     },
     tower_http::cors::CorsLayer,
@@ -62,7 +66,7 @@ const TRACK_INTERVAL: Duration = Duration::from_secs(10);
 pub async fn run_api(
     socket_addr: SocketAddr,
     chains: HashMap<String, api::BlockchainState>,
-    metrics_registry: Arc<metrics::Metrics>,
+    metrics_registry: Arc<RwLock<Registry>>,
     mut rx_exit: watch::Receiver<bool>,
 ) -> Result<()> {
     #[derive(OpenApi)]
@@ -84,10 +88,7 @@ pub async fn run_api(
     )]
     struct ApiDoc;
 
-    let api_state = api::ApiState {
-        chains:  Arc::new(chains),
-        metrics: metrics_registry,
-    };
+    let api_state = api::ApiState::new(chains, metrics_registry).await;
 
     // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
     // `with_state` method which replaces `Body` with `State` in the type signature.
@@ -240,6 +241,9 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         Ok::<(), Error>(())
     });
 
+    let registry = Arc::new(RwLock::new(Registry::default()));
+
+
     let metrics_registry = Arc::new(metrics::Metrics::new());
 
     if let Some(keeper_private_key) = opts.load_keeper_private_key()? {
@@ -269,7 +273,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         opts.provider.clone(),
         metrics_registry.clone(),
     ));
-    run_api(opts.addr.clone(), chains, metrics_registry.clone(), rx_exit).await?;
+    run_api(opts.addr.clone(), chains, registry.clone(), rx_exit).await?;
 
     Ok(())
 }

+ 0 - 15
apps/fortuna/src/metrics.rs

@@ -12,11 +12,6 @@ use {
     tokio::sync::RwLock,
 };
 
-#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
-pub struct RequestLabel {
-    pub value: String,
-}
-
 #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
 pub struct AccountLabel {
     pub chain_id: String,
@@ -26,8 +21,6 @@ pub struct AccountLabel {
 pub struct Metrics {
     pub registry: RwLock<Registry>,
 
-    pub http_requests: Family<RequestLabel, Counter>,
-
     pub current_sequence_number: Family<AccountLabel, Gauge>,
     pub end_sequence_number:     Family<AccountLabel, Gauge>,
     pub balance:                 Family<AccountLabel, Gauge<f64, AtomicU64>>,
@@ -42,13 +35,6 @@ impl Metrics {
     pub fn new() -> Self {
         let mut metrics_registry = Registry::default();
 
-        let http_requests = Family::<RequestLabel, Counter>::default();
-        metrics_registry.register(
-            "http_requests",
-            "Number of HTTP requests received",
-            http_requests.clone(),
-        );
-
         let current_sequence_number = Family::<AccountLabel, Gauge>::default();
         metrics_registry.register(
             "current_sequence_number",
@@ -99,7 +85,6 @@ impl Metrics {
 
         Metrics {
             registry: RwLock::new(metrics_registry),
-            http_requests,
             current_sequence_number,
             end_sequence_number,
             requests,