ソースを参照

feat: implement RedundantLazerClient with WebSocket connection pool

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>
Devin AI 9 ヶ月 前
コミット
c3d016c039
1 ファイル変更39 行追加54 行削除
  1. 39 54
      lazer/sdk/rust/consumer/src/client.rs

+ 39 - 54
lazer/sdk/rust/consumer/src/client.rs

@@ -3,12 +3,10 @@ use {
     tokio_stream,
     futures_util::{SinkExt, StreamExt},
     pyth_lazer_protocol::{
-        router::{Channel, PriceFeedId, PriceFeedProperty, SubscriptionParams, SubscriptionParamsRepr},
-        subscription::{Request, Response, SubscriptionId},
+        router::{Channel, PriceFeedId, PriceFeedProperty, SubscriptionParams, SubscriptionParamsRepr, JsonUpdate},
+        subscription::{Request, Response, SubscriptionId, SubscribeRequest, UnsubscribeRequest, StreamUpdatedResponse},
     },
-    serde::de::DeserializeOwned,
     std::{
-        collections::HashMap,
         sync::Arc,
         time::{Duration, Instant},
     },
@@ -18,22 +16,20 @@ use {
     },
     tokio_tungstenite::{
         connect_async,
-        tungstenite::{Message, Result as WsResult},
+        tungstenite::Message,
         MaybeTlsStream, WebSocketStream,
     },
-    tracing::{debug, error, info, warn},
+    tracing::{error, info, warn},
     ttl_cache::TtlCache,
     url::Url,
 };
 
-const TICKER_STREAM_CHANNEL_SIZE: usize = 100_000;
 const STREAM_POOL_CHANNEL_SIZE: usize = 100_000;
 const DEDUP_CACHE_SIZE: usize = 100_000;
 const DEDUP_TTL: Duration = Duration::from_secs(10);
 const RECONNECT_WAIT: Duration = Duration::from_secs(1);
 const MAX_NUM_CONNECTIONS: usize = 50;
 const CONNECTION_TTL: Duration = Duration::from_secs(23 * 60 * 60); // 23 hours
-const TTL_RECONNECT_STAGGER: Duration = Duration::from_secs(60); // 1 min
 
 type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
 
@@ -48,7 +44,7 @@ pub struct RedundantLazerClient {
     feed_ids: Vec<PriceFeedId>,
     properties: Vec<PriceFeedProperty>,
     channel: Channel,
-    timeout: Duration,
+    _timeout: Duration,
     endpoint: Url,
     subscription_id: Option<SubscriptionId>,
 }
@@ -79,7 +75,7 @@ impl RedundantLazerClient {
             feed_ids: Vec::new(),
             properties: Vec::new(),
             channel: Channel::FixedRate(pyth_lazer_protocol::router::FixedRate::MIN),
-            timeout,
+            _timeout: timeout,
             endpoint,
             subscription_id: None,
         })
@@ -101,10 +97,10 @@ impl RedundantLazerClient {
         subscription_id: SubscriptionId,
         params: SubscriptionParams,
     ) -> Result<()> {
-        let request = Request::Subscribe {
+        let request = Request::Subscribe(SubscribeRequest {
             subscription_id,
             params,
-        };
+        });
         let message = serde_json::to_string(&request)?;
         stream.send(Message::Text(message)).await?;
         Ok(())
@@ -129,7 +125,7 @@ impl RedundantLazerClient {
             json_binary_encoding: Default::default(),
             parsed: true,
             channel,
-        })?;
+        }).map_err(|e| anyhow::anyhow!(e))?;
 
         for connection in &self.connections {
             let mut state = connection.write().await;
@@ -160,14 +156,14 @@ impl RedundantLazerClient {
             json_binary_encoding: Default::default(),
             parsed: true,
             channel,
-        })?;
+        }).map_err(|e| anyhow::anyhow!(e))?;
 
         for connection in &self.connections {
             let mut state = connection.write().await;
-            let request = Request::UpdateSubscription {
+            let request = Request::Subscribe(SubscribeRequest {
                 subscription_id,
                 params: params.clone(),
-            };
+            });
             let message = serde_json::to_string(&request)?;
             state.stream.send(Message::Text(message)).await?;
         }
