Эх сурвалжийг харах

Support closing connections when QUIC connection drop (#6857)

* Support closing connections when QUIC connection drop

* Added a unit test on connection close

* removed the superflous connection.closed().await it is already locally closed
Lijun Wang 4 сар өмнө
parent
commit
9d97d498bd

+ 17 - 1
quic-client/src/lib.rs

@@ -12,8 +12,11 @@ use {
             QuicClient, QuicClientConnection as NonblockingQuicClientConnection,
             QuicLazyInitializedEndpoint,
         },
-        quic_client::QuicClientConnection as BlockingQuicClientConnection,
+        quic_client::{
+            close_quic_connection, QuicClientConnection as BlockingQuicClientConnection,
+        },
     },
+    log::debug,
     quic_client::get_runtime,
     quinn::{Endpoint, EndpointConfig, TokioRuntime},
     solana_connection_cache::{
@@ -72,6 +75,19 @@ impl ConnectionPool for QuicPool {
     }
 }
 
+impl Drop for QuicPool {
+    fn drop(&mut self) {
+        debug!(
+            "Dropping QuicPool with {} connections",
+            self.connections.len()
+        );
+        for connection in self.connections.drain(..) {
+            // Explicitly drop each connection to ensure resources are released
+            close_quic_connection(connection.0.clone());
+        }
+    }
+}
+
 pub struct QuicConfig {
     // Arc to prevent having to copy the struct
     client_certificate: RwLock<Arc<QuicClientCertificate>>,

+ 20 - 0
quic-client/src/nonblocking/quic_client.rs

@@ -229,6 +229,26 @@ pub struct QuicClient {
     stats: Arc<ClientStats>,
 }
 
+const CONNECTION_CLOSE_CODE_APPLICATION_CLOSE: u32 = 0u32;
+const CONNECTION_CLOSE_REASON_APPLICATION_CLOSE: &[u8] = b"dropped";
+
+impl QuicClient {
+    /// Explicitly close the connection. Must be called manually if cleanup is needed.
+    pub async fn close(&self) {
+        let mut conn_guard = self.connection.lock().await;
+        if let Some(conn) = conn_guard.take() {
+            debug!(
+                "Closing connection to {} connection_id: {:?}",
+                self.addr, conn.connection
+            );
+            conn.connection.close(
+                CONNECTION_CLOSE_CODE_APPLICATION_CLOSE.into(),
+                CONNECTION_CLOSE_REASON_APPLICATION_CLOSE,
+            );
+        }
+    }
+}
+
 impl QuicClient {
     pub fn new(endpoint: Arc<QuicLazyInitializedEndpoint>, addr: SocketAddr) -> Self {
         Self {

+ 6 - 0
quic-client/src/quic_client.rs

@@ -180,3 +180,9 @@ impl ClientConnection for QuicClientConnection {
         Ok(())
     }
 }
+
+pub(crate) fn close_quic_connection(connection: Arc<QuicClient>) {
+    // Close the connection and release resources
+    trace!("Closing QUIC connection to {}", connection.server_addr());
+    RUNTIME.block_on(connection.close());
+}

+ 51 - 2
quic-client/tests/quic_client.rs

@@ -3,12 +3,14 @@ mod tests {
     use {
         crossbeam_channel::{unbounded, Receiver},
         log::*,
-        solana_connection_cache::connection_cache_stats::ConnectionCacheStats,
+        solana_connection_cache::{
+            client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats,
+        },
         solana_keypair::Keypair,
         solana_net_utils::sockets::{bind_to, localhost_port_range_for_tests},
         solana_packet::PACKET_DATA_SIZE,
         solana_perf::packet::PacketBatch,
-        solana_quic_client::nonblocking::quic_client::QuicLazyInitializedEndpoint,
+        solana_quic_client::nonblocking::quic_client::{QuicClient, QuicLazyInitializedEndpoint},
         solana_streamer::{
             quic::{QuicServerParams, SpawnServerResult},
             streamer::StakedNodes,
@@ -310,4 +312,51 @@ mod tests {
         response_recv_thread.join().unwrap();
         info!("Response receiver exited!");
     }
+
+    #[tokio::test]
+    async fn test_connection_close() {
+        solana_logger::setup();
+        let (sender, receiver) = unbounded();
+        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
+        let (s, exit, keypair) = server_args();
+        let solana_streamer::nonblocking::quic::SpawnNonBlockingServerResult {
+            endpoints: _,
+            stats: _,
+            thread: t,
+            max_concurrent_connections: _,
+        } = solana_streamer::nonblocking::quic::spawn_server(
+            "quic_streamer_test",
+            s.try_clone().unwrap(),
+            &keypair,
+            sender,
+            exit.clone(),
+            staked_nodes,
+            QuicServerParams::default_for_tests(),
+        )
+        .unwrap();
+
+        let addr = s.local_addr().unwrap().ip();
+        let port = s.local_addr().unwrap().port();
+        let tpu_addr = SocketAddr::new(addr, port);
+        let connection_cache_stats = Arc::new(ConnectionCacheStats::default());
+        let client = QuicClient::new(Arc::new(QuicLazyInitializedEndpoint::default()), tpu_addr);
+
+        // Send a full size packet with single byte writes.
+        let num_bytes = PACKET_DATA_SIZE;
+        let num_expected_packets: usize = 3;
+        let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
+        let client_stats = ClientStats::default();
+        for packet in packets {
+            let _ = client
+                .send_buffer(&packet, &client_stats, connection_cache_stats.clone())
+                .await;
+        }
+
+        nonblocking_check_packets(receiver, num_bytes, num_expected_packets).await;
+        exit.store(true, Ordering::Relaxed);
+
+        t.await.unwrap();
+        // We close the connection after the server is down, this should not block
+        client.close().await;
+    }
 }