Переглянути джерело

Streamer: transition SimpleQos to TokenBucket (#8896)

Transition SimpleQos to TokenBucket

Co-authored-by: kirill lykov <lykov.kirill@gmail.com>

add us_to_have_tokens and matching test to TokenBucket

make ConnectionTable generic over the stream counter impl
Alex Pyattaev 13 годин тому
батько
коміт
e8cbabe410

+ 38 - 1
net-utils/src/token_bucket.rs

@@ -88,6 +88,24 @@ impl TokenBucket {
         }
     }
 
+    /// Returns time in microseconds until `num_tokens` worth of new
+    /// tokens can be consumed.
+    ///
+    /// Calculation is performed assuming no demand for smaller
+    /// batches of tokens (actual time may be longer).
+    /// Returns None if num_tokens > bucket capacity.
+    #[inline]
+    pub fn us_to_have_tokens(&self, num_tokens: u64) -> Option<u64> {
+        if num_tokens > self.max_tokens {
+            return None;
+        }
+
+        match num_tokens.checked_sub(self.current_tokens()) {
+            Some(missing) => Some((missing as f64 / self.new_tokens_per_us) as u64),
+            None => Some(0),
+        }
+    }
+
     /// Retrieves monotonic time since bucket creation.
     fn time_us(&self) -> u64 {
         cfg_if! {
@@ -348,7 +366,7 @@ pub mod test {
     };
 
     #[test]
-    fn test_token_bucket() {
+    fn test_token_bucket_basics() {
         let tb = TokenBucket::new(100, 100, 1000.0);
         assert_eq!(tb.current_tokens(), 100);
         tb.consume_tokens(50).expect("Bucket is initially full");
@@ -370,6 +388,25 @@ pub mod test {
         thread::sleep(Duration::from_millis(120));
         assert_eq!(tb.current_tokens(), 100, "Bucket should not overfill");
     }
+
+    #[test]
+    fn test_token_bucket_us_to_have_tokens() {
+        let tb = TokenBucket::new(1000, 1000, 1000.0);
+        assert_eq!(tb.current_tokens(), 1000);
+        tb.consume_tokens(1000).expect("Bucket is initially full");
+        assert!(
+            tb.current_tokens() < 100,
+            "Shoult not have many tokens left in bucket"
+        );
+
+        let t = tb
+            .us_to_have_tokens(500)
+            .expect("500 < bucket capacity (1000)")
+            / 1000; // convert to ms
+        assert!(t > 100, "time to fill should be ~ 500ms (got {t})");
+        assert!(t <= 500, "time to fill should be less than 500ms (got {t})");
+    }
+
     #[test]
     fn test_keyed_rate_limiter() {
         let prototype_bucket = TokenBucket::new(100, 100, 1000.0);

+ 8 - 1
streamer/src/nonblocking/qos.rs

@@ -56,4 +56,11 @@ pub(crate) trait QosController<C: ConnectionContext> {
     fn max_concurrent_connections(&self) -> usize;
 }
 
-pub trait QosConfig {}
+/// Marker trait to indicate what is the shared state for connections
+pub(crate) trait OpaqueStreamerCounter: Send + Sync + 'static {}
+
+#[cfg(test)]
+pub(crate) struct NullStreamerCounter;
+
+#[cfg(test)]
+impl OpaqueStreamerCounter for NullStreamerCounter {}

+ 27 - 21
streamer/src/nonblocking/quic.rs

@@ -2,8 +2,7 @@ use {
     crate::{
         nonblocking::{
             connection_rate_limiter::ConnectionRateLimiter,
-            qos::{ConnectionContext, QosController},
-            stream_throttle::ConnectionStreamCounter,
+            qos::{ConnectionContext, OpaqueStreamerCounter, QosController},
         },
         quic::{configure_server, QuicServerError, QuicStreamerConfig, StreamerStats},
         streamer::StakedNodes,
@@ -411,9 +410,9 @@ pub(crate) enum ConnectionHandlerError {
     MaxStreamError,
 }
 
-pub(crate) fn update_open_connections_stat(
+pub(crate) fn update_open_connections_stat<S: OpaqueStreamerCounter>(
     stats: &StreamerStats,
-    connection_table: &ConnectionTable,
+    connection_table: &ConnectionTable<S>,
 ) {
     if connection_table.is_staked() {
         stats
@@ -858,8 +857,7 @@ fn handle_chunks(
     Ok(StreamState::Finished)
 }
 
-#[derive(Debug)]
-struct ConnectionEntry {
+struct ConnectionEntry<S: OpaqueStreamerCounter> {
     cancel: CancellationToken,
     peer_type: ConnectionPeerType,
     last_update: Arc<AtomicU64>,
@@ -867,10 +865,10 @@ struct ConnectionEntry {
     // We do not explicitly use it, but its drop is triggered when ConnectionEntry is dropped.
     _client_connection_tracker: ClientConnectionTracker,
     connection: Option<Connection>,
-    stream_counter: Arc<ConnectionStreamCounter>,
+    stream_counter: Arc<S>,
 }
 
-impl ConnectionEntry {
+impl<S: OpaqueStreamerCounter> ConnectionEntry<S> {
     fn new(
         cancel: CancellationToken,
         peer_type: ConnectionPeerType,
@@ -878,7 +876,7 @@ impl ConnectionEntry {
         port: u16,
         client_connection_tracker: ClientConnectionTracker,
         connection: Option<Connection>,
-        stream_counter: Arc<ConnectionStreamCounter>,
+        stream_counter: Arc<S>,
     ) -> Self {
         Self {
             cancel,
@@ -903,7 +901,7 @@ impl ConnectionEntry {
     }
 }
 
-impl Drop for ConnectionEntry {
+impl<S: OpaqueStreamerCounter> Drop for ConnectionEntry<S> {
     fn drop(&mut self) {
         if let Some(conn) = self.connection.take() {
             conn.close(
@@ -935,8 +933,8 @@ pub(crate) enum ConnectionTableType {
 }
 
 // Map of IP to list of connection entries
-pub(crate) struct ConnectionTable {
-    table: IndexMap<ConnectionTableKey, Vec<ConnectionEntry>>,
+pub(crate) struct ConnectionTable<S: OpaqueStreamerCounter> {
+    table: IndexMap<ConnectionTableKey, Vec<ConnectionEntry<S>>>,
     pub(crate) total_size: usize,
     table_type: ConnectionTableType,
     cancel: CancellationToken,
@@ -945,7 +943,7 @@ pub(crate) struct ConnectionTable {
 /// Prune the connection which has the oldest update
 ///
 /// Return number pruned
-impl ConnectionTable {
+impl<S: OpaqueStreamerCounter> ConnectionTable<S> {
     pub(crate) fn new(table_type: ConnectionTableType, cancel: CancellationToken) -> Self {
         Self {
             table: IndexMap::default(),
@@ -994,7 +992,7 @@ impl ConnectionTable {
             })
             .map(|index| {
                 let connection = self.table[index].first();
-                let stake = connection.map(|connection: &ConnectionEntry| connection.stake());
+                let stake = connection.map(|connection: &ConnectionEntry<S>| connection.stake());
                 (index, stake)
             })
             .take(sample_size)
@@ -1007,7 +1005,7 @@ impl ConnectionTable {
         num_pruned
     }
 
-    pub(crate) fn try_add_connection(
+    pub(crate) fn try_add_connection<F: FnOnce() -> Arc<S>>(
         &mut self,
         key: ConnectionTableKey,
         port: u16,
@@ -1016,11 +1014,8 @@ impl ConnectionTable {
         peer_type: ConnectionPeerType,
         last_update: Arc<AtomicU64>,
         max_connections_per_peer: usize,
-    ) -> Option<(
-        Arc<AtomicU64>,
-        CancellationToken,
-        Arc<ConnectionStreamCounter>,
-    )> {
+        stream_counter_factory: F,
+    ) -> Option<(Arc<AtomicU64>, CancellationToken, Arc<S>)> {
         let connection_entry = self.table.entry(key).or_default();
         let has_connection_capacity = connection_entry
             .len()
@@ -1032,7 +1027,7 @@ impl ConnectionTable {
             let stream_counter = connection_entry
                 .first()
                 .map(|entry| entry.stream_counter.clone())
-                .unwrap_or(Arc::new(ConnectionStreamCounter::new()));
+                .unwrap_or_else(stream_counter_factory);
             connection_entry.push(ConnectionEntry::new(
                 cancel.clone(),
                 peer_type,
@@ -1116,6 +1111,7 @@ pub mod test {
     use {
         super::*,
         crate::nonblocking::{
+            qos::NullStreamerCounter,
             swqos::SwQosConfig,
             testing_utilities::{
                 check_multiple_streams, get_client_config, make_client_endpoint, setup_quic_server,
@@ -1666,6 +1662,7 @@ pub mod test {
                     ConnectionPeerType::Unstaked,
                     Arc::new(AtomicU64::new(i as u64)),
                     max_connections_per_peer,
+                    || Arc::new(NullStreamerCounter {}),
                 )
                 .unwrap();
         }
@@ -1679,6 +1676,7 @@ pub mod test {
                 ConnectionPeerType::Unstaked,
                 Arc::new(AtomicU64::new(5)),
                 max_connections_per_peer,
+                || Arc::new(NullStreamerCounter {}),
             )
             .unwrap();
 
@@ -1722,6 +1720,7 @@ pub mod test {
                     ConnectionPeerType::Unstaked,
                     Arc::new(AtomicU64::new(i as u64)),
                     max_connections_per_peer,
+                    || Arc::new(NullStreamerCounter {}),
                 )
                 .unwrap();
         }
@@ -1758,6 +1757,7 @@ pub mod test {
                     ConnectionPeerType::Unstaked,
                     Arc::new(AtomicU64::new(i as u64)),
                     max_connections_per_peer,
+                    || Arc::new(NullStreamerCounter {}),
                 )
                 .unwrap();
         });
@@ -1773,6 +1773,7 @@ pub mod test {
                 ConnectionPeerType::Unstaked,
                 Arc::new(AtomicU64::new(10)),
                 max_connections_per_peer,
+                || Arc::new(NullStreamerCounter {})
             )
             .is_none());
 
@@ -1788,6 +1789,7 @@ pub mod test {
                 ConnectionPeerType::Unstaked,
                 Arc::new(AtomicU64::new(10)),
                 max_connections_per_peer,
+                || Arc::new(NullStreamerCounter {})
             )
             .is_some());
 
@@ -1828,6 +1830,7 @@ pub mod test {
                     ConnectionPeerType::Staked((i + 1) as u64),
                     Arc::new(AtomicU64::new(i as u64)),
                     max_connections_per_peer,
+                    || Arc::new(NullStreamerCounter {}),
                 )
                 .unwrap();
         }
@@ -1872,6 +1875,7 @@ pub mod test {
                     ConnectionPeerType::Unstaked,
                     Arc::new(AtomicU64::new((i * 2) as u64)),
                     max_connections_per_peer,
+                    || Arc::new(NullStreamerCounter {}),
                 )
                 .unwrap();
 
@@ -1884,6 +1888,7 @@ pub mod test {
                     ConnectionPeerType::Unstaked,
                     Arc::new(AtomicU64::new((i * 2 + 1) as u64)),
                     max_connections_per_peer,
+                    || Arc::new(NullStreamerCounter {}),
                 )
                 .unwrap();
         }
@@ -1899,6 +1904,7 @@ pub mod test {
                 ConnectionPeerType::Unstaked,
                 Arc::new(AtomicU64::new((num_ips * 2) as u64)),
                 max_connections_per_peer,
+                || Arc::new(NullStreamerCounter {}),
             )
             .unwrap();
 

+ 61 - 98
streamer/src/nonblocking/simple_qos.rs

@@ -1,15 +1,12 @@
 use {
     crate::{
         nonblocking::{
-            qos::{ConnectionContext, QosController},
+            qos::{ConnectionContext, OpaqueStreamerCounter, QosController},
             quic::{
                 get_connection_stake, update_open_connections_stat, ClientConnectionTracker,
                 ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey,
                 ConnectionTableType,
             },
-            stream_throttle::{
-                throttle_stream, ConnectionStreamCounter, STREAM_THROTTLING_INTERVAL,
-            },
         },
         quic::{
             StreamerStats, DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER,
@@ -18,6 +15,7 @@ use {
         streamer::StakedNodes,
     },
     quinn::Connection,
+    solana_net_utils::token_bucket::TokenBucket,
     solana_time_utils as timing,
     std::{
         future::Future,
@@ -25,8 +23,12 @@ use {
             atomic::{AtomicU64, Ordering},
             Arc, RwLock,
         },
+        time::Duration,
+    },
+    tokio::{
+        sync::{Mutex, MutexGuard},
+        time::sleep,
     },
-    tokio::sync::{Mutex, MutexGuard},
     tokio_util::sync::CancellationToken,
 };
 
@@ -47,10 +49,12 @@ impl Default for SimpleQosConfig {
     }
 }
 
+impl OpaqueStreamerCounter for TokenBucket {}
+
 pub struct SimpleQos {
     config: SimpleQosConfig,
     stats: Arc<StreamerStats>,
-    staked_connection_table: Arc<Mutex<ConnectionTable>>,
+    staked_connection_table: Arc<Mutex<ConnectionTable<TokenBucket>>>,
     staked_nodes: Arc<RwLock<StakedNodes>>,
 }
 
@@ -76,16 +80,9 @@ impl SimpleQos {
         &self,
         client_connection_tracker: ClientConnectionTracker,
         connection: &Connection,
-        mut connection_table_l: MutexGuard<ConnectionTable>,
+        mut connection_table_l: MutexGuard<ConnectionTable<TokenBucket>>,
         conn_context: &SimpleQosConnectionContext,
-    ) -> Result<
-        (
-            Arc<AtomicU64>,
-            CancellationToken,
-            Arc<ConnectionStreamCounter>,
-        ),
-        ConnectionHandlerError,
-    > {
+    ) -> Result<(Arc<AtomicU64>, CancellationToken, Arc<TokenBucket>), ConnectionHandlerError> {
         let remote_addr = connection.remote_address();
 
         debug!(
@@ -93,21 +90,27 @@ impl SimpleQos {
             conn_context.peer_type(),
             remote_addr,
         );
-
+        let key = ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey);
         if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l
             .try_add_connection(
-                ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey),
+                key,
                 remote_addr.port(),
                 client_connection_tracker,
                 Some(connection.clone()),
                 conn_context.peer_type(),
                 conn_context.last_update.clone(),
                 self.config.max_connections_per_peer,
+                || {
+                    Arc::new(TokenBucket::new(
+                        self.config.max_streams_per_second,
+                        self.config.max_streams_per_second,
+                        self.config.max_streams_per_second as f64,
+                    ))
+                },
             )
         {
             update_open_connections_stat(&self.stats, &connection_table_l);
             drop(connection_table_l);
-
             Ok((last_update, cancel_connection, stream_counter))
         } else {
             self.stats
@@ -116,11 +119,6 @@ impl SimpleQos {
             Err(ConnectionHandlerError::ConnectionAddError)
         }
     }
-
-    fn max_streams_per_throttling_interval(&self, _context: &SimpleQosConnectionContext) -> u64 {
-        let interval_ms = STREAM_THROTTLING_INTERVAL.as_millis() as u64;
-        (self.config.max_streams_per_second * interval_ms / 1000).max(1)
-    }
 }
 
 #[derive(Clone)]
@@ -129,7 +127,7 @@ pub struct SimpleQosConnectionContext {
     remote_pubkey: Option<solana_pubkey::Pubkey>,
     remote_address: std::net::SocketAddr,
     last_update: Arc<AtomicU64>,
-    stream_counter: Option<Arc<ConnectionStreamCounter>>,
+    stream_counter: Option<Arc<TokenBucket>>,
 }
 
 impl ConnectionContext for SimpleQosConnectionContext {
@@ -214,14 +212,7 @@ impl QosController<SimpleQosConnectionContext> for SimpleQos {
         }
     }
 
-    fn on_stream_accepted(&self, conn_context: &SimpleQosConnectionContext) {
-        conn_context
-            .stream_counter
-            .as_ref()
-            .unwrap()
-            .stream_count
-            .fetch_add(1, Ordering::Relaxed);
-    }
+    fn on_stream_accepted(&self, _conn_context: &SimpleQosConnectionContext) {}
 
     fn on_stream_error(&self, _conn_context: &SimpleQosConnectionContext) {}
 
@@ -262,20 +253,32 @@ impl QosController<SimpleQosConnectionContext> for SimpleQos {
         async move {
             let peer_type = context.peer_type();
             let remote_addr = context.remote_address;
-            let stream_counter: &Arc<ConnectionStreamCounter> =
-                context.stream_counter.as_ref().unwrap();
-
-            let max_streams_per_throttling_interval =
-                self.max_streams_per_throttling_interval(context);
-
-            throttle_stream(
-                &self.stats,
-                peer_type,
-                remote_addr,
-                stream_counter,
-                max_streams_per_throttling_interval,
-            )
-            .await;
+            let stream_counter = context
+                .stream_counter
+                .as_ref()
+                .expect("This will always be populated before streams are opened");
+
+            while stream_counter.consume_tokens(1).is_err() {
+                debug!("Throttling stream from {remote_addr:?}");
+                self.stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
+                match peer_type {
+                    ConnectionPeerType::Unstaked => {
+                        self.stats
+                            .throttled_unstaked_streams
+                            .fetch_add(1, Ordering::Relaxed);
+                    }
+                    ConnectionPeerType::Staked(_) => {
+                        self.stats
+                            .throttled_staked_streams
+                            .fetch_add(1, Ordering::Relaxed);
+                    }
+                }
+                let min_sleep = stream_counter.us_to_have_tokens(1).expect(
+                    "Valid QoS configurations guarantee enough token bucket fits at least one \
+                     token",
+                );
+                sleep(Duration::from_micros(min_sleep)).await;
+            }
         }
     }
 
@@ -423,9 +426,8 @@ mod tests {
 
         // Verify success
         assert!(result.is_ok());
-        let (_last_update, cancel_token, stream_counter) = result.unwrap();
+        let (_last_update, cancel_token, _stream_counter) = result.unwrap();
         assert!(!cancel_token.is_cancelled());
-        assert_eq!(stream_counter.stream_count.load(Ordering::Relaxed), 0);
     }
 
     #[tokio::test]
@@ -467,6 +469,7 @@ mod tests {
             ConnectionPeerType::Staked(1000),
             Arc::new(AtomicU64::new(0)),
             1, // max_connections_per_peer
+            || Arc::new(TokenBucket::new(1, 1, 1.0)),
         );
 
         let connection_table_guard = tokio::sync::Mutex::new(connection_table);
@@ -497,10 +500,6 @@ mod tests {
 
         // Verify failure due to connection limit
         assert!(result.is_err());
-        assert!(matches!(
-            result.unwrap_err(),
-            ConnectionHandlerError::ConnectionAddError
-        ));
 
         // Verify stats were updated
         assert_eq!(stats.connection_add_failed.load(Ordering::Relaxed), 1);
@@ -674,15 +673,6 @@ mod tests {
 
         // Verify context was updated with stream counter
         assert!(conn_context.stream_counter.is_some());
-        assert_eq!(
-            conn_context
-                .stream_counter
-                .as_ref()
-                .unwrap()
-                .stream_count
-                .load(Ordering::Relaxed),
-            0
-        );
 
         // Verify stats were updated
         assert_eq!(
@@ -926,17 +916,6 @@ mod tests {
         // Verify last_update was updated (should be same or newer)
         let updated_last_update = conn_context.last_update.load(Ordering::Relaxed);
         assert!(updated_last_update >= initial_last_update);
-
-        // Verify stream counter starts at 0
-        assert_eq!(
-            conn_context
-                .stream_counter
-                .as_ref()
-                .unwrap()
-                .stream_count
-                .load(Ordering::Relaxed),
-            0
-        );
     }
 
     #[tokio::test]
@@ -976,27 +955,6 @@ mod tests {
 
         assert!(result.is_some()); // Connection should be added successfully
         assert!(conn_context.stream_counter.is_some()); // Stream counter should be set
-
-        // Record initial stream count
-        let initial_stream_count = conn_context
-            .stream_counter
-            .as_ref()
-            .unwrap()
-            .stream_count
-            .load(Ordering::Relaxed);
-        assert_eq!(initial_stream_count, 0);
-
-        // Test - call on_stream_accepted
-        simple_qos.on_stream_accepted(&conn_context);
-
-        // Verify stream count was incremented
-        let updated_stream_count = conn_context
-            .stream_counter
-            .as_ref()
-            .unwrap()
-            .stream_count
-            .load(Ordering::Relaxed);
-        assert_eq!(updated_stream_count, initial_stream_count + 1);
     }
 
     #[tokio::test]
@@ -1069,8 +1027,9 @@ mod tests {
             create_staked_nodes_with_keypairs(&server_keypair, &client_keypair, stake_amount);
 
         // Set a specific max_streams_per_second for testing
+        let max_streams_per_second = 10;
         let qos_config = SimpleQosConfig {
-            max_streams_per_second: 10,
+            max_streams_per_second,
             max_staked_connections: 100,
             max_connections_per_peer: 10,
         };
@@ -1097,12 +1056,16 @@ mod tests {
         // Test - call on_new_stream and measure timing
         let start_time = std::time::Instant::now();
 
-        simple_qos.on_new_stream(&conn_context).await;
+        // This should take roughly 1 second to complete
+        // due to rate limit (since we allow initial burst)
+        for _ in 0..max_streams_per_second * 2 {
+            simple_qos.on_new_stream(&conn_context).await;
+        }
 
         let elapsed = start_time.elapsed();
 
-        // The function should complete (may or may not sleep depending on current throttling state)
-        // We just verify it doesn't panic and completes successfully
-        assert!(elapsed < std::time::Duration::from_secs(1)); // Should not take too long
+        // we can not verify precisely so we check rough bounds
+        assert!(elapsed > std::time::Duration::from_millis(950)); // Should not take too little time!
+        assert!(elapsed < std::time::Duration::from_millis(1200)); // Should not take too long!
     }
 }

+ 6 - 1
streamer/src/nonblocking/stream_throttle.rs

@@ -1,5 +1,8 @@
 use {
-    crate::{nonblocking::quic::ConnectionPeerType, quic::StreamerStats},
+    crate::{
+        nonblocking::{qos::OpaqueStreamerCounter, quic::ConnectionPeerType},
+        quic::StreamerStats,
+    },
     std::{
         cmp,
         sync::{
@@ -189,6 +192,8 @@ pub struct ConnectionStreamCounter {
     last_throttling_instant: RwLock<tokio::time::Instant>,
 }
 
+impl OpaqueStreamerCounter for ConnectionStreamCounter {}
+
 impl ConnectionStreamCounter {
     pub fn new() -> Self {
         Self {

+ 6 - 6
streamer/src/nonblocking/swqos.rs

@@ -79,8 +79,8 @@ pub struct SwQos {
     staked_stream_load_ema: Arc<StakedStreamLoadEMA>,
     stats: Arc<StreamerStats>,
     staked_nodes: Arc<RwLock<StakedNodes>>,
-    unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
-    staked_connection_table: Arc<Mutex<ConnectionTable>>,
+    unstaked_connection_table: Arc<Mutex<ConnectionTable<ConnectionStreamCounter>>>,
+    staked_connection_table: Arc<Mutex<ConnectionTable<ConnectionStreamCounter>>>,
 }
 
 // QoS Params for Stake weighted QoS
@@ -210,7 +210,7 @@ impl SwQos {
         &self,
         client_connection_tracker: ClientConnectionTracker,
         connection: &Connection,
-        mut connection_table_l: MutexGuard<ConnectionTable>,
+        mut connection_table_l: MutexGuard<ConnectionTable<ConnectionStreamCounter>>,
         conn_context: &SwQosConnectionContext,
     ) -> Result<
         (
@@ -254,6 +254,7 @@ impl SwQos {
                     conn_context.peer_type(),
                     conn_context.last_update.clone(),
                     max_connections_per_peer,
+                    || Arc::new(ConnectionStreamCounter::new()),
                 )
             {
                 update_open_connections_stat(&self.stats, &connection_table_l);
@@ -285,7 +286,7 @@ impl SwQos {
 
     fn prune_unstaked_connection_table(
         &self,
-        unstaked_connection_table: &mut ConnectionTable,
+        unstaked_connection_table: &mut ConnectionTable<ConnectionStreamCounter>,
         max_unstaked_connections: usize,
         stats: Arc<StreamerStats>,
     ) {
@@ -305,7 +306,7 @@ impl SwQos {
         &self,
         client_connection_tracker: ClientConnectionTracker,
         connection: &Connection,
-        connection_table: Arc<Mutex<ConnectionTable>>,
+        connection_table: Arc<Mutex<ConnectionTable<ConnectionStreamCounter>>>,
         max_connections: usize,
         conn_context: &SwQosConnectionContext,
     ) -> Result<
@@ -610,7 +611,6 @@ pub mod test {
     }
 
     #[test]
-
     fn test_max_allowed_uni_streams() {
         assert_eq!(
             compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0),