@@ -182,7 +178,7 @@ impl RedundantLazerClient {
         if let Some(subscription_id) = self.subscription_id.take() {
             for connection in &self.connections {
                 let mut state = connection.write().await;
-                let request = Request::Unsubscribe { subscription_id };
+                let request = Request::Unsubscribe(UnsubscribeRequest { subscription_id });
                 let message = serde_json::to_string(&request)?;
                 state.stream.send(Message::Text(message))
                     .await
@@ -195,15 +191,15 @@ impl RedundantLazerClient {
     }
 
     pub async fn into_stream(
-        mut self,
-    ) -> Result<impl futures_util::Stream<Item = pyth_lazer_protocol::router::Update>> {
+        self,
+    ) -> Result<impl futures_util::Stream<Item = JsonUpdate>> {
         let (tx, rx) = mpsc::channel(STREAM_POOL_CHANNEL_SIZE);
-        let response_rx = self.start().await?;
+        let mut response_rx = self.start().await?;
         
         tokio::spawn(async move {
-            while let Ok(response) = response_rx.recv().await {
-                if let Response::StreamUpdated { update, .. } = response {
-                    if tx.send(update).await.is_err() {
+            while let Some(response) = response_rx.recv().await {
+                if let Response::StreamUpdated(StreamUpdatedResponse { payload, .. }) = response {
+                    if tx.send(payload).await.is_err() {
                         break;
                     }
                 }
@@ -214,10 +210,19 @@ impl RedundantLazerClient {
     }
 
     pub async fn start(
-        mut self,
+        self,
     ) -> Result<mpsc::Receiver<Response>> {
         let (tx, rx) = mpsc::channel(STREAM_POOL_CHANNEL_SIZE);
         let subscription_id = self.subscription_id.unwrap_or(SubscriptionId(1));
+        let params = SubscriptionParams::new(SubscriptionParamsRepr {
+            price_feed_ids: self.feed_ids.clone(),
+            properties: self.properties.clone(),
+            chains: vec![pyth_lazer_protocol::router::Chain::Evm],
+            delivery_format: Default::default(),
+            json_binary_encoding: Default::default(),
+            parsed: true,
+            channel: self.channel,
+        }).map_err(|e| anyhow::anyhow!(e))?;
 
         for connection in self.connections {
             let tx = tx.clone();
@@ -259,19 +264,19 @@ impl RedundantLazerClient {
                                     match serde_json::from_str::<Response>(&text) {
                                         Ok(response) => {
                                             match &response {
-                                                Response::Subscribed { subscription_id: _ } => {
+                                                Response::Subscribed(_) => {
                                                     info!("Subscription confirmed");
                                                 }
-                                                Response::SubscriptionError { error } => {
-                                                    error!("Subscription error: {}", error);
+                                                Response::SubscriptionError(error) => {
+                                                    error!("Subscription error: {}", error.error);
                                                     break;
                                                 }
-                                                Response::StreamUpdated { subscription_id: _, update } => {
+                                                Response::StreamUpdated(StreamUpdatedResponse { subscription_id: _, payload }) => {
                                                     // Generate a unique ID for deduplication based on update content
                                                     let update_hash = {
                                                         use std::hash::{Hash, Hasher};
                                                         let mut hasher = std::collections::hash_map::DefaultHasher::new();
-                                                        update.hash(&mut hasher);
+                                                        payload.hash(&mut hasher);
                                                         hasher.finish()
                                                     };
 
@@ -282,31 +287,11 @@ impl RedundantLazerClient {
 
                                                     // Insert into TTL cache and forward the update
                                                     state.seen_updates.insert(update_hash, true, DEDUP_TTL);
-                                                    match update {
-                                                        pyth_lazer_protocol::router::Update::Json(json_update) => {
-                                                            if let Some(parsed) = json_update.parsed {
-                                                                if tx.send(Response::StreamUpdated { 
-                                                                    subscription_id, 
-                                                                    update: pyth_lazer_protocol::router::Update::Json(
-                                                                        pyth_lazer_protocol::router::JsonUpdate {
-                                                                            parsed: Some(parsed),
-                                                                            evm: None,
-                                                                            solana: None,
-                                                                        }
-                                                                    )
-                                                                }).await.is_err() {
-                                                                    return;
-                                                                }
-                                                            }
-                                                        }
-                                                        _ => {
-                                                            if tx.send(Response::StreamUpdated { 
-                                                                subscription_id, 
-                                                                update 
-                                                            }).await.is_err() {
-                                                                return;
-                                                            }
-                                                        }
+                                                    if tx.send(Response::StreamUpdated(StreamUpdatedResponse {
+                                                        subscription_id,
+                                                        payload: payload.clone(),
+                                                    })).await.is_err() {
+                                                        return;
                                                     }
                                                 }
                                                 _ => {