Browse Source

feat(pyth-lazer-agent): liveness and readiness endpoints

Mike Rolish 4 tháng trước cách đây
mục cha
commit
84c6a40a3c

+ 5 - 4
.github/CODEOWNERS

@@ -2,6 +2,7 @@ apps/api-reference @pyth-network/web-team
 apps/entropy-debugger @pyth-network/web-team
 apps/insights @pyth-network/web-team
 apps/staking @pyth-network/web-team
+apps/pyth-lazer-agent @merolish
 packages/component-library @pyth-network/web-team
 packages/known-publishers @pyth-network/web-team
 Dockerfile.node @pyth-network/web-team
@@ -16,11 +17,11 @@ turbo.json @pyth-network/web-team
 .github/workflows/ci-turbo-build.yml @pyth-network/web-team
 .github/workflows/ci-turbo-test.yml @pyth-network/web-team
 
-/lazer/contracts/aptos @Riateche @ali-bahjati
-/lazer/contracts/evm @Riateche @ali-bahjati
-/lazer/contracts/solana @Riateche @ali-bahjati
+/lazer/contracts/aptos @Riateche @ali-behjati
+/lazer/contracts/evm @Riateche @ali-behjati
+/lazer/contracts/solana @Riateche @ali-behjati
 /lazer/publisher_sdk @darunrs @Riateche
-/lazer/sdk/js @ali-bahjati @keyvankhademi
+/lazer/sdk/js @ali-behjati @keyvankhademi
 /lazer/sdk/rust @darunrs @Riateche
 
 flake.lock @cprussin

+ 1 - 1
apps/pyth-lazer-agent/Cargo.lock

@@ -1643,7 +1643,7 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-agent"
-version = "0.1.1"
+version = "0.1.2"
 dependencies = [
  "anyhow",
  "backoff",

+ 1 - 1
apps/pyth-lazer-agent/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "pyth-lazer-agent"
-version = "0.1.1"
+version = "0.1.2"
 edition = "2024"
 
 [dependencies]

+ 19 - 0
apps/pyth-lazer-agent/src/http_server.rs

@@ -28,6 +28,9 @@ pub struct RelayerRequest(pub http::Request<hyper::body::Incoming>);
 const PUBLISHER_WS_URI: &str = "/v1/publisher";
 const PUBLISHER_WS_URI_V2: &str = "/v2/publisher";
 
+const READINESS_PROBE_PATH: &str = "/ready";
+const LIVENESS_PROBE_PATH: &str = "/live";
+
 pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()> {
     let listener = TcpListener::bind(&config.listen_address).await?;
     info!("listening on {:?}", &config.listen_address);
@@ -74,6 +77,22 @@ async fn request_handler(
     let request_type = match path {
         PUBLISHER_WS_URI => Request::PublisherV1,
         PUBLISHER_WS_URI_V2 => Request::PublisherV2,
+        LIVENESS_PROBE_PATH => {
+            let response = Response::builder().status(StatusCode::OK);
+            return Ok(response.body(FullBody::default())?);
+        }
+        READINESS_PROBE_PATH => {
+            let status = if lazer_publisher
+                .is_ready
+                .load(std::sync::atomic::Ordering::Relaxed)
+            {
+                StatusCode::OK
+            } else {
+                StatusCode::SERVICE_UNAVAILABLE
+            };
+            let response = Response::builder().status(status);
+            return Ok(response.body(FullBody::default())?);
+        }
         _ => {
             return Ok(Response::builder()
                 .status(StatusCode::NOT_FOUND)

+ 9 - 1
apps/pyth-lazer-agent/src/lazer_publisher.rs

@@ -14,6 +14,8 @@ use pyth_lazer_publisher_sdk::transaction::{
 };
 use solana_keypair::read_keypair_file;
 use std::path::PathBuf;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
 use tokio::sync::broadcast;
 use tokio::{
     select,
@@ -25,6 +27,7 @@ use tracing::error;
 #[derive(Clone)]
 pub struct LazerPublisher {
     sender: Sender<FeedUpdate>,
+    pub(crate) is_ready: Arc<AtomicBool>,
 }
 
 impl LazerPublisher {
@@ -66,11 +69,13 @@ impl LazerPublisher {
             };
 
         let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
+        let is_ready = Arc::new(AtomicBool::new(false));
         for url in config.relayer_urls.iter() {
             let mut task = RelayerSessionTask {
                 url: url.clone(),
                 token: authorization_token.clone(),
                 receiver: relayer_sender.subscribe(),
+                is_ready: is_ready.clone(),
             };
             tokio::spawn(async move { task.run().await });
         }
@@ -84,7 +89,10 @@ impl LazerPublisher {
             signing_key,
         };
         tokio::spawn(async move { task.run().await });
-        Self { sender }
+        Self {
+            sender,
+            is_ready: is_ready.clone(),
+        }
     }
 
     pub async fn push_feed_update(&self, feed_update: FeedUpdate) -> Result<()> {

+ 9 - 0
apps/pyth-lazer-agent/src/relayer_session.rs

@@ -6,6 +6,8 @@ use futures_util::{SinkExt, StreamExt};
 use http::HeaderValue;
 use protobuf::Message;
 use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::{Duration, Instant};
 use tokio::net::TcpStream;
 use tokio::select;
@@ -63,6 +65,7 @@ pub struct RelayerSessionTask {
     pub url: Url,
     pub token: String,
     pub receiver: broadcast::Receiver<SignedLazerTransaction>,
+    pub is_ready: Arc<AtomicBool>,
 }
 
 impl RelayerSessionTask {
@@ -116,6 +119,9 @@ impl RelayerSessionTask {
             ws_sender: relayer_ws_sender,
         };
 
+        // If we have at least one successful connection, mark as ready.
+        self.is_ready.store(true, Ordering::Relaxed);
+
         loop {
             select! {
                 recv_result = self.receiver.recv() => {
@@ -174,6 +180,8 @@ mod tests {
         Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
     };
     use std::net::SocketAddr;
+    use std::sync::Arc;
+    use std::sync::atomic::AtomicBool;
     use tokio::net::TcpListener;
     use tokio::sync::{broadcast, mpsc};
     use url::Url;
@@ -234,6 +242,7 @@ mod tests {
             url: Url::parse("ws://127.0.0.1:12346").unwrap(),
             token: "token1".to_string(),
             receiver: relayer_receiver,
+            is_ready: Arc::new(AtomicBool::new(false)),
         };
         tokio::spawn(async move { relayer_session_task.run().await });
         tokio::time::sleep(std::time::Duration::from_millis(1000)).await;