Эх сурвалжийг харах

streamer: refactor config structs (#9093)

refactor streamer configs
Alex Pyattaev 3 өдөр өмнө
parent
commit
432d810d94

+ 3 - 1
bench-vote/src/main.rs

@@ -267,12 +267,14 @@ fn main() -> Result<()> {
                 max_connections_per_ipaddr_per_min: max_connections_per_ipaddr_per_min
                     .try_into()
                     .unwrap(),
+                ..Default::default()
+            };
+            let qos_config = SwQosConfig {
                 max_connections_per_unstaked_peer: max_connections_per_peer,
                 max_staked_connections: max_connections,
                 max_unstaked_connections: 0,
                 ..Default::default()
             };
-            let qos_config = SwQosConfig::default();
             let (s_reader, r_reader) = unbounded();
             read_channels.push(r_reader);
 

+ 3 - 2
core/src/validator.rs

@@ -588,17 +588,18 @@ impl ValidatorTpuConfig {
         let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
             quic_streamer_config: QuicStreamerConfig {
                 max_connections_per_ipaddr_per_min: 32,
+                ..Default::default()
+            },
+            qos_config: SwQosConfig {
                 max_unstaked_connections: 0,
                 ..Default::default()
             },
-            qos_config: SwQosConfig::default(),
         };
 
         // vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
         let vote_quic_server_config = SimpleQosQuicStreamerConfig {
             quic_streamer_config: QuicStreamerConfig {
                 max_connections_per_ipaddr_per_min: 32,
-                max_unstaked_connections: 0,
                 ..Default::default()
             },
             qos_config: SimpleQosConfig::default(),

+ 9 - 4
streamer/examples/swqos.rs

@@ -68,8 +68,10 @@ pub fn load_staked_nodes_overrides(path: &String) -> anyhow::Result<HashMap<Pubk
 
 #[derive(Debug, Parser)]
 struct Cli {
-    #[arg(short, long, default_value_t = 1)]
-    max_connections_per_peer: usize,
+    #[arg(short, long, default_value_t = 10)]
+    max_connections_per_staked_peer: usize,
+    #[arg(short, long, default_value_t = 10)]
+    max_connections_per_unstaked_peer: usize,
 
     #[arg(short, long, default_value = "0.0.0.0:8008")]
     bind_to: SocketAddr,
@@ -120,10 +122,13 @@ async fn main() -> anyhow::Result<()> {
         sender,
         staked_nodes,
         QuicStreamerConfig {
-            max_connections_per_unstaked_peer: cli.max_connections_per_peer,
             ..QuicStreamerConfig::default()
         },
-        SwQosConfig::default(),
+        SwQosConfig {
+            max_connections_per_staked_peer: cli.max_connections_per_staked_peer,
+            max_connections_per_unstaked_peer: cli.max_connections_per_unstaked_peer,
+            ..Default::default()
+        },
         cancel.clone(),
     )?;
     info!("Server listening on {}", socket.local_addr()?);

+ 5 - 0
streamer/src/nonblocking/qos.rs

@@ -51,4 +51,9 @@ pub(crate) trait QosController<C: ConnectionContext> {
         context: &C,
         connection: Connection,
     ) -> impl Future<Output = usize> + Send;
+
+    /// How many concurrent
+    fn max_concurrent_connections(&self) -> usize;
 }
+
+pub trait QosConfig {}

+ 17 - 12
streamer/src/nonblocking/quic.rs

@@ -157,7 +157,7 @@ where
         })
         .collect::<Result<Vec<_>, _>>()?;
 
