Ver código fonte

Support simple QOS -- with QOS trait refactoring (#8437)

* simple QOS streams/s per peer

minimum one stream per throttle window for simple QOS

WIP streamer_support_qos_as_trait

WIP refactoring QUIC streamer

cleaning up code

WIP refactoring

continued refactoring

continued refactoring

clean up code

fmt code

Fixed some comp issues

Fixed some comp issues

Ignore manual async warning as it is intentional

Moved more swqos functions out

* Handle rebase errors

* Clean up code

* Clean up code

* Missing update connection stats

* cleanup code

* Refactor out qos.rs

* Clean up connection context interface

* Clean up CancellationToken usage

* rename to try_cache_connection

* simplify interface on try_cache_connection, removed last_update from return

* Update comments

* Vote using simple QOS

* Supporting using simple QOS

* Dedup code

* added unit tests for simple QOS

added unit tests for simple QOS

make test exact

rename test name

* Correct unit test for simple qos

* Addressed some feedback from Alex

* Handle some merge conflicts

* Fixed a merge conflict issue

* Addressed some comments from Alex

* Addressed some comments from Alex, simplify QOS interface

* decouple QosConfig from Server Config to avoid conflicting parameters related to QOS

* unit tests for simple QOS -- part 1

* unit tests for simple QOS -- part 2

* clippy issue

* clean up code

* resolved a unit test failure due to merge conflicts

* removed a println

* resolved some merge conflicts

* Applying a update from Alex -- do not allow blanket allow deprecated

* Clippy issue
Lijun Wang 3 semanas atrás
pai
commit
f2ec733e1a

+ 5 - 2
bench-vote/src/main.rs

@@ -16,9 +16,10 @@ use {
     solana_pubkey::Pubkey,
     solana_signer::Signer,
     solana_streamer::{
+        nonblocking::swqos::SwQosConfig,
         packet::PacketBatchRecycler,
         quic::{
-            spawn_server_with_cancel, QuicServerParams, DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER,
+            spawn_server_with_cancel, QuicStreamerConfig, DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER,
             DEFAULT_MAX_STAKED_CONNECTIONS,
         },
         streamer::{receiver, PacketBatchReceiver, StakedNodes, StreamerReceiveStats},
@@ -262,7 +263,7 @@ fn main() -> Result<()> {
         let stats = Arc::new(StreamerReceiveStats::new("bench-vote-test"));
 
         if let Some(quic_params) = &quic_params {
-            let quic_server_params = QuicServerParams {
+            let quic_server_params = QuicStreamerConfig {
                 max_connections_per_ipaddr_per_min: max_connections_per_ipaddr_per_min
                     .try_into()
                     .unwrap(),
@@ -271,6 +272,7 @@ fn main() -> Result<()> {
                 max_unstaked_connections: 0,
                 ..Default::default()
             };
+            let qos_config = SwQosConfig::default();
             let (s_reader, r_reader) = unbounded();
             read_channels.push(r_reader);
 
@@ -282,6 +284,7 @@ fn main() -> Result<()> {
                 s_reader,
                 quic_params.staked_nodes.clone(),
                 quic_server_params,
+                qos_config,
                 cancel.clone(),
             )
             .unwrap();

+ 17 - 8
core/src/tpu.rs

@@ -49,7 +49,10 @@ use {
         vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
     },
     solana_streamer::{
-        quic::{spawn_server_with_cancel, QuicServerParams, SpawnServerResult},
+        quic::{
+            spawn_server_with_cancel, spawn_simple_qos_server_with_cancel,
+            SimpleQosQuicStreamerConfig, SpawnServerResult, SwQosQuicStreamerConfig,
+        },
         streamer::StakedNodes,
     },
     solana_turbine::{
@@ -96,6 +99,9 @@ impl SigVerifier {
     }
 }
 
+// Conservatively allow 20 TPS per validator.
+pub const MAX_VOTES_PER_SECOND: u64 = 20;
+
 pub struct Tpu {
     fetch_stage: FetchStage,
     sig_verifier: SigVerifier,
@@ -146,9 +152,9 @@ impl Tpu {
         banking_tracer_channels: Channels,
         tracer_thread_hdl: TracerThread,
         tpu_enable_udp: bool,
-        tpu_quic_server_config: QuicServerParams,
-        tpu_fwd_quic_server_config: QuicServerParams,
-        vote_quic_server_config: QuicServerParams,
+        tpu_quic_server_config: SwQosQuicStreamerConfig,
+        tpu_fwd_quic_server_config: SwQosQuicStreamerConfig,
+        vote_quic_server_config: SimpleQosQuicStreamerConfig,
         prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
         block_production_method: BlockProductionMethod,
         block_production_num_workers: NonZeroUsize,
@@ -209,14 +215,15 @@ impl Tpu {
             endpoints: _,
             thread: tpu_vote_quic_t,
             key_updater: vote_streamer_key_updater,
-        } = spawn_server_with_cancel(
+        } = spawn_simple_qos_server_with_cancel(
             "solQuicTVo",
             "quic_streamer_tpu_vote",
             tpu_vote_quic_sockets,
             keypair,
             vote_packet_sender.clone(),
             staked_nodes.clone(),
-            vote_quic_server_config,
+            vote_quic_server_config.quic_streamer_config,
+            vote_quic_server_config.qos_config,
             cancel.clone(),
         )
         .unwrap();
@@ -234,7 +241,8 @@ impl Tpu {
                 keypair,
                 packet_sender,
                 staked_nodes.clone(),
-                tpu_quic_server_config,
+                tpu_quic_server_config.quic_streamer_config,
+                tpu_quic_server_config.qos_config,
                 cancel.clone(),
             )
             .unwrap();
@@ -256,7 +264,8 @@ impl Tpu {
                 keypair,
                 forwarded_packet_sender,
                 staked_nodes.clone(),
-                tpu_fwd_quic_server_config,
+                tpu_fwd_quic_server_config.quic_streamer_config,
+                tpu_fwd_quic_server_config.qos_config,
                 cancel,
             )
             .unwrap();

+ 33 - 14
core/src/validator.rs

@@ -134,7 +134,12 @@ use {
     solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig,
     solana_shred_version::compute_shred_version,
     solana_signer::Signer,
-    solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
+    solana_streamer::{
+        nonblocking::{simple_qos::SimpleQosConfig, swqos::SwQosConfig},
+        quic::{QuicStreamerConfig, SimpleQosQuicStreamerConfig, SwQosQuicStreamerConfig},
+        socket::SocketAddrSpace,
+        streamer::StakedNodes,
+    },
     solana_time_utils::timestamp,
     solana_tpu_client::tpu_client::{
         DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
@@ -562,32 +567,46 @@ pub struct ValidatorTpuConfig {
     /// Controls if to enable UDP for TPU tansactions.
     pub tpu_enable_udp: bool,
     /// QUIC server config for regular TPU
-    pub tpu_quic_server_config: QuicServerParams,
+    pub tpu_quic_server_config: SwQosQuicStreamerConfig,
     /// QUIC server config for TPU forward
-    pub tpu_fwd_quic_server_config: QuicServerParams,
+    pub tpu_fwd_quic_server_config: SwQosQuicStreamerConfig,
     /// QUIC server config for Vote
-    pub vote_quic_server_config: QuicServerParams,
+    pub vote_quic_server_config: SimpleQosQuicStreamerConfig,
 }
 
 impl ValidatorTpuConfig {
     /// A convenient function to build a ValidatorTpuConfig for testing with good
     /// default.
     pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
-        let tpu_quic_server_config = QuicServerParams {
-            max_connections_per_ipaddr_per_min: 32,
-            accumulator_channel_size: 100_000, // smaller channel size for faster test
-            ..Default::default()
+        let tpu_quic_server_config = SwQosQuicStreamerConfig {
+            quic_streamer_config: QuicStreamerConfig {
+                max_connections_per_ipaddr_per_min: 32,
+                accumulator_channel_size: 100_000, // smaller channel size for faster test
+                ..Default::default()
+            },
+            qos_config: SwQosConfig::default(),
         };
 
-        let tpu_fwd_quic_server_config = QuicServerParams {
-            max_connections_per_ipaddr_per_min: 32,
-            max_unstaked_connections: 0,
-            accumulator_channel_size: 100_000, // smaller channel size for faster test
-            ..Default::default()
+        let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
+            quic_streamer_config: QuicStreamerConfig {
+                max_connections_per_ipaddr_per_min: 32,
+                max_unstaked_connections: 0,
+                accumulator_channel_size: 100_000, // smaller channel size for faster test
+                ..Default::default()
+            },
+            qos_config: SwQosConfig::default(),
         };
 
         // vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
-        let vote_quic_server_config = tpu_fwd_quic_server_config.clone();
+        let vote_quic_server_config = SimpleQosQuicStreamerConfig {
+            quic_streamer_config: QuicStreamerConfig {
+                max_connections_per_ipaddr_per_min: 32,
+                max_unstaked_connections: 0,
+                accumulator_channel_size: 100_000, // smaller channel size for faster test
+                ..Default::default()
+            },
+            qos_config: SimpleQosConfig::default(),
+        };
 
         ValidatorTpuConfig {
             use_quic: DEFAULT_TPU_USE_QUIC,

+ 12 - 6
quic-client/tests/quic_client.rs

@@ -12,7 +12,8 @@ mod tests {
         solana_perf::packet::PacketBatch,
         solana_quic_client::nonblocking::quic_client::{QuicClient, QuicLazyInitializedEndpoint},
         solana_streamer::{
-            quic::{QuicServerParams, SpawnServerResult},
+            nonblocking::swqos::SwQosConfig,
+            quic::{QuicStreamerConfig, SpawnServerResult},
             streamer::StakedNodes,
         },
         solana_tls_utils::{new_dummy_x509_certificate, QuicClientCertificate},
@@ -80,7 +81,8 @@ mod tests {
             &keypair,
             sender,
             staked_nodes,
-            QuicServerParams::default_for_tests(),
+            QuicStreamerConfig::default_for_tests(),
+            SwQosConfig::default(),
             cancel.clone(),
         )
         .unwrap();
@@ -160,7 +162,8 @@ mod tests {
             &keypair,
             sender,
             staked_nodes,
-            QuicServerParams::default_for_tests(),
+            QuicStreamerConfig::default_for_tests(),
+            SwQosConfig::default(),
             cancel.clone(),
         )
         .unwrap();
@@ -218,7 +221,8 @@ mod tests {
             &keypair,
             sender,
             staked_nodes.clone(),
-            QuicServerParams::default_for_tests(),
+            QuicStreamerConfig::default_for_tests(),
+            SwQosConfig::default(),
             request_recv_cancel.clone(),
         )
         .unwrap();
@@ -242,7 +246,8 @@ mod tests {
             &keypair2,
             sender2,
             staked_nodes,
-            QuicServerParams::default_for_tests(),
+            QuicStreamerConfig::default_for_tests(),
+            SwQosConfig::default(),
             response_recv_cancel.clone(),
         )
         .unwrap();
@@ -328,7 +333,8 @@ mod tests {
             &keypair,
             sender,
             staked_nodes,
-            QuicServerParams::default_for_tests(),
+            QuicStreamerConfig::default_for_tests(),
+            SwQosConfig::default(),
             cancel.clone(),
         )
         .unwrap();

+ 5 - 3
streamer/examples/swqos.rs

@@ -15,7 +15,8 @@ use {
     solana_net_utils::sockets::{bind_to_with_config, SocketConfiguration},
     solana_pubkey::Pubkey,
     solana_streamer::{
-        nonblocking::quic::SpawnNonBlockingServerResult, quic::QuicServerParams,
+        nonblocking::{quic::SpawnNonBlockingServerResult, swqos::SwQosConfig},
+        quic::QuicStreamerConfig,
         streamer::StakedNodes,
     },
     std::{
@@ -117,10 +118,11 @@ async fn main() -> anyhow::Result<()> {
         &keypair,
         sender,
         staked_nodes,
-        QuicServerParams {
+        QuicStreamerConfig {
             max_connections_per_peer: cli.max_connections_per_peer,
-            ..QuicServerParams::default()
+            ..QuicStreamerConfig::default()
         },
+        SwQosConfig::default(),
         cancel.clone(),
     )?;
     info!("Server listening on {}", socket.local_addr()?);

+ 3 - 0
streamer/src/nonblocking/mod.rs

@@ -1,8 +1,11 @@
 pub mod connection_rate_limiter;
+pub mod qos;
 pub mod quic;
 #[cfg(feature = "dev-context-only-utils")]
 pub mod recvmmsg;
 pub mod sendmmsg;
+pub mod simple_qos;
 mod stream_throttle;
+pub mod swqos;
 #[cfg(feature = "dev-context-only-utils")]
 pub mod testing_utilities;

+ 55 - 0
streamer/src/nonblocking/qos.rs

@@ -0,0 +1,55 @@
+use {
+    crate::nonblocking::quic::{ClientConnectionTracker, ConnectionPeerType},
+    quinn::Connection,
+    std::future::Future,
+    tokio_util::sync::CancellationToken,
+};
+
+/// A trait to provide context about a connection, such as peer type,
+/// remote pubkey. This is opaque to the framework and is provided by
+/// the concrete implementation of QosController.
+pub(crate) trait ConnectionContext: Clone + Send + Sync {
+    fn peer_type(&self) -> ConnectionPeerType;
+    fn remote_pubkey(&self) -> Option<solana_pubkey::Pubkey>;
+}
+
+/// A trait to manage QoS for connections. This includes
+/// 1) deriving the ConnectionContext for a connection
+/// 2) managing connection caching and connection limits, stream limits
+pub(crate) trait QosController<C: ConnectionContext> {
+    /// Build the ConnectionContext for a connection
+    fn build_connection_context(&self, connection: &Connection) -> C;
+
+    /// Try to add a new connection to the connection table. This is an async operation that
+    /// returns a Future. If successful, the Future resolves to Some containing a CancellationToken
+    /// and a ConnectionStreamCounter to track the streams created on this connection.
+    /// Otherwise, the Future resolves to None.
+    fn try_add_connection(
+        &self,
+        client_connection_tracker: ClientConnectionTracker,
+        connection: &quinn::Connection,
+        context: &mut C,
+    ) -> impl Future<Output = Option<CancellationToken>> + Send;
+
+    /// Called when a new stream is received on a connection
+    fn on_new_stream(&self, context: &C) -> impl Future<Output = ()> + Send;
+
+    /// Called when a stream is accepted on a connection
+    fn on_stream_accepted(&self, context: &C);
+
+    /// Called when a stream is finished successfully
+    fn on_stream_finished(&self, context: &C);
+
+    /// Called when a stream has an error
+    fn on_stream_error(&self, context: &C);
+
+    /// Called when a stream is closed
+    fn on_stream_closed(&self, context: &C);
+
+    /// Remove a connection. Return the number of open connections after removal.
+    fn remove_connection(
+        &self,
+        context: &C,
+        connection: Connection,
+    ) -> impl Future<Output = usize> + Send;
+}

Diferenças do arquivo suprimidas por serem muito extensas
+ 186 - 493
streamer/src/nonblocking/quic.rs


+ 1122 - 0
streamer/src/nonblocking/simple_qos.rs

@@ -0,0 +1,1122 @@
+use {
+    crate::{
+        nonblocking::{
+            qos::{ConnectionContext, QosController},
+            quic::{
+                get_connection_stake, update_open_connections_stat, ClientConnectionTracker,
+                ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey,
+                ConnectionTableType,
+            },
+            stream_throttle::{
+                throttle_stream, ConnectionStreamCounter, STREAM_THROTTLING_INTERVAL,
+            },
+        },
+        quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS},
+        streamer::StakedNodes,
+    },
+    quinn::Connection,
+    solana_time_utils as timing,
+    std::{
+        future::Future,
+        sync::{
+            atomic::{AtomicU64, Ordering},
+            Arc, RwLock,
+        },
+    },
+    tokio::sync::{Mutex, MutexGuard},
+    tokio_util::sync::CancellationToken,
+};
+
+#[derive(Clone)]
+pub struct SimpleQosConfig {
+    pub max_streams_per_second: u64,
+}
+
+impl Default for SimpleQosConfig {
+    fn default() -> Self {
+        SimpleQosConfig {
+            max_streams_per_second: DEFAULT_MAX_STREAMS_PER_MS * 1000,
+        }
+    }
+}
+
+pub struct SimpleQos {
+    max_streams_per_second: u64,
+    max_staked_connections: usize,
+    max_connections_per_peer: usize,
+    stats: Arc<StreamerStats>,
+    staked_connection_table: Arc<Mutex<ConnectionTable>>,
+    staked_nodes: Arc<RwLock<StakedNodes>>,
+}
+
+impl SimpleQos {
+    pub fn new(
+        qos_config: SimpleQosConfig,
+        max_connections_per_peer: usize,
+        max_staked_connections: usize,
+        stats: Arc<StreamerStats>,
+        staked_nodes: Arc<RwLock<StakedNodes>>,
+        cancel: CancellationToken,
+    ) -> Self {
+        Self {
+            max_streams_per_second: qos_config.max_streams_per_second,
+            max_connections_per_peer,
+            max_staked_connections,
+            stats,
+            staked_nodes,
+            staked_connection_table: Arc::new(Mutex::new(ConnectionTable::new(
+                ConnectionTableType::Staked,
+                cancel,
+            ))),
+        }
+    }
+
+    fn cache_new_connection(
+        &self,
+        client_connection_tracker: ClientConnectionTracker,
+        connection: &Connection,
+        mut connection_table_l: MutexGuard<ConnectionTable>,
+        conn_context: &SimpleQosConnectionContext,
+    ) -> Result<
+        (
+            Arc<AtomicU64>,
+            CancellationToken,
+            Arc<ConnectionStreamCounter>,
+        ),
+        ConnectionHandlerError,
+    > {
+        let remote_addr = connection.remote_address();
+
+        debug!(
+            "Peer type {:?}, from peer {}",
+            conn_context.peer_type(),
+            remote_addr,
+        );
+
+        if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l
+            .try_add_connection(
+                ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey),
+                remote_addr.port(),
+                client_connection_tracker,
+                Some(connection.clone()),
+                conn_context.peer_type(),
+                conn_context.last_update.clone(),
+                self.max_connections_per_peer,
+            )
+        {
+            update_open_connections_stat(&self.stats, &connection_table_l);
+            drop(connection_table_l);
+
+            Ok((last_update, cancel_connection, stream_counter))
+        } else {
+            self.stats
+                .connection_add_failed
+                .fetch_add(1, Ordering::Relaxed);
+            Err(ConnectionHandlerError::ConnectionAddError)
+        }
+    }
+
+    fn max_streams_per_throttling_interval(&self, _context: &SimpleQosConnectionContext) -> u64 {
+        let interval_ms = STREAM_THROTTLING_INTERVAL.as_millis() as u64;
+        (self.max_streams_per_second * interval_ms / 1000).max(1)
+    }
+}
+
+#[derive(Clone)]
+pub struct SimpleQosConnectionContext {
+    peer_type: ConnectionPeerType,
+    remote_pubkey: Option<solana_pubkey::Pubkey>,
+    remote_address: std::net::SocketAddr,
+    last_update: Arc<AtomicU64>,
+    stream_counter: Option<Arc<ConnectionStreamCounter>>,
+}
+
+impl ConnectionContext for SimpleQosConnectionContext {
+    fn peer_type(&self) -> ConnectionPeerType {
+        self.peer_type
+    }
+
+    fn remote_pubkey(&self) -> Option<solana_pubkey::Pubkey> {
+        self.remote_pubkey
+    }
+}
+
+impl QosController<SimpleQosConnectionContext> for SimpleQos {
+    fn build_connection_context(&self, connection: &Connection) -> SimpleQosConnectionContext {
+        let (peer_type, remote_pubkey, _total_stake) =
+            get_connection_stake(connection, &self.staked_nodes).map_or(
+                (ConnectionPeerType::Unstaked, None, 0),
+                |(pubkey, stake, total_stake, _max_stake, _min_stake)| {
+                    (ConnectionPeerType::Staked(stake), Some(pubkey), total_stake)
+                },
+            );
+
+        SimpleQosConnectionContext {
+            peer_type,
+            remote_pubkey,
+            remote_address: connection.remote_address(),
+            last_update: Arc::new(AtomicU64::new(timing::timestamp())),
+            stream_counter: None,
+        }
+    }
+
+    #[allow(clippy::manual_async_fn)]
+    fn try_add_connection(
+        &self,
+        client_connection_tracker: ClientConnectionTracker,
+        connection: &quinn::Connection,
+        conn_context: &mut SimpleQosConnectionContext,
+    ) -> impl Future<Output = Option<CancellationToken>> + Send {
+        async move {
+            const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2;
+            match conn_context.peer_type() {
+                ConnectionPeerType::Staked(stake) => {
+                    let mut connection_table_l = self.staked_connection_table.lock().await;
+
+                    if connection_table_l.total_size >= self.max_staked_connections {
+                        let num_pruned =
+                            connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake);
+
+                        debug!(
+                            "Pruned {} staked connections to make room for new staked connection \
+                             from {}",
+                            num_pruned,
+                            connection.remote_address(),
+                        );
+                        self.stats
+                            .num_evictions_staked
+                            .fetch_add(num_pruned, Ordering::Relaxed);
+                        update_open_connections_stat(&self.stats, &connection_table_l);
+                    }
+
+                    if connection_table_l.total_size < self.max_staked_connections {
+                        if let Ok((last_update, cancel_connection, stream_counter)) = self
+                            .cache_new_connection(
+                                client_connection_tracker,
+                                connection,
+                                connection_table_l,
+                                conn_context,
+                            )
+                        {
+                            self.stats
+                                .connection_added_from_staked_peer
+                                .fetch_add(1, Ordering::Relaxed);
+                            conn_context.last_update = last_update;
+                            conn_context.stream_counter = Some(stream_counter);
+                            return Some(cancel_connection);
+                        }
+                    }
+                    None
+                }
+                ConnectionPeerType::Unstaked => None,
+            }
+        }
+    }
+
+    fn on_stream_accepted(&self, conn_context: &SimpleQosConnectionContext) {
+        conn_context
+            .stream_counter
+            .as_ref()
+            .unwrap()
+            .stream_count
+            .fetch_add(1, Ordering::Relaxed);
+    }
+
+    fn on_stream_error(&self, _conn_context: &SimpleQosConnectionContext) {}
+
+    fn on_stream_closed(&self, _conn_context: &SimpleQosConnectionContext) {}
+
+    #[allow(clippy::manual_async_fn)]
+    fn remove_connection(
+        &self,
+        conn_context: &SimpleQosConnectionContext,
+        connection: Connection,
+    ) -> impl Future<Output = usize> + Send {
+        async move {
+            let stable_id = connection.stable_id();
+            let remote_addr = connection.remote_address();
+
+            let mut connection_table = self.staked_connection_table.lock().await;
+            let removed_connection_count = connection_table.remove_connection(
+                ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey()),
+                remote_addr.port(),
+                stable_id,
+            );
+            update_open_connections_stat(&self.stats, &connection_table);
+            removed_connection_count
+        }
+    }
+
+    fn on_stream_finished(&self, context: &SimpleQosConnectionContext) {
+        context
+            .last_update
+            .store(timing::timestamp(), Ordering::Relaxed);
+    }
+
+    #[allow(clippy::manual_async_fn)]
+    fn on_new_stream(
+        &self,
+        context: &SimpleQosConnectionContext,
+    ) -> impl Future<Output = ()> + Send {
+        async move {
+            let peer_type = context.peer_type();
+            let remote_addr = context.remote_address;
+            let stream_counter: &Arc<ConnectionStreamCounter> =
+                context.stream_counter.as_ref().unwrap();
+
+            let max_streams_per_throttling_interval =
+                self.max_streams_per_throttling_interval(context);
+
+            throttle_stream(
+                &self.stats,
+                peer_type,
+                remote_addr,
+                stream_counter,
+                max_streams_per_throttling_interval,
+            )
+            .await;
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {
+        super::*,
+        crate::{
+            nonblocking::{
+                quic::{ConnectionTable, ConnectionTableType},
+                testing_utilities::get_client_config,
+            },
+            quic::{configure_server, StreamerStats},
+            streamer::StakedNodes,
+        },
+        quinn::Endpoint,
+        solana_keypair::{Keypair, Signer},
+        solana_net_utils::sockets::bind_to_localhost_unique,
+        std::{
+            collections::HashMap,
+            sync::{
+                atomic::{AtomicU64, Ordering},
+                Arc, RwLock,
+            },
+        },
+        tokio_util::sync::CancellationToken,
+    };
+
+    async fn create_connection_with_keypairs(
+        server_keypair: &Keypair,
+        client_keypair: &Keypair,
+    ) -> (Connection, Endpoint, Endpoint) {
+        // Create server endpoint
+        let (server_config, _) = configure_server(server_keypair).unwrap();
+        let server_socket = bind_to_localhost_unique().expect("should bind - server");
+        let server_addr = server_socket.local_addr().unwrap();
+        let server_endpoint = Endpoint::new(
+            quinn::EndpointConfig::default(),
+            Some(server_config),
+            server_socket,
+            Arc::new(quinn::TokioRuntime),
+        )
+        .unwrap();
+
+        // Create client endpoint
+        let client_socket = bind_to_localhost_unique().expect("should bind - client");
+        let mut client_endpoint = Endpoint::new(
+            quinn::EndpointConfig::default(),
+            None,
+            client_socket,
+            Arc::new(quinn::TokioRuntime),
+        )
+        .unwrap();
+
+        let client_config = get_client_config(client_keypair);
+        client_endpoint.set_default_client_config(client_config);
+
+        // Accept connection on server side
+        let server_connection_future = async {
+            let incoming = server_endpoint.accept().await.unwrap();
+            incoming.await.unwrap()
+        };
+
+        // Connect from client side
+        let client_connect_future = client_endpoint.connect(server_addr, "localhost").unwrap();
+
+        // Wait for both to complete - we want the server-side connection
+        let (server_connection, client_connection) =
+            tokio::join!(server_connection_future, client_connect_future);
+
+        let _client_connection = client_connection.unwrap();
+
+        (server_connection, client_endpoint, server_endpoint)
+    }
+
+    async fn create_server_side_connection() -> (Connection, Endpoint, Endpoint) {
+        let server_keypair = Keypair::new();
+        let client_keypair = Keypair::new();
+        create_connection_with_keypairs(&server_keypair, &client_keypair).await
+    }
+
+    fn create_staked_nodes_with_keypairs(
+        server_keypair: &Keypair,
+        client_keypair: &Keypair,
+        stake_amount: u64,
+    ) -> Arc<RwLock<StakedNodes>> {
+        let mut stakes = HashMap::new();
+        stakes.insert(server_keypair.pubkey(), stake_amount);
+        stakes.insert(client_keypair.pubkey(), stake_amount);
+
+        let overrides: HashMap<solana_pubkey::Pubkey, u64> = HashMap::new();
+
+        Arc::new(RwLock::new(StakedNodes::new(Arc::new(stakes), overrides)))
+    }
+
+    #[tokio::test]
+    async fn test_cache_new_connection_success() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        let connection_table = ConnectionTable::new(ConnectionTableType::Staked, cancel);
+        let connection_table_guard = tokio::sync::Mutex::new(connection_table);
+        let connection_table_l = connection_table_guard.lock().await;
+
+        let client_tracker = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        // Create server-side accepted connection
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_server_side_connection().await;
+
+        // Create test connection context using the server-side connection
+        let remote_addr = server_connection.remote_address();
+        let conn_context = SimpleQosConnectionContext {
+            peer_type: ConnectionPeerType::Staked(1000),
+            remote_pubkey: Some(solana_pubkey::Pubkey::new_unique()),
+            remote_address: remote_addr,
+            last_update: Arc::new(AtomicU64::new(0)),
+            stream_counter: None,
+        };
+
+        // Test
+        let result = simple_qos.cache_new_connection(
+            client_tracker,
+            &server_connection, // Use server-side connection
+            connection_table_l,
+            &conn_context,
+        );
+
+        // Verify success
+        assert!(result.is_ok());
+        let (_last_update, cancel_token, stream_counter) = result.unwrap();
+        assert!(!cancel_token.is_cancelled());
+        assert_eq!(stream_counter.stream_count.load(Ordering::Relaxed), 0);
+    }
+
+    #[tokio::test]
+    async fn test_cache_new_connection_max_connections_reached() {
+        // Setup with connection limit of 1
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            1,   // max_connections_per_peer (set to 1 to trigger limit)
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        let mut connection_table =
+            ConnectionTable::new(ConnectionTableType::Staked, cancel.clone());
+
+        // Create first server-side connection and add it to reach the limit
+        let (connection1, _client_endpoint1, _server_endpoint1) =
+            create_server_side_connection().await;
+        let remote_addr = connection1.remote_address();
+        let key = ConnectionTableKey::new(remote_addr.ip(), None);
+
+        let client_tracker1 = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        // Add first connection to reach the limit
+        let _ = connection_table.try_add_connection(
+            key,
+            remote_addr.port(),
+            client_tracker1,
+            Some(connection1),
+            ConnectionPeerType::Staked(1000),
+            Arc::new(AtomicU64::new(0)),
+            1, // max_connections_per_peer
+        );
+
+        let connection_table_guard = tokio::sync::Mutex::new(connection_table);
+        let connection_table_l = connection_table_guard.lock().await;
+
+        // Try to add second connection (should fail)
+        let (connection2, _client_endpoint2, _server_endpoint2) =
+            create_server_side_connection().await;
+        let client_tracker2 = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        let conn_context = SimpleQosConnectionContext {
+            peer_type: ConnectionPeerType::Staked(1000),
+            remote_pubkey: None,
+            remote_address: remote_addr,
+            last_update: Arc::new(AtomicU64::new(0)),
+            stream_counter: None,
+        };
+
+        // Test
+        let result = simple_qos.cache_new_connection(
+            client_tracker2,
+            &connection2, // Use server-side connection
+            connection_table_l,
+            &conn_context,
+        );
+
+        // Verify failure due to connection limit
+        assert!(result.is_err());
+        assert!(matches!(
+            result.unwrap_err(),
+            ConnectionHandlerError::ConnectionAddError
+        ));
+
+        // Verify stats were updated
+        assert_eq!(stats.connection_add_failed.load(Ordering::Relaxed), 1);
+    }
+
+    #[tokio::test]
+    async fn test_cache_new_connection_updates_stats() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        let connection_table = ConnectionTable::new(ConnectionTableType::Staked, cancel);
+        let connection_table_guard = tokio::sync::Mutex::new(connection_table);
+        let connection_table_l = connection_table_guard.lock().await;
+
+        let client_tracker = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        // Create server-side accepted connection
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_server_side_connection().await;
+        let remote_addr = server_connection.remote_address();
+
+        let conn_context = SimpleQosConnectionContext {
+            peer_type: ConnectionPeerType::Staked(1000),
+            remote_pubkey: Some(solana_pubkey::Pubkey::new_unique()),
+            remote_address: remote_addr,
+            last_update: Arc::new(AtomicU64::new(0)),
+            stream_counter: None,
+        };
+
+        // Record initial stats
+        let initial_open_connections = stats.open_staked_connections.load(Ordering::Relaxed);
+
+        // Test
+        let result = simple_qos.cache_new_connection(
+            client_tracker,
+            &server_connection,
+            connection_table_l,
+            &conn_context,
+        );
+
+        if result.is_ok() {
+            // Verify stats were updated (open connections should increase)
+            assert!(
+                stats.open_staked_connections.load(Ordering::Relaxed) > initial_open_connections
+            );
+        }
+    }
+
+    #[tokio::test]
+    async fn test_build_connection_context_unstaked_peer() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        // Create server-side accepted connection
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_server_side_connection().await;
+
+        // Test - build connection context for unstaked peer
+        let context = simple_qos.build_connection_context(&server_connection);
+
+        // Verify unstaked peer context
+        assert!(matches!(context.peer_type(), ConnectionPeerType::Unstaked));
+        assert_eq!(context.remote_pubkey(), None);
+        assert_eq!(context.remote_address, server_connection.remote_address());
+        assert!(context.last_update.load(Ordering::Relaxed) > 0); // Should have timestamp
+        assert!(context.stream_counter.is_none()); // Should be None initially
+    }
+
+    #[tokio::test]
+    async fn test_build_connection_context_staked_peer() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+
+        // Create keypairs for staked connection
+        let server_keypair = Keypair::new();
+        let client_keypair = Keypair::new();
+        let stake_amount = 50_000_000; // 50M lamports
+
+        // Create staked nodes with both keypairs
+        let staked_nodes =
+            create_staked_nodes_with_keypairs(&server_keypair, &client_keypair, stake_amount);
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        // Create connection using the staked keypairs
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_connection_with_keypairs(&server_keypair, &client_keypair).await;
+
+        // Test - build connection context for staked peer
+        let context = simple_qos.build_connection_context(&server_connection);
+
+        // Verify staked peer context
+        assert!(matches!(context.peer_type(), ConnectionPeerType::Staked(_)));
+        if let ConnectionPeerType::Staked(stake) = context.peer_type() {
+            assert_eq!(stake, stake_amount);
+        }
+        assert_eq!(context.remote_pubkey(), Some(client_keypair.pubkey()));
+        assert_eq!(context.remote_address, server_connection.remote_address());
+        assert!(context.last_update.load(Ordering::Relaxed) > 0); // Should have timestamp
+        assert!(context.stream_counter.is_none()); // Should be None initially
+    }
+
+    #[tokio::test]
+    async fn test_try_add_connection_staked_peer_success() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+
+        // Create keypairs for staked connection
+        let server_keypair = Keypair::new();
+        let client_keypair = Keypair::new();
+        let stake_amount = 50_000_000; // 50M lamports
+
+        // Create staked nodes with both keypairs
+        let staked_nodes =
+            create_staked_nodes_with_keypairs(&server_keypair, &client_keypair, stake_amount);
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        let client_tracker = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        // Create connection using the staked keypairs
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_connection_with_keypairs(&server_keypair, &client_keypair).await;
+
+        // Build connection context
+        let mut conn_context = simple_qos.build_connection_context(&server_connection);
+
+        // Test - try to add staked connection
+        let result = simple_qos
+            .try_add_connection(client_tracker, &server_connection, &mut conn_context)
+            .await;
+
+        // Verify successful connection addition
+        assert!(result.is_some());
+        let cancel_token = result.unwrap();
+        assert!(!cancel_token.is_cancelled());
+
+        // Verify context was updated with stream counter
+        assert!(conn_context.stream_counter.is_some());
+        assert_eq!(
+            conn_context
+                .stream_counter
+                .as_ref()
+                .unwrap()
+                .stream_count
+                .load(Ordering::Relaxed),
+            0
+        );
+
+        // Verify stats were updated
+        assert_eq!(
+            stats
+                .connection_added_from_staked_peer
+                .load(Ordering::Relaxed),
+            1
+        );
+    }
+
+    #[tokio::test]
+    async fn test_try_add_connection_unstaked_peer_rejected() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        let client_tracker = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        // Create unstaked connection
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_server_side_connection().await;
+
+        // Build connection context (will be unstaked)
+        let mut conn_context = simple_qos.build_connection_context(&server_connection);
+
+        // Test - try to add unstaked connection (should be rejected)
+        let result = simple_qos
+            .try_add_connection(client_tracker, &server_connection, &mut conn_context)
+            .await;
+
+        // Verify unstaked connection was rejected
+        assert!(result.is_none());
+
+        // Verify context stream counter was not set
+        assert!(conn_context.stream_counter.is_none());
+
+        // Verify no staked peer connection stats were incremented
+        assert_eq!(
+            stats
+                .connection_added_from_staked_peer
+                .load(Ordering::Relaxed),
+            0
+        );
+    }
+
+    #[tokio::test]
+    async fn test_try_add_connection_max_staked_connections_with_pruning() {
+        // Setup with very low max connections to trigger pruning
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+
+        // Create keypairs for staked connections
+        let server_keypair1 = Keypair::new();
+        let client_keypair1 = Keypair::new();
+        let server_keypair2 = Keypair::new();
+        let client_keypair2 = Keypair::new();
+        let stake_amount = 50_000_000; // 50M lamports
+
+        let mut stakes = HashMap::new();
+        stakes.insert(server_keypair1.pubkey(), stake_amount);
+        stakes.insert(client_keypair1.pubkey(), stake_amount);
+        stakes.insert(server_keypair2.pubkey(), stake_amount);
+        // client 2 has higher stake so that it can prune client 1
+        stakes.insert(client_keypair2.pubkey(), stake_amount * 2);
+
+        let overrides: HashMap<solana_pubkey::Pubkey, u64> = HashMap::new();
+        let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(Arc::new(stakes), overrides)));
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10, // max_connections_per_peer
+            1,  // max_staked_connections (set to 1 to trigger pruning)
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        // Add first connection to fill the table
+        let client_tracker1 = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        let (server_connection1, _client_endpoint1, _server_endpoint1) =
+            create_connection_with_keypairs(&server_keypair1, &client_keypair1).await;
+
+        let mut conn_context1 = simple_qos.build_connection_context(&server_connection1);
+
+        let result1 = simple_qos
+            .try_add_connection(client_tracker1, &server_connection1, &mut conn_context1)
+            .await;
+
+        assert!(result1.is_some()); // First connection should succeed
+
+        // Try to add second connection (should trigger pruning)
+        let client_tracker2 = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        let (server_connection2, _client_endpoint2, _server_endpoint2) =
+            create_connection_with_keypairs(&server_keypair2, &client_keypair2).await;
+
+        let mut conn_context2 = simple_qos.build_connection_context(&server_connection2);
+
+        let result2 = simple_qos
+            .try_add_connection(client_tracker2, &server_connection2, &mut conn_context2)
+            .await;
+
+        // Verify second connection succeeded (after pruning)
+        assert!(result2.is_some());
+
+        // Verify pruning stats were updated
+        assert!(stats.num_evictions_staked.load(Ordering::Relaxed) > 0);
+    }
+
+    #[tokio::test]
+    async fn test_try_add_connection_max_staked_connections_no_pruning_possible() {
+        // Setup with max connections = 1 and high stake that can't be pruned
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+
+        // Create keypairs with different stake amounts
+        let server_keypair1 = Keypair::new();
+        let client_keypair1 = Keypair::new();
+        let server_keypair2 = Keypair::new();
+        let client_keypair2 = Keypair::new();
+        let high_stake = 100_000_000; // 100M lamports
+        let low_stake = 10_000_000; // 10M lamports
+
+        let mut stakes = HashMap::new();
+        stakes.insert(server_keypair1.pubkey(), high_stake);
+        stakes.insert(client_keypair1.pubkey(), high_stake);
+        stakes.insert(server_keypair2.pubkey(), low_stake);
+        stakes.insert(client_keypair2.pubkey(), low_stake);
+
+        let overrides: HashMap<solana_pubkey::Pubkey, u64> = HashMap::new();
+        let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(Arc::new(stakes), overrides)));
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10, // max_connections_per_peer
+            1,  // max_staked_connections (set to 1)
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        // Add high-stake connection first
+        let client_tracker1 = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        let (server_connection1, _client_endpoint1, _server_endpoint1) =
+            create_connection_with_keypairs(&server_keypair1, &client_keypair1).await;
+
+        let mut conn_context1 = simple_qos.build_connection_context(&server_connection1);
+
+        let result1 = simple_qos
+            .try_add_connection(client_tracker1, &server_connection1, &mut conn_context1)
+            .await;
+
+        assert!(result1.is_some()); // First high-stake connection should succeed
+
+        // Try to add low-stake connection (should fail as it can't prune the high-stake one)
+        let client_tracker2 = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        let (server_connection2, _client_endpoint2, _server_endpoint2) =
+            create_connection_with_keypairs(&server_keypair2, &client_keypair2).await;
+
+        let mut conn_context2 = simple_qos.build_connection_context(&server_connection2);
+
+        let result2 = simple_qos
+            .try_add_connection(client_tracker2, &server_connection2, &mut conn_context2)
+            .await;
+
+        // Verify second connection failed (couldn't prune higher stake)
+        assert!(result2.is_none());
+
+        // Verify context was not updated
+        assert!(conn_context2.stream_counter.is_none());
+    }
+
+    #[tokio::test]
+    async fn test_try_add_connection_context_updates() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+
+        // Create keypairs for staked connection
+        let server_keypair = Keypair::new();
+        let client_keypair = Keypair::new();
+        let stake_amount = 50_000_000; // 50M lamports
+
+        let staked_nodes =
+            create_staked_nodes_with_keypairs(&server_keypair, &client_keypair, stake_amount);
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        let client_tracker = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_connection_with_keypairs(&server_keypair, &client_keypair).await;
+
+        let mut conn_context = simple_qos.build_connection_context(&server_connection);
+
+        // Record initial context state
+        let initial_last_update = conn_context.last_update.load(Ordering::Relaxed);
+        assert!(conn_context.stream_counter.is_none());
+
+        // Test - try to add connection
+        let result = simple_qos
+            .try_add_connection(client_tracker, &server_connection, &mut conn_context)
+            .await;
+
+        // Verify connection was added successfully
+        assert!(result.is_some());
+
+        // Verify context was properly updated
+        assert!(conn_context.stream_counter.is_some());
+
+        // Verify last_update was updated (should be same or newer)
+        let updated_last_update = conn_context.last_update.load(Ordering::Relaxed);
+        assert!(updated_last_update >= initial_last_update);
+
+        // Verify stream counter starts at 0
+        assert_eq!(
+            conn_context
+                .stream_counter
+                .as_ref()
+                .unwrap()
+                .stream_count
+                .load(Ordering::Relaxed),
+            0
+        );
+    }
+
+    #[tokio::test]
+    async fn test_on_stream_accepted_increments_counter() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+
+        // Create keypairs for staked connection
+        let server_keypair = Keypair::new();
+        let client_keypair = Keypair::new();
+        let stake_amount = 50_000_000; // 50M lamports
+
+        let staked_nodes =
+            create_staked_nodes_with_keypairs(&server_keypair, &client_keypair, stake_amount);
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        let client_tracker = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_connection_with_keypairs(&server_keypair, &client_keypair).await;
+
+        // Build connection context and add connection to initialize stream counter
+        let mut conn_context = simple_qos.build_connection_context(&server_connection);
+
+        let result = simple_qos
+            .try_add_connection(client_tracker, &server_connection, &mut conn_context)
+            .await;
+
+        assert!(result.is_some()); // Connection should be added successfully
+        assert!(conn_context.stream_counter.is_some()); // Stream counter should be set
+
+        // Record initial stream count
+        let initial_stream_count = conn_context
+            .stream_counter
+            .as_ref()
+            .unwrap()
+            .stream_count
+            .load(Ordering::Relaxed);
+        assert_eq!(initial_stream_count, 0);
+
+        // Test - call on_stream_accepted
+        simple_qos.on_stream_accepted(&conn_context);
+
+        // Verify stream count was incremented
+        let updated_stream_count = conn_context
+            .stream_counter
+            .as_ref()
+            .unwrap()
+            .stream_count
+            .load(Ordering::Relaxed);
+        assert_eq!(updated_stream_count, initial_stream_count + 1);
+    }
+
+    #[tokio::test]
+    async fn test_remove_connection_success() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+
+        // Create keypairs for staked connection
+        let server_keypair = Keypair::new();
+        let client_keypair = Keypair::new();
+        let stake_amount = 50_000_000; // 50M lamports
+
+        let staked_nodes =
+            create_staked_nodes_with_keypairs(&server_keypair, &client_keypair, stake_amount);
+
+        let simple_qos = SimpleQos::new(
+            SimpleQosConfig::default(),
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        let client_tracker = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_connection_with_keypairs(&server_keypair, &client_keypair).await;
+
+        // Build connection context and add connection first
+        let mut conn_context = simple_qos.build_connection_context(&server_connection);
+
+        let add_result = simple_qos
+            .try_add_connection(client_tracker, &server_connection, &mut conn_context)
+            .await;
+
+        assert!(add_result.is_some()); // Connection should be added successfully
+
+        // Record initial stats
+        let initial_open_connections = stats.open_staked_connections.load(Ordering::Relaxed);
+        assert!(initial_open_connections > 0); // Should have at least one connection
+
+        // Test - remove the connection
+        let removed_count = simple_qos
+            .remove_connection(&conn_context, server_connection.clone())
+            .await;
+
+        // Verify connection was removed
+        assert_eq!(removed_count, 1); // Should have removed exactly 1 connection
+
+        // Verify stats were updated (open connections should decrease)
+        let final_open_connections = stats.open_staked_connections.load(Ordering::Relaxed);
+        assert!(final_open_connections < initial_open_connections);
+        assert_eq!(final_open_connections, initial_open_connections - 1);
+    }
+
+    #[tokio::test]
+    async fn test_on_new_stream_throttles_correctly() {
+        // Setup
+        let cancel = CancellationToken::new();
+        let stats = Arc::new(StreamerStats::default());
+
+        // Create keypairs for staked connection
+        let server_keypair = Keypair::new();
+        let client_keypair = Keypair::new();
+        let stake_amount = 50_000_000; // 50M lamports
+
+        let staked_nodes =
+            create_staked_nodes_with_keypairs(&server_keypair, &client_keypair, stake_amount);
+
+        // Set a specific max_streams_per_second for testing
+        let qos_config = SimpleQosConfig {
+            max_streams_per_second: 10, // 10 streams per second
+        };
+
+        let simple_qos = SimpleQos::new(
+            qos_config,
+            10,  // max_connections_per_peer
+            100, // max_staked_connections
+            stats.clone(),
+            staked_nodes,
+            cancel.clone(),
+        );
+
+        let client_tracker = ClientConnectionTracker {
+            stats: stats.clone(),
+        };
+
+        let (server_connection, _client_endpoint, _server_endpoint) =
+            create_connection_with_keypairs(&server_keypair, &client_keypair).await;
+
+        // Build connection context and add connection to initialize stream counter
+        let mut conn_context = simple_qos.build_connection_context(&server_connection);
+
+        let result = simple_qos
+            .try_add_connection(client_tracker, &server_connection, &mut conn_context)
+            .await;
+
+        assert!(result.is_some()); // Connection should be added successfully
+        assert!(conn_context.stream_counter.is_some()); // Stream counter should be set
+
+        // Test - call on_new_stream and measure timing
+        let start_time = std::time::Instant::now();
+
+        simple_qos.on_new_stream(&conn_context).await;
+
+        let elapsed = start_time.elapsed();
+
+        // The function should complete (may or may not sleep depending on current throttling state)
+        // We just verify it doesn't panic and completes successfully
+        assert!(elapsed < std::time::Duration::from_secs(1)); // Should not take too long
+    }
+}

+ 49 - 2
streamer/src/nonblocking/stream_throttle.rs

@@ -9,6 +9,7 @@ use {
         },
         time::{Duration, Instant},
     },
+    tokio::time::sleep,
 };
 
 const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
@@ -31,6 +32,7 @@ pub(crate) struct StakedStreamLoadEMA {
     max_staked_load_in_ema_window: u64,
     // Maximum number of streams for an unstaked connection in stream throttling window
     max_unstaked_load_in_throttling_window: u64,
+    max_streams_per_ms: u64,
 }
 
 impl StakedStreamLoadEMA {
@@ -63,6 +65,7 @@ impl StakedStreamLoadEMA {
             stats,
             max_staked_load_in_ema_window,
             max_unstaked_load_in_throttling_window,
+            max_streams_per_ms,
         }
     }
 
@@ -181,16 +184,20 @@ impl StakedStreamLoadEMA {
             }
         }
     }
