Pārlūkot izejas kodu

XDP: add bond interface support for zero copy (#9004)

add bond support

Co-authored-by: Alessandro Decina <alessandro.d@gmail.com>
Greg Cusack 1 nedēļu atpakaļ
vecāks
revīzija
07345d33df
4 mainītis faili ar 57 papildinājumiem un 27 dzēšanām
  1. 1 0
      CHANGELOG.md
  2. 11 3
      core/src/validator.rs
  3. 41 4
      turbine/src/xdp.rs
  4. 4 20
      xdp/src/tx_loop.rs

+ 1 - 0
CHANGELOG.md

@@ -66,6 +66,7 @@ without warning. From v4.0.0 onward, symbols in these crates will be unavailable
 
 #### Breaking
 * When XDP is enabled, the validator process requires the `CAP_NET_RAW`, `CAP_NET_ADMIN`, `CAP_BPF`, and `CAP_PERFMON` capabilities. These can be configured in the systemd service file by setting `CapabilityBoundingSet=CAP_NET_RAW CAP_NET_ADMIN CAP_BPF CAP_PERFMON` under the `[Service]` section or directly on the binary with the command `sudo setcap cap_net_raw,cap_net_admin,cap_bpf,cap_perfmon=p <path/to/agave-validator>` (this command must be run each time the binary is replaced)
+* Enabling XDP zero copy on systems configured with LACP bond requires manually passing  `--experimental-retransmit-xdp-interface <real-interface>` (e.g.: `eno17395np0` not `bond0`), as zero copy is only available on physical interfaces.
 * Require increased `memlock` limits - recommended setting is `LimitMEMLOCK=2000000000` in systemd service configuration. Lack of sufficient limit (on Linux) will cause startup error.
 * Remove deprecated arguments
   * `--accounts-index-memory-limit-mb`

+ 11 - 3
core/src/validator.rs

@@ -147,7 +147,7 @@ use {
     solana_turbine::{
         self,
         broadcast_stage::BroadcastStageType,
-        xdp::{XdpConfig, XdpRetransmitter},
+        xdp::{master_ip_if_bonded, XdpConfig, XdpRetransmitter},
     },
     solana_unified_scheduler_pool::DefaultSchedulerPool,
     solana_validator_exit::Exit,
@@ -156,7 +156,7 @@ use {
     std::{
         borrow::Cow,
         collections::{HashMap, HashSet},
-        net::SocketAddr,
+        net::{IpAddr, SocketAddr},
         num::{NonZeroU64, NonZeroUsize},
         path::{Path, PathBuf},
         str::FromStr,
@@ -1578,7 +1578,15 @@ impl Validator {
                     .local_addr()
                     .expect("failed to get local address")
                     .port();
-                let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port)
+                let src_ip = match node.bind_ip_addrs.active() {
+                    IpAddr::V4(ip) if !ip.is_unspecified() => Some(ip),
+                    IpAddr::V4(_unspecified) => xdp_config
+                        .interface
+                        .as_ref()
+                        .and_then(|iface| master_ip_if_bonded(iface)),
+                    _ => panic!("IPv6 not supported"),
+                };
+                let (rtx, sender) = XdpRetransmitter::new(xdp_config, src_port, src_ip)
                     .expect("failed to create xdp retransmitter");
                 (Some(rtx), Some(sender))
             } else {

+ 41 - 4
turbine/src/xdp.rs

@@ -13,7 +13,11 @@ use {
 use {
     crossbeam_channel::{Sender, TrySendError},
     solana_ledger::shred,
-    std::{error::Error, net::SocketAddr, thread},
+    std::{
+        error::Error,
+        net::{Ipv4Addr, SocketAddr},
+        thread,
+    },
 };
 
 #[derive(Clone, Debug)]
@@ -105,12 +109,20 @@ pub struct XdpRetransmitter {
 
 impl XdpRetransmitter {
     #[cfg(not(target_os = "linux"))]
-    pub fn new(_config: XdpConfig, _src_port: u16) -> Result<(Self, XdpSender), Box<dyn Error>> {
+    pub fn new(
+        _config: XdpConfig,
+        _src_port: u16,
+        _src_ip: Option<Ipv4Addr>,
+    ) -> Result<(Self, XdpSender), Box<dyn Error>> {
         Err("XDP is only supported on Linux".into())
     }
 
     #[cfg(target_os = "linux")]
-    pub fn new(config: XdpConfig, src_port: u16) -> Result<(Self, XdpSender), Box<dyn Error>> {
+    pub fn new(
+        config: XdpConfig,
+        src_port: u16,
+        src_ip: Option<Ipv4Addr>,
+    ) -> Result<(Self, XdpSender), Box<dyn Error>> {
         use caps::{
             CapSet,
             Capability::{CAP_BPF, CAP_NET_ADMIN, CAP_NET_RAW, CAP_PERFMON},
@@ -187,7 +199,7 @@ impl XdpRetransmitter {
                             QueueId(i as u64),
                             config.zero_copy,
                             None,
-                            None,
+                            src_ip,
                             src_port,
                             None,
                             receiver,
@@ -208,3 +220,28 @@ impl XdpRetransmitter {
         Ok(())
     }
 }
+
+/// Returns the IPv4 address of the master interface if the given interface is part of a bond.
+#[cfg(target_os = "linux")]
+pub fn master_ip_if_bonded(interface: &str) -> Option<Ipv4Addr> {
+    let master_ifindex_path = format!("/sys/class/net/{interface}/master/ifindex");
+    if let Ok(contents) = std::fs::read_to_string(&master_ifindex_path) {
+        let idx = contents.trim().parse().unwrap();
+        return Some(
+            NetworkDevice::new_from_index(idx)
+                .and_then(|dev| dev.ipv4_addr())
+                .unwrap_or_else(|e| {
+                    panic!(
+                        "failed to open bond master interface for {interface}: master index \
+                         {idx}: {e}"
+                    )
+                }),
+        );
+    }
+    None
+}
+
+#[cfg(not(target_os = "linux"))]
+pub fn master_ip_if_bonded(_interface: &str) -> Option<Ipv4Addr> {
+    None
+}

+ 4 - 20
xdp/src/tx_loop.rs

@@ -52,6 +52,7 @@ pub fn tx_loop<T: AsRef<[u8]>, A: AsRef<[SocketAddr]>>(
         dev.mac_addr()
             .expect("no src_mac provided, device must have a MAC address")
     });
+
     let src_ip = src_ip.unwrap_or_else(|| {
         // if no source IP is provided, use the device's IPv4 address
         dev.ipv4_addr()
@@ -204,36 +205,19 @@ pub fn tx_loop<T: AsRef<[u8]>, A: AsRef<[SocketAddr]>>(
                 } else {
                     let next_hop = router.route(addr.ip()).unwrap();
 
-                    let mut skip = false;
-
-                    // sanity check that the address is routable through our NIC
-                    if next_hop.if_index != dev.if_index() {
-                        log::warn!(
-                            "dropping packet: turbine peer {addr} must be routed through \
-                             if_index: {} our if_index: {}",
-                            next_hop.if_index,
-                            dev.if_index()
-                        );
-                        skip = true;
-                    }
-
                     // we need the MAC address to send the packet
-                    if next_hop.mac_addr.is_none() {
+                    let Some(dest_mac) = next_hop.mac_addr else {
                         log::warn!(
                             "dropping packet: turbine peer {addr} must be routed through {} which \
                              has no known MAC address",
                             next_hop.ip_addr
                         );
-                        skip = true;
-                    };
-
-                    if skip {
                         batched_packets -= 1;
                         umem.release(frame.offset());
                         continue;
-                    }
+                    };
 
-                    next_hop.mac_addr.unwrap()
+                    dest_mac
                 };
 
                 const PACKET_HEADER_SIZE: usize =