tx_loop.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. #![allow(clippy::arithmetic_side_effects)]
  2. use {
  3. crate::{
  4. device::{NetworkDevice, QueueId, RingSizes},
  5. netlink::MacAddress,
  6. packet::{
  7. write_eth_header, write_ip_header, write_udp_header, ETH_HEADER_SIZE, IP_HEADER_SIZE,
  8. UDP_HEADER_SIZE,
  9. },
  10. route::Router,
  11. set_cpu_affinity,
  12. socket::{Socket, Tx, TxRing},
  13. umem::{Frame as _, PageAlignedMemory, SliceUmem, SliceUmemFrame, Umem as _},
  14. },
  15. caps::{
  16. CapSet,
  17. Capability::{CAP_NET_ADMIN, CAP_NET_RAW},
  18. },
  19. crossbeam_channel::{Receiver, Sender, TryRecvError},
  20. libc::{sysconf, _SC_PAGESIZE},
  21. std::{
  22. net::{IpAddr, Ipv4Addr, SocketAddr},
  23. thread,
  24. time::Duration,
  25. },
  26. };
  27. #[allow(clippy::too_many_arguments)]
  28. pub fn tx_loop<T: AsRef<[u8]>, A: AsRef<[SocketAddr]>>(
  29. cpu_id: usize,
  30. dev: &NetworkDevice,
  31. queue_id: QueueId,
  32. zero_copy: bool,
  33. src_mac: Option<MacAddress>,
  34. src_ip: Option<Ipv4Addr>,
  35. src_port: u16,
  36. dest_mac: Option<MacAddress>,
  37. receiver: Receiver<(A, T)>,
  38. drop_sender: Sender<(A, T)>,
  39. ) {
  40. log::info!(
  41. "starting xdp loop on {} queue {queue_id:?} cpu {cpu_id}",
  42. dev.name()
  43. );
  44. // each queue is bound to its own CPU core
  45. set_cpu_affinity([cpu_id]).unwrap();
  46. let src_mac = src_mac.unwrap_or_else(|| {
  47. // if no source MAC is provided, use the device's MAC address
  48. dev.mac_addr()
  49. .expect("no src_mac provided, device must have a MAC address")
  50. });
  51. let src_ip = src_ip.unwrap_or_else(|| {
  52. // if no source IP is provided, use the device's IPv4 address
  53. dev.ipv4_addr()
  54. .expect("no src_ip provided, device must have an IPv4 address")
  55. });
  56. // some drivers require frame_size=page_size
  57. let frame_size = unsafe { sysconf(_SC_PAGESIZE) } as usize;
  58. let queue = dev
  59. .open_queue(queue_id)
  60. .expect("failed to open queue for AF_XDP socket");
  61. let RingSizes {
  62. rx: rx_size,
  63. tx: tx_size,
  64. } = queue.ring_sizes().unwrap_or_else(|| {
  65. log::info!(
  66. "using default ring sizes for {} queue {queue_id:?}",
  67. dev.name()
  68. );
  69. RingSizes::default()
  70. });
  71. let frame_count = (rx_size + tx_size) * 2;
  72. // try to allocate huge pages first, then fall back to regular pages
  73. const HUGE_2MB: usize = 2 * 1024 * 1024;
  74. let mut memory =
  75. PageAlignedMemory::alloc_with_page_size(frame_size, frame_count, HUGE_2MB, true)
  76. .or_else(|_| {
  77. log::warn!("huge page alloc failed, falling back to regular page size");
  78. PageAlignedMemory::alloc(frame_size, frame_count)
  79. })
  80. .unwrap();
  81. let umem = SliceUmem::new(&mut memory, frame_size as u32).unwrap();
  82. // we need NET_ADMIN and NET_RAW for the socket
  83. for cap in [CAP_NET_ADMIN, CAP_NET_RAW] {
  84. caps::raise(None, CapSet::Effective, cap).unwrap();
  85. }
  86. let Ok((mut socket, tx)) = Socket::tx(queue, umem, zero_copy, tx_size * 2, tx_size) else {
  87. panic!("failed to create AF_XDP socket on queue {queue_id:?}");
  88. };
  89. let umem = socket.umem();
  90. let umem_tx_capacity = umem.available();
  91. let Tx {
  92. // this is where we'll queue frames
  93. ring,
  94. // this is where we'll get completion events once frames have been picked up by the NIC
  95. mut completion,
  96. } = tx;
  97. let mut ring = ring.unwrap();
  98. // get the routing table from netlink
  99. let router = Router::new().expect("failed to create router");
  100. // we don't need higher caps anymore
  101. for cap in [CAP_NET_ADMIN, CAP_NET_RAW] {
  102. caps::drop(None, CapSet::Effective, cap).unwrap();
  103. }
  104. // How long we sleep waiting to receive shreds from the channel.
  105. const RECV_TIMEOUT: Duration = Duration::from_nanos(1000);
  106. const MAX_TIMEOUTS: usize = 500;
  107. // We try to collect _at least_ BATCH_SIZE packets before queueing into the NIC. This is to
  108. // avoid introducing too much per-packet overhead and giving the NIC time to complete work
  109. // before we queue the next chunk of packets.
  110. const BATCH_SIZE: usize = 64;
  111. // Local buffer where we store packets before sending themi.
  112. let mut batched_items = Vec::with_capacity(BATCH_SIZE);
  113. // How many packets we've batched. This is _not_ batched_items.len(), but item * peers. For
  114. // example if we have 3 packets to transmit to 2 destination addresses each, we have 6 batched
  115. // packets.
  116. let mut batched_packets = 0;
  117. let mut timeouts = 0;
  118. loop {
  119. match receiver.try_recv() {
  120. Ok((addrs, payload)) => {
  121. batched_packets += addrs.as_ref().len();
  122. batched_items.push((addrs, payload));
  123. timeouts = 0;
  124. if batched_packets < BATCH_SIZE {
  125. continue;
  126. }
  127. }
  128. Err(TryRecvError::Empty) => {
  129. if timeouts < MAX_TIMEOUTS {
  130. timeouts += 1;
  131. thread::sleep(RECV_TIMEOUT);
  132. } else {
  133. timeouts = 0;
  134. // we haven't received anything in a while, kick the driver
  135. ring.commit();
  136. kick(&ring);
  137. }
  138. }
  139. Err(TryRecvError::Disconnected) => {
  140. // keep looping until we've flushed all the packets
  141. if batched_packets == 0 {
  142. break;
  143. }
  144. }
  145. };
  146. // this is the number of packets after which we commit the ring and kick the driver if
  147. // necessary
  148. let mut chunk_remaining = BATCH_SIZE.min(batched_packets);
  149. for (addrs, payload) in batched_items.drain(..) {
  150. for addr in addrs.as_ref() {
  151. if ring.available() == 0 || umem.available() == 0 {
  152. // loop until we have space for the next packet
  153. loop {
  154. completion.sync(true);
  155. // we haven't written any frames so we only need to sync the consumer position
  156. ring.sync(false);
  157. // check if any frames were completed
  158. while let Some(frame_offset) = completion.read() {
  159. umem.release(frame_offset);
  160. }
  161. if ring.available() > 0 && umem.available() > 0 {
  162. // we have space for the next packet, break out of the loop
  163. break;
  164. }
  165. // queues are full, if NEEDS_WAKEUP is set kick the driver so hopefully it'll
  166. // complete some work
  167. kick(&ring);
  168. }
  169. }
  170. // at this point we're guaranteed to have a frame to write the next packet into and
  171. // a slot in the ring to submit it
  172. let mut frame = umem.reserve().unwrap();
  173. let IpAddr::V4(dst_ip) = addr.ip() else {
  174. panic!("IPv6 not supported");
  175. };
  176. let dest_mac = if let Some(mac) = dest_mac {
  177. mac
  178. } else {
  179. let next_hop = router.route(addr.ip()).unwrap();
  180. let mut skip = false;
  181. // sanity check that the address is routable through our NIC
  182. if next_hop.if_index != dev.if_index() {
  183. log::warn!(
  184. "dropping packet: turbine peer {addr} must be routed through \
  185. if_index: {} our if_index: {}",
  186. next_hop.if_index,
  187. dev.if_index()
  188. );
  189. skip = true;
  190. }
  191. // we need the MAC address to send the packet
  192. if next_hop.mac_addr.is_none() {
  193. log::warn!(
  194. "dropping packet: turbine peer {addr} must be routed through {} which \
  195. has no known MAC address",
  196. next_hop.ip_addr
  197. );
  198. skip = true;
  199. };
  200. if skip {
  201. batched_packets -= 1;
  202. umem.release(frame.offset());
  203. continue;
  204. }
  205. next_hop.mac_addr.unwrap()
  206. };
  207. const PACKET_HEADER_SIZE: usize =
  208. ETH_HEADER_SIZE + IP_HEADER_SIZE + UDP_HEADER_SIZE;
  209. let len = payload.as_ref().len();
  210. frame.set_len(PACKET_HEADER_SIZE + len);
  211. let packet = umem.map_frame_mut(&frame);
  212. // write the payload first as it's needed for checksum calculation (if enabled)
  213. packet[PACKET_HEADER_SIZE..][..len].copy_from_slice(payload.as_ref());
  214. write_eth_header(packet, &src_mac.0, &dest_mac.0);
  215. write_ip_header(
  216. &mut packet[ETH_HEADER_SIZE..],
  217. &src_ip,
  218. &dst_ip,
  219. (UDP_HEADER_SIZE + len) as u16,
  220. );
  221. write_udp_header(
  222. &mut packet[ETH_HEADER_SIZE + IP_HEADER_SIZE..],
  223. &src_ip,
  224. src_port,
  225. &dst_ip,
  226. addr.port(),
  227. len as u16,
  228. // don't do checksums
  229. false,
  230. );
  231. // write the packet into the ring
  232. ring.write(frame, 0)
  233. .map_err(|_| "ring full")
  234. // this should never happen as we check for available slots above
  235. .expect("failed to write to ring");
  236. batched_packets -= 1;
  237. chunk_remaining -= 1;
  238. // check if it's time to commit the ring and kick the driver
  239. if chunk_remaining == 0 {
  240. chunk_remaining = BATCH_SIZE.min(batched_packets);
  241. // commit new frames
  242. ring.commit();
  243. kick(&ring);
  244. }
  245. }
  246. let _ = drop_sender.try_send((addrs, payload));
  247. }
  248. debug_assert_eq!(batched_packets, 0);
  249. }
  250. assert_eq!(batched_packets, 0);
  251. // drain the ring
  252. while umem.available() < umem_tx_capacity || ring.available() < ring.capacity() {
  253. log::debug!(
  254. "draining xdp ring umem {}/{} ring {}/{}",
  255. umem.available(),
  256. umem_tx_capacity,
  257. ring.available(),
  258. ring.capacity()
  259. );
  260. completion.sync(true);
  261. while let Some(frame_offset) = completion.read() {
  262. umem.release(frame_offset);
  263. }
  264. ring.sync(false);
  265. kick(&ring);
  266. }
  267. }
  268. // With some drivers, or always when we work in SKB mode, we need to explicitly kick the driver once
  269. // we want the NIC to do something.
  270. #[inline(always)]
  271. fn kick(ring: &TxRing<SliceUmemFrame<'_>>) {
  272. if !ring.needs_wakeup() {
  273. return;
  274. }
  275. if let Err(e) = ring.wake() {
  276. kick_error(e);
  277. }
  278. }
  279. #[inline(never)]
  280. fn kick_error(e: std::io::Error) {
  281. match e.raw_os_error() {
  282. // these are non-fatal errors
  283. Some(libc::EBUSY | libc::ENOBUFS | libc::EAGAIN) => {}
  284. // this can temporarily happen with some drivers when changing
  285. // settings (eg with ethtool)
  286. Some(libc::ENETDOWN) => {
  287. log::warn!("network interface is down")
  288. }
  289. // we should never get here, hopefully the driver recovers?
  290. _ => {
  291. log::error!("network interface driver error: {e:?}");
  292. }
  293. }
  294. }