Bladeren bron

Added option to specify tpu addresses in vortexor (#6116)

* Added option to specify tpu addresses in vortexor

* Added option to specify tpu addresses in vortexor

* fmt log messages

* use multi_bind_in_range_with_config
Lijun Wang 6 maanden geleden
bovenliggende
commit
0142ceb24e
4 gewijzigde bestanden met toevoegingen van 83 en 20 verwijderingen
  1. 13 0
      vortexor/src/cli.rs
  2. 33 5
      vortexor/src/main.rs
  3. 35 15
      vortexor/src/vortexor.rs
  4. 2 0
      vortexor/tests/vortexor.rs

+ 13 - 0
vortexor/src/cli.rs

@@ -92,6 +92,19 @@ pub struct Cli {
     #[arg(long, value_parser = parse_port_range, value_name = "MIN_PORT-MAX_PORT", default_value = get_default_port_range())]
     pub dynamic_port_range: (u16, u16),
 
+    /// Optional TPU address to bind to. If not specified, the vortexor will bind to
+    /// the first available port in the dynamic port range. When this argument is
+    /// specified, the --bind-address and --dynamic-port-range arguments are ignored.
+    #[arg(long, value_name = "HOST:PORT")]
+    pub tpu_address: Option<SocketAddr>,
+
+    /// Optional TPU-forward address to bind to. If not specified, the vortexor will bind to
+    /// the first available port in the dynamic port range after binding the tpu_address.
+    /// When this argument is specified, the --bind-address and --dynamic-port-range
+    /// arguments are ignored.
+    #[arg(long, value_name = "HOST:PORT")]
+    pub tpu_forward_address: Option<SocketAddr>,
+
     /// Controls the max concurrent connections per IpAddr.
     #[arg(long, default_value_t = DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER)]
     pub max_connections_per_peer: usize,

+ 33 - 5
vortexor/src/main.rs

@@ -5,7 +5,7 @@ use {
     solana_core::banking_trace::BankingTracer,
     solana_logger::redirect_stderr_to_file,
     solana_net_utils::{bind_in_range_with_config, SocketConfig},
-    solana_sdk::{signature::read_keypair_file, signer::Signer},
+    solana_sdk::{quic::QUIC_PORT_OFFSET, signature::read_keypair_file, signer::Signer},
     solana_streamer::streamer::StakedNodes,
     solana_vortexor::{
         cli::Cli,
@@ -20,7 +20,7 @@ use {
     std::{
         collections::HashMap,
         env,
-        net::IpAddr,
+        net::{IpAddr, SocketAddr},
         sync::{atomic::AtomicBool, Arc, RwLock},
         time::Duration,
     },
@@ -77,14 +77,21 @@ pub fn main() {
     let tpu_coalesce = Duration::from_millis(args.tpu_coalesce_ms);
     let dynamic_port_range = args.dynamic_port_range;
 
+    let tpu_address = args.tpu_address;
+    let tpu_forward_address = args.tpu_forward_address;
     let max_streams_per_ms = args.max_streams_per_ms;
     let exit = Arc::new(AtomicBool::new(false));
     // To be linked with the Tpu sigverify and forwarder service
     let (tpu_sender, tpu_receiver) = bounded(DEFAULT_CHANNEL_SIZE);
     let (tpu_fwd_sender, _tpu_fwd_receiver) = bounded(DEFAULT_CHANNEL_SIZE);
 
-    let tpu_sockets =
-        Vortexor::create_tpu_sockets(*bind_address, dynamic_port_range, num_quic_endpoints);
+    let tpu_sockets = Vortexor::create_tpu_sockets(
+        *bind_address,
+        dynamic_port_range,
+        tpu_address,
+        tpu_forward_address,
+        num_quic_endpoints,
+    );
 
     let (banking_tracer, _) = BankingTracer::new(
         None, // Not interesed in banking tracing
@@ -125,7 +132,7 @@ pub fn main() {
         DEFAULT_SENDER_THREADS_COUNT,
         DEFAULT_BATCH_SIZE,
         DEFAULT_RECV_TIMEOUT,
-        destinations,
+        destinations.clone(),
     );
 
     info!("Creating the SigVerifier");
@@ -156,6 +163,27 @@ pub fn main() {
         tpu_sockets.tpu_quic_fwd[0].local_addr()
     );
 
+    let tpu_address = tpu_sockets.tpu_quic[0].local_addr().unwrap();
+    let tpu_public_address = SocketAddr::new(
+        tpu_address.ip(),
+        tpu_address.port().saturating_sub(QUIC_PORT_OFFSET),
+    );
+    let tpu_fwd_address = tpu_sockets.tpu_quic_fwd[0].local_addr().unwrap();
+    let tpu_fwd_public_address = SocketAddr::new(
+        tpu_fwd_address.ip(),
+        tpu_fwd_address.port().saturating_sub(QUIC_PORT_OFFSET),
+    );
+
+    for destination in destinations.read().unwrap().iter() {
+        info!(
+            "To pair the validator with receiver address {destination} with this \
+             vortexor, add the following arguments in the validator's start command: \
+              --tpu-vortexor-receiver-address {destination} \
+              --public-tpu-address {tpu_public_address} \
+              --public-tpu-forward-address {tpu_fwd_public_address}",
+        );
+    }
+
     let vortexor = Vortexor::create_vortexor(
         tpu_sockets,
         staked_nodes,

+ 35 - 15
vortexor/src/vortexor.rs

@@ -4,7 +4,7 @@ use {
         banking_trace::TracedSender, sigverify::TransactionSigVerifier,
         sigverify_stage::SigVerifyStage,
     },
-    solana_net_utils::{bind_in_range_with_config, bind_more_with_config, SocketConfig},
+    solana_net_utils::{multi_bind_in_range_with_config, SocketConfig},
     solana_perf::packet::PacketBatch,
     solana_sdk::{quic::NotifyKeyUpdate, signature::Keypair},
     solana_streamer::{
@@ -13,7 +13,7 @@ use {
         streamer::StakedNodes,
     },
     std::{
-        net::UdpSocket,
+        net::{SocketAddr, UdpSocket},
         sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
         thread::{self, JoinHandle},
         time::Duration,
@@ -56,26 +56,27 @@ impl Vortexor {
     pub fn create_tpu_sockets(
         bind_address: std::net::IpAddr,
         dynamic_port_range: (u16, u16),
+        tpu_address: Option<SocketAddr>,
+        tpu_forward_address: Option<SocketAddr>,
         num_quic_endpoints: usize,
     ) -> TpuSockets {
         let quic_config = SocketConfig::default().reuseport(true);
 
-        let (_, tpu_quic) =
-            bind_in_range_with_config(bind_address, dynamic_port_range, quic_config)
-                .expect("expected bind to succeed");
-
-        let tpu_quic_port = tpu_quic.local_addr().unwrap().port();
-        let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config).unwrap();
-
-        let (_, tpu_quic_fwd) = bind_in_range_with_config(
+        let tpu_quic = bind_sockets(
             bind_address,
-            (tpu_quic_port.saturating_add(1), dynamic_port_range.1),
+            dynamic_port_range,
+            tpu_address,
+            num_quic_endpoints,
             quic_config,
-        )
-        .expect("expected bind to succeed");
+        );
 
-        let tpu_quic_fwd =
-            bind_more_with_config(tpu_quic_fwd, num_quic_endpoints, quic_config).unwrap();
+        let tpu_quic_fwd = bind_sockets(
+            bind_address,
+            dynamic_port_range,
+            tpu_forward_address,
+            num_quic_endpoints,
+            quic_config,
+        );
 
         TpuSockets {
             tpu_quic,
@@ -177,3 +178,22 @@ impl Vortexor {
         Ok(())
     }
 }
+
+/// Binds the sockets to the specified address and port range if address is Some.
+/// If the address is None, it binds to the specified bind_address and port range.
+fn bind_sockets(
+    bind_address: std::net::IpAddr,
+    port_range: (u16, u16),
+    address: Option<SocketAddr>,
+    num_quic_endpoints: usize,
+    quic_config: SocketConfig,
+) -> Vec<UdpSocket> {
+    let (bind_address, port_range) = address
+        .map(|addr| (addr.ip(), (addr.port(), addr.port().saturating_add(1))))
+        .unwrap_or((bind_address, port_range));
+
+    let (_, sockets) =
+        multi_bind_in_range_with_config(bind_address, port_range, quic_config, num_quic_endpoints)
+            .expect("expected bind to succeed");
+    sockets
+}

+ 2 - 0
vortexor/tests/vortexor.rs

@@ -49,6 +49,8 @@ async fn test_vortexor() {
     let tpu_sockets = Vortexor::create_tpu_sockets(
         bind_address,
         VALIDATOR_PORT_RANGE,
+        None, // tpu_address
+        None, // tpu_forward_address
         DEFAULT_NUM_QUIC_ENDPOINTS,
     );