-    let max_concurrent_connections = quic_server_params.max_concurrent_connections();
+    let max_concurrent_connections = qos.max_concurrent_connections();
     let handle = tokio::spawn({
         let endpoints = endpoints.clone();
         let stats = stats.clone();
@@ -334,10 +334,9 @@ where
                 continue;
             }
 
-            let Ok(client_connection_tracker) = ClientConnectionTracker::new(
-                stats.clone(),
-                quic_server_params.max_concurrent_connections(),
-            ) else {
+            let Ok(client_connection_tracker) =
+                ClientConnectionTracker::new(stats.clone(), qos.max_concurrent_connections())
+            else {
                 stats
                     .refused_connections_too_many_open_connections
                     .fetch_add(1, Ordering::Relaxed);
@@ -1351,7 +1350,7 @@ pub mod test {
         } = setup_quic_server(
             None,
             QuicStreamerConfig::default_for_tests(),
-            SwQosConfig::default(),
+            SwQosConfig::default_for_tests(),
         );
         check_block_multiple_connections(server_address).await;
         cancel.cancel();
@@ -1372,10 +1371,12 @@ pub mod test {
         } = setup_quic_server(
             None,
             QuicStreamerConfig {
-                max_connections_per_unstaked_peer: 2,
                 ..QuicStreamerConfig::default_for_tests()
             },
-            SwQosConfig::default(),
+            SwQosConfig {
+                max_connections_per_unstaked_peer: 2,
+                ..SwQosConfig::default_for_tests()
+            },
         );
 
         let client_socket = bind_to_localhost_unique().expect("should bind - client");
@@ -1582,10 +1583,12 @@ pub mod test {
             sender,
             staked_nodes,
             QuicStreamerConfig {
-                max_unstaked_connections: 0, // Do not allow any connection from unstaked clients/nodes
                 ..QuicStreamerConfig::default_for_tests()
             },
-            SwQosConfig::default(),
+            SwQosConfig {
+                max_unstaked_connections: 0, // Do not allow any connection from unstaked clients/nodes
+                ..Default::default()
+            },
             cancel.clone(),
         )
         .unwrap();
@@ -1616,10 +1619,12 @@ pub mod test {
             sender,
             staked_nodes,
             QuicStreamerConfig {
-                max_connections_per_unstaked_peer: 2,
                 ..QuicStreamerConfig::default_for_tests()
             },
-            SwQosConfig::default(),
+            SwQosConfig {
+                max_connections_per_unstaked_peer: 2,
+                ..Default::default()
+            },
             cancel.clone(),
         )
         .unwrap();

+ 36 - 50
streamer/src/nonblocking/simple_qos.rs

@@ -11,7 +11,10 @@ use {
                 throttle_stream, ConnectionStreamCounter, STREAM_THROTTLING_INTERVAL,
             },
         },
-        quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS},
+        quic::{
+            StreamerStats, DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER,
+            DEFAULT_MAX_STAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS,
+        },
         streamer::StakedNodes,
     },
     quinn::Connection,
