Browse Source

multiquic support in solana_streamer -- rebase of #634 to latest master (#1452)

* net-utils: support SO_REUSEPORT

tpu: use multiple quic endpoints

cluster-info: manage port range by hand...

local-cluster: keep udp tpu socket around for tests

* Missing cargo file

* sort cargo.toml

* divide the concurrent_connections among the endpoints for multiquic

* Change default multiquic endpoint count to 1

* Missing Cargo.lock changes

* revert reuseaddr changes

* revert reuseaddr changes;fmt code

* reverted port range changes

* revert DEFAULT_TPU_ENABLE_UDP change in local_cluster

* Turn tpu_enable_udp to true to prevent concurrent local cluster tests to use the same QUIC ports

* changed QUIC_ENDPOINTS to 10 for testing

* Turn QUIC_ENDPOINTS to 1 for now

---------

Co-authored-by: Trent Nelson <trent@solana.com>
Co-authored-by: Lijun Wang <lijun.wang@oracle.com>
Lijun Wang 1 year ago
parent
commit
2443048c5c

+ 2 - 0
Cargo.lock

@@ -7419,6 +7419,7 @@ dependencies = [
  "bytes",
  "crossbeam-channel",
  "dashmap",
+ "futures 0.3.30",
  "futures-util",
  "histogram",
  "indexmap 2.2.6",
@@ -7433,6 +7434,7 @@ dependencies = [
  "rand 0.8.5",
  "rustls",
  "smallvec",
+ "socket2 0.5.7",
  "solana-logger",
  "solana-measure",
  "solana-metrics",

+ 4 - 1
client/src/connection_cache.rs

@@ -260,7 +260,7 @@ mod tests {
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
 
         let SpawnServerResult {
-            endpoint: response_recv_endpoint,
+            endpoints: mut response_recv_endpoints,
             thread: response_recv_thread,
             key_updater: _,
         } = solana_streamer::quic::spawn_server(
@@ -281,6 +281,9 @@ mod tests {
         )
         .unwrap();
 
+        let response_recv_endpoint = response_recv_endpoints
+            .pop()
+            .expect("at least one endpoint");
         let connection_cache = ConnectionCache::new_with_client_options(
             "connection_cache_test",
             1,                            // connection_pool_size

+ 9 - 7
core/src/tpu.rs

@@ -38,7 +38,9 @@ use {
     solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
     solana_streamer::{
         nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
-        quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
+        quic::{
+            spawn_server_multi, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS,
+        },
         streamer::StakedNodes,
     },
     solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
@@ -60,8 +62,8 @@ pub struct TpuSockets {
     pub transaction_forwards: Vec<UdpSocket>,
     pub vote: Vec<UdpSocket>,
     pub broadcast: Vec<UdpSocket>,
-    pub transactions_quic: UdpSocket,
-    pub transactions_forwards_quic: UdpSocket,
+    pub transactions_quic: Vec<UdpSocket>,
+    pub transactions_forwards_quic: Vec<UdpSocket>,
 }
 
 pub struct Tpu {
@@ -153,10 +155,10 @@ impl Tpu {
         let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
 
         let SpawnServerResult {
-            endpoint: _,
+            endpoints: _,
             thread: tpu_quic_t,
             key_updater,
-        } = spawn_server(
+        } = spawn_server_multi(
             "solQuicTpu",
             "quic_streamer_tpu",
             transactions_quic_sockets,
@@ -175,10 +177,10 @@ impl Tpu {
         .unwrap();
 
         let SpawnServerResult {
-            endpoint: _,
+            endpoints: _,
             thread: tpu_forwards_quic_t,
             key_updater: forwards_key_updater,
-        } = spawn_server(
+        } = spawn_server_multi(
             "solQuicTpuFwd",
             "quic_streamer_tpu_forwards",
             transactions_forwards_quic_sockets,

+ 74 - 17
gossip/src/cluster_info.rs

@@ -54,8 +54,9 @@ use {
     solana_ledger::shred::Shred,
     solana_measure::measure::Measure,
     solana_net_utils::{
-        bind_common, bind_common_in_range, bind_in_range, bind_two_in_range_with_offset,
-        find_available_port_in_range, multi_bind_in_range, PortRange,
+        bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config,
+        bind_more_with_config, bind_two_in_range_with_offset_and_config,
+        find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig,
     },
     solana_perf::{
         data_budget::DataBudget,
@@ -2891,8 +2892,8 @@ pub struct Sockets {
     pub serve_repair: UdpSocket,
     pub serve_repair_quic: UdpSocket,
     pub ancestor_hashes_requests: UdpSocket,
-    pub tpu_quic: UdpSocket,
-    pub tpu_forwards_quic: UdpSocket,
+    pub tpu_quic: Vec<UdpSocket>,
+    pub tpu_forwards_quic: Vec<UdpSocket>,
 }
 
 pub struct NodeConfig {
@@ -2905,6 +2906,9 @@ pub struct NodeConfig {
     pub num_tvu_sockets: NonZeroUsize,
 }
 
+// This will be adjusted and parameterized in follow-on PRs.
+const QUIC_ENDPOINTS: usize = 1;
+
 #[derive(Debug)]
 pub struct Node {
     pub info: ContactInfo,
@@ -2922,15 +2926,35 @@ impl Node {
         let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED));
         let port_range = (1024, 65535);
 
+        let udp_config = SocketConfig { reuseport: false };
+        let quic_config = SocketConfig { reuseport: true };
         let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
-            bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
+            bind_two_in_range_with_offset_and_config(
+                localhost_ip_addr,
+                port_range,
+                QUIC_PORT_OFFSET,
+                udp_config.clone(),
+                quic_config.clone(),
+            )
+            .unwrap();
+        let tpu_quic =
+            bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
         let (gossip_port, (gossip, ip_echo)) =
             bind_common_in_range(localhost_ip_addr, port_range).unwrap();
         let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
         let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap();
         let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
         let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
-            bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
+            bind_two_in_range_with_offset_and_config(
+                localhost_ip_addr,
+                port_range,
+                QUIC_PORT_OFFSET,
+                udp_config,
+                quic_config.clone(),
+            )
+            .unwrap();
+        let tpu_forwards_quic =
+            bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap();
         let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap();
         let repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
         let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap();
@@ -3008,7 +3032,7 @@ impl Node {
         if gossip_addr.port() != 0 {
             (
                 gossip_addr.port(),
-                bind_common(bind_ip_addr, gossip_addr.port(), false).unwrap_or_else(|e| {
+                bind_common(bind_ip_addr, gossip_addr.port()).unwrap_or_else(|e| {
                     panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e)
                 }),
             )
@@ -3017,7 +3041,16 @@ impl Node {
         }
     }
     fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) {
-        bind_in_range(bind_ip_addr, port_range).expect("Failed to bind")
+        let config = SocketConfig { reuseport: false };
+        Self::bind_with_config(bind_ip_addr, port_range, config)
+    }
+
+    fn bind_with_config(
+        bind_ip_addr: IpAddr,
+        port_range: PortRange,
+        config: SocketConfig,
+    ) -> (u16, UdpSocket) {
+        bind_in_range_with_config(bind_ip_addr, port_range, config).expect("Failed to bind")
     }
 
     pub fn new_single_bind(
@@ -3030,10 +3063,30 @@ impl Node {
             Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr);
         let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range);
         let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
+        let udp_config = SocketConfig { reuseport: false };
+        let quic_config = SocketConfig { reuseport: true };
         let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
-            bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
+            bind_two_in_range_with_offset_and_config(
+                bind_ip_addr,
+                port_range,
+                QUIC_PORT_OFFSET,
+                udp_config.clone(),
+                quic_config.clone(),
+            )
+            .unwrap();
+        let tpu_quic =
+            bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
         let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
-            bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
+            bind_two_in_range_with_offset_and_config(
+                bind_ip_addr,
+                port_range,
+                QUIC_PORT_OFFSET,
+                udp_config,
+                quic_config.clone(),
+            )
+            .unwrap();
+        let tpu_forwards_quic =
+            bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap();
         let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
         let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
         let (_, repair) = Self::bind(bind_ip_addr, port_range);
@@ -3117,21 +3170,28 @@ impl Node {
         let (tpu_port, tpu_sockets) =
             multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");
 
-        let (_tpu_port_quic, tpu_quic) = Self::bind(
+        let quic_config = SocketConfig { reuseport: true };
+        let (_tpu_port_quic, tpu_quic) = Self::bind_with_config(
             bind_ip_addr,
             (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1),
+            quic_config.clone(),
         );
+        let tpu_quic =
+            bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
 
         let (tpu_forwards_port, tpu_forwards_sockets) =
             multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
 
-        let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind(
+        let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config(
             bind_ip_addr,
             (
                 tpu_forwards_port + QUIC_PORT_OFFSET,
                 tpu_forwards_port + QUIC_PORT_OFFSET + 1,
             ),
+            quic_config.clone(),
         );
+        let tpu_forwards_quic =
+            bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
 
         let (tpu_vote_port, tpu_vote_sockets) =
             multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");
@@ -3338,7 +3398,6 @@ mod tests {
         },
         itertools::izip,
         solana_ledger::shred::Shredder,
-        solana_net_utils::MINIMUM_VALIDATOR_PORT_RANGE_WIDTH,
         solana_sdk::signature::{Keypair, Signer},
         solana_vote_program::{vote_instruction, vote_state::Vote},
         std::{
@@ -3771,10 +3830,8 @@ mod tests {
     fn new_with_external_ip_test_gossip() {
         // Can't use VALIDATOR_PORT_RANGE because if this test runs in parallel with others, the
         // port returned by `bind_in_range()` might be snatched up before `Node::new_with_external_ip()` runs
-        let port_range = (
-            VALIDATOR_PORT_RANGE.1 + MINIMUM_VALIDATOR_PORT_RANGE_WIDTH,
-            VALIDATOR_PORT_RANGE.1 + (2 * MINIMUM_VALIDATOR_PORT_RANGE_WIDTH),
-        );
+        let (start, end) = VALIDATOR_PORT_RANGE;
+        let port_range = (end, end + (end - start));
         let ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
         let port = bind_in_range(ip, port_range).expect("Failed to bind").0;
         let config = NodeConfig {

+ 3 - 1
local-cluster/src/local_cluster.rs

@@ -336,7 +336,9 @@ impl LocalCluster {
             socket_addr_space,
             DEFAULT_TPU_USE_QUIC,
             DEFAULT_TPU_CONNECTION_POOL_SIZE,
-            DEFAULT_TPU_ENABLE_UDP,
+            // We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests
+            // to use the same QUIC ports due to SO_REUSEPORT.
+            true,
             32, // max connections per IpAddr per minute
             Arc::new(RwLock::new(None)),
         )

+ 95 - 26
net-utils/src/lib.rs

@@ -384,29 +384,45 @@ pub fn is_host_port(string: String) -> Result<(), String> {
     parse_host_port(&string).map(|_| ())
 }
 
+#[derive(Clone, Debug)]
+pub struct SocketConfig {
+    pub reuseport: bool,
+}
+
+impl Default for SocketConfig {
+    #[allow(clippy::derivable_impls)]
+    fn default() -> Self {
+        Self { reuseport: false }
+    }
+}
+
 #[cfg(any(windows, target_os = "ios"))]
 fn udp_socket(_reuseaddr: bool) -> io::Result<Socket> {
     let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
     Ok(sock)
 }
 
+#[cfg(any(windows, target_os = "ios"))]
+fn udp_socket_with_config(_config: SocketConfig) -> io::Result<Socket> {
+    let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
+    Ok(sock)
+}
+
+#[cfg(not(any(windows, target_os = "ios")))]
+fn udp_socket(reuseport: bool) -> io::Result<Socket> {
+    let config = SocketConfig { reuseport };
+    udp_socket_with_config(config)
+}
+
 #[cfg(not(any(windows, target_os = "ios")))]
-fn udp_socket(reuseaddr: bool) -> io::Result<Socket> {
-    use {
-        nix::sys::socket::{
-            setsockopt,
-            sockopt::{ReuseAddr, ReusePort},
-        },
-        std::os::fd::AsFd,
-    };
+fn udp_socket_with_config(config: SocketConfig) -> io::Result<Socket> {
+    use nix::sys::socket::{setsockopt, sockopt::ReusePort};
+    let SocketConfig { reuseport } = config;
 
     let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
-    let sock_fd = sock.as_fd();
 
-    if reuseaddr {
-        // best effort, i.e. ignore errors here, we'll get the failure in caller
-        setsockopt(&sock_fd, ReusePort, &true).ok();
-        setsockopt(&sock_fd, ReuseAddr, &true).ok();
+    if reuseport {
+        setsockopt(&sock, ReusePort, &true).ok();
     }
 
     Ok(sock)
@@ -418,7 +434,7 @@ pub fn bind_common_in_range(
     range: PortRange,
 ) -> io::Result<(u16, (UdpSocket, TcpListener))> {
     for port in range.0..range.1 {
-        if let Ok((sock, listener)) = bind_common(ip_addr, port, false) {
+        if let Ok((sock, listener)) = bind_common(ip_addr, port) {
             return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener)));
         }
     }
@@ -430,7 +446,16 @@ pub fn bind_common_in_range(
 }
 
 pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> {
-    let sock = udp_socket(false)?;
+    let config = SocketConfig::default();
+    bind_in_range_with_config(ip_addr, range, config)
+}
+
+pub fn bind_in_range_with_config(
+    ip_addr: IpAddr,
+    range: PortRange,
+    config: SocketConfig,
+) -> io::Result<(u16, UdpSocket)> {
+    let sock = udp_socket_with_config(config)?;
 
     for port in range.0..range.1 {
         let addr = SocketAddr::new(ip_addr, port);
@@ -484,8 +509,9 @@ pub fn multi_bind_in_range(
             port
         }; // drop the probe, port should be available... briefly.
 
+        let config = SocketConfig { reuseport: true };
         for _ in 0..num {
-            let sock = bind_to(ip_addr, port, true);
+            let sock = bind_to_with_config(ip_addr, port, config.clone());
             if let Ok(sock) = sock {
                 sockets.push(sock);
             } else {
@@ -505,8 +531,17 @@ pub fn multi_bind_in_range(
     Ok((port, sockets))
 }
 
-pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result<UdpSocket> {
-    let sock = udp_socket(reuseaddr)?;
+pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result<UdpSocket> {
+    let config = SocketConfig { reuseport };
+    bind_to_with_config(ip_addr, port, config)
+}
+
+pub fn bind_to_with_config(
+    ip_addr: IpAddr,
+    port: u16,
+    config: SocketConfig,
+) -> io::Result<UdpSocket> {
+    let sock = udp_socket_with_config(config)?;
 
     let addr = SocketAddr::new(ip_addr, port);
 
@@ -514,12 +549,18 @@ pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result<UdpSoc
 }
 
 // binds both a UdpSocket and a TcpListener
-pub fn bind_common(
+pub fn bind_common(ip_addr: IpAddr, port: u16) -> io::Result<(UdpSocket, TcpListener)> {
+    let config = SocketConfig { reuseport: false };
+    bind_common_with_config(ip_addr, port, config)
+}
+
+// binds both a UdpSocket and a TcpListener
+pub fn bind_common_with_config(
     ip_addr: IpAddr,
     port: u16,
-    reuseaddr: bool,
+    config: SocketConfig,
 ) -> io::Result<(UdpSocket, TcpListener)> {
-    let sock = udp_socket(reuseaddr)?;
+    let sock = udp_socket_with_config(config)?;
 
     let addr = SocketAddr::new(ip_addr, port);
     let sock_addr = SockAddr::from(addr);
@@ -531,6 +572,18 @@ pub fn bind_two_in_range_with_offset(
     ip_addr: IpAddr,
     range: PortRange,
     offset: u16,
+) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
+    let sock1_config = SocketConfig::default();
+    let sock2_config = SocketConfig::default();
+    bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config)
+}
+
+pub fn bind_two_in_range_with_offset_and_config(
+    ip_addr: IpAddr,
+    range: PortRange,
+    offset: u16,
+    sock1_config: SocketConfig,
+    sock2_config: SocketConfig,
 ) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
     if range.1.saturating_sub(range.0) < offset {
         return Err(io::Error::new(
@@ -539,9 +592,11 @@ pub fn bind_two_in_range_with_offset(
         ));
     }
     for port in range.0..range.1 {
-        if let Ok(first_bind) = bind_to(ip_addr, port, false) {
+        if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config.clone()) {
             if range.1.saturating_sub(port) >= offset {
-                if let Ok(second_bind) = bind_to(ip_addr, port + offset, false) {
+                if let Ok(second_bind) =
+                    bind_to_with_config(ip_addr, port + offset, sock2_config.clone())
+                {
                     return Ok((
                         (first_bind.local_addr().unwrap().port(), first_bind),
                         (second_bind.local_addr().unwrap().port(), second_bind),
@@ -563,7 +618,7 @@ pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Re
     let mut tries_left = end - start;
     let mut rand_port = thread_rng().gen_range(start..end);
     loop {
-        match bind_common(ip_addr, rand_port, false) {
+        match bind_common(ip_addr, rand_port) {
             Ok(_) => {
                 break Ok(rand_port);
             }
@@ -581,6 +636,19 @@ pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Re
     }
 }
 
+pub fn bind_more_with_config(
+    socket: UdpSocket,
+    num: usize,
+    config: SocketConfig,
+) -> io::Result<Vec<UdpSocket>> {
+    let addr = socket.local_addr().unwrap();
+    let ip = addr.ip();
+    let port = addr.port();
+    std::iter::once(Ok(socket))
+        .chain((1..num).map(|_| bind_to_with_config(ip, port, config.clone())))
+        .collect()
+}
+
 #[cfg(test)]
 mod tests {
     use {super::*, std::net::Ipv4Addr};
@@ -684,8 +752,9 @@ mod tests {
         let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
         assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000);
         let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
-        let x = bind_to(ip_addr, 2002, true).unwrap();
-        let y = bind_to(ip_addr, 2002, true).unwrap();
+        let config = SocketConfig { reuseport: true };
+        let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap();
+        let y = bind_to_with_config(ip_addr, 2002, config).unwrap();
         assert_eq!(
             x.local_addr().unwrap().port(),
             y.local_addr().unwrap().port()

+ 1 - 0
programs/sbf/Cargo.lock

@@ -6244,6 +6244,7 @@ dependencies = [
  "bytes",
  "crossbeam-channel",
  "dashmap",
+ "futures 0.3.30",
  "futures-util",
  "histogram",
  "indexmap 2.2.6",

+ 9 - 5
quic-client/tests/quic_client.rs

@@ -73,7 +73,7 @@ mod tests {
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
         let (s, exit, keypair) = server_args();
         let SpawnServerResult {
-            endpoint: _,
+            endpoints: _,
             thread: t,
             key_updater: _,
         } = solana_streamer::quic::spawn_server(
@@ -159,7 +159,7 @@ mod tests {
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
         let (s, exit, keypair) = server_args();
         let solana_streamer::nonblocking::quic::SpawnNonBlockingServerResult {
-            endpoint: _,
+            endpoints: _,
             stats: _,
             thread: t,
             max_concurrent_connections: _,
@@ -223,7 +223,7 @@ mod tests {
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
         let (request_recv_socket, request_recv_exit, keypair) = server_args();
         let SpawnServerResult {
-            endpoint: request_recv_endpoint,
+            endpoints: request_recv_endpoints,
             thread: request_recv_thread,
             key_updater: _,
         } = solana_streamer::quic::spawn_server(
@@ -244,7 +244,7 @@ mod tests {
         )
         .unwrap();
 
-        drop(request_recv_endpoint);
+        drop(request_recv_endpoints);
         // Response Receiver:
         let (response_recv_socket, response_recv_exit, keypair2) = server_args();
         let (sender2, receiver2) = unbounded();
@@ -253,7 +253,7 @@ mod tests {
         let port = response_recv_socket.local_addr().unwrap().port();
         let server_addr = SocketAddr::new(addr, port);
         let SpawnServerResult {
-            endpoint: response_recv_endpoint,
+            endpoints: mut response_recv_endpoints,
             thread: response_recv_thread,
             key_updater: _,
         } = solana_streamer::quic::spawn_server(
@@ -286,6 +286,10 @@ mod tests {
             key: priv_key,
         });
 
+        let response_recv_endpoint = response_recv_endpoints
+            .pop()
+            .expect("at least one endpoint");
+        drop(response_recv_endpoints);
         let endpoint =
             QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint));
         let request_sender =

+ 2 - 0
streamer/Cargo.toml

@@ -14,6 +14,7 @@ async-channel = { workspace = true }
 bytes = { workspace = true }
 crossbeam-channel = { workspace = true }
 dashmap = { workspace = true }
+futures  = { workspace = true }
 futures-util = { workspace = true }
 histogram = { workspace = true }
 indexmap = { workspace = true }
@@ -39,6 +40,7 @@ x509-parser = { workspace = true }
 
 [dev-dependencies]
 assert_matches = { workspace = true }
+socket2 = { workspace = true }
 solana-logger = { workspace = true }
 
 [lib]

+ 140 - 23
streamer/src/nonblocking/quic.rs

@@ -16,9 +16,10 @@ use {
     },
     bytes::Bytes,
     crossbeam_channel::Sender,
+    futures::{stream::FuturesUnordered, Future, StreamExt as _},
     indexmap::map::{Entry, IndexMap},
     percentage::Percentage,
-    quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
+    quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
     quinn_proto::VarIntBoundsExceeded,
     rand::{thread_rng, Rng},
     smallvec::SmallVec,
@@ -40,11 +41,13 @@ use {
     std::{
         iter::repeat_with,
         net::{IpAddr, SocketAddr, UdpSocket},
+        pin::Pin,
         // CAUTION: be careful not to introduce any awaits while holding an RwLock.
         sync::{
             atomic::{AtomicBool, AtomicU64, Ordering},
             Arc, RwLock,
         },
+        task::Poll,
         time::{Duration, Instant},
     },
     tokio::{
@@ -56,6 +59,7 @@ use {
         // but if we do, the scope of the RwLock must always be a subset of the async Mutex
         // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to
         // introduce any other awaits while holding the RwLock.
+        select,
         sync::{Mutex, MutexGuard},
         task::JoinHandle,
         time::{sleep, timeout},
@@ -135,7 +139,7 @@ impl ConnectionPeerType {
 }
 
 pub struct SpawnNonBlockingServerResult {
-    pub endpoint: Endpoint,
+    pub endpoints: Vec<Endpoint>,
     pub stats: Arc<StreamStats>,
     pub thread: JoinHandle<()>,
     pub max_concurrent_connections: usize,
@@ -157,23 +161,61 @@ pub fn spawn_server(
     wait_for_chunk_timeout: Duration,
     coalesce: Duration,
 ) -> Result<SpawnNonBlockingServerResult, QuicServerError> {
-    info!("Start {name} quic server on {sock:?}");
-    let concurrent_connections = max_staked_connections + max_unstaked_connections;
+    spawn_server_multi(
+        name,
+        vec![sock],
+        keypair,
+        packet_sender,
+        exit,
+        max_connections_per_peer,
+        staked_nodes,
+        max_staked_connections,
+        max_unstaked_connections,
+        max_streams_per_ms,
+        max_connections_per_ipaddr_per_min,
+        wait_for_chunk_timeout,
+        coalesce,
+    )
+}
+
+#[allow(clippy::too_many_arguments, clippy::type_complexity)]
+pub fn spawn_server_multi(
+    name: &'static str,
+    sockets: Vec<UdpSocket>,
+    keypair: &Keypair,
+    packet_sender: Sender<PacketBatch>,
+    exit: Arc<AtomicBool>,
+    max_connections_per_peer: usize,
+    staked_nodes: Arc<RwLock<StakedNodes>>,
+    max_staked_connections: usize,
+    max_unstaked_connections: usize,
+    max_streams_per_ms: u64,
+    max_connections_per_ipaddr_per_min: u64,
+    wait_for_chunk_timeout: Duration,
+    coalesce: Duration,
+) -> Result<SpawnNonBlockingServerResult, QuicServerError> {
+    info!("Start {name} quic server on {sockets:?}");
+    let concurrent_connections =
+        (max_staked_connections + max_unstaked_connections) / sockets.len();
     let max_concurrent_connections = concurrent_connections + concurrent_connections / 4;
     let (config, _cert) = configure_server(keypair, max_concurrent_connections)?;
 
-    let endpoint = Endpoint::new(
-        EndpointConfig::default(),
-        Some(config),
-        sock,
-        Arc::new(TokioRuntime),
-    )
-    .map_err(QuicServerError::EndpointFailed)?;
-
+    let endpoints = sockets
+        .into_iter()
+        .map(|sock| {
+            Endpoint::new(
+                EndpointConfig::default(),
+                Some(config.clone()),
+                sock,
+                Arc::new(TokioRuntime),
+            )
+            .map_err(QuicServerError::EndpointFailed)
+        })
+        .collect::<Result<Vec<_>, _>>()?;
     let stats = Arc::<StreamStats>::default();
     let handle = tokio::spawn(run_server(
         name,
-        endpoint.clone(),
+        endpoints.clone(),
         packet_sender,
         exit,
         max_connections_per_peer,
@@ -187,7 +229,7 @@ pub fn spawn_server(
         coalesce,
     ));
     Ok(SpawnNonBlockingServerResult {
-        endpoint,
+        endpoints,
         stats,
         thread: handle,
         max_concurrent_connections,
@@ -197,7 +239,7 @@ pub fn spawn_server(
 #[allow(clippy::too_many_arguments)]
 async fn run_server(
     name: &'static str,
-    incoming: Endpoint,
+    incoming: Vec<Endpoint>,
     packet_sender: Sender<PacketBatch>,
     exit: Arc<AtomicBool>,
     max_connections_per_peer: usize,
@@ -234,8 +276,37 @@ async fn run_server(
         stats.clone(),
         coalesce,
     ));
+
+    let mut accepts = incoming
+        .iter()
+        .enumerate()
+        .map(|(i, incoming)| {
+            Box::pin(EndpointAccept {
+                accept: incoming.accept(),
+                endpoint: i,
+            })
+        })
+        .collect::<FuturesUnordered<_>>();
     while !exit.load(Ordering::Relaxed) {
-        let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await;
+        let timeout_connection = select! {
+            ready = accepts.next() => {
+                if let Some((connecting, i)) = ready {
+                    accepts.push(
+                        Box::pin(EndpointAccept {
+                            accept: incoming[i].accept(),
+                            endpoint: i,
+                        }
+                    ));
+                    Ok(connecting)
+                } else {
+                    // we can't really get here - we never poll an empty FuturesUnordered
+                    continue
+                }
+            }
+            _ = tokio::time::sleep(WAIT_FOR_CONNECTION_TIMEOUT) => {
+                Err(())
+            }
+        };
 
         if last_datapoint.elapsed().as_secs() >= 5 {
             stats.report(name);
@@ -1317,6 +1388,25 @@ impl ConnectionTable {
     }
 }
 
+struct EndpointAccept<'a> {
+    endpoint: usize,
+    accept: Accept<'a>,
+}
+
+impl<'a> Future for EndpointAccept<'a> {
+    type Output = (Option<quinn::Connecting>, usize);
+
+    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
+        let i = self.endpoint;
+        // Safety:
+        // self is pinned and accept is a field so it can't get moved out. See safety docs of
+        // map_unchecked_mut.
+        unsafe { self.map_unchecked_mut(|this| &mut this.accept) }
+            .poll(cx)
+            .map(|r| (r, i))
+    }
+}
+
 #[cfg(test)]
 pub mod test {
     use {
@@ -1395,20 +1485,47 @@ pub mod test {
         SocketAddr,
         Arc<StreamStats>,
     ) {
-        let s = UdpSocket::bind("127.0.0.1:0").unwrap();
+        let sockets = {
+            #[cfg(not(target_os = "windows"))]
+            {
+                use std::{
+                    os::fd::{FromRawFd, IntoRawFd},
+                    str::FromStr as _,
+                };
+                (0..10)
+                    .map(|_| {
+                        let sock = socket2::Socket::new(
+                            socket2::Domain::IPV4,
+                            socket2::Type::DGRAM,
+                            Some(socket2::Protocol::UDP),
+                        )
+                        .unwrap();
+                        sock.set_reuse_port(true).unwrap();
+                        sock.bind(&SocketAddr::from_str("127.0.0.1:0").unwrap().into())
+                            .unwrap();
+                        unsafe { UdpSocket::from_raw_fd(sock.into_raw_fd()) }
+                    })
+                    .collect::<Vec<_>>()
+            }
+            #[cfg(target_os = "windows")]
+            {
+                vec![UdpSocket::bind("127.0.0.1:0").unwrap()]
+            }
+        };
+
         let exit = Arc::new(AtomicBool::new(false));
         let (sender, receiver) = unbounded();
         let keypair = Keypair::new();
-        let server_address = s.local_addr().unwrap();
+        let server_address = sockets[0].local_addr().unwrap();
         let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default()));
         let SpawnNonBlockingServerResult {
-            endpoint: _,
+            endpoints: _,
             stats,
             thread: t,
             max_concurrent_connections: _,
-        } = spawn_server(
+        } = spawn_server_multi(
             "quic_streamer_test",
-            s,
+            sockets,
             &keypair,
             sender,
             exit.clone(),
@@ -1844,7 +1961,7 @@ pub mod test {
         let server_address = s.local_addr().unwrap();
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
         let SpawnNonBlockingServerResult {
-            endpoint: _,
+            endpoints: _,
             stats: _,
             thread: t,
             max_concurrent_connections: _,
@@ -1880,7 +1997,7 @@ pub mod test {
         let server_address = s.local_addr().unwrap();
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
         let SpawnNonBlockingServerResult {
-            endpoint: _,
+            endpoints: _,
             stats,
             thread: t,
             max_concurrent_connections: _,

+ 48 - 11
streamer/src/quic.rs

@@ -37,7 +37,7 @@ impl SkipClientVerification {
 }
 
 pub struct SpawnServerResult {
-    pub endpoint: Endpoint,
+    pub endpoints: Vec<Endpoint>,
     pub thread: thread::JoinHandle<()>,
     pub key_updater: Arc<EndpointKeyUpdater>,
 }
@@ -121,14 +121,16 @@ pub enum QuicServerError {
 }
 
 pub struct EndpointKeyUpdater {
-    endpoint: Endpoint,
+    endpoints: Vec<Endpoint>,
     max_concurrent_connections: usize,
 }
 
 impl NotifyKeyUpdate for EndpointKeyUpdater {
     fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
         let (config, _) = configure_server(key, self.max_concurrent_connections)?;
-        self.endpoint.set_server_config(Some(config));
+        for endpoint in &self.endpoints {
+            endpoint.set_server_config(Some(config.clone()));
+        }
         Ok(())
     }
 }
@@ -525,7 +527,42 @@ impl StreamStats {
 pub fn spawn_server(
     thread_name: &'static str,
     metrics_name: &'static str,
-    sock: UdpSocket,
+    socket: UdpSocket,
+    keypair: &Keypair,
+    packet_sender: Sender<PacketBatch>,
+    exit: Arc<AtomicBool>,
+    max_connections_per_peer: usize,
+    staked_nodes: Arc<RwLock<StakedNodes>>,
+    max_staked_connections: usize,
+    max_unstaked_connections: usize,
+    max_streams_per_ms: u64,
+    max_connections_per_ipaddr_per_min: u64,
+    wait_for_chunk_timeout: Duration,
+    coalesce: Duration,
+) -> Result<SpawnServerResult, QuicServerError> {
+    spawn_server_multi(
+        thread_name,
+        metrics_name,
+        vec![socket],
+        keypair,
+        packet_sender,
+        exit,
+        max_connections_per_peer,
+        staked_nodes,
+        max_staked_connections,
+        max_unstaked_connections,
+        max_streams_per_ms,
+        max_connections_per_ipaddr_per_min,
+        wait_for_chunk_timeout,
+        coalesce,
+    )
+}
+
+#[allow(clippy::too_many_arguments)]
+pub fn spawn_server_multi(
+    thread_name: &'static str,
+    metrics_name: &'static str,
+    sockets: Vec<UdpSocket>,
     keypair: &Keypair,
     packet_sender: Sender<PacketBatch>,
     exit: Arc<AtomicBool>,
@@ -541,9 +578,9 @@ pub fn spawn_server(
     let runtime = rt(format!("{thread_name}Rt"));
     let result = {
         let _guard = runtime.enter();
-        crate::nonblocking::quic::spawn_server(
+        crate::nonblocking::quic::spawn_server_multi(
             metrics_name,
-            sock,
+            sockets,
             keypair,
             packet_sender,
             exit,
@@ -566,11 +603,11 @@ pub fn spawn_server(
         })
         .unwrap();
     let updater = EndpointKeyUpdater {
-        endpoint: result.endpoint.clone(),
+        endpoints: result.endpoints.clone(),
         max_concurrent_connections: result.max_concurrent_connections,
     };
     Ok(SpawnServerResult {
-        endpoint: result.endpoint,
+        endpoints: result.endpoints,
         thread: handle,
         key_updater: Arc::new(updater),
     })
@@ -602,7 +639,7 @@ mod test {
         let server_address = s.local_addr().unwrap();
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
         let SpawnServerResult {
-            endpoint: _,
+            endpoints: _,
             thread: t,
             key_updater: _,
         } = spawn_server(
@@ -663,7 +700,7 @@ mod test {
         let server_address = s.local_addr().unwrap();
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
         let SpawnServerResult {
-            endpoint: _,
+            endpoints: _,
             thread: t,
             key_updater: _,
         } = spawn_server(
@@ -711,7 +748,7 @@ mod test {
         let server_address = s.local_addr().unwrap();
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
         let SpawnServerResult {
-            endpoint: _,
+            endpoints: _,
             thread: t,
             key_updater: _,
         } = spawn_server(

+ 2 - 2
validator/src/bootstrap.rs

@@ -85,11 +85,11 @@ fn verify_reachable_ports(
     }
     if verify_address(&node.info.tpu(Protocol::UDP).ok()) {
         udp_sockets.extend(node.sockets.tpu.iter());
-        udp_sockets.push(&node.sockets.tpu_quic);
+        udp_sockets.extend(&node.sockets.tpu_quic);
     }
     if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) {
         udp_sockets.extend(node.sockets.tpu_forwards.iter());
-        udp_sockets.push(&node.sockets.tpu_forwards_quic);
+        udp_sockets.extend(&node.sockets.tpu_forwards_quic);
     }
     if verify_address(&node.info.tpu_vote().ok()) {
         udp_sockets.extend(node.sockets.tpu_vote.iter());