Browse Source

tpu-client-next: redo certificate update (#5888)

Redo the way tpu-client-next updates Endpoints' certificate when triggered by NotifyUpdate call
kirill lykov 6 months ago
parent
commit
1c837492df

+ 31 - 7
core/src/forwarding_stage.rs

@@ -24,13 +24,16 @@ use {
     solana_sdk::{
     solana_sdk::{
         fee::{FeeBudgetLimits, FeeDetails},
         fee::{FeeBudgetLimits, FeeDetails},
         packet,
         packet,
+        quic::NotifyKeyUpdate,
         signer::keypair::Keypair,
         signer::keypair::Keypair,
         transaction::MessageHash,
         transaction::MessageHash,
         transport::TransportError,
         transport::TransportError,
     },
     },
     solana_streamer::sendmmsg::{batch_send, SendPktsError},
     solana_streamer::sendmmsg::{batch_send, SendPktsError},
     solana_tpu_client_next::{
     solana_tpu_client_next::{
-        connection_workers_scheduler::{BindTarget, ConnectionWorkersSchedulerConfig, Fanout},
+        connection_workers_scheduler::{
+            BindTarget, ConnectionWorkersSchedulerConfig, Fanout, StakeIdentity,
+        },
         leader_updater::LeaderUpdater,
         leader_updater::LeaderUpdater,
         transaction_batch::TransactionBatch,
         transaction_batch::TransactionBatch,
         ConnectionWorkersScheduler,
         ConnectionWorkersScheduler,
@@ -41,7 +44,10 @@ use {
         thread::{Builder, JoinHandle},
         thread::{Builder, JoinHandle},
         time::{Duration, Instant},
         time::{Duration, Instant},
     },
     },
-    tokio::{runtime::Handle as RuntimeHandle, sync::mpsc},
+    tokio::{
+        runtime::Handle as RuntimeHandle,
+        sync::{mpsc, watch},
+    },
     tokio_util::sync::CancellationToken,
     tokio_util::sync::CancellationToken,
 };
 };
 
 
@@ -522,6 +528,7 @@ impl LeaderUpdater for ForwardAddressGetter {
 
 
 struct TpuClientNextClient {
 struct TpuClientNextClient {
     sender: mpsc::Sender<TransactionBatch>,
     sender: mpsc::Sender<TransactionBatch>,
+    update_certificate_sender: watch::Sender<Option<StakeIdentity>>,
 }
 }
 
 
 const METRICS_REPORTING_INTERVAL: Duration = Duration::from_secs(3);
 const METRICS_REPORTING_INTERVAL: Duration = Duration::from_secs(3);
@@ -539,16 +546,24 @@ impl TpuClientNextClient {
         let leader_updater = forward_address_getter.clone();
         let leader_updater = forward_address_getter.clone();
 
 
         let config = Self::create_config(bind_socket, stake_identity);
         let config = Self::create_config(bind_socket, stake_identity);
-        let scheduler: ConnectionWorkersScheduler =
-            ConnectionWorkersScheduler::new(Box::new(leader_updater), receiver);
+        let (update_certificate_sender, update_certificate_receiver) = watch::channel(None);
+        let scheduler: ConnectionWorkersScheduler = ConnectionWorkersScheduler::new(
+            Box::new(leader_updater),
+            receiver,
+            update_certificate_receiver,
+            cancel.clone(),
+        );
         // leaking handle to this task, as it will run until the cancel signal is received
         // leaking handle to this task, as it will run until the cancel signal is received
         runtime_handle.spawn(scheduler.get_stats().report_to_influxdb(
         runtime_handle.spawn(scheduler.get_stats().report_to_influxdb(
             "forwarding-stage-tpu-client",
             "forwarding-stage-tpu-client",
             METRICS_REPORTING_INTERVAL,
             METRICS_REPORTING_INTERVAL,
             cancel.clone(),
             cancel.clone(),
         ));
         ));
-        let _handle = runtime_handle.spawn(scheduler.run(config, cancel.clone()));
-        Self { sender }
+        let _handle = runtime_handle.spawn(scheduler.run(config));
+        Self {
+            sender,
+            update_certificate_sender,
+        }
     }
     }
 
 
     fn create_config(
     fn create_config(
@@ -557,7 +572,7 @@ impl TpuClientNextClient {
     ) -> ConnectionWorkersSchedulerConfig {
     ) -> ConnectionWorkersSchedulerConfig {
         ConnectionWorkersSchedulerConfig {
         ConnectionWorkersSchedulerConfig {
             bind: BindTarget::Socket(bind_socket),
             bind: BindTarget::Socket(bind_socket),
-            stake_identity: stake_identity.map(Into::into),
+            stake_identity: stake_identity.map(StakeIdentity::new),
             // Cache size of 128 covers all nodes above the P90 slot count threshold,
             // Cache size of 128 covers all nodes above the P90 slot count threshold,
             // which together account for ~75% of total slots in the epoch.
             // which together account for ~75% of total slots in the epoch.
             num_connections: 128,
             num_connections: 128,
@@ -585,6 +600,15 @@ impl ForwardingClient for TpuClientNextClient {
     }
     }
 }
 }
 
 
+impl NotifyKeyUpdate for TpuClientNextClient {
+    fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
+        let stake_identity = StakeIdentity::new(identity);
+        self.update_certificate_sender
+            .send(Some(stake_identity))
+            .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
+    }
+}
+
 /// Calculate priority for a transaction:
 /// Calculate priority for a transaction:
 ///
 ///
 /// The priority is calculated as:
 /// The priority is calculated as:

+ 1 - 1
rpc/src/rpc_service.rs

@@ -503,7 +503,7 @@ impl JsonRpcService {
                     config.send_transaction_service_config,
                     config.send_transaction_service_config,
                     config.max_slots,
                     config.max_slots,
                     config.leader_schedule_cache,
                     config.leader_schedule_cache,
-                    client.clone(),
+                    client,
                     config.max_complete_transaction_status_slot,
                     config.max_complete_transaction_status_slot,
                     config.max_complete_rewards_slot,
                     config.max_complete_rewards_slot,
                     config.prioritization_fee_cache,
                     config.prioritization_fee_cache,

+ 1 - 1
send-transaction-service/src/test_utils.rs

@@ -79,7 +79,7 @@ where
 
 
 impl Stoppable for TpuClientNextClient {
 impl Stoppable for TpuClientNextClient {
     fn stop(&self) {
     fn stop(&self) {
-        self.cancel().unwrap();
+        self.cancel();
     }
     }
 }
 }
 
 

+ 34 - 130
send-transaction-service/src/transaction_client.rs

@@ -1,17 +1,19 @@
 use {
 use {
     crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo},
     crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo},
     async_trait::async_trait,
     async_trait::async_trait,
-    log::{debug, error, warn},
+    log::warn,
     solana_client::connection_cache::{ConnectionCache, Protocol},
     solana_client::connection_cache::{ConnectionCache, Protocol},
     solana_connection_cache::client_connection::ClientConnection as TpuConnection,
     solana_connection_cache::client_connection::ClientConnection as TpuConnection,
     solana_keypair::Keypair,
     solana_keypair::Keypair,
     solana_measure::measure::Measure,
     solana_measure::measure::Measure,
     solana_sdk::quic::NotifyKeyUpdate,
     solana_sdk::quic::NotifyKeyUpdate,
     solana_tpu_client_next::{
     solana_tpu_client_next::{
-        connection_workers_scheduler::{BindTarget, ConnectionWorkersSchedulerConfig, Fanout},
+        connection_workers_scheduler::{
+            BindTarget, ConnectionWorkersSchedulerConfig, Fanout, StakeIdentity,
+        },
         leader_updater::LeaderUpdater,
         leader_updater::LeaderUpdater,
         transaction_batch::TransactionBatch,
         transaction_batch::TransactionBatch,
-        ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
+        ConnectionWorkersScheduler,
     },
     },
     std::{
     std::{
         net::{SocketAddr, UdpSocket},
         net::{SocketAddr, UdpSocket},
@@ -20,8 +22,10 @@ use {
     },
     },
     tokio::{
     tokio::{
         runtime::Handle,
         runtime::Handle,
-        sync::mpsc::{self},
-        task::JoinHandle as TokioJoinHandle,
+        sync::{
+            mpsc::{self},
+            watch,
+        },
     },
     },
     tokio_util::sync::CancellationToken,
     tokio_util::sync::CancellationToken,
 };
 };
@@ -43,8 +47,6 @@ pub trait TransactionClient {
 
 
     #[cfg(any(test, feature = "dev-context-only-utils"))]
     #[cfg(any(test, feature = "dev-context-only-utils"))]
     fn protocol(&self) -> Protocol;
     fn protocol(&self) -> Protocol;
-
-    fn exit(&self);
 }
 }
 
 
 pub struct ConnectionCacheClient<T: TpuInfoWithSendStatic> {
 pub struct ConnectionCacheClient<T: TpuInfoWithSendStatic> {
@@ -155,8 +157,6 @@ where
     fn protocol(&self) -> Protocol {
     fn protocol(&self) -> Protocol {
         self.connection_cache.protocol()
         self.connection_cache.protocol()
     }
     }
-
-    fn exit(&self) {}
 }
 }
 
 
 impl<T> NotifyKeyUpdate for ConnectionCacheClient<T>
 impl<T> NotifyKeyUpdate for ConnectionCacheClient<T>
@@ -227,37 +227,15 @@ where
 /// * Update the validator identity keypair and propagate the changes to the
 /// * Update the validator identity keypair and propagate the changes to the
 ///   scheduler. Most of the complexity of this structure arises from this
 ///   scheduler. Most of the complexity of this structure arises from this
 ///   functionality.
 ///   functionality.
+#[derive(Clone)]
 pub struct TpuClientNextClient {
 pub struct TpuClientNextClient {
     runtime_handle: Handle,
     runtime_handle: Handle,
     sender: mpsc::Sender<TransactionBatch>,
     sender: mpsc::Sender<TransactionBatch>,
-    bind_socket: UdpSocket,
-    // This handle is needed to implement `NotifyKeyUpdate` trait. It's only
-    // method takes &self and thus we need to wrap with Mutex.
-    join_and_cancel: Arc<Mutex<(Option<TpuClientJoinHandle>, CancellationToken)>>,
-    leader_forward_count: u64,
-}
-
-// Implement Clone manually because `UdpSocket` implements only `try_clone`.
-impl Clone for TpuClientNextClient {
-    fn clone(&self) -> Self {
-        let bind_socket = self
-            .bind_socket
-            .try_clone()
-            .expect("Cloning bind socket should always finish successfully.");
-
-        TpuClientNextClient {
-            runtime_handle: self.runtime_handle.clone(),
-            sender: self.sender.clone(),
-            bind_socket,
-            join_and_cancel: self.join_and_cancel.clone(),
-            leader_forward_count: self.leader_forward_count,
-        }
-    }
+    update_certificate_sender: watch::Sender<Option<StakeIdentity>>,
+    #[cfg(any(test, feature = "dev-context-only-utils"))]
+    cancel: CancellationToken,
 }
 }
 
 
-type TpuClientJoinHandle =
-    TokioJoinHandle<Result<ConnectionWorkersScheduler, ConnectionWorkersSchedulerError>>;
-
 const METRICS_REPORTING_INTERVAL: Duration = Duration::from_secs(3);
 const METRICS_REPORTING_INTERVAL: Duration = Duration::from_secs(3);
 impl TpuClientNextClient {
 impl TpuClientNextClient {
     pub fn new<T>(
     pub fn new<T>(
@@ -276,6 +254,8 @@ impl TpuClientNextClient {
         // 1000 tps, assuming batch size is 64.
         // 1000 tps, assuming batch size is 64.
         let (sender, receiver) = mpsc::channel(128);
         let (sender, receiver) = mpsc::channel(128);
 
 
+        let (update_certificate_sender, update_certificate_receiver) = watch::channel(None);
+
         let cancel = CancellationToken::new();
         let cancel = CancellationToken::new();
 
 
         let leader_info_provider = CurrentLeaderInfo::new(leader_info);
         let leader_info_provider = CurrentLeaderInfo::new(leader_info);
@@ -285,26 +265,27 @@ impl TpuClientNextClient {
                 my_tpu_address,
                 my_tpu_address,
                 tpu_peers,
                 tpu_peers,
             };
             };
-        let config = {
-            let bind_socket = bind_socket
-                .try_clone()
-                .expect("Cloning bind socket should always finish successfully.");
-            Self::create_config(bind_socket, identity, leader_forward_count as usize)
-        };
-        let scheduler = ConnectionWorkersScheduler::new(Box::new(leader_updater), receiver);
+        let config = Self::create_config(bind_socket, identity, leader_forward_count as usize);
+
+        let scheduler = ConnectionWorkersScheduler::new(
+            Box::new(leader_updater),
+            receiver,
+            update_certificate_receiver,
+            cancel.clone(),
+        );
         // leaking handle to this task, as it will run until the cancel signal is received
         // leaking handle to this task, as it will run until the cancel signal is received
         runtime_handle.spawn(scheduler.get_stats().report_to_influxdb(
         runtime_handle.spawn(scheduler.get_stats().report_to_influxdb(
             "send-transaction-service-TPU-client",
             "send-transaction-service-TPU-client",
             METRICS_REPORTING_INTERVAL,
             METRICS_REPORTING_INTERVAL,
             cancel.clone(),
             cancel.clone(),
         ));
         ));
-        let handle = runtime_handle.spawn(scheduler.run(config, cancel.clone()));
+        let _handle = runtime_handle.spawn(scheduler.run(config));
         Self {
         Self {
             runtime_handle,
             runtime_handle,
-            join_and_cancel: Arc::new(Mutex::new((Some(handle), cancel))),
             sender,
             sender,
-            leader_forward_count,
-            bind_socket,
+            update_certificate_sender,
+            #[cfg(any(test, feature = "dev-context-only-utils"))]
+            cancel,
         }
         }
     }
     }
 
 
@@ -315,7 +296,7 @@ impl TpuClientNextClient {
     ) -> ConnectionWorkersSchedulerConfig {
     ) -> ConnectionWorkersSchedulerConfig {
         ConnectionWorkersSchedulerConfig {
         ConnectionWorkersSchedulerConfig {
             bind: BindTarget::Socket(bind_socket),
             bind: BindTarget::Socket(bind_socket),
-            stake_identity: stake_identity.map(Into::into),
+            stake_identity: stake_identity.map(StakeIdentity::new),
             num_connections: MAX_CONNECTIONS,
             num_connections: MAX_CONNECTIONS,
             skip_check_transaction_age: true,
             skip_check_transaction_age: true,
             // experimentally found parameter values
             // experimentally found parameter values
@@ -329,69 +310,17 @@ impl TpuClientNextClient {
     }
     }
 
 
     #[cfg(any(test, feature = "dev-context-only-utils"))]
     #[cfg(any(test, feature = "dev-context-only-utils"))]
-    pub fn cancel(&self) -> Result<(), Box<dyn std::error::Error>> {
-        let Ok(lock) = self.join_and_cancel.lock() else {
-            return Err("Failed to stop scheduler.".into());
-        };
-        lock.1.cancel();
-        Ok(())
-    }
-
-    async fn do_update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
-        let runtime_handle = self.runtime_handle.clone();
-        let bind_socket = self
-            .bind_socket
-            .try_clone()
-            .expect("Cloning bind socket should always finish successfully.");
-        let config = Self::create_config(
-            bind_socket,
-            Some(identity),
-            self.leader_forward_count as usize,
-        );
-        let handle = self.join_and_cancel.clone();
-
-        let join_handle = {
-            let Ok(mut lock) = handle.lock() else {
-                return Err("TpuClientNext task panicked.".into());
-            };
-            let (handle, token) = std::mem::take(&mut *lock);
-            token.cancel();
-            handle
-        };
-
-        if let Some(join_handle) = join_handle {
-            let Ok(result) = join_handle.await else {
-                return Err("TpuClientNext task panicked.".into());
-            };
-
-            match result {
-                Ok(scheduler) => {
-                    let cancel = CancellationToken::new();
-                    // leaking handle to this task, as it will run until the cancel signal is received
-                    runtime_handle.spawn(scheduler.get_stats().report_to_influxdb(
-                        "send-transaction-service-TPU-client",
-                        METRICS_REPORTING_INTERVAL,
-                        cancel.clone(),
-                    ));
-                    let join_handle = runtime_handle.spawn(scheduler.run(config, cancel.clone()));
-
-                    let Ok(mut lock) = handle.lock() else {
-                        return Err("TpuClientNext task panicked.".into());
-                    };
-                    *lock = (Some(join_handle), cancel);
-                }
-                Err(error) => {
-                    return Err(Box::new(error));
-                }
-            }
-        }
-        Ok(())
+    pub fn cancel(&self) {
+        self.cancel.cancel();
     }
     }
 }
 }
 
 
 impl NotifyKeyUpdate for TpuClientNextClient {
 impl NotifyKeyUpdate for TpuClientNextClient {
     fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
     fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
-        self.runtime_handle.block_on(self.do_update_key(identity))
+        let stake_identity = StakeIdentity::new(identity);
+        self.update_certificate_sender
+            .send(Some(stake_identity))
+            .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
     }
     }
 }
 }
 
 
