Procházet zdrojové kódy

Fix ping problem from client

Ali Behjati před 2 roky
rodič
revize
d93154a58d
1 změnil soubory, kde provedl 26 přidání a 1 odebrání
  1. 26 1
      hermes/src/api/ws.rs

+ 26 - 1
hermes/src/api/ws.rs

@@ -36,14 +36,17 @@ use {
     },
     std::{
         collections::HashMap,
+        pin::Pin,
         sync::atomic::{
             AtomicUsize,
             Ordering,
         },
+        time::Duration,
     },
     tokio::sync::mpsc,
 };
 
+pub const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30);
 
 pub async fn ws_route_handler(
     ws: WebSocketUpgrade,
@@ -83,6 +86,8 @@ pub struct Subscriber {
     receiver:                SplitStream<WebSocket>,
     sender:                  SplitSink<WebSocket, Message>,
     price_feeds_with_config: HashMap<PriceIdentifier, PriceFeedClientConfig>,
+    ping_interval_future:    Pin<Box<tokio::time::Sleep>>,
+    responded_to_ping:       bool,
 }
 
 impl Subscriber {
@@ -101,6 +106,8 @@ impl Subscriber {
             receiver,
             sender,
             price_feeds_with_config: HashMap::new(),
+            ping_interval_future: Box::pin(tokio::time::sleep(PING_INTERVAL_DURATION)),
+            responded_to_ping: true, // We start with true so we don't close the connection immediately
         }
     }
 
@@ -131,6 +138,16 @@ impl Subscriber {
                     Some(message_or_err) => self.handle_client_message(message_or_err?).await?
                 }
             },
+            _  = &mut self.ping_interval_future => {
+                if !self.responded_to_ping {
+                    log::debug!("Subscriber {} did not respond to ping, closing connection.", self.id);
+                    self.closed = true;
+                    return Ok(());
+                }
+                self.responded_to_ping = false;
+                self.sender.send(Message::Ping(vec![])).await?;
+                self.ping_interval_future = Box::pin(tokio::time::sleep(PING_INTERVAL_DURATION));
+            }
         }
 
         Ok(())
@@ -180,6 +197,14 @@ impl Subscriber {
         let maybe_client_message = match message {
             Message::Text(text) => serde_json::from_str::<ClientMessage>(&text),
             Message::Binary(data) => serde_json::from_slice::<ClientMessage>(&data),
+            Message::Ping(_) => {
+                // Axum will send Pong automatically
+                return Ok(());
+            }
+            Message::Pong(_) => {
+                self.responded_to_ping = true;
+                return Ok(());
+            }
             _ => {
                 return Ok(());
             }
@@ -188,7 +213,7 @@ impl Subscriber {
         match maybe_client_message {
             Err(e) => {
                 self.sender
-                    .feed(
+                    .send(
                         serde_json::to_string(&ServerMessage::Response(
                             ServerResponseMessage::Err {
                                 error: e.to_string(),