Browse Source

tpu-client-next: check if worker is active when ensuring it's presence in cache (#8009)

It may happen that the connection worker is cached but it is not active. This leads to reestablishing connection at the moment of sending. Check not only presence of connection worker in the cache but also if it is active.
kirill lykov 2 months ago
parent
commit
a253e03bf8

+ 9 - 3
tpu-client-next/src/connection_worker.rs

@@ -163,7 +163,7 @@ impl ConnectionWorker {
 
                             // Monitor connection health proactively
                             close_reason = connection.closed() => {
-                                self.handle_connection_closed(close_reason).await;
+                                self.handle_connection_closed(close_reason);
                                 continue;
                             }
                         }
@@ -189,6 +189,9 @@ impl ConnectionWorker {
             () = main_loop => (),
             () = cancel.cancelled() => (),
         }
+        // Cancel it additionally here so that in WorkerInfo we can check if
+        // this worker is active.
+        cancel.cancel();
     }
 
     /// Handles connection closure events detected by the connection monitor.
@@ -196,7 +199,7 @@ impl ConnectionWorker {
     /// This method logs the close reason with appropriate severity based on
     /// the type of closure, records statistics, and determines whether to
     /// attempt reconnection based on the error type.
-    async fn handle_connection_closed(&mut self, close_reason: ConnectionError) {
+    fn handle_connection_closed(&mut self, close_reason: ConnectionError) {
         match &close_reason {
             ConnectionError::ConnectionClosed(close) => {
                 debug!(
@@ -355,7 +358,10 @@ impl ConnectionWorker {
                         self.connection = ConnectionState::Closing;
                     }
                     ConnectError::InvalidRemoteAddress(_) => {
-                        warn!("Invalid remote address for peer: {}", self.peer);
+                        warn!(
+                            "Invalid remote address for peer: {}, attempt: {}",
+                            self.peer, retries_attempt
+                        );
                         self.connection = ConnectionState::Closing;
                     }
                     e => {

+ 18 - 3
tpu-client-next/src/workers_cache.rs

@@ -6,7 +6,9 @@
 use qualifier_attr::qualifiers;
 use {
     crate::{
-        connection_worker::ConnectionWorker, logging::debug, transaction_batch::TransactionBatch,
+        connection_worker::ConnectionWorker,
+        logging::{debug, trace},
+        transaction_batch::TransactionBatch,
         SendTransactionStats,
     },
     lru::LruCache,
@@ -70,6 +72,12 @@ impl WorkerInfo {
             .map_err(|_| WorkersCacheError::TaskJoinFailure)?;
         Ok(())
     }
+
+    /// Returns `true` if the worker is still active and able to send
+    /// transactions.
+    fn is_active(&self) -> bool {
+        !(self.cancel.is_cancelled() || self.sender.is_closed())
+    }
 }
 
 /// Spawns a worker to handle communication with a given peer.
@@ -185,9 +193,14 @@ impl WorkersCache {
         handshake_timeout: Duration,
         stats: Arc<SendTransactionStats>,
     ) -> Option<ShutdownWorker> {
-        if self.contains(&peer) {
-            return None;
+        if let Some(worker) = self.workers.peek(&peer) {
+            // if worker is active, we will reuse it. Otherwise, we will spawn
+            // the new one and the existing will be popped out.
+            if worker.is_active() {
+                return None;
+            }
         }
+        trace!("No active worker for peer {peer}, respawning.");
 
         let worker = spawn_worker(
             endpoint,
@@ -487,6 +500,8 @@ mod tests {
             sleep(Duration::from_millis(500)).await;
         }
 
+        assert!(!worker_info.is_active(), "Worker should be inactive");
+
         // try to send to this worker — should fail and remove the worker
         let result = cache
             .try_send_transactions_to_address(&peer, TransactionBatch::new(vec![vec![0u8; 1]]));

+ 8 - 4
tpu-client-next/tests/connection_workers_scheduler_test.rs

@@ -527,11 +527,15 @@ async fn test_no_host() {
     // Wait for the generator to finish.
     tx_sender_shutdown.await;
 
-    // While attempting to establish a connection with a nonexistent host, we fill the worker's
-    // channel.
+    // For each transaction, we will check if worker exists and active. In this
+    // case, worker will never be active because when failed creating
+    // connection, we stop it. So scheduler will `max_send_attempts` try to
+    // create worker and fail each time.
     let stats = join_scheduler(scheduler_handle).await;
-    // `5` because `config.max_reconnect_attempts` is 4
-    assert_eq!(stats.connect_error_invalid_remote_address, 5);
+    assert_eq!(
+        stats.connect_error_invalid_remote_address,
+        max_send_attempts as u64
+    );
 }
 
 // Check that when the client is rate-limited by server, we update counters