@@ -421,31 +350,6 @@ impl TransactionClient for TpuClientNextClient {
     fn protocol(&self) -> Protocol {
     fn protocol(&self) -> Protocol {
         Protocol::QUIC
         Protocol::QUIC
     }
     }
-
-    fn exit(&self) {
-        let Ok(mut lock) = self.join_and_cancel.lock() else {
-            error!("Failed to stop scheduler: TpuClientNext task panicked.");
-            return;
-        };
-        let (cancel, token) = std::mem::take(&mut *lock);
-        token.cancel();
-        let Some(handle) = cancel else {
-            error!("Client task handle was not set.");
-            return;
-        };
-        match self.runtime_handle.block_on(handle) {
-            Ok(result) => match result {
-                Ok(scheduler) => {
-                    debug!(
-                        "tpu-client-next statistics over all the connections: {:?}",
-                        scheduler.get_stats()
-                    );
-                }
-                Err(error) => error!("tpu-client-next exits with error {error}."),
-            },
-            Err(error) => error!("Failed to join task {error}."),
-        }
-    }
 }
 }
 
 
 #[derive(Clone)]
 #[derive(Clone)]

+ 0 - 1
tpu-client-next/src/connection_worker.rs

@@ -97,7 +97,6 @@ impl ConnectionWorker {
         send_txs_stats: Arc<SendTransactionStats>,
         send_txs_stats: Arc<SendTransactionStats>,
     ) -> (Self, CancellationToken) {
     ) -> (Self, CancellationToken) {
         let cancel = CancellationToken::new();
         let cancel = CancellationToken::new();
-
         let this = Self {
         let this = Self {
             endpoint,
             endpoint,
             peer,
             peer,

+ 77 - 43
tpu-client-next/src/connection_workers_scheduler.rs

@@ -9,19 +9,19 @@ use {
             create_client_config, create_client_endpoint, QuicClientCertificate, QuicError,
             create_client_config, create_client_endpoint, QuicClientCertificate, QuicError,
         },
         },
         transaction_batch::TransactionBatch,
         transaction_batch::TransactionBatch,
-        workers_cache::{maybe_shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError},
+        workers_cache::{shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError},
         SendTransactionStats,
         SendTransactionStats,
     },
     },
     async_trait::async_trait,
     async_trait::async_trait,
     log::*,
     log::*,