+
+    pub(crate) fn max_streams_per_ms(&self) -> u64 {
+        self.max_streams_per_ms
+    }
 }
 
 #[derive(Debug)]
-pub(crate) struct ConnectionStreamCounter {
+pub struct ConnectionStreamCounter {
     pub(crate) stream_count: AtomicU64,
     last_throttling_instant: RwLock<tokio::time::Instant>,
 }
 
 impl ConnectionStreamCounter {
-    pub(crate) fn new() -> Self {
+    pub fn new() -> Self {
         Self {
             stream_count: AtomicU64::default(),
             last_throttling_instant: RwLock::new(tokio::time::Instant::now()),
@@ -219,6 +226,46 @@ impl ConnectionStreamCounter {
     }
 }
 
+pub(crate) async fn throttle_stream(
+    stats: &StreamerStats,
+    peer_type: ConnectionPeerType,
+    remote_addr: std::net::SocketAddr,
+    stream_counter: &Arc<ConnectionStreamCounter>,
+    max_streams_per_throttling_interval: u64,
+) {
+    let throttle_interval_start = stream_counter.reset_throttling_params_if_needed();
+    let streams_read_in_throttle_interval = stream_counter.stream_count.load(Ordering::Relaxed);
+    if streams_read_in_throttle_interval >= max_streams_per_throttling_interval {
+        // The peer is sending faster than we're willing to read. Sleep for what's
+        // left of this read interval so the peer backs off.
+        let throttle_duration =
+            STREAM_THROTTLING_INTERVAL.saturating_sub(throttle_interval_start.elapsed());
+
+        if !throttle_duration.is_zero() {
+            debug!(
+                "Throttling stream from {remote_addr:?}, peer type: {peer_type:?}, \
+                 max_streams_per_interval: {max_streams_per_throttling_interval}, \
+                 read_interval_streams: {streams_read_in_throttle_interval} throttle_duration: \
+                 {throttle_duration:?}"
+            );
+            stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
+            match peer_type {
+                ConnectionPeerType::Unstaked => {
+                    stats
+                        .throttled_unstaked_streams
+                        .fetch_add(1, Ordering::Relaxed);
+                }
+                ConnectionPeerType::Staked(_) => {
+                    stats
+                        .throttled_staked_streams
+                        .fetch_add(1, Ordering::Relaxed);
+                }
+            }
+            sleep(throttle_duration).await;
+        }
+    }
+}
+
 #[cfg(test)]
 pub mod test {
     use {

+ 613 - 0
streamer/src/nonblocking/swqos.rs

@@ -0,0 +1,613 @@
+use {
+    crate::{
+        nonblocking::{
+            qos::{ConnectionContext, QosController},
+            quic::{
+                get_connection_stake, update_open_connections_stat, ClientConnectionTracker,
+                ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey,
+                ConnectionTableType, CONNECTION_CLOSE_CODE_DISALLOWED,
+                CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT, CONNECTION_CLOSE_REASON_DISALLOWED,
+                CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT,
+            },
+            stream_throttle::{
+                throttle_stream, ConnectionStreamCounter, StakedStreamLoadEMA,
+                STREAM_THROTTLING_INTERVAL_MS,
+            },
+        },
+        quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS},
+        streamer::StakedNodes,
+    },
+    percentage::Percentage,
+    quinn::{Connection, VarInt, VarIntBoundsExceeded},
+    solana_packet::PACKET_DATA_SIZE,
+    solana_quic_definitions::{
+        QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO,
+        QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
+        QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
+        QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
+    },
+    solana_time_utils as timing,
+    std::{
+        future::Future,
+        sync::{
+            atomic::{AtomicU64, Ordering},
+            Arc, RwLock,
+        },
+    },
+    tokio::sync::{Mutex, MutexGuard},
+    tokio_util::sync::CancellationToken,
+};
+
+#[derive(Clone)]
+pub struct SwQosConfig {
+    pub max_streams_per_ms: u64,
+}
+
+impl Default for SwQosConfig {
+    fn default() -> Self {
+        SwQosConfig {
+            max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
+        }
+    }
+}
+
+pub struct SwQos {
+    max_staked_connections: usize,
+    max_unstaked_connections: usize,
+    max_connections_per_peer: usize,
+    staked_stream_load_ema: Arc<StakedStreamLoadEMA>,
+    stats: Arc<StreamerStats>,
+    staked_nodes: Arc<RwLock<StakedNodes>>,
+    unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
+    staked_connection_table: Arc<Mutex<ConnectionTable>>,
+}
+
+// QoS Params for Stake weighted QoS
+#[derive(Clone)]
+pub struct SwQosConnectionContext {
+    peer_type: ConnectionPeerType,
+    max_stake: u64,
+    min_stake: u64,
+    remote_pubkey: Option<solana_pubkey::Pubkey>,
+    total_stake: u64,
+    in_staked_table: bool,
+    last_update: Arc<AtomicU64>,
+    remote_address: std::net::SocketAddr,
+    stream_counter: Option<Arc<ConnectionStreamCounter>>,
+}
+
+impl ConnectionContext for SwQosConnectionContext {
+    fn peer_type(&self) -> ConnectionPeerType {
+        self.peer_type
+    }
+
+    fn remote_pubkey(&self) -> Option<solana_pubkey::Pubkey> {
+        self.remote_pubkey
+    }
+}
+
+impl SwQos {
+    pub fn new(
+        qos_config: SwQosConfig,
+        max_staked_connections: usize,
+        max_unstaked_connections: usize,
+        max_connections_per_peer: usize,
+        stats: Arc<StreamerStats>,
+        staked_nodes: Arc<RwLock<StakedNodes>>,
+        cancel: CancellationToken,
+    ) -> Self {
+        Self {
+            max_staked_connections,
+            max_unstaked_connections,
+            max_connections_per_peer,
+            staked_stream_load_ema: Arc::new(StakedStreamLoadEMA::new(
+                stats.clone(),
+                max_unstaked_connections,
+                qos_config.max_streams_per_ms,
+            )),
+            stats,
+            staked_nodes,
+            unstaked_connection_table: Arc::new(Mutex::new(ConnectionTable::new(
+                ConnectionTableType::Unstaked,
+                cancel.clone(),
+            ))),
+            staked_connection_table: Arc::new(Mutex::new(ConnectionTable::new(
+                ConnectionTableType::Staked,
+                cancel,
+            ))),
+        }
+    }
+}
+
+/// Calculate the ratio for per connection receive window from a staked peer
+fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 {
+    // Testing shows the maximum throughput from a connection is achieved at receive_window =
+    // PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the
+    // stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to
+    // QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r'
+    // for stake 's' is,
+    // r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find
+    // a and b.
+
+    if stake > max_stake {
+        return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
+    }
+
+    let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
+    let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
+    if max_stake > min_stake {
+        let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
+        let b = max_ratio as f64 - ((max_stake as f64) * a);
+        let ratio = (a * stake as f64) + b;
+        ratio.round() as u64
+    } else {
+        QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
+    }
+}
+
+fn compute_recieve_window(
+    max_stake: u64,
+    min_stake: u64,
+    peer_type: ConnectionPeerType,
+) -> Result<VarInt, VarIntBoundsExceeded> {
+    match peer_type {
+        ConnectionPeerType::Unstaked => {
+            VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO)
+        }
+        ConnectionPeerType::Staked(peer_stake) => {
+            let ratio =
+                compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake);
+            VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio)
+        }
+    }
+}
+
+fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
+    match peer_type {
+        ConnectionPeerType::Staked(peer_stake) => {
+            // No checked math for f64 type. So let's explicitly check for 0 here
+            if total_stake == 0 || peer_stake > total_stake {
+                warn!(
+                    "Invalid stake values: peer_stake: {peer_stake:?}, total_stake: \
+                     {total_stake:?}"
+                );
+
+                QUIC_MIN_STAKED_CONCURRENT_STREAMS
+            } else {
+                let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS
+                    - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
+
+                (((peer_stake as f64 / total_stake as f64) * delta) as usize
+                    + QUIC_MIN_STAKED_CONCURRENT_STREAMS)
+                    .clamp(
+                        QUIC_MIN_STAKED_CONCURRENT_STREAMS,
+                        QUIC_MAX_STAKED_CONCURRENT_STREAMS,
+                    )
+            }
+        }
+        ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
+    }
+}
+
+impl SwQos {
+    fn cache_new_connection(
+        &self,
+        client_connection_tracker: ClientConnectionTracker,
+        connection: &Connection,
+        mut connection_table_l: MutexGuard<ConnectionTable>,
+        conn_context: &SwQosConnectionContext,
+    ) -> Result<
+        (
+            Arc<AtomicU64>,
+            CancellationToken,
+            Arc<ConnectionStreamCounter>,
+        ),
+        ConnectionHandlerError,
+    > {
+        if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
+            conn_context.peer_type(),
+            conn_context.total_stake,
+        ) as u64)
+        {
+            let remote_addr = connection.remote_address();
+            let receive_window = compute_recieve_window(
+                conn_context.max_stake,
+                conn_context.min_stake,
+                conn_context.peer_type(),
+            );
+
+            debug!(
+                "Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}",
+                conn_context.peer_type(),
+                conn_context.total_stake,
+                max_uni_streams.into_inner(),
+                receive_window,
+                remote_addr,
+            );
+
+            if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l
+                .try_add_connection(
+                    ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey),
+                    remote_addr.port(),
+                    client_connection_tracker,
+                    Some(connection.clone()),
+                    conn_context.peer_type(),
+                    conn_context.last_update.clone(),
+                    self.max_connections_per_peer,
+                )
+            {
+                update_open_connections_stat(&self.stats, &connection_table_l);
+                drop(connection_table_l);
+
+                if let Ok(receive_window) = receive_window {
+                    connection.set_receive_window(receive_window);
+                }
+                connection.set_max_concurrent_uni_streams(max_uni_streams);
+
+                Ok((last_update, cancel_connection, stream_counter))
+            } else {
+                self.stats
+                    .connection_add_failed
+                    .fetch_add(1, Ordering::Relaxed);
+                Err(ConnectionHandlerError::ConnectionAddError)
+            }
+        } else {
+            connection.close(
+                CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT.into(),
+                CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT,
+            );
+            self.stats
+                .connection_add_failed_invalid_stream_count
+                .fetch_add(1, Ordering::Relaxed);
+            Err(ConnectionHandlerError::MaxStreamError)
+        }
+    }
+
+    fn prune_unstaked_connection_table(
+        &self,
+        unstaked_connection_table: &mut ConnectionTable,
+        max_unstaked_connections: usize,
+        stats: Arc<StreamerStats>,
+    ) {
+        if unstaked_connection_table.total_size >= max_unstaked_connections {
+            const PRUNE_TABLE_TO_PERCENTAGE: u8 = 90;
+            let max_percentage_full = Percentage::from(PRUNE_TABLE_TO_PERCENTAGE);
+
+            let max_connections = max_percentage_full.apply_to(max_unstaked_connections);
+            let num_pruned = unstaked_connection_table.prune_oldest(max_connections);
+            stats
+                .num_evictions_unstaked
+                .fetch_add(num_pruned, Ordering::Relaxed);
+        }
+    }
+
+    async fn prune_unstaked_connections_and_add_new_connection(
+        &self,
+        client_connection_tracker: ClientConnectionTracker,
+        connection: &Connection,
+        connection_table: Arc<Mutex<ConnectionTable>>,
+        max_connections: usize,
+        conn_context: &SwQosConnectionContext,
+    ) -> Result<
+        (
+            Arc<AtomicU64>,
+            CancellationToken,
+            Arc<ConnectionStreamCounter>,
+        ),
+        ConnectionHandlerError,
+    > {
+        let stats = self.stats.clone();
+        if max_connections > 0 {
+            let mut connection_table = connection_table.lock().await;
+            self.prune_unstaked_connection_table(&mut connection_table, max_connections, stats);
+            self.cache_new_connection(
+                client_connection_tracker,
+                connection,
+                connection_table,
+                conn_context,
+            )
+        } else {
+            connection.close(
+                CONNECTION_CLOSE_CODE_DISALLOWED.into(),
+                CONNECTION_CLOSE_REASON_DISALLOWED,
+            );
+            Err(ConnectionHandlerError::ConnectionAddError)
+        }
+    }
+
+    fn max_streams_per_throttling_interval(&self, conn_context: &SwQosConnectionContext) -> u64 {
+        self.staked_stream_load_ema
+            .available_load_capacity_in_throttling_duration(
+                conn_context.peer_type,
+                conn_context.total_stake,
+            )
+    }
+}
+
+impl QosController<SwQosConnectionContext> for SwQos {
+    fn build_connection_context(&self, connection: &Connection) -> SwQosConnectionContext {
+        get_connection_stake(connection, &self.staked_nodes).map_or(
+            SwQosConnectionContext {
+                peer_type: ConnectionPeerType::Unstaked,
+                max_stake: 0,
+                min_stake: 0,
+                total_stake: 0,
+                remote_pubkey: None,
+                in_staked_table: false,
+                remote_address: connection.remote_address(),
+                stream_counter: None,
+                last_update: Arc::new(AtomicU64::new(timing::timestamp())),
+            },
+            |(pubkey, stake, total_stake, max_stake, min_stake)| {
+                // The heuristic is that the stake should be large enough to have 1 stream pass through within one throttle
+                // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams.
+
+                let peer_type = {
+                    let max_streams_per_ms = self.staked_stream_load_ema.max_streams_per_ms();
+                    let min_stake_ratio =
+                        1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64;
+                    let stake_ratio = stake as f64 / total_stake as f64;
+                    if stake_ratio < min_stake_ratio {
+                        // If it is a staked connection with ultra low stake ratio, treat it as unstaked.
+                        ConnectionPeerType::Unstaked
+                    } else {
+                        ConnectionPeerType::Staked(stake)
+                    }
+                };
+
+                SwQosConnectionContext {
+                    peer_type,
+                    max_stake,
+                    min_stake,
+                    total_stake,
+                    remote_pubkey: Some(pubkey),
+                    in_staked_table: false,
+                    remote_address: connection.remote_address(),
+                    last_update: Arc::new(AtomicU64::new(timing::timestamp())),
+                    stream_counter: None,
+                }
+            },
+        )
+    }
+
+    #[allow(clippy::manual_async_fn)]
+    fn try_add_connection(
+        &self,
+        client_connection_tracker: ClientConnectionTracker,
+        connection: &quinn::Connection,
+        conn_context: &mut SwQosConnectionContext,
+    ) -> impl Future<Output = Option<CancellationToken>> + Send {
+        async move {
+            const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2;
+
+            match conn_context.peer_type() {
+                ConnectionPeerType::Staked(stake) => {
+                    let mut connection_table_l = self.staked_connection_table.lock().await;
+
+                    if connection_table_l.total_size >= self.max_staked_connections {
+                        let num_pruned =
+                            connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake);
+                        self.stats
+                            .num_evictions_staked
+                            .fetch_add(num_pruned, Ordering::Relaxed);
+                        update_open_connections_stat(&self.stats, &connection_table_l);
+                    }
+
+                    if connection_table_l.total_size < self.max_staked_connections {
+                        if let Ok((last_update, cancel_connection, stream_counter)) = self
+                            .cache_new_connection(
+                                client_connection_tracker,
+                                connection,
+                                connection_table_l,
+                                conn_context,
+                            )
+                        {
+                            self.stats
+                                .connection_added_from_staked_peer
+                                .fetch_add(1, Ordering::Relaxed);
+                            conn_context.in_staked_table = true;
+                            conn_context.last_update = last_update;
+                            conn_context.stream_counter = Some(stream_counter);
+                            return Some(cancel_connection);
+                        }
+                    } else {
+                        // If we couldn't prune a connection in the staked connection table, let's
+                        // put this connection in the unstaked connection table. If needed, prune a
+                        // connection from the unstaked connection table.
+                        if let Ok((last_update, cancel_connection, stream_counter)) = self
+                            .prune_unstaked_connections_and_add_new_connection(
+                                client_connection_tracker,
+                                connection,
+                                self.unstaked_connection_table.clone(),
+                                self.max_unstaked_connections,
+                                conn_context,
+                            )
+                            .await
+                        {
+                            self.stats
+                                .connection_added_from_staked_peer
+                                .fetch_add(1, Ordering::Relaxed);
+                            conn_context.in_staked_table = false;
+                            conn_context.last_update = last_update;
+                            conn_context.stream_counter = Some(stream_counter);
+                            return Some(cancel_connection);
+                        } else {
+                            self.stats
+                                .connection_add_failed_on_pruning
+                                .fetch_add(1, Ordering::Relaxed);
+                            self.stats
+                                .connection_add_failed_staked_node
+                                .fetch_add(1, Ordering::Relaxed);
+                        }
+                    }
+                }
+                ConnectionPeerType::Unstaked => {
+                    if let Ok((last_update, cancel_connection, stream_counter)) = self
+                        .prune_unstaked_connections_and_add_new_connection(
+                            client_connection_tracker,
+                            connection,
+                            self.unstaked_connection_table.clone(),
+                            self.max_unstaked_connections,
+                            conn_context,
+                        )
+                        .await
+                    {
+                        self.stats
+                            .connection_added_from_unstaked_peer
+                            .fetch_add(1, Ordering::Relaxed);
+                        conn_context.in_staked_table = false;
+                        conn_context.last_update = last_update;
+                        conn_context.stream_counter = Some(stream_counter);
+                        return Some(cancel_connection);
+                    } else {
+                        self.stats
+                            .connection_add_failed_unstaked_node
+                            .fetch_add(1, Ordering::Relaxed);
+                    }
+                }
+            }
+
+            None
+        }
+    }
+
+    fn on_stream_accepted(&self, conn_context: &SwQosConnectionContext) {
+        self.staked_stream_load_ema
+            .increment_load(conn_context.peer_type);
+        conn_context
+            .stream_counter
+            .as_ref()
+            .unwrap()
+            .stream_count
+            .fetch_add(1, Ordering::Relaxed);
+    }
+
+    fn on_stream_error(&self, _conn_context: &SwQosConnectionContext) {
+        self.staked_stream_load_ema.update_ema_if_needed();
+    }
+
+    fn on_stream_closed(&self, _conn_context: &SwQosConnectionContext) {
+        self.staked_stream_load_ema.update_ema_if_needed();
+    }
+
+    #[allow(clippy::manual_async_fn)]
+    fn remove_connection(
+        &self,
+        conn_context: &SwQosConnectionContext,
+        connection: Connection,
+    ) -> impl Future<Output = usize> + Send {
+        async move {
+            let mut lock = if conn_context.in_staked_table {
+                self.staked_connection_table.lock().await
+            } else {
+                self.unstaked_connection_table.lock().await
+            };
+
+            let stable_id = connection.stable_id();
+            let remote_addr = connection.remote_address();
+
+            let removed_count = lock.remove_connection(
+                ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey()),
+                remote_addr.port(),
+                stable_id,
+            );
+            update_open_connections_stat(&self.stats, &lock);
+            removed_count
+        }
+    }
+
+    fn on_stream_finished(&self, context: &SwQosConnectionContext) {
+        context
+            .last_update
+            .store(timing::timestamp(), Ordering::Relaxed);
+    }
+
+    #[allow(clippy::manual_async_fn)]
+    fn on_new_stream(&self, context: &SwQosConnectionContext) -> impl Future<Output = ()> + Send {
+        async move {
+            let peer_type = context.peer_type();
+            let remote_addr = context.remote_address;
+            let stream_counter: &Arc<ConnectionStreamCounter> =
+                context.stream_counter.as_ref().unwrap();
+
+            let max_streams_per_throttling_interval =
+                self.max_streams_per_throttling_interval(context);
+
+            throttle_stream(
+                &self.stats,
+                peer_type,
+                remote_addr,
+                stream_counter,
+                max_streams_per_throttling_interval,
+            )
+            .await;
+        }
+    }
+}
+
+#[cfg(test)]
+pub mod test {
+    use super::*;
+
+    #[test]
+    fn test_cacluate_receive_window_ratio_for_staked_node() {
+        let mut max_stake = 10000;
+        let mut min_stake = 0;
+        let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake);
+        assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO);
+
+        let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
+        let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
+        assert_eq!(ratio, max_ratio);
+
+        let ratio =
+            compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2);
+        let average_ratio =
+            (QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2;
+        assert_eq!(ratio, average_ratio);
+
+        max_stake = 10000;
+        min_stake = 10000;
+        let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
+        assert_eq!(ratio, max_ratio);
+
+        max_stake = 0;
+        min_stake = 0;
+        let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
+        assert_eq!(ratio, max_ratio);
+
+        max_stake = 1000;
+        min_stake = 10;
+        let ratio =
+            compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10);
+        assert_eq!(ratio, max_ratio);
+    }
+
+    #[test]
+
+    fn test_max_allowed_uni_streams() {
+        assert_eq!(
+            compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0),
+            QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
+        );
+        assert_eq!(
+            compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0),
+            QUIC_MIN_STAKED_CONCURRENT_STREAMS
+        );
+        let delta =
+            (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
+        assert_eq!(
+            compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000),
+            QUIC_MAX_STAKED_CONCURRENT_STREAMS,
+        );
+        assert_eq!(
+            compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000),
+            ((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS)
+                .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS)
+        );
+        assert_eq!(
+            compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000),
+            QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
+        );
+    }
+}

