Selaa lähdekoodia

add logic to check rpc health

0xfirefist 1 vuosi sitten
vanhempi
sitoutus
45333e91d1
1 muutettua tiedostoa jossa 42 lisäystä ja 17 poistoa
  1. 42 17
      apps/fortuna/src/api/health.rs

+ 42 - 17
apps/fortuna/src/api/health.rs

@@ -1,20 +1,29 @@
 use {
-    crate::api::{
-        ChainId,
-        RestError,
+    crate::{
+        api::{
+            ChainId,
+            RestError,
+        },
+        chain::reader::BlockStatus,
     },
     anyhow::Result,
     axum::{
         extract::State,
         Json,
     },
+    std::sync::{
+        Arc,
+        RwLock,
+    },
+    tokio::spawn,
     utoipa::ToSchema,
 };
 
-#[derive(Debug, serde::Serialize, serde::Deserialize, ToSchema, PartialEq)]
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, ToSchema, PartialEq)]
 pub struct RpcHealth {
-    chain_id:   ChainId,
-    is_healthy: bool,
+    chain_id:            ChainId,
+    is_healthy:          bool,
+    latest_block_number: Option<u64>,
 }
 
 /// Get the list of supported chain ids
@@ -28,20 +37,36 @@ responses(
 pub async fn health(
     State(state): State<crate::api::ApiState>,
 ) -> Result<Json<RpcHealthResponse>, RestError> {
-    let mut res = RpcHealthResponse {
-        rpcs_health: vec![],
-    };
+    let rpcs_health = Arc::new(RwLock::new(vec![]));
+
+    let threads = state
+        .chains
+        .iter()
+        .map(|(chain_id, _)| {
+            let rpcs_health = Arc::clone(&rpcs_health);
+            let chain_id = chain_id.clone();
+            let contract = Arc::clone(&state.chains.get(&chain_id).unwrap().contract);
+            spawn(async move {
+                let block_number = match contract.get_block_number(BlockStatus::Latest).await {
+                    Ok(number) => Some(number),
+                    Err(_) => None,
+                };
 
-    for (chain_id, _) in state.chains.iter() {
-        let rpc_health = RpcHealth {
-            chain_id:   chain_id.clone(),
-            is_healthy: true,
-        };
+                let mut rpcs_health = rpcs_health.write().unwrap();
+                rpcs_health.push(RpcHealth {
+                    chain_id,
+                    is_healthy: block_number.is_some(),
+                    latest_block_number: block_number,
+                });
+            })
+        })
+        .collect::<Vec<_>>();
 
-        res.rpcs_health.push(rpc_health);
-    }
+    // tokio await on threads
+    let _ = futures::future::join_all(threads).await;
 
-    Ok(Json(res))
+    let rpcs_health = rpcs_health.read().unwrap().clone();
+    Ok(Json(RpcHealthResponse { rpcs_health }))
 }
 
 #[derive(Debug, serde::Serialize, serde::Deserialize, ToSchema, PartialEq)]