-    quinn::Endpoint,
+    quinn::{ClientConfig, Endpoint},
     solana_keypair::Keypair,
     solana_keypair::Keypair,
     std::{
     std::{
         net::{SocketAddr, UdpSocket},
         net::{SocketAddr, UdpSocket},
         sync::Arc,
         sync::Arc,
     },
     },
     thiserror::Error,
     thiserror::Error,
-    tokio::sync::mpsc,
+    tokio::sync::{mpsc, watch},
     tokio_util::sync::CancellationToken,
     tokio_util::sync::CancellationToken,
 };
 };
 pub type TransactionReceiver = mpsc::Receiver<TransactionBatch>;
 pub type TransactionReceiver = mpsc::Receiver<TransactionBatch>;
@@ -35,6 +35,8 @@ pub type TransactionReceiver = mpsc::Receiver<TransactionBatch>;
 pub struct ConnectionWorkersScheduler {
 pub struct ConnectionWorkersScheduler {
     leader_updater: Box<dyn LeaderUpdater>,
     leader_updater: Box<dyn LeaderUpdater>,
     transaction_receiver: TransactionReceiver,
     transaction_receiver: TransactionReceiver,
+    update_identity_receiver: watch::Receiver<Option<StakeIdentity>>,
+    cancel: CancellationToken,
     stats: Arc<SendTransactionStats>,
     stats: Arc<SendTransactionStats>,
 }
 }
 
 
