Reisen 2 лет назад
Родитель
Сommit
cf7c6839eb
2 измененных файлов с 13 добавлено и 19 удалено
  1. 2 2
      hermes/src/api.rs
  2. 11 17
      hermes/src/api/ws.rs

+ 2 - 2
hermes/src/api.rs

@@ -1,5 +1,5 @@
 use {
-    self::ws::dispatch_updates,
+    self::ws::notify_updates,
     crate::store::Store,
     anyhow::Result,
     axum::{
@@ -65,7 +65,7 @@ pub async fn run(store: Arc<Store>, mut update_rx: Receiver<()>, rpc_addr: Strin
                 .await
                 .expect("state update channel is closed");
 
-            dispatch_updates(state.clone()).await;
+            notify_updates(state.ws.clone()).await;
         }
     });
 

+ 11 - 17
hermes/src/api/ws.rs

@@ -53,6 +53,7 @@ use {
 };
 
 pub const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30);
+pub const NOTIFICATIONS_CHAN_LEN: usize = 1000;
 
 pub async fn ws_route_handler(
     ws: WebSocketUpgrade,
@@ -64,19 +65,14 @@ pub async fn ws_route_handler(
 async fn websocket_handler(stream: WebSocket, state: super::State) {
     let ws_state = state.ws.clone();
     let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst);
-
-    let (sender, receiver) = stream.split();
-
-    // TODO: Use a configured value for the buffer size or make it const static
-    // TODO: Use redis stream to source the updates instead of a channel
-    let (tx, rx) = mpsc::channel::<()>(1000);
-
-    ws_state.subscribers.insert(id, tx);
-
     log::debug!("New websocket connection, assigning id: {}", id);
 
-    let mut subscriber = Subscriber::new(id, state.store.clone(), rx, receiver, sender);
+    let (notify_sender, notify_receiver) = mpsc::channel::<()>(NOTIFICATIONS_CHAN_LEN);
+    let (sender, receiver) = stream.split();
+    let mut subscriber =
+        Subscriber::new(id, state.store.clone(), notify_receiver, receiver, sender);
 
+    ws_state.subscribers.insert(id, notify_sender);
     subscriber.run().await;
 }
 
@@ -88,7 +84,7 @@ pub struct Subscriber {
     id:                      SubscriberId,
     closed:                  bool,
     store:                   Arc<Store>,
-    update_rx:               mpsc::Receiver<()>,
+    notify_receiver:         mpsc::Receiver<()>,
     receiver:                SplitStream<WebSocket>,
     sender:                  SplitSink<WebSocket, Message>,
     price_feeds_with_config: HashMap<PriceIdentifier, PriceFeedClientConfig>,
@@ -100,7 +96,7 @@ impl Subscriber {
     pub fn new(
         id: SubscriberId,
         store: Arc<Store>,
-        update_rx: mpsc::Receiver<()>,
+        notify_receiver: mpsc::Receiver<()>,
         receiver: SplitStream<WebSocket>,
         sender: SplitSink<WebSocket, Message>,
     ) -> Self {
@@ -108,7 +104,7 @@ impl Subscriber {
             id,
             closed: false,
             store,
-            update_rx,
+            notify_receiver,
             receiver,
             sender,
             price_feeds_with_config: HashMap::new(),
@@ -128,7 +124,7 @@ impl Subscriber {
 
     async fn handle_next(&mut self) -> Result<()> {
         tokio::select! {
-            maybe_update_feeds = self.update_rx.recv() => {
+            maybe_update_feeds = self.notify_receiver.recv() => {
                 if maybe_update_feeds.is_none() {
                     return Err(anyhow!("Update channel closed. This should never happen. Closing connection."));
                 };
@@ -257,9 +253,7 @@ impl Subscriber {
     }
 }
 
-pub async fn dispatch_updates(state: super::State) {
-    let ws_state = state.ws.clone();
-
+pub async fn notify_updates(ws_state: Arc<WsState>) {
     let closed_subscribers: Vec<Option<SubscriberId>> = join_all(
         ws_state
             .subscribers