瀏覽代碼

Streamer: code cleanup (#7320)

* clean up the server spawn API
* remove pub from functions which do not need to be pub
* remove some obviously wrong comments
Alex Pyattaev 3 月之前
父節點
當前提交
8854e65d81

+ 2 - 2
bench-vote/src/main.rs

@@ -18,7 +18,7 @@ use {
     solana_streamer::{
         packet::PacketBatchRecycler,
         quic::{
-            spawn_server_multi, QuicServerParams, DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER,
+            spawn_server, QuicServerParams, DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER,
             DEFAULT_MAX_STAKED_CONNECTIONS,
         },
         streamer::{receiver, PacketBatchReceiver, StakedNodes, StreamerReceiveStats},
@@ -261,7 +261,7 @@ fn main() -> Result<()> {
             let (s_reader, r_reader) = unbounded();
             read_channels.push(r_reader);
 
-            let server = spawn_server_multi(
+            let server = spawn_server(
                 "solRcvrBenVote",
                 "bench_vote_metrics",
                 read_sockets,

+ 4 - 4
core/src/tpu.rs

@@ -55,7 +55,7 @@ use {
         vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
     },
     solana_streamer::{
-        quic::{spawn_server_multi, QuicServerParams, SpawnServerResult},
+        quic::{spawn_server, QuicServerParams, SpawnServerResult},
         streamer::StakedNodes,
     },
     solana_turbine::{
@@ -212,7 +212,7 @@ impl Tpu {
             endpoints: _,
             thread: tpu_vote_quic_t,
             key_updater: vote_streamer_key_updater,
-        } = spawn_server_multi(
+        } = spawn_server(
             "solQuicTVo",
             "quic_streamer_tpu_vote",
             tpu_vote_quic_sockets,
@@ -230,7 +230,7 @@ impl Tpu {
                 endpoints: _,
                 thread: tpu_quic_t,
                 key_updater,
-            } = spawn_server_multi(
+            } = spawn_server(
                 "solQuicTpu",
                 "quic_streamer_tpu",
                 transactions_quic_sockets,
@@ -252,7 +252,7 @@ impl Tpu {
                 endpoints: _,
                 thread: tpu_forwards_quic_t,
                 key_updater: forwards_key_updater,
-            } = spawn_server_multi(
+            } = spawn_server(
                 "solQuicTpuFwd",
                 "quic_streamer_tpu_forwards",
                 transactions_forwards_quic_sockets,

+ 5 - 5
quic-client/tests/quic_client.rs

@@ -78,7 +78,7 @@ mod tests {
         } = solana_streamer::quic::spawn_server(
             "solQuicTest",
             "quic_streamer_test",
-            s.try_clone().unwrap(),
+            vec![s.try_clone().unwrap()],
             &keypair,
             sender,
             exit.clone(),
@@ -158,7 +158,7 @@ mod tests {
             max_concurrent_connections: _,
         } = solana_streamer::nonblocking::quic::spawn_server(
             "quic_streamer_test",
-            s.try_clone().unwrap(),
+            vec![s.try_clone().unwrap()],
             &keypair,
             sender,
             exit.clone(),
@@ -216,7 +216,7 @@ mod tests {
         } = solana_streamer::quic::spawn_server(
             "solQuicTest",
             "quic_streamer_test",
-            request_recv_socket.try_clone().unwrap(),
+            [request_recv_socket.try_clone().unwrap()],
             &keypair,
             sender,
             request_recv_exit.clone(),
@@ -240,7 +240,7 @@ mod tests {
         } = solana_streamer::quic::spawn_server(
             "solQuicTest",
             "quic_streamer_test",
-            response_recv_socket,
+            [response_recv_socket],
             &keypair2,
             sender2,
             response_recv_exit.clone(),
@@ -326,7 +326,7 @@ mod tests {
             max_concurrent_connections: _,
         } = solana_streamer::nonblocking::quic::spawn_server(
             "quic_streamer_test",
-            s.try_clone().unwrap(),
+            vec![s.try_clone().unwrap()],
             &keypair,
             sender,
             exit.clone(),

+ 12 - 14
streamer/src/nonblocking/quic.rs

@@ -140,18 +140,19 @@ pub struct SpawnNonBlockingServerResult {
     pub max_concurrent_connections: usize,
 }
 
-pub fn spawn_server(
+#[deprecated(since = "3.0.0", note = "Use spawn_server instead")]
+pub fn spawn_server_multi(
     name: &'static str,
-    sock: UdpSocket,
+    sockets: impl IntoIterator<Item = UdpSocket>,
     keypair: &Keypair,
     packet_sender: Sender<PacketBatch>,
     exit: Arc<AtomicBool>,
     staked_nodes: Arc<RwLock<StakedNodes>>,
     quic_server_params: QuicServerParams,
 ) -> Result<SpawnNonBlockingServerResult, QuicServerError> {
-    spawn_server_multi(
+    spawn_server(
         name,
-        vec![sock],
+        sockets,
         keypair,
         packet_sender,
         exit,
@@ -160,15 +161,17 @@ pub fn spawn_server(
     )
 }
 
-pub fn spawn_server_multi(
+/// Spawn a streamer instance in the current tokio runtime.
+pub fn spawn_server(
     name: &'static str,
-    sockets: Vec<UdpSocket>,
+    sockets: impl IntoIterator<Item = UdpSocket>,
     keypair: &Keypair,
     packet_sender: Sender<PacketBatch>,
     exit: Arc<AtomicBool>,
     staked_nodes: Arc<RwLock<StakedNodes>>,
     quic_server_params: QuicServerParams,
 ) -> Result<SpawnNonBlockingServerResult, QuicServerError> {
+    let sockets: Vec<_> = sockets.into_iter().collect();
     info!("Start {name} quic server on {sockets:?}");
     let QuicServerParams {
         max_unstaked_connections,
@@ -459,7 +462,7 @@ fn get_connection_stake(
     ))
 }
 
-pub fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
+fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
     match peer_type {
         ConnectionPeerType::Staked(peer_stake) => {
             // No checked math for f64 type. So let's explicitly check for 0 here
@@ -493,11 +496,6 @@ enum ConnectionHandlerError {
 
 #[derive(Clone)]
 struct NewConnectionHandlerParams {
-    // In principle, the code can be made to work with a crossbeam channel
-    // as long as we're careful never to use a blocking recv or send call
-    // but I've found that it's simply too easy to accidentally block
-    // in async code when using the crossbeam channel, so for the sake of maintainability,
-    // we're sticking with an async channel
     packet_sender: Sender<PacketAccumulator>,
     remote_pubkey: Option<Pubkey>,
     peer_type: ConnectionPeerType,
@@ -2006,7 +2004,7 @@ pub mod test {
             max_concurrent_connections: _,
         } = spawn_server(
             "quic_streamer_test",
-            s,
+            [s],
             &keypair,
             sender,
             exit.clone(),
@@ -2039,7 +2037,7 @@ pub mod test {
             max_concurrent_connections: _,
         } = spawn_server(
             "quic_streamer_test",
-            s,
+            [s],
             &keypair,
             sender,
             exit.clone(),

+ 2 - 2
streamer/src/nonblocking/testing_utilities.rs

@@ -1,6 +1,6 @@
 //! Contains utility functions to create server and client for test purposes.
 use {
-    super::quic::{spawn_server_multi, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID},
+    super::quic::{spawn_server, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID},
     crate::{
         quic::{QuicServerParams, StreamerStats},
         streamer::StakedNodes,
@@ -97,7 +97,7 @@ pub fn setup_quic_server_with_sockets(
         stats,
         thread: handle,
         max_concurrent_connections: _,
-    } = spawn_server_multi(
+    } = spawn_server(
         "quic_streamer_test",
         sockets,
         &keypair,

+ 13 - 11
streamer/src/quic.rs

@@ -115,7 +115,7 @@ pub(crate) fn configure_server(
     Ok((server_config, cert_chain_pem))
 }
 
-pub fn rt(name: String, num_threads: NonZeroUsize) -> Runtime {
+fn rt(name: String, num_threads: NonZeroUsize) -> Runtime {
     tokio::runtime::Builder::new_multi_thread()
         .thread_name(name)
         .worker_threads(num_threads.get())
@@ -582,20 +582,21 @@ impl StreamerStats {
     }
 }
 
-pub fn spawn_server(
+#[deprecated(since = "3.0.0", note = "Use spawn_server instead")]
+pub fn spawn_server_multi(
     thread_name: &'static str,
     metrics_name: &'static str,
-    socket: UdpSocket,
+    sockets: Vec<UdpSocket>,
     keypair: &Keypair,
     packet_sender: Sender<PacketBatch>,
     exit: Arc<AtomicBool>,
     staked_nodes: Arc<RwLock<StakedNodes>>,
     quic_server_params: QuicServerParams,
 ) -> Result<SpawnServerResult, QuicServerError> {
-    spawn_server_multi(
+    spawn_server(
         thread_name,
         metrics_name,
-        vec![socket],
+        sockets,
         keypair,
         packet_sender,
         exit,
@@ -647,10 +648,11 @@ impl QuicServerParams {
     }
 }
 
-pub fn spawn_server_multi(
+/// Spawns a tokio runtime and a streamer instance inside it.
+pub fn spawn_server(
     thread_name: &'static str,
     metrics_name: &'static str,
-    sockets: Vec<UdpSocket>,
+    sockets: impl IntoIterator<Item = UdpSocket>,
     keypair: &Keypair,
     packet_sender: Sender<PacketBatch>,
     exit: Arc<AtomicBool>,
@@ -660,7 +662,7 @@ pub fn spawn_server_multi(
     let runtime = rt(format!("{thread_name}Rt"), quic_server_params.num_threads);
     let result = {
         let _guard = runtime.enter();
-        crate::nonblocking::quic::spawn_server_multi(
+        crate::nonblocking::quic::spawn_server(
             metrics_name,
             sockets,
             keypair,
@@ -724,7 +726,7 @@ mod test {
         } = spawn_server(
             "solQuicTest",
             "quic_streamer_test",
-            s,
+            [s],
             &keypair,
             sender,
             exit.clone(),
@@ -779,7 +781,7 @@ mod test {
         } = spawn_server(
             "solQuicTest",
             "quic_streamer_test",
-            s,
+            [s],
             &keypair,
             sender,
             exit.clone(),
@@ -824,7 +826,7 @@ mod test {
         } = spawn_server(
             "solQuicTest",
             "quic_streamer_test",
-            s,
+            [s],
             &keypair,
             sender,
             exit.clone(),

+ 3 - 3
vortexor/src/vortexor.rs

@@ -12,7 +12,7 @@ use {
     solana_quic_definitions::NotifyKeyUpdate,
     solana_streamer::{
         nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
-        quic::{spawn_server_multi, EndpointKeyUpdater, QuicServerParams},
+        quic::{spawn_server, EndpointKeyUpdater, QuicServerParams},
         streamer::StakedNodes,
     },
     std::{
@@ -133,7 +133,7 @@ impl Vortexor {
             tpu_quic_fwd,
         } = tpu_sockets;
 
-        let tpu_result = spawn_server_multi(
+        let tpu_result = spawn_server(
             "solVtxTpu",
             "quic_vortexor_tpu",
             tpu_quic,
@@ -149,7 +149,7 @@ impl Vortexor {
         // for staked connections:
         quic_server_params.max_staked_connections = max_fwd_staked_connections;
         quic_server_params.max_unstaked_connections = max_fwd_unstaked_connections;
-        let tpu_fwd_result = spawn_server_multi(
+        let tpu_fwd_result = spawn_server(
             "solVtxTpuFwd",
             "quic_vortexor_tpu_forwards",
             tpu_quic_fwd,