@@ -114,15 +116,13 @@ pub enum BindTarget {
 /// consumed by [`ConnectionWorkersScheduler`] to create an endpoint.
 /// consumed by [`ConnectionWorkersScheduler`] to create an endpoint.
 pub struct StakeIdentity(QuicClientCertificate);
 pub struct StakeIdentity(QuicClientCertificate);
 
 
-impl From<Keypair> for StakeIdentity {
-    fn from(keypair: Keypair) -> Self {
-        Self(QuicClientCertificate::new(Some(&keypair)))
+impl StakeIdentity {
+    pub fn new(keypair: &Keypair) -> Self {
+        Self(QuicClientCertificate::new(Some(keypair)))
     }
     }
-}
 
 
-impl From<&Keypair> for StakeIdentity {
-    fn from(keypair: &Keypair) -> Self {
-        Self(QuicClientCertificate::new(Some(keypair)))
+    pub fn as_certificate(&self) -> &QuicClientCertificate {
+        &self.0
     }
     }
 }
 }
 
 
@@ -158,11 +158,15 @@ impl ConnectionWorkersScheduler {
     pub fn new(
     pub fn new(
         leader_updater: Box<dyn LeaderUpdater>,
         leader_updater: Box<dyn LeaderUpdater>,
         transaction_receiver: mpsc::Receiver<TransactionBatch>,
         transaction_receiver: mpsc::Receiver<TransactionBatch>,
+        update_identity_receiver: watch::Receiver<Option<StakeIdentity>>,
+        cancel: CancellationToken,
     ) -> Self {
     ) -> Self {
         let stats = Arc::new(SendTransactionStats::default());
         let stats = Arc::new(SendTransactionStats::default());
         Self {
         Self {
             leader_updater,
             leader_updater,
             transaction_receiver,
             transaction_receiver,
+            update_identity_receiver,
+            cancel,
             stats,
             stats,
         }
         }
     }
     }
@@ -184,9 +188,8 @@ impl ConnectionWorkersScheduler {
     pub async fn run(
     pub async fn run(
         self,
         self,
         config: ConnectionWorkersSchedulerConfig,
         config: ConnectionWorkersSchedulerConfig,
-        cancel: CancellationToken,
-    ) -> Result<Self, ConnectionWorkersSchedulerError> {
-        self.run_with_broadcaster::<NonblockingBroadcaster>(config, cancel)
+    ) -> Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError> {
+        self.run_with_broadcaster::<NonblockingBroadcaster>(config)
             .await
             .await
     }
     }
 
 
@@ -195,16 +198,12 @@ impl ConnectionWorkersScheduler {
     /// way transactions are send to the leaders, see [`WorkersBroadcaster`].
     /// way transactions are send to the leaders, see [`WorkersBroadcaster`].
     ///
     ///
     /// Runs the main loop that handles worker scheduling and management for
     /// Runs the main loop that handles worker scheduling and management for
-    /// connections. Returns the error quic statistics per connection address or
-    /// an error along with receiver for transactions. The receiver returned
-    /// back to the user because in some cases we need to re-utilize the same
-    /// receiver for the new scheduler. For example, this happens when the
-    /// identity for the validator is updated.
+    /// connections. Returns [`SendTransactionStats`] or an error.
     ///
     ///
     /// Importantly, if some transactions were not delivered due to network
     /// Importantly, if some transactions were not delivered due to network
     /// problems, they will not be retried when the problem is resolved.
     /// problems, they will not be retried when the problem is resolved.
     pub async fn run_with_broadcaster<Broadcaster: WorkersBroadcaster>(
     pub async fn run_with_broadcaster<Broadcaster: WorkersBroadcaster>(
-        mut self,
+        self,
         ConnectionWorkersSchedulerConfig {
         ConnectionWorkersSchedulerConfig {
             bind,
             bind,
             stake_identity,
             stake_identity,
@@ -214,30 +213,57 @@ impl ConnectionWorkersScheduler {
             max_reconnect_attempts,
             max_reconnect_attempts,
             leaders_fanout,
             leaders_fanout,
         }: ConnectionWorkersSchedulerConfig,
         }: ConnectionWorkersSchedulerConfig,
-        cancel: CancellationToken,
-    ) -> Result<Self, ConnectionWorkersSchedulerError> {
-        let endpoint = Self::setup_endpoint(bind, stake_identity)?;
+    ) -> Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError> {
+        let ConnectionWorkersScheduler {
+            mut leader_updater,
+            mut transaction_receiver,
+            mut update_identity_receiver,
+            cancel,
+            stats,
+        } = self;
+        let mut endpoint = setup_endpoint(bind, stake_identity)?;
+
         debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
         debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
         let mut workers = WorkersCache::new(num_connections, cancel.clone());
         let mut workers = WorkersCache::new(num_connections, cancel.clone());
 
 
         let mut last_error = None;
         let mut last_error = None;
+        // flag to ensure that the section handling
+        // `update_identity_receiver.changed()` is entered only once when the
+        // channel is dropped.
+        let mut identity_updater_is_active = true;
 
 
         loop {
         loop {
             let transaction_batch: TransactionBatch = tokio::select! {
             let transaction_batch: TransactionBatch = tokio::select! {
-                recv_res = self.transaction_receiver.recv() => match recv_res {
+                recv_res = transaction_receiver.recv() => match recv_res {
                     Some(txs) => txs,
                     Some(txs) => txs,
                     None => {
                     None => {
                         debug!("End of `transaction_receiver`: shutting down.");
                         debug!("End of `transaction_receiver`: shutting down.");
                         break;
                         break;
                     }
                     }
                 },
                 },
+                res = update_identity_receiver.changed(), if identity_updater_is_active => {
+                    let Ok(()) = res else {
+                        // Sender has been dropped; log and continue
+                        debug!("Certificate update channel closed; continuing without further updates.");
+                        identity_updater_is_active = false;
+                        continue;
+                    };
+
+                    let client_config = build_client_config(update_identity_receiver.borrow_and_update().as_ref());
+                    endpoint.set_default_client_config(client_config);
+                    // Flush workers since they are handling connections created
+                    // with outdated certificate.
+                    workers.flush();
+                    debug!("Updated certificate.");
+                    continue;
+                },
                 () = cancel.cancelled() => {
                 () = cancel.cancelled() => {
                     debug!("Cancelled: Shutting down");
                     debug!("Cancelled: Shutting down");
                     break;
                     break;
                 }
                 }
             };
             };
 
 
-            let connect_leaders = self.leader_updater.next_leaders(leaders_fanout.connect);
+            let connect_leaders = leader_updater.next_leaders(leaders_fanout.connect);
             let send_leaders = extract_send_leaders(&connect_leaders, leaders_fanout.send);
             let send_leaders = extract_send_leaders(&connect_leaders, leaders_fanout.send);
 
 
             // add future leaders to the cache to hide the latency of opening
             // add future leaders to the cache to hide the latency of opening
@@ -250,9 +276,11 @@ impl ConnectionWorkersScheduler {
                         worker_channel_size,
                         worker_channel_size,
                         skip_check_transaction_age,
                         skip_check_transaction_age,
                         max_reconnect_attempts,
                         max_reconnect_attempts,
-                        self.stats.clone(),
+                        stats.clone(),
                     );
                     );
-                    maybe_shutdown_worker(workers.push(peer, worker));
+                    if let Some(pop_worker) = workers.push(peer, worker) {
+                        shutdown_worker(pop_worker)
+                    }
                 }
                 }
             }
             }
 
 
@@ -267,25 +295,11 @@ impl ConnectionWorkersScheduler {
         workers.shutdown().await;
         workers.shutdown().await;
 
 
         endpoint.close(0u32.into(), b"Closing connection");
         endpoint.close(0u32.into(), b"Closing connection");
-        self.leader_updater.stop().await;
+        leader_updater.stop().await;
         if let Some(error) = last_error {
         if let Some(error) = last_error {
             return Err(error);
             return Err(error);
         }
         }
-        Ok(self)
-    }
-
-    /// Sets up the QUIC endpoint for the scheduler to handle connections.
-    fn setup_endpoint(
-        bind: BindTarget,
-        stake_identity: Option<StakeIdentity>,
-    ) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
-        let client_certificate = match stake_identity {
-            Some(identity) => identity.into(),
-            None => QuicClientCertificate::new(None),
-        };
-        let client_config = create_client_config(client_certificate);
-        let endpoint = create_client_endpoint(bind, client_config)?;
-        Ok(endpoint)
+        Ok(stats)
     }
     }
 
 
     /// Spawns a worker to handle communication with a given peer.
     /// Spawns a worker to handle communication with a given peer.
@@ -317,6 +331,24 @@ impl ConnectionWorkersScheduler {
     }
     }
 }
 }
 
 