@@ -30,20 +33,22 @@ use {
 #[derive(Clone)]
 pub struct SimpleQosConfig {
     pub max_streams_per_second: u64,
+    pub max_staked_connections: usize,
+    pub max_connections_per_peer: usize,
 }
 
 impl Default for SimpleQosConfig {
     fn default() -> Self {
         SimpleQosConfig {
             max_streams_per_second: DEFAULT_MAX_STREAMS_PER_MS * 1000,
+            max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
+            max_connections_per_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER,
         }
     }
 }
 
 pub struct SimpleQos {
-    max_streams_per_second: u64,
-    max_staked_connections: usize,
-    max_connections_per_peer: usize,
+    config: SimpleQosConfig,
     stats: Arc<StreamerStats>,
     staked_connection_table: Arc<Mutex<ConnectionTable>>,
     staked_nodes: Arc<RwLock<StakedNodes>>,
@@ -51,17 +56,13 @@ pub struct SimpleQos {
 
 impl SimpleQos {
     pub fn new(
-        qos_config: SimpleQosConfig,
-        max_connections_per_peer: usize,
-        max_staked_connections: usize,
+        config: SimpleQosConfig,
         stats: Arc<StreamerStats>,
         staked_nodes: Arc<RwLock<StakedNodes>>,
         cancel: CancellationToken,
     ) -> Self {
         Self {
-            max_streams_per_second: qos_config.max_streams_per_second,
-            max_connections_per_peer,
-            max_staked_connections,
+            config,
             stats,
             staked_nodes,
             staked_connection_table: Arc::new(Mutex::new(ConnectionTable::new(
@@ -101,7 +102,7 @@ impl SimpleQos {
                 Some(connection.clone()),
                 conn_context.peer_type(),
                 conn_context.last_update.clone(),
-                self.max_connections_per_peer,
+                self.config.max_connections_per_peer,
             )
         {
             update_open_connections_stat(&self.stats, &connection_table_l);
@@ -118,7 +119,7 @@ impl SimpleQos {
 
     fn max_streams_per_throttling_interval(&self, _context: &SimpleQosConnectionContext) -> u64 {
         let interval_ms = STREAM_THROTTLING_INTERVAL.as_millis() as u64;
-        (self.max_streams_per_second * interval_ms / 1000).max(1)
+        (self.config.max_streams_per_second * interval_ms / 1000).max(1)
     }
 }
 
@@ -173,7 +174,7 @@ impl QosController<SimpleQosConnectionContext> for SimpleQos {
                 ConnectionPeerType::Staked(stake) => {
                     let mut connection_table_l = self.staked_connection_table.lock().await;
 
-                    if connection_table_l.total_size >= self.max_staked_connections {
+                    if connection_table_l.total_size >= self.config.max_staked_connections {
                         let num_pruned =
                             connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake);
 
@@ -189,7 +190,7 @@ impl QosController<SimpleQosConnectionContext> for SimpleQos {
                         update_open_connections_stat(&self.stats, &connection_table_l);
                     }
 
-                    if connection_table_l.total_size < self.max_staked_connections {
+                    if connection_table_l.total_size < self.config.max_staked_connections {
                         if let Ok((last_update, cancel_connection, stream_counter)) = self
                             .cache_new_connection(
                                 client_connection_tracker,
@@ -277,6 +278,11 @@ impl QosController<SimpleQosConnectionContext> for SimpleQos {
             .await;
         }
     }
+
+    fn max_concurrent_connections(&self) -> usize {
+        // Allow 25% more connections than required to allow for handshake
+        self.config.max_staked_connections * 5 / 4
+    }
 }
 
 #[cfg(test)]
@@ -380,8 +386,6 @@ mod tests {
 
         let simple_qos = SimpleQos::new(
             SimpleQosConfig::default(),
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -432,9 +436,10 @@ mod tests {
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
 
         let simple_qos = SimpleQos::new(
-            SimpleQosConfig::default(),
-            1,   // max_connections_per_peer (set to 1 to trigger limit)
-            100, // max_staked_connections
+            SimpleQosConfig {
+                max_connections_per_peer: 1,
+                ..Default::default()
+            },
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -510,8 +515,6 @@ mod tests {
 
         let simple_qos = SimpleQos::new(
             SimpleQosConfig::default(),
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -566,8 +569,6 @@ mod tests {
 
         let simple_qos = SimpleQos::new(
             SimpleQosConfig::default(),
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -605,8 +606,6 @@ mod tests {
 
         let simple_qos = SimpleQos::new(
             SimpleQosConfig::default(),
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -647,8 +646,6 @@ mod tests {
 
         let simple_qos = SimpleQos::new(
             SimpleQosConfig::default(),
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -705,8 +702,6 @@ mod tests {
 
         let simple_qos = SimpleQos::new(
             SimpleQosConfig::default(),
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -767,9 +762,10 @@ mod tests {
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(Arc::new(stakes), overrides)));
 
         let simple_qos = SimpleQos::new(
-            SimpleQosConfig::default(),
-            10, // max_connections_per_peer
-            1,  // max_staked_connections (set to 1 to trigger pruning)
+            SimpleQosConfig {
+                max_staked_connections: 1,
+                ..Default::default()
+            },
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -836,9 +832,10 @@ mod tests {
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(Arc::new(stakes), overrides)));
 
         let simple_qos = SimpleQos::new(
-            SimpleQosConfig::default(),
-            10, // max_connections_per_peer
-            1,  // max_staked_connections (set to 1)
+            SimpleQosConfig {
+                max_staked_connections: 1,
+                ..Default::default()
+            },
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -897,8 +894,6 @@ mod tests {
 
         let simple_qos = SimpleQos::new(
             SimpleQosConfig::default(),
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -960,8 +955,6 @@ mod tests {
 
         let simple_qos = SimpleQos::new(
             SimpleQosConfig::default(),
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -1022,8 +1015,6 @@ mod tests {
 
         let simple_qos = SimpleQos::new(
             SimpleQosConfig::default(),
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
             stats.clone(),
             staked_nodes,
             cancel.clone(),
@@ -1079,17 +1070,12 @@ mod tests {
 
         // Set a specific max_streams_per_second for testing
         let qos_config = SimpleQosConfig {
-            max_streams_per_second: 10, // 10 streams per second
+            max_streams_per_second: 10,
+            max_staked_connections: 100,
+            max_connections_per_peer: 10,
         };
 
-        let simple_qos = SimpleQos::new(
-            qos_config,
-            10,  // max_connections_per_peer
-            100, // max_staked_connections
-            stats.clone(),
-            staked_nodes,
-            cancel.clone(),
-        );
+        let simple_qos = SimpleQos::new(qos_config, stats.clone(), staked_nodes, cancel.clone());
 
         let client_tracker = ClientConnectionTracker {
             stats: stats.clone(),

+ 41 - 22
streamer/src/nonblocking/swqos.rs

@@ -14,7 +14,11 @@ use {
                 STREAM_THROTTLING_INTERVAL_MS,
             },
         },
-        quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS},
+        quic::{
+            StreamerStats, DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER,
+            DEFAULT_MAX_QUIC_CONNECTIONS_PER_UNSTAKED_PEER, DEFAULT_MAX_STAKED_CONNECTIONS,
+            DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_MAX_UNSTAKED_CONNECTIONS,
+        },
         streamer::StakedNodes,
     },
     percentage::Percentage,
@@ -41,21 +45,37 @@ use {
 #[derive(Clone)]
 pub struct SwQosConfig {
     pub max_streams_per_ms: u64,
+    pub max_staked_connections: usize,
+    pub max_unstaked_connections: usize,
+    pub max_connections_per_staked_peer: usize,
+    pub max_connections_per_unstaked_peer: usize,
 }
 
 impl Default for SwQosConfig {
     fn default() -> Self {
         SwQosConfig {
             max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
+            max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
+            max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
+            max_connections_per_staked_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER,
+            max_connections_per_unstaked_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_UNSTAKED_PEER,
+        }
+    }
+}
+
+impl SwQosConfig {
+    #[cfg(feature = "dev-context-only-utils")]
+    pub fn default_for_tests() -> Self {
+        Self {
+            max_connections_per_unstaked_peer: 1,
+            max_connections_per_staked_peer: 1,
+            ..Self::default()
         }
     }
 }
 
 pub struct SwQos {
-    max_staked_connections: usize,
-    max_unstaked_connections: usize,
-    max_connections_per_staked_peer: usize,
-    max_connections_per_unstaked_peer: usize,
+    config: SwQosConfig,
     staked_stream_load_ema: Arc<StakedStreamLoadEMA>,
     stats: Arc<StreamerStats>,
     staked_nodes: Arc<RwLock<StakedNodes>>,
@@ -89,24 +109,17 @@ impl ConnectionContext for SwQosConnectionContext {
 
 impl SwQos {
     pub fn new(
-        qos_config: SwQosConfig,
-        max_staked_connections: usize,
-        max_unstaked_connections: usize,
-        max_connections_per_staked_peer: usize,
-        max_connections_per_unstaked_peer: usize,
+        config: SwQosConfig,
         stats: Arc<StreamerStats>,
         staked_nodes: Arc<RwLock<StakedNodes>>,
         cancel: CancellationToken,
     ) -> Self {
         Self {
-            max_staked_connections,
-            max_unstaked_connections,
-            max_connections_per_staked_peer,
-            max_connections_per_unstaked_peer,
+            config: config.clone(),
             staked_stream_load_ema: Arc::new(StakedStreamLoadEMA::new(
                 stats.clone(),
-                max_unstaked_connections,
-                qos_config.max_streams_per_ms,
+                config.max_unstaked_connections,
+                config.max_streams_per_ms,
             )),
             stats,
             staked_nodes,
@@ -229,8 +242,8 @@ impl SwQos {
             );
 
             let max_connections_per_peer = match conn_context.peer_type() {
-                ConnectionPeerType::Unstaked => self.max_connections_per_unstaked_peer,
-                ConnectionPeerType::Staked(_) => self.max_connections_per_staked_peer,
+                ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer,
+                ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer,
             };
             if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l
                 .try_add_connection(
@@ -391,7 +404,7 @@ impl QosController<SwQosConnectionContext> for SwQos {
                 ConnectionPeerType::Staked(stake) => {
                     let mut connection_table_l = self.staked_connection_table.lock().await;
 
-                    if connection_table_l.total_size >= self.max_staked_connections {
+                    if connection_table_l.total_size >= self.config.max_staked_connections {
                         let num_pruned =
                             connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake);
                         self.stats
@@ -400,7 +413,7 @@ impl QosController<SwQosConnectionContext> for SwQos {
                         update_open_connections_stat(&self.stats, &connection_table_l);
                     }
 
-                    if connection_table_l.total_size < self.max_staked_connections {
+                    if connection_table_l.total_size < self.config.max_staked_connections {
                         if let Ok((last_update, cancel_connection, stream_counter)) = self
                             .cache_new_connection(
                                 client_connection_tracker,
@@ -426,7 +439,7 @@ impl QosController<SwQosConnectionContext> for SwQos {
                                 client_connection_tracker,
                                 connection,
                                 self.unstaked_connection_table.clone(),
-                                self.max_unstaked_connections,
+                                self.config.max_unstaked_connections,
                                 conn_context,
                             )
                             .await
@@ -454,7 +467,7 @@ impl QosController<SwQosConnectionContext> for SwQos {
                             client_connection_tracker,
                             connection,
                             self.unstaked_connection_table.clone(),
-                            self.max_unstaked_connections,
+                            self.config.max_unstaked_connections,
                             conn_context,
                         )
                         .await
@@ -550,6 +563,12 @@ impl QosController<SwQosConnectionContext> for SwQos {
             .await;
         }
     }
+
+    fn max_concurrent_connections(&self) -> usize {
+        // Allow 25% more connections than required to allow for handshake
+
+        (self.config.max_staked_connections + self.config.max_unstaked_connections) * 5 / 4
+    }
 }
 
 #[cfg(test)]

+ 0 - 4
streamer/src/nonblocking/testing_utilities.rs

@@ -48,10 +48,6 @@ where
 
     let swqos = Arc::new(SwQos::new(
         qos_config,
-        quic_server_params.max_staked_connections,
-        quic_server_params.max_unstaked_connections,
-        quic_server_params.max_connections_per_staked_peer,
-        quic_server_params.max_connections_per_unstaked_peer,
         stats.clone(),
         staked_nodes,
         cancel.clone(),

+ 11 - 30
streamer/src/quic.rs

@@ -560,10 +560,6 @@ impl StreamerStats {
 
 #[derive(Clone)]
 pub struct QuicStreamerConfig {
-    pub max_connections_per_unstaked_peer: usize,
-    pub max_connections_per_staked_peer: usize,
-    pub max_staked_connections: usize,
-    pub max_unstaked_connections: usize,
     pub max_connections_per_ipaddr_per_min: u64,
     pub wait_for_chunk_timeout: Duration,
     pub num_threads: NonZeroUsize,
@@ -584,10 +580,6 @@ pub struct SimpleQosQuicStreamerConfig {
 impl Default for QuicStreamerConfig {
     fn default() -> Self {
         Self {
-            max_connections_per_unstaked_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_UNSTAKED_PEER,
-            max_connections_per_staked_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER,
-            max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
-            max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
             max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
             wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
             num_threads: NonZeroUsize::new(num_cpus::get().min(1)).expect("1 is non-zero"),
@@ -602,17 +594,10 @@ impl QuicStreamerConfig {
     #[cfg(feature = "dev-context-only-utils")]
     pub fn default_for_tests() -> Self {
         Self {
-            max_connections_per_unstaked_peer: 1,
-            max_connections_per_staked_peer: 1,
             num_threads: Self::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
             ..Self::default()
         }
     }
-
-    pub(crate) fn max_concurrent_connections(&self) -> usize {
-        let conns = self.max_staked_connections + self.max_unstaked_connections;
-        conns + conns / 4
-    }
 }
 
 /// Generic function to spawn a tokio runtime with a QUIC server
@@ -680,10 +665,6 @@ pub fn spawn_stake_wighted_qos_server(
     let stats = Arc::<StreamerStats>::default();
     let swqos = Arc::new(SwQos::new(
         qos_config,
-        quic_server_params.max_staked_connections,
-        quic_server_params.max_unstaked_connections,
-        quic_server_params.max_connections_per_unstaked_peer,
-        quic_server_params.max_connections_per_unstaked_peer,
         stats.clone(),
         staked_nodes,
         cancel.clone(),
@@ -714,11 +695,8 @@ pub fn spawn_simple_qos_server(
     cancel: CancellationToken,
 ) -> Result<SpawnServerResult, QuicServerError> {
     let stats = Arc::<StreamerStats>::default();
-
     let simple_qos = Arc::new(SimpleQos::new(
         qos_config,
-        quic_server_params.max_connections_per_staked_peer,
-        quic_server_params.max_staked_connections,
         stats.clone(),
         staked_nodes,
         cancel.clone(),
@@ -815,7 +793,7 @@ mod test {
             sender,
             staked_nodes,
             server_params,
-            SwQosConfig::default(),
+            SwQosConfig::default_for_tests(),
             cancel.clone(),
         )
         .unwrap();
@@ -871,10 +849,12 @@ mod test {
             sender,
             staked_nodes,
             QuicStreamerConfig {
-                max_connections_per_unstaked_peer: 2,
                 ..QuicStreamerConfig::default_for_tests()
             },
-            SwQosConfig::default(),
+            SwQosConfig {
+                max_connections_per_unstaked_peer: 2,
+                ..Default::default()
+            },
             cancel.clone(),
         )
         .unwrap();
@@ -915,13 +895,12 @@ mod test {
         );
 
         let server_params = QuicStreamerConfig {
-            max_unstaked_connections: 0,
-            max_connections_per_staked_peer: 1,
-            max_connections_per_unstaked_peer: 0,
             ..QuicStreamerConfig::default_for_tests()
         };
         let qos_config = SimpleQosConfig {
+            max_connections_per_peer: 1,
             max_streams_per_second: 20, // low limit to ensure staked node can send all packets
+            ..Default::default()
         };
         let server_params = SimpleQosQuicStreamerConfig {
             quic_streamer_config: server_params,
@@ -964,10 +943,12 @@ mod test {
             sender,
             staked_nodes,
             QuicStreamerConfig {
-                max_unstaked_connections: 0,
                 ..QuicStreamerConfig::default_for_tests()
             },
-            SwQosConfig::default(),
+            SwQosConfig {
+                max_unstaked_connections: 0,
+                ..Default::default()
+            },
             cancel.clone(),
         )
         .unwrap();

+ 27 - 15
tpu-client-next/tests/connection_workers_scheduler_test.rs

@@ -310,13 +310,13 @@ async fn test_connection_denied_until_allowed() {
         cancel,
     } = setup_quic_server(
         None,
-        QuicStreamerConfig {
+        QuicStreamerConfig::default_for_tests(),
+        SwQosConfig {
             // To prevent server from accepting a new connection, we
             // set max_connections_per_peer == 1
             max_connections_per_unstaked_peer: 1,
-            ..QuicStreamerConfig::default_for_tests()
+            ..Default::default()
         },
-        SwQosConfig::default(),
     );
 
     // If we create a blocking connection and try to create connections to send TXs,
@@ -386,11 +386,13 @@ async fn test_connection_pruned_and_reopened() {
     } = setup_quic_server(
         None,
         QuicStreamerConfig {
+            ..QuicStreamerConfig::default_for_tests()
+        },
+        SwQosConfig {
             max_connections_per_unstaked_peer: 100,
             max_unstaked_connections: 1,
-            ..QuicStreamerConfig::default_for_tests()
+            ..Default::default()
         },
-        SwQosConfig::default(),
     );
 
     // Setup sending txs
@@ -443,14 +445,16 @@ async fn test_staked_connection() {
     } = setup_quic_server(
         Some(staked_nodes),
         QuicStreamerConfig {
+            ..QuicStreamerConfig::default()
+        },
+        SwQosConfig {
             // Must use at least the number of endpoints (10) because
             // `max_staked_connections` and `max_unstaked_connections` are
             // cumulative for all the endpoints.
             max_staked_connections: 10,
             max_unstaked_connections: 0,
-            ..QuicStreamerConfig::default_for_tests()
+            ..Default::default()
         },
-        SwQosConfig::default(),
     );
 
     // Setup sending txs
@@ -594,11 +598,13 @@ async fn test_rate_limiting() {
     } = setup_quic_server(
         None,
         QuicStreamerConfig {
-            max_connections_per_unstaked_peer: 100,
             max_connections_per_ipaddr_per_min: 1,
             ..QuicStreamerConfig::default_for_tests()
         },
-        SwQosConfig::default(),
+        SwQosConfig {
+            max_connections_per_unstaked_peer: 100,
+            ..Default::default()
+        },
     );
 
     // open a connection to consume the limit
@@ -656,11 +662,13 @@ async fn test_rate_limiting_establish_connection() {
     } = setup_quic_server(
         None,
         QuicStreamerConfig {
-            max_connections_per_unstaked_peer: 100,
             max_connections_per_ipaddr_per_min: 1,
             ..QuicStreamerConfig::default_for_tests()
         },
-        SwQosConfig::default(),
+        SwQosConfig {
+            max_connections_per_unstaked_peer: 100,
+            ..Default::default()
+        },
     );
 
     let connection_to_reach_limit = make_client_endpoint(&server_address, None).await;
@@ -738,15 +746,17 @@ async fn test_update_identity() {
     } = setup_quic_server(
         Some(staked_nodes),
         QuicStreamerConfig {
+            ..QuicStreamerConfig::default_for_tests()
+        },
+        SwQosConfig {
             // Must use at least the number of endpoints (10) because
             // `max_staked_connections` and `max_unstaked_connections` are
             // cumulative for all the endpoints.
             max_staked_connections: 10,
             // Deny all unstaked connections.
             max_unstaked_connections: 0,
-            ..QuicStreamerConfig::default_for_tests()
+            ..Default::default()
         },
-        SwQosConfig::default(),
     );
 
     // Setup sending txs
@@ -802,11 +812,13 @@ async fn test_proactive_connection_close_detection() {
     } = setup_quic_server(
         None,
         QuicStreamerConfig {
+            ..QuicStreamerConfig::default_for_tests()
+        },
+        SwQosConfig {
             max_connections_per_unstaked_peer: 1,
             max_unstaked_connections: 1,
-            ..QuicStreamerConfig::default_for_tests()
+            ..Default::default()
         },
-        SwQosConfig::default(),
     );
 
     // Setup controlled transaction sending

+ 13 - 10
validator/src/commands/run/execute.rs

@@ -937,6 +937,11 @@ pub fn execute(
 
     let tpu_quic_server_config = SwQosQuicStreamerConfig {
         quic_streamer_config: QuicStreamerConfig {
+            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
+            num_threads: tpu_transaction_receive_threads,
+            ..Default::default()
+        },
+        qos_config: SwQosConfig {
             max_connections_per_unstaked_peer: tpu_max_connections_per_unstaked_peer
                 .try_into()
                 .unwrap(),
@@ -945,15 +950,17 @@ pub fn execute(
                 .unwrap(),
             max_staked_connections: tpu_max_staked_connections.try_into().unwrap(),
             max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(),
-            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
-            num_threads: tpu_transaction_receive_threads,
-            ..Default::default()
+            max_streams_per_ms,
         },
-        qos_config: SwQosConfig { max_streams_per_ms },
     };
 
     let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
         quic_streamer_config: QuicStreamerConfig {
+            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
+            num_threads: tpu_transaction_forward_receive_threads,
+            ..Default::default()
+        },
+        qos_config: SwQosConfig {
             max_connections_per_staked_peer: tpu_max_connections_per_staked_peer
                 .try_into()
                 .unwrap(),
@@ -962,23 +969,19 @@ pub fn execute(
                 .unwrap(),
             max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
             max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(),
-            max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
-            num_threads: tpu_transaction_forward_receive_threads,
-            ..Default::default()
+            max_streams_per_ms,
         },
-        qos_config: SwQosConfig { max_streams_per_ms },
     };
 
     let vote_quic_server_config = SimpleQosQuicStreamerConfig {
         quic_streamer_config: QuicStreamerConfig {
-            max_connections_per_unstaked_peer: 1,
-            max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(),
             max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
             num_threads: tpu_vote_transaction_receive_threads,
             ..Default::default()
         },
         qos_config: SimpleQosConfig {
             max_streams_per_second: MAX_VOTES_PER_SECOND,
+            ..Default::default()
         },
     };
 

+ 9 - 10
vortexor/src/vortexor.rs

@@ -121,14 +121,17 @@ impl Vortexor {
     ) -> Self {
         let quic_server_params = SwQosQuicStreamerConfig {
             quic_streamer_config: QuicStreamerConfig {
-                max_connections_per_unstaked_peer: max_connections_per_peer,
-                max_staked_connections: max_tpu_staked_connections,
-                max_unstaked_connections: max_tpu_unstaked_connections,
                 max_connections_per_ipaddr_per_min,
                 wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
                 ..Default::default()
             },
-            qos_config: SwQosConfig { max_streams_per_ms },
+            qos_config: SwQosConfig {
+                max_connections_per_unstaked_peer: max_connections_per_peer,
+                max_connections_per_staked_peer: max_connections_per_peer,
+                max_staked_connections: max_tpu_staked_connections,
+                max_unstaked_connections: max_tpu_unstaked_connections,
+                max_streams_per_ms,
+            },
         };
 
         let mut quic_fwd_server_params = quic_server_params.clone();
@@ -153,12 +156,8 @@ impl Vortexor {
 
         // Fot TPU forward -- we disallow unstaked connections. Allocate all connection resources
         // for staked connections:
-        quic_fwd_server_params
-            .quic_streamer_config
-            .max_staked_connections = max_fwd_staked_connections;
-        quic_fwd_server_params
-            .quic_streamer_config
-            .max_unstaked_connections = max_fwd_unstaked_connections;
+        quic_fwd_server_params.qos_config.max_staked_connections = max_fwd_staked_connections;
+        quic_fwd_server_params.qos_config.max_unstaked_connections = max_fwd_unstaked_connections;
         let tpu_fwd_result = spawn_stake_wighted_qos_server(
             "solVtxTpuFwd",
             "quic_vortexor_tpu_forwards",