+ 8 - 5
streamer/src/nonblocking/testing_utilities.rs

@@ -2,8 +2,8 @@
 use {
     super::quic::{SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID},
     crate::{
-        nonblocking::quic::spawn_server_with_cancel,
-        quic::{QuicServerParams, StreamerStats},
+        nonblocking::{quic::spawn_server_with_cancel, swqos::SwQosConfig},
+        quic::{QuicStreamerConfig, StreamerStats},
         streamer::StakedNodes,
     },
     crossbeam_channel::{unbounded, Receiver},
@@ -77,16 +77,18 @@ pub fn create_quic_server_sockets() -> Vec<UdpSocket> {
 
 pub fn setup_quic_server(
     option_staked_nodes: Option<StakedNodes>,
-    quic_server_params: QuicServerParams,
+    quic_server_params: QuicStreamerConfig,
+    qos_config: SwQosConfig,
 ) -> SpawnTestServerResult {
     let sockets = create_quic_server_sockets();
-    setup_quic_server_with_sockets(sockets, option_staked_nodes, quic_server_params)
+    setup_quic_server_with_sockets(sockets, option_staked_nodes, quic_server_params, qos_config)
 }
 
 pub fn setup_quic_server_with_sockets(
     sockets: Vec<UdpSocket>,
     option_staked_nodes: Option<StakedNodes>,
-    quic_server_params: QuicServerParams,
+    quic_server_params: QuicStreamerConfig,
+    qos_config: SwQosConfig,
 ) -> SpawnTestServerResult {
     let (sender, receiver) = unbounded();
     let keypair = Keypair::new();
@@ -106,6 +108,7 @@ pub fn setup_quic_server_with_sockets(
         sender,
         staked_nodes,
         quic_server_params,
+        qos_config,
         cancel.clone(),
     )
     .unwrap();

+ 275 - 19
streamer/src/quic.rs

@@ -1,6 +1,11 @@
 use {
     crate::{
-        nonblocking::quic::{ALPN_TPU_PROTOCOL_ID, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
+        nonblocking::{
+            qos::{ConnectionContext, QosController},
+            quic::{ALPN_TPU_PROTOCOL_ID, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
+            simple_qos::{SimpleQos, SimpleQosConfig},
+            swqos::{SwQos, SwQosConfig},
+        },
         streamer::StakedNodes,
     },
     crossbeam_channel::Sender,
@@ -598,6 +603,7 @@ impl StreamerStats {
 }
 
 #[deprecated(since = "3.0.0", note = "Use spawn_server_with_cancel instead")]
+#[allow(deprecated)]
 pub fn spawn_server_multi(
     thread_name: &'static str,
     metrics_name: &'static str,
@@ -622,32 +628,58 @@ pub fn spawn_server_multi(
 }
 
 #[derive(Clone)]
+#[deprecated(since = "3.1.0", note = "Use QuicStreamerConfig instead")]
 pub struct QuicServerParams {
     pub max_connections_per_peer: usize,
     pub max_staked_connections: usize,
     pub max_unstaked_connections: usize,
+    pub max_connections_per_ipaddr_per_min: u64,
+    pub wait_for_chunk_timeout: Duration,
+    pub accumulator_channel_size: usize,
+    pub num_threads: NonZeroUsize,
     pub max_streams_per_ms: u64,
+}
+
+#[derive(Clone)]
+pub struct QuicStreamerConfig {
+    pub max_connections_per_peer: usize,
+    pub max_staked_connections: usize,
+    pub max_unstaked_connections: usize,
     pub max_connections_per_ipaddr_per_min: u64,
     pub wait_for_chunk_timeout: Duration,
     pub accumulator_channel_size: usize,
     pub num_threads: NonZeroUsize,
 }
 
+#[derive(Clone)]
+pub struct SwQosQuicStreamerConfig {
+    pub quic_streamer_config: QuicStreamerConfig,
+    pub qos_config: SwQosConfig,
+}
+
+#[derive(Clone)]
+pub struct SimpleQosQuicStreamerConfig {
+    pub quic_streamer_config: QuicStreamerConfig,
+    pub qos_config: SimpleQosConfig,
+}
+
+#[allow(deprecated)]
 impl Default for QuicServerParams {
     fn default() -> Self {
         QuicServerParams {
             max_connections_per_peer: 1,
             max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
             max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
-            max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
             max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
             wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
             accumulator_channel_size: DEFAULT_ACCUMULATOR_CHANNEL_SIZE,
             num_threads: NonZeroUsize::new(num_cpus::get().min(1)).expect("1 is non-zero"),
+            max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
         }
     }
 }
 
+#[allow(deprecated)]
 impl QuicServerParams {
     #[cfg(feature = "dev-context-only-utils")]
     pub const DEFAULT_NUM_SERVER_THREADS_FOR_TEST: NonZeroUsize = NonZeroUsize::new(8).unwrap();
@@ -660,6 +692,35 @@ impl QuicServerParams {
             ..Self::default()
         }
     }
+}
+
+impl Default for QuicStreamerConfig {
+    fn default() -> Self {
+        Self {
+            max_connections_per_peer: 1,
+            max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
+            max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
+            max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
+            wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
+            accumulator_channel_size: DEFAULT_ACCUMULATOR_CHANNEL_SIZE,
+            num_threads: NonZeroUsize::new(num_cpus::get().min(1)).expect("1 is non-zero"),
+        }
+    }
+}
+
+impl QuicStreamerConfig {
+    #[cfg(feature = "dev-context-only-utils")]
+    pub const DEFAULT_NUM_SERVER_THREADS_FOR_TEST: NonZeroUsize = NonZeroUsize::new(8).unwrap();
+
+    #[cfg(feature = "dev-context-only-utils")]
+    pub fn default_for_tests() -> Self {
+        // Shrink the channel size to avoid a massive allocation for tests
+        Self {
+            accumulator_channel_size: 100_000,
+            num_threads: Self::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
+            ..Self::default()
+        }
+    }
 
     pub(crate) fn max_concurrent_connections(&self) -> usize {
         let conns = self.max_staked_connections + self.max_unstaked_connections;
@@ -667,7 +728,23 @@ impl QuicServerParams {
     }
 }
 
+#[allow(deprecated)]
+impl From<&QuicServerParams> for QuicStreamerConfig {
+    fn from(params: &QuicServerParams) -> Self {
+        Self {
+            max_connections_per_peer: params.max_connections_per_peer,
+            max_staked_connections: params.max_staked_connections,
+            max_unstaked_connections: params.max_unstaked_connections,
+            max_connections_per_ipaddr_per_min: params.max_connections_per_ipaddr_per_min,
+            wait_for_chunk_timeout: params.wait_for_chunk_timeout,
+            accumulator_channel_size: params.accumulator_channel_size,
+            num_threads: params.num_threads,
+        }
+    }
+}
+
 #[deprecated(since = "3.1.0", note = "Use spawn_server_with_cancel instead")]
+#[allow(deprecated)]
 pub fn spawn_server(
     thread_name: &'static str,
     metrics_name: &'static str,
@@ -689,6 +766,10 @@ pub fn spawn_server(
             thread::sleep(Duration::from_millis(100));
         }
     });
+    let quic_server_config: QuicStreamerConfig = (&quic_server_params).into();
+    let qos_config = SwQosConfig {
+        max_streams_per_ms: quic_server_params.max_streams_per_ms,
+    };
     spawn_server_with_cancel(
         thread_name,
         metrics_name,
@@ -696,33 +777,40 @@ pub fn spawn_server(
         keypair,
         packet_sender,
         staked_nodes,
-        quic_server_params,
+        quic_server_config,
+        qos_config,
         cancel,
     )
 }
 
-/// Spawns a tokio runtime and a streamer instance inside it.
-pub fn spawn_server_with_cancel(
+/// Generic function to spawn a QUIC server with any QoS implementation
+fn spawn_server_with_cancel_generic<Q, C>(
     thread_name: &'static str,
     metrics_name: &'static str,
+    stats: Arc<StreamerStats>,
     sockets: impl IntoIterator<Item = UdpSocket>,
     keypair: &Keypair,
     packet_sender: Sender<PacketBatch>,
-    staked_nodes: Arc<RwLock<StakedNodes>>,
-    quic_server_params: QuicServerParams,
+    quic_server_params: QuicStreamerConfig,
     cancel: CancellationToken,
-) -> Result<SpawnServerResult, QuicServerError> {
+    qos: Arc<Q>,
+) -> Result<SpawnServerResult, QuicServerError>
+where
+    Q: QosController<C> + Send + Sync + 'static,
+    C: ConnectionContext + Send + Sync + 'static,
+{
     let runtime = rt(format!("{thread_name}Rt"), quic_server_params.num_threads);
     let result = {
         let _guard = runtime.enter();
-        crate::nonblocking::quic::spawn_server_with_cancel(
+        crate::nonblocking::quic::spawn_server_with_cancel_and_qos(
             metrics_name,
+            stats,
             sockets,
             keypair,
             packet_sender,
-            staked_nodes,
             quic_server_params,
             cancel,
+            qos,
         )
     }?;
     let handle = thread::Builder::new()
@@ -743,6 +831,77 @@ pub fn spawn_server_with_cancel(
     })
 }
 
+/// Spawns a tokio runtime and a streamer instance inside it.
+pub fn spawn_server_with_cancel(
+    thread_name: &'static str,
+    metrics_name: &'static str,
+    sockets: impl IntoIterator<Item = UdpSocket>,
+    keypair: &Keypair,
+    packet_sender: Sender<PacketBatch>,
+    staked_nodes: Arc<RwLock<StakedNodes>>,
+    quic_server_params: QuicStreamerConfig,
+    qos_config: SwQosConfig,
+    cancel: CancellationToken,
+) -> Result<SpawnServerResult, QuicServerError> {
+    let stats = Arc::<StreamerStats>::default();
+    let swqos = Arc::new(SwQos::new(
+        qos_config,
+        quic_server_params.max_staked_connections,
+        quic_server_params.max_unstaked_connections,
+        quic_server_params.max_connections_per_peer,
+        stats.clone(),
+        staked_nodes,
+        cancel.clone(),
+    ));
+    spawn_server_with_cancel_generic(
+        thread_name,
+        metrics_name,
+        stats,
+        sockets,
+        keypair,
+        packet_sender,
+        quic_server_params,
+        cancel,
+        swqos,
+    )
+}
+
+/// Spawns a tokio runtime and a streamer instance inside it.
+pub fn spawn_simple_qos_server_with_cancel(
+    thread_name: &'static str,
+    metrics_name: &'static str,
+    sockets: impl IntoIterator<Item = UdpSocket>,
+    keypair: &Keypair,
+    packet_sender: Sender<PacketBatch>,
+    staked_nodes: Arc<RwLock<StakedNodes>>,
+    quic_server_params: QuicStreamerConfig,
+    qos_config: SimpleQosConfig,
+    cancel: CancellationToken,
+) -> Result<SpawnServerResult, QuicServerError> {
+    let stats = Arc::<StreamerStats>::default();
+
+    let simple_qos = Arc::new(SimpleQos::new(
+        qos_config,
+        quic_server_params.max_connections_per_peer,
+        quic_server_params.max_staked_connections,
+        stats.clone(),
+        staked_nodes,
+        cancel.clone(),
+    ));
+
+    spawn_server_with_cancel_generic(
+        thread_name,
+        metrics_name,
+        stats,
+        sockets,
+        keypair,
+        packet_sender,
+        quic_server_params,
+        cancel,
+        simple_qos,
+    )
+}
+
 #[cfg(test)]
 mod test {
     use {
@@ -750,17 +909,22 @@ mod test {
         crate::nonblocking::{quic::test::*, testing_utilities::check_multiple_streams},
         crossbeam_channel::unbounded,
         solana_net_utils::sockets::bind_to_localhost_unique,
-        std::net::SocketAddr,
+        solana_pubkey::Pubkey,
+        solana_signer::Signer,
+        std::{collections::HashMap, net::SocketAddr},
     };
 
     fn rt_for_test() -> Runtime {
         rt(
             "solQuicTestRt".to_string(),
-            QuicServerParams::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
+            QuicStreamerConfig::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
         )
     }
 
-    fn setup_quic_server() -> (
+    fn setup_quic_server_with_params(
+        server_params: QuicStreamerConfig,
+        staked_nodes: Arc<RwLock<StakedNodes>>,
+    ) -> (
         std::thread::JoinHandle<()>,
         crossbeam_channel::Receiver<PacketBatch>,
         SocketAddr,
@@ -770,7 +934,6 @@ mod test {
         let (sender, receiver) = unbounded();
         let keypair = Keypair::new();
         let server_address = s.local_addr().unwrap();
-        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
         let cancel = CancellationToken::new();
         let SpawnServerResult {
             endpoints: _,
@@ -783,13 +946,57 @@ mod test {
             &keypair,
             sender,
             staked_nodes,
-            QuicServerParams::default_for_tests(),
+            server_params,
+            SwQosConfig::default(),
             cancel.clone(),
         )
         .unwrap();
         (t, receiver, server_address, cancel)
     }
 
+    fn setup_simple_qos_quic_server_with_params(
+        server_params: SimpleQosQuicStreamerConfig,
+        staked_nodes: Arc<RwLock<StakedNodes>>,
+    ) -> (
+        std::thread::JoinHandle<()>,
+        crossbeam_channel::Receiver<PacketBatch>,
+        SocketAddr,
+        CancellationToken,
+    ) {
+        let s = bind_to_localhost_unique().expect("should bind");
+        let (sender, receiver) = unbounded();
+        let keypair = Keypair::new();
+        let server_address = s.local_addr().unwrap();
+        let cancel = CancellationToken::new();
+        let SpawnServerResult {
+            endpoints: _,
+            thread: t,
+            key_updater: _,
+        } = spawn_simple_qos_server_with_cancel(
+            "solQuicTest",
+            "quic_streamer_test",
+            [s],
+            &keypair,
+            sender,
+            staked_nodes,
+            server_params.quic_streamer_config,
+            server_params.qos_config,
+            cancel.clone(),
+        )
+        .unwrap();
+        (t, receiver, server_address, cancel)
+    }
+
+    fn setup_quic_server() -> (
+        std::thread::JoinHandle<()>,
+        crossbeam_channel::Receiver<PacketBatch>,
+        SocketAddr,
+        CancellationToken,
+    ) {
+        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
+        setup_quic_server_with_params(QuicStreamerConfig::default_for_tests(), staked_nodes)
+    }
+
     #[test]
     fn test_quic_server_exit() {
         let (t, _receiver, _server_address, cancel) = setup_quic_server();
@@ -838,10 +1045,11 @@ mod test {
             &keypair,
             sender,
             staked_nodes,
-            QuicServerParams {
+            QuicStreamerConfig {
                 max_connections_per_peer: 2,
-                ..QuicServerParams::default_for_tests()
+                ..QuicStreamerConfig::default_for_tests()
             },
+            SwQosConfig::default(),
             cancel.clone(),
         )
         .unwrap();
@@ -863,6 +1071,53 @@ mod test {
         t.join().unwrap();
     }
 
+    #[test]
+    fn test_quic_server_multiple_packets_with_simple_qos() {
+        // Send multiple writes from a staked node with SimpleStreamsPerSecond QoS mode
+        // with a super low staked client stake to ensure it can send all packets
+        // within the rate limit.
+        agave_logger::setup();
+        let client_keypair = Keypair::new();
+        let rich_node_keypair = Keypair::new();
+
+        let stakes = HashMap::from([
+            (client_keypair.pubkey(), 1_000), // very small staked node
+            (rich_node_keypair.pubkey(), 1_000_000_000),
+        ]);
+        let staked_nodes = StakedNodes::new(
+            Arc::new(stakes),
+            HashMap::<Pubkey, u64>::default(), // overrides
+        );
+
+        let server_params = QuicStreamerConfig {
+            max_unstaked_connections: 0,
+            ..QuicStreamerConfig::default_for_tests()
+        };
+        let qos_config = SimpleQosConfig {
+            max_streams_per_second: 20, // low limit to ensure staked node can send all packets
+        };
+        let server_params = SimpleQosQuicStreamerConfig {
+            quic_streamer_config: server_params,
+            qos_config,
+        };
+        let (t, receiver, server_address, cancel) = setup_simple_qos_quic_server_with_params(
+            server_params,
+            Arc::new(RwLock::new(staked_nodes)),
+        );
+
+        let runtime = rt_for_test();
+        let num_expected_packets = 20;
+
+        runtime.block_on(check_multiple_packets(
+            receiver,
+            server_address,
+            Some(&client_keypair),
+            num_expected_packets,
+        ));
+        cancel.cancel();
+        t.join().unwrap();
+    }
+
     #[test]
     fn test_quic_server_unstaked_node_connect_failure() {
         agave_logger::setup();
@@ -883,10 +1138,11 @@ mod test {
             &keypair,
             sender,
             staked_nodes,
-            QuicServerParams {
+            QuicStreamerConfig {
                 max_unstaked_connections: 0,
-                ..QuicServerParams::default_for_tests()
+                ..QuicStreamerConfig::default_for_tests()
             },
+            SwQosConfig::default(),
             cancel.clone(),
         )
         .unwrap();

+ 37 - 18
tpu-client-next/tests/connection_workers_scheduler_test.rs

@@ -9,11 +9,12 @@ use {
     solana_rpc_client::nonblocking::rpc_client::RpcClient,
     solana_signer::Signer,
     solana_streamer::{
-        nonblocking::testing_utilities::{
-            make_client_endpoint, setup_quic_server, SpawnTestServerResult,
+        nonblocking::{
+            swqos::SwQosConfig,
+            testing_utilities::{make_client_endpoint, setup_quic_server, SpawnTestServerResult},
         },
         packet::PacketBatch,
-        quic::QuicServerParams,
+        quic::QuicStreamerConfig,
         streamer::StakedNodes,
     },
     solana_tpu_client_next::{
@@ -200,7 +201,11 @@ async fn test_basic_transactions_sending() {
         server_address,
         stats: _stats,
         cancel,
-    } = setup_quic_server(None, QuicServerParams::default_for_tests());
+    } = setup_quic_server(
+        None,
+        QuicStreamerConfig::default_for_tests(),
+        SwQosConfig::default(),
+    );
 
     // Setup sending txs
     let tx_size = 1;
@@ -290,7 +295,11 @@ async fn test_connection_denied_until_allowed() {
         server_address,
         stats: _stats,
         cancel,
-    } = setup_quic_server(None, QuicServerParams::default_for_tests());
+    } = setup_quic_server(
+        None,
+        QuicStreamerConfig::default_for_tests(),
+        SwQosConfig::default(),
+    );
 
     // To prevent server from accepting a new connection, we use the following observation.
     // Since max_connections_per_peer == 1 (< max_unstaked_connections == 500), if we create a first
@@ -353,11 +362,12 @@ async fn test_connection_pruned_and_reopened() {
         cancel,
     } = setup_quic_server(
         None,
-        QuicServerParams {
+        QuicStreamerConfig {
             max_connections_per_peer: 100,
             max_unstaked_connections: 1,
-            ..QuicServerParams::default_for_tests()
+            ..QuicStreamerConfig::default_for_tests()
         },
+        SwQosConfig::default(),
     );
 
     // Setup sending txs
@@ -409,14 +419,15 @@ async fn test_staked_connection() {
         cancel,
     } = setup_quic_server(
         Some(staked_nodes),
-        QuicServerParams {
+        QuicStreamerConfig {
             // Must use at least the number of endpoints (10) because
             // `max_staked_connections` and `max_unstaked_connections` are
             // cumulative for all the endpoints.
             max_staked_connections: 10,
             max_unstaked_connections: 0,
-            ..QuicServerParams::default_for_tests()
+            ..QuicStreamerConfig::default_for_tests()
         },
+        SwQosConfig::default(),
     );
 
     // Setup sending txs
@@ -462,7 +473,11 @@ async fn test_connection_throttling() {
         server_address,
         stats: _stats,
         cancel,
-    } = setup_quic_server(None, QuicServerParams::default_for_tests());
+    } = setup_quic_server(
+        None,
+        QuicStreamerConfig::default_for_tests(),
+        SwQosConfig::default(),
+    );
 
     // Setup sending txs
     let tx_size = 1;
@@ -555,11 +570,12 @@ async fn test_rate_limiting() {
         cancel,
     } = setup_quic_server(
         None,
-        QuicServerParams {
+        QuicStreamerConfig {
             max_connections_per_peer: 100,
             max_connections_per_ipaddr_per_min: 1,
-            ..QuicServerParams::default_for_tests()
+            ..QuicStreamerConfig::default_for_tests()
         },
+        SwQosConfig::default(),
     );
 
     // open a connection to consume the limit
@@ -619,11 +635,12 @@ async fn test_rate_limiting_establish_connection() {
         cancel,
     } = setup_quic_server(
         None,
-        QuicServerParams {
+        QuicStreamerConfig {
             max_connections_per_peer: 100,
             max_connections_per_ipaddr_per_min: 1,
-            ..QuicServerParams::default_for_tests()
+            ..QuicStreamerConfig::default_for_tests()
         },
+        SwQosConfig::default(),
     );
 
     let connection_to_reach_limit = make_client_endpoint(&server_address, None).await;
@@ -700,15 +717,16 @@ async fn test_update_identity() {
         cancel,
     } = setup_quic_server(
         Some(staked_nodes),
-        QuicServerParams {
+        QuicStreamerConfig {
             // Must use at least the number of endpoints (10) because
             // `max_staked_connections` and `max_unstaked_connections` are
             // cumulative for all the endpoints.
             max_staked_connections: 10,
             // Deny all unstaked connections.
             max_unstaked_connections: 0,
-            ..QuicServerParams::default_for_tests()
+            ..QuicStreamerConfig::default_for_tests()
         },
+        SwQosConfig::default(),
     );
 
     // Setup sending txs
@@ -763,11 +781,12 @@ async fn test_proactive_connection_close_detection() {
         cancel,
     } = setup_quic_server(
         None,
-        QuicServerParams {
+        QuicStreamerConfig {
             max_connections_per_peer: 1,
             max_unstaked_connections: 1,
-            ..QuicServerParams::default_for_tests()
+            ..QuicStreamerConfig::default_for_tests()
         },
+        SwQosConfig::default(),
     );
 
     // Setup controlled transaction sending

+ 37 - 23
validator/src/commands/run/execute.rs

@@ -36,6 +36,7 @@ use {
         repair::repair_handler::RepairHandlerType,
         snapshot_packager_service::SnapshotPackagerService,
         system_monitor_service::SystemMonitorService,
+        tpu::MAX_VOTES_PER_SECOND,
         validator::{
             is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod,
             SchedulerPacing, Validator, ValidatorConfig, ValidatorError, ValidatorStartProgress,
@@ -60,7 +61,10 @@ use {
     solana_pubkey::Pubkey,
     solana_runtime::{runtime_config::RuntimeConfig, snapshot_utils},
     solana_signer::Signer,
-    solana_streamer::quic::QuicServerParams,
+    solana_streamer::{
+        nonblocking::{simple_qos::SimpleQosConfig, swqos::SwQosConfig},
+        quic::{QuicStreamerConfig, SimpleQosQuicStreamerConfig, SwQosQuicStreamerConfig},
+    },
     solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
     solana_turbine::{
         broadcast_stage::BroadcastStageType,
@@ -942,32 +946,42 @@ pub fn execute(
     // the one pushed by bootstrap.
     node.info.hot_swap_pubkey(identity_keypair.pubkey());
 
-    let tpu_quic_server_config = QuicServerParams {
-        max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
-        max_staked_connections: tpu_max_staked_connections.try_into().unwrap(),
-        max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(),
-        max_streams_per_ms,
-        max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
-        num_threads: tpu_transaction_receive_threads,
-        ..Default::default()
+    let tpu_quic_server_config = SwQosQuicStreamerConfig {
+        quic_streamer_config: QuicStreamerConfig {
+            max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
+            max_staked_connections: tpu_max_staked_connections.try_into().unwrap(),
+            max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(),
+            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
+            num_threads: tpu_transaction_receive_threads,
+            ..Default::default()
+        },
+        qos_config: SwQosConfig { max_streams_per_ms },
     };
 
-    let tpu_fwd_quic_server_config = QuicServerParams {
-        max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
-        max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
-        max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(),
-        max_streams_per_ms,
-        max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
-        num_threads: tpu_transaction_forward_receive_threads,
-        ..Default::default()
+    let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
+        quic_streamer_config: QuicStreamerConfig {
+            max_connections_per_peer: tpu_max_connections_per_peer.try_into().unwrap(),
+            max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
+            max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(),
+            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
+            num_threads: tpu_transaction_forward_receive_threads,
+            ..Default::default()
+        },
+        qos_config: SwQosConfig { max_streams_per_ms },
     };
 
-    // Vote shares TPU forward's characteristics, except that we accept 1 connection
-    // per peer and no unstaked connections are accepted.
-    let mut vote_quic_server_config = tpu_fwd_quic_server_config.clone();
-    vote_quic_server_config.max_connections_per_peer = 1;
-    vote_quic_server_config.max_unstaked_connections = 0;
-    vote_quic_server_config.num_threads = tpu_vote_transaction_receive_threads;
+    let vote_quic_server_config = SimpleQosQuicStreamerConfig {
+        quic_streamer_config: QuicStreamerConfig {
+            max_connections_per_peer: 1,
+            max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
+            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
+            num_threads: tpu_vote_transaction_receive_threads,
+            ..Default::default()
+        },
+        qos_config: SimpleQosConfig {
+            max_streams_per_second: MAX_VOTES_PER_SECOND,
+        },
+    };
 
     let validator = match Validator::new(
         node,

+ 27 - 14
vortexor/src/vortexor.rs

@@ -11,8 +11,11 @@ use {
     solana_perf::packet::PacketBatch,
     solana_quic_definitions::NotifyKeyUpdate,
     solana_streamer::{
-        nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
-        quic::{spawn_server_with_cancel, EndpointKeyUpdater, QuicServerParams},
+        nonblocking::{quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, swqos::SwQosConfig},
+        quic::{
+            spawn_server_with_cancel, EndpointKeyUpdater, QuicStreamerConfig,
+            SwQosQuicStreamerConfig,
+        },
         streamer::StakedNodes,
     },
     std::{
@@ -116,16 +119,20 @@ impl Vortexor {
         identity_keypair: &Keypair,
         cancel: CancellationToken,
     ) -> Self {
-        let mut quic_server_params = QuicServerParams {
-            max_connections_per_peer,
-            max_staked_connections: max_tpu_staked_connections,
-            max_unstaked_connections: max_tpu_unstaked_connections,
-            max_streams_per_ms,
-            max_connections_per_ipaddr_per_min,
-            wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
-            ..Default::default()
+        let quic_server_params = SwQosQuicStreamerConfig {
+            quic_streamer_config: QuicStreamerConfig {
+                max_connections_per_peer,
+                max_staked_connections: max_tpu_staked_connections,
+                max_unstaked_connections: max_tpu_unstaked_connections,
+                max_connections_per_ipaddr_per_min,
+                wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
+                ..Default::default()
+            },
+            qos_config: SwQosConfig { max_streams_per_ms },
         };
 
+        let mut quic_fwd_server_params = quic_server_params.clone();
+
         let TpuSockets {
             tpu_quic,
             tpu_quic_fwd,
@@ -138,15 +145,20 @@ impl Vortexor {
             identity_keypair,
             tpu_sender.clone(),
             staked_nodes.clone(),
-            quic_server_params.clone(),
+            quic_server_params.quic_streamer_config,
+            quic_server_params.qos_config,
             cancel.clone(),
         )
         .unwrap();
 
         // Fot TPU forward -- we disallow unstaked connections. Allocate all connection resources
         // for staked connections:
-        quic_server_params.max_staked_connections = max_fwd_staked_connections;
-        quic_server_params.max_unstaked_connections = max_fwd_unstaked_connections;
+        quic_fwd_server_params
+            .quic_streamer_config
+            .max_staked_connections = max_fwd_staked_connections;
+        quic_fwd_server_params
+            .quic_streamer_config
+            .max_unstaked_connections = max_fwd_unstaked_connections;
         let tpu_fwd_result = spawn_server_with_cancel(
             "solVtxTpuFwd",
             "quic_vortexor_tpu_forwards",
@@ -154,7 +166,8 @@ impl Vortexor {
             identity_keypair,
             tpu_fwd_sender,
             staked_nodes.clone(),
-            quic_server_params,
+            quic_fwd_server_params.quic_streamer_config,
+            quic_fwd_server_params.qos_config,
             cancel.clone(),
         )
         .unwrap();

+ 4 - 2
votor/src/voting_service.rs

@@ -277,7 +277,8 @@ mod tests {
         },
         solana_signer::Signer,
         solana_streamer::{
-            quic::{spawn_server_with_cancel, QuicServerParams, SpawnServerResult},
+            nonblocking::swqos::SwQosConfig,
+            quic::{spawn_server_with_cancel, QuicStreamerConfig, SpawnServerResult},
             socket::SocketAddrSpace,
             streamer::StakedNodes,
         },
@@ -388,7 +389,8 @@ mod tests {
             &Keypair::new(),
             sender,
             staked_nodes,
-            QuicServerParams::default_for_tests(),
+            QuicStreamerConfig::default_for_tests(),
+            SwQosConfig::default(),
             cancel_token.clone(),
         )
         .unwrap();

Alguns arquivos não foram mostrados porque muitos arquivos mudaram nesse diff