+/// Sets up the QUIC endpoint for the scheduler to handle connections.
+fn setup_endpoint(
+    bind: BindTarget,
+    stake_identity: Option<StakeIdentity>,
+) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
+    let client_config = build_client_config(stake_identity.as_ref());
+    let endpoint = create_client_endpoint(bind, client_config)?;
+    Ok(endpoint)
+}
+
+fn build_client_config(stake_identity: Option<&StakeIdentity>) -> ClientConfig {
+    let client_certificate = match stake_identity {
+        Some(identity) => identity.as_certificate(),
+        None => &QuicClientCertificate::new(None),
+    };
+    create_client_config(client_certificate)
+}
+
 /// [`NonblockingBroadcaster`] attempts to immediately send transactions to all
 /// [`NonblockingBroadcaster`] attempts to immediately send transactions to all
 /// the workers. If worker cannot accept transactions because it's channel is
 /// the workers. If worker cannot accept transactions because it's channel is
 /// full, the transactions will not be sent to this worker.
 /// full, the transactions will not be sent to this worker.
@@ -344,7 +376,9 @@ impl WorkersBroadcaster for NonblockingBroadcaster {
                 }
                 }
                 Err(WorkersCacheError::ReceiverDropped) => {
                 Err(WorkersCacheError::ReceiverDropped) => {
                     // Remove the worker from the cache, if the peer has disconnected.
                     // Remove the worker from the cache, if the peer has disconnected.
-                    maybe_shutdown_worker(workers.pop(*new_leader));
+                    if let Some(pop_worker) = workers.pop(*new_leader) {
+                        shutdown_worker(pop_worker)
+                    }
                 }
                 }
                 Err(err) => {
                 Err(err) => {
                     warn!("Connection to {new_leader} was closed, worker error: {err}");
                     warn!("Connection to {new_leader} was closed, worker error: {err}");

+ 1 - 2
tpu-client-next/src/quic_networking.rs

@@ -19,8 +19,7 @@ pub use {
     solana_tls_utils::QuicClientCertificate,
     solana_tls_utils::QuicClientCertificate,
 };
 };
 
 
-pub(crate) fn create_client_config(client_certificate: QuicClientCertificate) -> ClientConfig {
-    // adapted from QuicLazyInitializedEndpoint::create_endpoint
+pub(crate) fn create_client_config(client_certificate: &QuicClientCertificate) -> ClientConfig {
     let mut crypto = tls_client_config_builder()
     let mut crypto = tls_client_config_builder()
         .with_client_auth_cert(
         .with_client_auth_cert(
             vec![client_certificate.certificate.clone()],
             vec![client_certificate.certificate.clone()],

+ 40 - 16
tpu-client-next/src/workers_cache.rs

@@ -10,7 +10,7 @@ use {
     thiserror::Error,
     thiserror::Error,
     tokio::{
     tokio::{
         sync::mpsc::{self, error::TrySendError},
         sync::mpsc::{self, error::TrySendError},
-        task::JoinHandle,
+        task::{JoinHandle, JoinSet},
     },
     },
     tokio_util::sync::CancellationToken,
     tokio_util::sync::CancellationToken,
 };
 };
@@ -101,6 +101,8 @@ impl WorkersCache {
         }
         }
     }
     }
 
 
+    /// Checks if the worker for a given peer exists and it hasn't been
+    /// cancelled.
     pub fn contains(&self, peer: &SocketAddr) -> bool {
     pub fn contains(&self, peer: &SocketAddr) -> bool {
         self.workers.contains(peer)
         self.workers.contains(peer)
     }
     }
@@ -161,10 +163,12 @@ impl WorkersCache {
                 "Failed to deliver transaction batch for leader {}, drop batch.",
                 "Failed to deliver transaction batch for leader {}, drop batch.",
                 peer.ip()
                 peer.ip()
             );
             );
-            maybe_shutdown_worker(workers.pop(peer).map(|current_worker| ShutdownWorker {
-                leader: *peer,
-                worker: current_worker,
-            }));
+            if let Some(current_worker) = workers.pop(peer) {
+                shutdown_worker(ShutdownWorker {
+                    leader: *peer,
+                    worker: current_worker,
+                })
+            }
         }
         }
 
 
         send_res
         send_res
@@ -195,10 +199,12 @@ impl WorkersCache {
             let send_res = current_worker.send_transactions(txs_batch).await;
             let send_res = current_worker.send_transactions(txs_batch).await;
             if let Err(WorkersCacheError::ReceiverDropped) = send_res {
             if let Err(WorkersCacheError::ReceiverDropped) = send_res {
                 // Remove the worker from the cache, if the peer has disconnected.
                 // Remove the worker from the cache, if the peer has disconnected.
-                maybe_shutdown_worker(workers.pop(peer).map(|current_worker| ShutdownWorker {
-                    leader: *peer,
-                    worker: current_worker,
-                }));
+                if let Some(current_worker) = workers.pop(peer) {
+                    shutdown_worker(ShutdownWorker {
+                        leader: *peer,
+                        worker: current_worker,
+                    })
+                }
             }
             }
 
 
             send_res
             send_res
@@ -210,16 +216,37 @@ impl WorkersCache {
             .unwrap_or(Err(WorkersCacheError::ShutdownError))
             .unwrap_or(Err(WorkersCacheError::ShutdownError))
     }
     }
 
 
+    /// Flushes the cache and asynchronously shuts down all workers. This method
+    /// doesn't wait for the completion of all the shutdown tasks.
+    pub(crate) fn flush(&mut self) {
+        while let Some((peer, current_worker)) = self.workers.pop_lru() {
+            shutdown_worker(ShutdownWorker {
+                leader: peer,
+                worker: current_worker,
+            });
+        }
+    }
+
     /// Closes and removes all workers in the cache. This is typically done when
     /// Closes and removes all workers in the cache. This is typically done when
     /// shutting down the system.
     /// shutting down the system.
+    ///
+    /// The method awaits the completion of all shutdown tasks, ensuring that
+    /// each worker is properly terminated.
     pub(crate) async fn shutdown(&mut self) {
     pub(crate) async fn shutdown(&mut self) {
         // Interrupt any outstanding `send_transactions()` calls.
         // Interrupt any outstanding `send_transactions()` calls.
         self.cancel.cancel();
         self.cancel.cancel();
 
 
-        while let Some((leader, worker)) = self.workers.pop_lru() {
-            let res = worker.shutdown().await;
+        let mut tasks = JoinSet::new();
+        while let Some((peer, current_worker)) = self.workers.pop_lru() {
+            let shutdown_worker = ShutdownWorker {
+                leader: peer,
+                worker: current_worker,
+            };
+            tasks.spawn(shutdown_worker.shutdown());
+        }
+        while let Some(res) = tasks.join_next().await {
             if let Err(err) = res {
             if let Err(err) = res {
-                debug!("Error while shutting down worker for {leader}: {err}");
+                debug!("A shutdown task failed: {}", err);
             }
             }
         }
         }
     }
     }
@@ -243,10 +270,7 @@ impl ShutdownWorker {
     }
     }
 }
 }
 
 
-pub fn maybe_shutdown_worker(worker: Option<ShutdownWorker>) {
-    let Some(worker) = worker else {
-        return;
-    };
+pub fn shutdown_worker(worker: ShutdownWorker) {
     tokio::spawn(async move {
     tokio::spawn(async move {
         let leader = worker.leader();
         let leader = worker.leader();
         let res = worker.shutdown().await;
         let res = worker.shutdown().await;

+ 101 - 19
tpu-client-next/tests/connection_workers_scheduler_test.rs

@@ -15,11 +15,13 @@ use {
         streamer::StakedNodes,
         streamer::StakedNodes,
     },
     },
     solana_tpu_client_next::{
     solana_tpu_client_next::{
-        connection_workers_scheduler::{BindTarget, ConnectionWorkersSchedulerConfig, Fanout},
+        connection_workers_scheduler::{
+            BindTarget, ConnectionWorkersSchedulerConfig, Fanout, StakeIdentity,
+        },
         leader_updater::create_leader_updater,
         leader_updater::create_leader_updater,
         send_transaction_stats::SendTransactionStatsNonAtomic,
         send_transaction_stats::SendTransactionStatsNonAtomic,
         transaction_batch::TransactionBatch,
         transaction_batch::TransactionBatch,
-        ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
+        ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats,
     },
     },
     std::{
     std::{
         collections::HashMap,
         collections::HashMap,
@@ -31,7 +33,7 @@ use {
     tokio::{
     tokio::{
         sync::{
         sync::{
             mpsc::{channel, Receiver},
             mpsc::{channel, Receiver},
-            oneshot,
+            oneshot, watch,
         },
         },
         task::JoinHandle,
         task::JoinHandle,
         time::{sleep, Instant},
         time::{sleep, Instant},
@@ -43,7 +45,7 @@ fn test_config(stake_identity: Option<Keypair>) -> ConnectionWorkersSchedulerCon
     let address = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0);
     let address = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0);
     ConnectionWorkersSchedulerConfig {
     ConnectionWorkersSchedulerConfig {
         bind: BindTarget::Address(address),
         bind: BindTarget::Address(address),
-        stake_identity: stake_identity.map(Into::into),
+        stake_identity: stake_identity.map(|identity| StakeIdentity::new(&identity)),
         num_connections: 1,
         num_connections: 1,
         skip_check_transaction_age: false,
         skip_check_transaction_age: false,
         // At the moment we have only one strategy to send transactions: we try
         // At the moment we have only one strategy to send transactions: we try
@@ -65,7 +67,8 @@ async fn setup_connection_worker_scheduler(
     transaction_receiver: Receiver<TransactionBatch>,
     transaction_receiver: Receiver<TransactionBatch>,
     stake_identity: Option<Keypair>,
     stake_identity: Option<Keypair>,
 ) -> (
 ) -> (
-    JoinHandle<Result<ConnectionWorkersScheduler, ConnectionWorkersSchedulerError>>,
+    JoinHandle<Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError>>,
+    watch::Sender<Option<StakeIdentity>>,
     CancellationToken,
     CancellationToken,
 ) {
 ) {
     let json_rpc_url = "http://127.0.0.1:8899";
     let json_rpc_url = "http://127.0.0.1:8899";
@@ -82,23 +85,29 @@ async fn setup_connection_worker_scheduler(
         .expect("Leader updates was successfully created");
         .expect("Leader updates was successfully created");
 
 
     let cancel = CancellationToken::new();
     let cancel = CancellationToken::new();
+    let (update_identity_sender, update_identity_receiver) = watch::channel(None);
+    let scheduler = ConnectionWorkersScheduler::new(
+        leader_updater,
+        transaction_receiver,
+        update_identity_receiver,
+        cancel.clone(),
+    );
     let config = test_config(stake_identity);
     let config = test_config(stake_identity);
-    let scheduler = ConnectionWorkersScheduler::new(leader_updater, transaction_receiver);
-    let scheduler = tokio::spawn(scheduler.run(config, cancel.clone()));
+    let scheduler = tokio::spawn(scheduler.run(config));
 
 
-    (scheduler, cancel)
+    (scheduler, update_identity_sender, cancel)
 }
 }
 
 
 async fn join_scheduler(
 async fn join_scheduler(
     scheduler_handle: JoinHandle<
     scheduler_handle: JoinHandle<
-        Result<ConnectionWorkersScheduler, ConnectionWorkersSchedulerError>,
+        Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError>,
     >,
     >,
 ) -> SendTransactionStatsNonAtomic {
 ) -> SendTransactionStatsNonAtomic {
-    let scheduler = scheduler_handle
+    let scheduler_stats = scheduler_handle
         .await
         .await
         .unwrap()
         .unwrap()
         .expect("Scheduler should stop successfully.");
         .expect("Scheduler should stop successfully.");
-    scheduler.get_stats().read_and_reset()
+    scheduler_stats.read_and_reset()
 }
 }
 
 
 // Specify the pessimistic time to finish generation and result checks.
 // Specify the pessimistic time to finish generation and result checks.
@@ -198,8 +207,10 @@ async fn test_basic_transactions_sending() {
         ..
         ..
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(10));
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(10));
 
 
-    let (scheduler_handle, _scheduler_cancel) =
+    let (scheduler_handle, update_identity_sender, _scheduler_cancel) =
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
+    // dropping sender will not lead to stop the scheduler.
+    drop(update_identity_sender);
 
 
     // Check results
     // Check results
     let mut received_data = Vec::with_capacity(expected_num_txs);
     let mut received_data = Vec::with_capacity(expected_num_txs);
@@ -295,7 +306,7 @@ async fn test_connection_denied_until_allowed() {
         ..
         ..
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
 
 
-    let (scheduler_handle, _scheduler_cancel) =
+    let (scheduler_handle, _update_identity_sender, _scheduler_cancel) =
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
 
 
     // Check results
     // Check results
@@ -354,7 +365,7 @@ async fn test_connection_pruned_and_reopened() {
         ..
         ..
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
 
 
-    let (scheduler_handle, _scheduler_cancel) =
+    let (scheduler_handle, _update_identity_sender, _scheduler_cancel) =
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
 
 
     sleep(Duration::from_millis(400)).await;
     sleep(Duration::from_millis(400)).await;
@@ -416,7 +427,7 @@ async fn test_staked_connection() {
         ..
         ..
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
 
 
-    let (scheduler_handle, _scheduler_cancel) =
+    let (scheduler_handle, _update_certificate_sender, _scheduler_cancel) =
         setup_connection_worker_scheduler(server_address, tx_receiver, Some(stake_identity)).await;
         setup_connection_worker_scheduler(server_address, tx_receiver, Some(stake_identity)).await;
 
 
     // Check results
     // Check results
@@ -462,7 +473,7 @@ async fn test_connection_throttling() {
         ..
         ..
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1));
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1));
 
 
-    let (scheduler_handle, _scheduler_cancel) =
+    let (scheduler_handle, _update_certificate_sender, _scheduler_cancel) =
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
 
 
     // Check results
     // Check results
@@ -504,7 +515,7 @@ async fn test_no_host() {
         ..
         ..
     } = spawn_tx_sender(tx_size, max_send_attempts, Duration::from_millis(10));
     } = spawn_tx_sender(tx_size, max_send_attempts, Duration::from_millis(10));
 
 
-    let (scheduler_handle, _scheduler_cancel) =
+    let (scheduler_handle, _update_certificate_sender, _scheduler_cancel) =
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
 
 
     // Wait for all the transactions to be sent, and some extra time for the delivery to be
     // Wait for all the transactions to be sent, and some extra time for the delivery to be
@@ -558,7 +569,7 @@ async fn test_rate_limiting() {
         ..
         ..
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
 
 
-    let (scheduler_handle, scheduler_cancel) =
+    let (scheduler_handle, _update_certificate_sender, scheduler_cancel) =
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
 
 
     let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
     let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
@@ -616,7 +627,7 @@ async fn test_rate_limiting_establish_connection() {
         ..
         ..
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1000));
     } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1000));
 
 
-    let (scheduler_handle, scheduler_cancel) =
+    let (scheduler_handle, _update_certificate_sender, scheduler_cancel) =
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
         setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
 
 
     let actual_num_packets =
     let actual_num_packets =
@@ -658,3 +669,74 @@ async fn test_rate_limiting_establish_connection() {
     exit.store(true, Ordering::Relaxed);
     exit.store(true, Ordering::Relaxed);
     server_handle.await.unwrap();
     server_handle.await.unwrap();
 }
 }
+
+// Check that identity is updated successfully using corresponding channel.
+//
+// Since the identity update and the transactions are sent concurrently to their channels
+// and scheduler selects randomly which channel to handle first, we cannot
+// guarantee in this test that the identity has been updated before we start
+// sending transactions. Hence, instead of checking that all the transactions
+// have been delivered, we check that at least some have been.
+#[tokio::test]
+async fn test_update_identity() {
+    let stake_identity = Keypair::new();
+    let stakes = HashMap::from([(stake_identity.pubkey(), 100_000)]);
+    let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::<Pubkey, u64>::default());
+
+    let SpawnTestServerResult {
+        join_handle: server_handle,
+        exit,
+        receiver,
+        server_address,
+        stats: _stats,
+    } = setup_quic_server(
+        Some(staked_nodes),
+        TestServerConfig {
+            // Must use at least the number of endpoints (10) because
+            // `max_staked_connections` and `max_unstaked_connections` are
+            // cumulative for all the endpoints.
+            max_staked_connections: 10,
+            // Deny all unstaked connections.
+            max_unstaked_connections: 0,
+            ..Default::default()
+        },
+    );
+
+    // Setup sending txs
+    let tx_size = 1;
+    let num_txs: usize = 100;
+    let SpawnTxGenerator {
+        tx_receiver,
+        tx_sender_shutdown,
+        ..
+    } = spawn_tx_sender(tx_size, num_txs, Duration::from_millis(50));
+
+    let (scheduler_handle, update_identity_sender, scheduler_cancel) =
+        setup_connection_worker_scheduler(
+            server_address,
+            tx_receiver,
+            // Create scheduler with unstaked identity.
+            None,
+        )
+        .await;
+    // Update identity.
+    update_identity_sender
+        .send(Some(StakeIdentity::new(&stake_identity)))
+        .unwrap();
+
+    let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
+    assert!(actual_num_packets > 0);
+
+    // Stop the sender.
+    tx_sender_shutdown.await;
+
+    // And the scheduler.
+    scheduler_cancel.cancel();
+
+    let stats = join_scheduler(scheduler_handle).await;
+    assert!(stats.successfully_sent > 0);
+
+    // Exit server
+    exit.store(true, Ordering::Relaxed);
+    server_handle.await.unwrap();
+}