cluster_nodes.rs 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210
  1. use {
  2. crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
  3. agave_feature_set::{self as feature_set},
  4. itertools::Either,
  5. lazy_lru::LruCache,
  6. rand::{seq::SliceRandom, Rng, RngCore, SeedableRng},
  7. rand_chacha::{ChaCha8Rng, ChaChaRng},
  8. solana_clock::{Epoch, Slot},
  9. solana_cluster_type::ClusterType,
  10. solana_gossip::{
  11. cluster_info::ClusterInfo,
  12. contact_info::{ContactInfo as GossipContactInfo, Protocol},
  13. crds::GossipRoute,
  14. crds_data::CrdsData,
  15. crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
  16. crds_value::CrdsValue,
  17. weighted_shuffle::WeightedShuffle,
  18. },
  19. solana_keypair::Keypair,
  20. solana_ledger::shred::ShredId,
  21. solana_native_token::LAMPORTS_PER_SOL,
  22. solana_net_utils::SocketAddrSpace,
  23. solana_pubkey::Pubkey,
  24. solana_runtime::bank::Bank,
  25. solana_signer::Signer,
  26. solana_time_utils::timestamp,
  27. std::{
  28. any::TypeId,
  29. cell::RefCell,
  30. cmp::Ordering,
  31. collections::{HashMap, HashSet},
  32. iter::repeat_with,
  33. marker::PhantomData,
  34. net::SocketAddr,
  35. sync::{Arc, OnceLock, RwLock},
  36. time::{Duration, Instant},
  37. },
  38. thiserror::Error,
  39. };
  40. thread_local! {
  41. static THREAD_LOCAL_WEIGHTED_SHUFFLE: RefCell<WeightedShuffle<u64>> = RefCell::new(
  42. WeightedShuffle::new::<[u64; 0]>("get_retransmit_addrs", []),
  43. );
  44. }
  45. const DATA_PLANE_FANOUT: usize = 200;
  46. pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4;
  47. #[derive(Debug, Error)]
  48. pub enum Error {
  49. #[error("Loopback from slot leader: {leader}, shred: {shred:?}")]
  50. Loopback { leader: Pubkey, shred: ShredId },
  51. }
  52. #[derive(Debug)]
  53. #[allow(clippy::large_enum_variant)]
  54. enum NodeId {
  55. // TVU node obtained through gossip (staked or not).
  56. ContactInfo(ContactInfo),
  57. // Staked node with no contact-info in gossip table.
  58. Pubkey(Pubkey),
  59. }
  60. // A lite version of gossip ContactInfo local to turbine where we only hold on
  61. // to a few necessary fields from gossip ContactInfo.
  62. #[derive(Clone, Debug)]
  63. pub(crate) struct ContactInfo {
  64. pubkey: Pubkey,
  65. wallclock: u64,
  66. tvu_quic: Option<SocketAddr>,
  67. tvu_udp: Option<SocketAddr>,
  68. }
  69. pub struct Node {
  70. node: NodeId,
  71. stake: u64,
  72. }
  73. pub struct ClusterNodes<T> {
  74. pubkey: Pubkey, // The local node itself.
  75. // All staked nodes + other known tvu-peers + the node itself;
  76. // sorted by (stake, pubkey) in descending order.
  77. nodes: Vec<Node>,
  78. // Reverse index from nodes pubkey to their index in self.nodes.
  79. index: HashMap<Pubkey, /*index:*/ usize>,
  80. weighted_shuffle: WeightedShuffle</*stake:*/ u64>,
  81. use_cha_cha_8: bool,
  82. _phantom: PhantomData<T>,
  83. }
  84. // Cache entries are wrapped in Arc<OnceLock<...>>, so that, when needed, only
  85. // one single thread initializes the entry for the epoch without holding a lock
  86. // on the entire cache.
  87. type LruCacheOnce<K, V> = RwLock<LruCache<K, Arc<OnceLock<V>>>>;
  88. pub struct ClusterNodesCache<T> {
  89. cache: LruCacheOnce<Epoch, (/*as of:*/ Instant, Arc<ClusterNodes<T>>)>,
  90. ttl: Duration, // Time to live.
  91. }
  92. impl Node {
  93. #[inline]
  94. fn pubkey(&self) -> &Pubkey {
  95. match &self.node {
  96. NodeId::Pubkey(pubkey) => pubkey,
  97. NodeId::ContactInfo(node) => node.pubkey(),
  98. }
  99. }
  100. #[inline]
  101. fn contact_info(&self) -> Option<&ContactInfo> {
  102. match &self.node {
  103. NodeId::Pubkey(_) => None,
  104. NodeId::ContactInfo(node) => Some(node),
  105. }
  106. }
  107. #[inline]
  108. fn contact_info_mut(&mut self) -> Option<&mut ContactInfo> {
  109. match &mut self.node {
  110. NodeId::Pubkey(_) => None,
  111. NodeId::ContactInfo(node) => Some(node),
  112. }
  113. }
  114. }
  115. impl ContactInfo {
  116. #[inline]
  117. pub(crate) fn pubkey(&self) -> &Pubkey {
  118. &self.pubkey
  119. }
  120. #[inline]
  121. pub(crate) fn wallclock(&self) -> u64 {
  122. self.wallclock
  123. }
  124. #[inline]
  125. pub(crate) fn tvu(&self, protocol: Protocol) -> Option<SocketAddr> {
  126. match protocol {
  127. Protocol::QUIC => self.tvu_quic,
  128. Protocol::UDP => self.tvu_udp,
  129. }
  130. }
  131. // Removes respective TVU address from the ContactInfo so that no more
  132. // shreds are sent to that socket address.
  133. #[inline]
  134. fn remove_tvu_addr(&mut self, protocol: Protocol) {
  135. match protocol {
  136. Protocol::QUIC => {
  137. self.tvu_quic = None;
  138. }
  139. Protocol::UDP => {
  140. self.tvu_udp = None;
  141. }
  142. }
  143. }
  144. }
  145. impl<T> ClusterNodes<T> {
  146. pub(crate) fn submit_metrics(&self, name: &'static str, now: u64) {
  147. let mut epoch_stakes = 0;
  148. let mut num_nodes_dead = 0;
  149. let mut num_nodes_staked = 0;
  150. let mut num_nodes_stale = 0;
  151. let mut stake_dead = 0;
  152. let mut stake_stale = 0;
  153. for node in &self.nodes {
  154. epoch_stakes += node.stake;
  155. if node.stake != 0u64 {
  156. num_nodes_staked += 1;
  157. }
  158. match node.contact_info().map(ContactInfo::wallclock) {
  159. None => {
  160. num_nodes_dead += 1;
  161. stake_dead += node.stake;
  162. }
  163. Some(wallclock) => {
  164. let age = now.saturating_sub(wallclock);
  165. if age > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS {
  166. num_nodes_stale += 1;
  167. stake_stale += node.stake;
  168. }
  169. }
  170. }
  171. }
  172. num_nodes_stale += num_nodes_dead;
  173. stake_stale += stake_dead;
  174. datapoint_info!(
  175. name,
  176. ("epoch_stakes", epoch_stakes / LAMPORTS_PER_SOL, i64),
  177. ("num_nodes", self.nodes.len(), i64),
  178. ("num_nodes_dead", num_nodes_dead, i64),
  179. ("num_nodes_staked", num_nodes_staked, i64),
  180. ("num_nodes_stale", num_nodes_stale, i64),
  181. ("stake_dead", stake_dead / LAMPORTS_PER_SOL, i64),
  182. ("stake_stale", stake_stale / LAMPORTS_PER_SOL, i64),
  183. );
  184. }
  185. }
  186. /// Encapsulates the possible RNG implementations for turbine.
  187. /// This was implemented for the transition from ChaCha20 to ChaCha8.
  188. enum TurbineRng {
  189. Legacy(ChaChaRng),
  190. ChaCha8(ChaCha8Rng),
  191. }
  192. impl TurbineRng {
  193. /// Create a new seeded TurbineRng of the correct implementation
  194. fn new_seeded(leader: &Pubkey, shred: &ShredId, use_cha_cha_8: bool) -> Self {
  195. let seed = shred.seed(leader);
  196. if use_cha_cha_8 {
  197. TurbineRng::ChaCha8(ChaCha8Rng::from_seed(seed))
  198. } else {
  199. TurbineRng::Legacy(ChaChaRng::from_seed(seed))
  200. }
  201. }
  202. }
  203. impl RngCore for TurbineRng {
  204. fn next_u32(&mut self) -> u32 {
  205. match self {
  206. TurbineRng::Legacy(cha_cha20_rng) => cha_cha20_rng.next_u32(),
  207. TurbineRng::ChaCha8(cha_cha8_rng) => cha_cha8_rng.next_u32(),
  208. }
  209. }
  210. fn next_u64(&mut self) -> u64 {
  211. match self {
  212. TurbineRng::Legacy(cha_cha20_rng) => cha_cha20_rng.next_u64(),
  213. TurbineRng::ChaCha8(cha_cha8_rng) => cha_cha8_rng.next_u64(),
  214. }
  215. }
  216. fn fill_bytes(&mut self, dest: &mut [u8]) {
  217. match self {
  218. TurbineRng::Legacy(cha_cha20_rng) => cha_cha20_rng.fill_bytes(dest),
  219. TurbineRng::ChaCha8(cha_cha8_rng) => cha_cha8_rng.fill_bytes(dest),
  220. }
  221. }
  222. fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
  223. match self {
  224. TurbineRng::Legacy(cha_cha20_rng) => cha_cha20_rng.try_fill_bytes(dest),
  225. TurbineRng::ChaCha8(cha_cha8_rng) => cha_cha8_rng.try_fill_bytes(dest),
  226. }
  227. }
  228. }
  229. impl ClusterNodes<BroadcastStage> {
  230. pub fn new(
  231. cluster_info: &ClusterInfo,
  232. cluster_type: ClusterType,
  233. stakes: &HashMap<Pubkey, u64>,
  234. use_cha_cha_8: bool,
  235. ) -> Self {
  236. new_cluster_nodes(cluster_info, cluster_type, stakes, use_cha_cha_8)
  237. }
  238. pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
  239. let mut rng = TurbineRng::new_seeded(&self.pubkey, shred, self.use_cha_cha_8);
  240. let index = self.weighted_shuffle.first(&mut rng)?;
  241. self.nodes[index].contact_info()
  242. }
  243. }
  244. impl ClusterNodes<RetransmitStage> {
  245. pub fn get_retransmit_addrs(
  246. &self,
  247. slot_leader: &Pubkey,
  248. shred: &ShredId,
  249. fanout: usize,
  250. socket_addr_space: &SocketAddrSpace,
  251. ) -> Result<(/*root_distance:*/ u8, Vec<SocketAddr>), Error> {
  252. // Exclude slot leader from list of nodes.
  253. if slot_leader == &self.pubkey {
  254. return Err(Error::Loopback {
  255. leader: *slot_leader,
  256. shred: *shred,
  257. });
  258. }
  259. THREAD_LOCAL_WEIGHTED_SHUFFLE.with_borrow_mut(|weighted_shuffle| {
  260. weighted_shuffle.clone_from(&self.weighted_shuffle);
  261. if let Some(index) = self.index.get(slot_leader) {
  262. weighted_shuffle.remove_index(*index);
  263. }
  264. let mut rng = TurbineRng::new_seeded(slot_leader, shred, self.use_cha_cha_8);
  265. let (index, peers) = get_retransmit_peers(
  266. fanout,
  267. |k| self.nodes[k].pubkey() == &self.pubkey,
  268. weighted_shuffle.shuffle(&mut rng),
  269. );
  270. let protocol = get_broadcast_protocol(shred);
  271. let peers = peers
  272. .filter_map(|k| self.nodes[k].contact_info()?.tvu(protocol))
  273. .filter(|addr| socket_addr_space.check(addr))
  274. .collect();
  275. let root_distance = get_root_distance(index, fanout);
  276. Ok((root_distance, peers))
  277. })
  278. }
  279. // Returns the parent node in the turbine broadcast tree.
  280. // Returns None if the node is the root of the tree or if it is not staked.
  281. pub(crate) fn get_retransmit_parent(
  282. &self,
  283. leader: &Pubkey,
  284. shred: &ShredId,
  285. fanout: usize,
  286. ) -> Result<Option<Pubkey>, Error> {
  287. // Exclude slot leader from list of nodes.
  288. if leader == &self.pubkey {
  289. return Err(Error::Loopback {
  290. leader: *leader,
  291. shred: *shred,
  292. });
  293. }
  294. // Unstaked nodes' position in the turbine tree is not deterministic
  295. // and depends on gossip propagation of contact-infos. Therefore, if
  296. // this node is not staked return None.
  297. {
  298. // dedup_tvu_addrs might exclude a non-staked node from self.nodes
  299. // due to duplicate socket/IP addresses.
  300. let Some(&index) = self.index.get(&self.pubkey) else {
  301. return Ok(None);
  302. };
  303. if self.nodes[index].stake == 0 {
  304. return Ok(None);
  305. }
  306. }
  307. let mut weighted_shuffle = self.weighted_shuffle.clone();
  308. if let Some(index) = self.index.get(leader).copied() {
  309. weighted_shuffle.remove_index(index);
  310. }
  311. let mut rng = TurbineRng::new_seeded(leader, shred, self.use_cha_cha_8);
  312. // Only need shuffled nodes until this node itself.
  313. let nodes: Vec<_> = weighted_shuffle
  314. .shuffle(&mut rng)
  315. .map(|index| &self.nodes[index])
  316. .take_while(|node| node.pubkey() != &self.pubkey)
  317. .collect();
  318. let parent = get_retransmit_parent(fanout, nodes.len(), &nodes);
  319. Ok(parent.map(Node::pubkey).copied())
  320. }
  321. }
  322. pub fn new_cluster_nodes<T: 'static>(
  323. cluster_info: &ClusterInfo,
  324. cluster_type: ClusterType,
  325. stakes: &HashMap<Pubkey, u64>,
  326. use_cha_cha_8: bool,
  327. ) -> ClusterNodes<T> {
  328. let self_pubkey = cluster_info.id();
  329. let nodes = get_nodes(cluster_info, cluster_type, stakes);
  330. let index: HashMap<_, _> = nodes
  331. .iter()
  332. .enumerate()
  333. .map(|(ix, node)| (*node.pubkey(), ix))
  334. .collect();
  335. let broadcast = TypeId::of::<T>() == TypeId::of::<BroadcastStage>();
  336. let stakes = nodes.iter().map(|node| node.stake);
  337. let mut weighted_shuffle = WeightedShuffle::new("cluster-nodes", stakes);
  338. if broadcast {
  339. weighted_shuffle.remove_index(index[&self_pubkey]);
  340. }
  341. ClusterNodes {
  342. pubkey: self_pubkey,
  343. nodes,
  344. index,
  345. weighted_shuffle,
  346. _phantom: PhantomData,
  347. use_cha_cha_8,
  348. }
  349. }
  350. // All staked nodes + other known tvu-peers + the node itself;
  351. // sorted by (stake, pubkey) in descending order.
  352. fn get_nodes(
  353. cluster_info: &ClusterInfo,
  354. cluster_type: ClusterType,
  355. stakes: &HashMap<Pubkey, u64>,
  356. ) -> Vec<Node> {
  357. let self_pubkey = cluster_info.id();
  358. let should_dedup_tvu_addrs = match cluster_type {
  359. ClusterType::Development => false,
  360. ClusterType::Devnet | ClusterType::Testnet | ClusterType::MainnetBeta => true,
  361. };
  362. let mut nodes: Vec<Node> = std::iter::once({
  363. // The local node itself.
  364. let stake = stakes.get(&self_pubkey).copied().unwrap_or_default();
  365. let node = ContactInfo::from(&cluster_info.my_contact_info());
  366. let node = NodeId::from(node);
  367. Node { node, stake }
  368. })
  369. // All known tvu-peers from gossip.
  370. .chain(
  371. cluster_info
  372. .tvu_peers(|node| ContactInfo::from(node))
  373. .into_iter()
  374. .map(|node| {
  375. let stake = stakes.get(node.pubkey()).copied().unwrap_or_default();
  376. let node = NodeId::from(node);
  377. Node { node, stake }
  378. }),
  379. )
  380. // All staked nodes.
  381. .chain(
  382. stakes
  383. .iter()
  384. .filter(|(_, stake)| **stake > 0)
  385. .map(|(&pubkey, &stake)| Node {
  386. node: NodeId::from(pubkey),
  387. stake,
  388. }),
  389. )
  390. .collect();
  391. sort_and_dedup_nodes(&mut nodes);
  392. if should_dedup_tvu_addrs {
  393. dedup_tvu_addrs(&mut nodes);
  394. };
  395. nodes
  396. }
  397. // Sorts nodes by highest stakes first and dedups by pubkey.
  398. fn sort_and_dedup_nodes(nodes: &mut Vec<Node>) {
  399. nodes.sort_unstable_by(|a, b| cmp_nodes_stake(b, a));
  400. // dedup_by keeps the first of consecutive elements which compare equal.
  401. // Because if all else are equal above sort puts NodeId::ContactInfo before
  402. // NodeId::Pubkey, this will keep nodes with contact-info.
  403. nodes.dedup_by(|a, b| a.pubkey() == b.pubkey());
  404. }
  405. // Compares nodes by stake and tie breaks by pubkeys.
  406. // For the same pubkey, NodeId::ContactInfo is considered > NodeId::Pubkey.
  407. #[inline]
  408. fn cmp_nodes_stake(a: &Node, b: &Node) -> Ordering {
  409. a.stake
  410. .cmp(&b.stake)
  411. .then_with(|| a.pubkey().cmp(b.pubkey()))
  412. .then_with(|| match (&a.node, &b.node) {
  413. (NodeId::ContactInfo(_), NodeId::ContactInfo(_)) => Ordering::Equal,
  414. (NodeId::ContactInfo(_), NodeId::Pubkey(_)) => Ordering::Greater,
  415. (NodeId::Pubkey(_), NodeId::ContactInfo(_)) => Ordering::Less,
  416. (NodeId::Pubkey(_), NodeId::Pubkey(_)) => Ordering::Equal,
  417. })
  418. }
  419. /// If set > 1 it allows the nodes to run behind a NAT.
  420. /// This usecase is currently not supported.
  421. const MAX_NUM_NODES_PER_IP_ADDRESS: usize = 1;
  422. /// Dedups socket addresses so that if there are 2 nodes in the cluster with the
  423. /// same TVU socket-addr, we only send shreds to one of them.
  424. /// Additionally limits number of nodes at the same IP address to 1
  425. fn dedup_tvu_addrs(nodes: &mut Vec<Node>) {
  426. const TVU_PROTOCOLS: [Protocol; 2] = [Protocol::UDP, Protocol::QUIC];
  427. let capacity = nodes.len().saturating_mul(2);
  428. // Tracks (Protocol, SocketAddr) tuples already observed.
  429. let mut addrs = HashSet::with_capacity(capacity);
  430. // Maps IP addresses to number of nodes at that IP address.
  431. let mut counts = HashMap::with_capacity(capacity);
  432. nodes.retain_mut(|node| {
  433. let node_stake = node.stake;
  434. let Some(node) = node.contact_info_mut() else {
  435. // Need to keep staked identities without gossip ContactInfo for
  436. // deterministic shuffle.
  437. return node_stake > 0u64;
  438. };
  439. // Dedup socket addresses and limit nodes at same IP address.
  440. for protocol in TVU_PROTOCOLS {
  441. let Some(addr) = node.tvu(protocol) else {
  442. continue;
  443. };
  444. let count: usize = *counts
  445. .entry((protocol, addr.ip()))
  446. .and_modify(|count| *count += 1)
  447. .or_insert(1);
  448. if !addrs.insert((protocol, addr)) || count > MAX_NUM_NODES_PER_IP_ADDRESS {
  449. // Remove the respective TVU address so that no more shreds are
  450. // sent to this socket address.
  451. node.remove_tvu_addr(protocol);
  452. }
  453. }
  454. // Always keep staked nodes for deterministic shuffle,
  455. // but drop non-staked nodes if they have no valid TVU address.
  456. node_stake > 0u64
  457. || TVU_PROTOCOLS
  458. .into_iter()
  459. .any(|protocol| node.tvu(protocol).is_some())
  460. })
  461. }
  462. // root : [0]
  463. // 1st layer: [1, 2, ..., fanout]
  464. // 2nd layer: [[fanout + 1, ..., fanout * 2],
  465. // [fanout * 2 + 1, ..., fanout * 3],
  466. // ...
  467. // [fanout * fanout + 1, ..., fanout * (fanout + 1)]]
  468. // 3rd layer: ...
  469. // ...
  470. // The leader node broadcasts shreds to the root node.
  471. // The root node retransmits the shreds to all nodes in the 1st layer.
  472. // Each other node retransmits shreds to fanout many nodes in the next layer.
  473. // For example the node k in the 1st layer will retransmit to nodes:
  474. // fanout + k, 2*fanout + k, ..., fanout*fanout + k
  475. fn get_retransmit_peers<T>(
  476. fanout: usize,
  477. // Predicate fn which identifies this node in the shuffle.
  478. pred: impl Fn(T) -> bool,
  479. nodes: impl IntoIterator<Item = T>,
  480. ) -> (/*this node's index:*/ usize, impl Iterator<Item = T>) {
  481. let mut nodes = nodes.into_iter();
  482. // This node's index within shuffled nodes.
  483. let Some(index) = nodes.by_ref().position(pred) else {
  484. // dedup_tvu_addrs might exclude a non-staked node from self.nodes due
  485. // to duplicate socket/IP addresses.
  486. return (usize::MAX, Either::Right(std::iter::empty()));
  487. };
  488. // Node's index within its neighborhood.
  489. let offset = index.saturating_sub(1) % fanout;
  490. // First node in the neighborhood.
  491. let anchor = index - offset;
  492. let step = if index == 0 { 1 } else { fanout };
  493. let peers = (anchor * fanout + offset + 1..)
  494. .step_by(step)
  495. .take(fanout)
  496. .scan(index, move |state, k| -> Option<T> {
  497. let peer = nodes.by_ref().nth(k - *state - 1)?;
  498. *state = k;
  499. Some(peer)
  500. });
  501. (index, Either::Left(peers))
  502. }
  503. // Returns the parent node in the turbine broadcast tree.
  504. // Returns None if the node is the root of the tree.
  505. fn get_retransmit_parent<T: Copy>(
  506. fanout: usize,
  507. index: usize, // Local node's index within the nodes slice.
  508. nodes: &[T],
  509. ) -> Option<T> {
  510. // Node's index within its neighborhood.
  511. let offset = index.saturating_sub(1) % fanout;
  512. let index = index.checked_sub(1)? / fanout;
  513. let index = index - index.saturating_sub(1) % fanout;
  514. let index = if index == 0 { index } else { index + offset };
  515. nodes.get(index).copied()
  516. }
  517. impl<T> ClusterNodesCache<T> {
  518. pub fn new(
  519. // Capacity of underlying LRU-cache in terms of number of epochs.
  520. cap: usize,
  521. // A time-to-live eviction policy is enforced to refresh entries in
  522. // case gossip contact-infos are updated.
  523. ttl: Duration,
  524. ) -> Self {
  525. Self {
  526. cache: RwLock::new(LruCache::new(cap)),
  527. ttl,
  528. }
  529. }
  530. }
  531. impl<T: 'static> ClusterNodesCache<T> {
  532. pub(crate) fn get(
  533. &self,
  534. shred_slot: Slot,
  535. root_bank: &Bank,
  536. working_bank: &Bank,
  537. cluster_info: &ClusterInfo,
  538. ) -> Arc<ClusterNodes<T>> {
  539. // Returns the cached entry for the epoch if it is either uninitialized
  540. // or not expired yet. Discards the entry if it is already initialized
  541. // but also expired.
  542. let get_epoch_entry = |cache: &LruCache<Epoch, _>, epoch, ttl| {
  543. let entry: &Arc<OnceLock<(Instant, _)>> = cache.get(&epoch)?;
  544. let Some((asof, _)) = entry.get() else {
  545. return Some(entry.clone()); // not initialized yet
  546. };
  547. (asof.elapsed() < ttl).then(|| entry.clone())
  548. };
  549. let epoch_schedule = root_bank.epoch_schedule();
  550. let epoch = epoch_schedule.get_epoch(shred_slot);
  551. // Read from the cache with a shared lock.
  552. let entry = {
  553. let cache = self.cache.read().unwrap();
  554. get_epoch_entry(&cache, epoch, self.ttl)
  555. };
  556. let use_cha_cha_8 = check_feature_activation(
  557. &feature_set::switch_to_chacha8_turbine::ID,
  558. shred_slot,
  559. root_bank,
  560. );
  561. // Fall back to exclusive lock if there is a cache miss or the cached
  562. // entry has already expired.
  563. let entry: Arc<OnceLock<_>> = entry.unwrap_or_else(|| {
  564. let mut cache = self.cache.write().unwrap();
  565. get_epoch_entry(&cache, epoch, self.ttl).unwrap_or_else(|| {
  566. // Either a cache miss here or the existing entry has already
  567. // expired. Upsert and return an uninitialized entry.
  568. let entry = Arc::<OnceLock<_>>::default();
  569. cache.put(epoch, Arc::clone(&entry));
  570. entry
  571. })
  572. });
  573. // Initialize if needed by only a single thread outside locks.
  574. let (_, nodes) = entry.get_or_init(|| {
  575. let epoch_staked_nodes = [root_bank, working_bank]
  576. .iter()
  577. .find_map(|bank| bank.epoch_staked_nodes(epoch))
  578. .unwrap_or_else(|| {
  579. error!(
  580. "ClusterNodesCache::get: unknown Bank::epoch_staked_nodes for epoch: \
  581. {epoch}, slot: {shred_slot}"
  582. );
  583. inc_new_counter_error!("cluster_nodes-unknown_epoch_staked_nodes", 1);
  584. Arc::<HashMap<Pubkey, /*stake:*/ u64>>::default()
  585. });
  586. let nodes = new_cluster_nodes::<T>(
  587. cluster_info,
  588. root_bank.cluster_type(),
  589. &epoch_staked_nodes,
  590. use_cha_cha_8,
  591. );
  592. (Instant::now(), Arc::new(nodes))
  593. });
  594. nodes.clone()
  595. }
  596. }
  597. impl From<ContactInfo> for NodeId {
  598. #[inline]
  599. fn from(node: ContactInfo) -> Self {
  600. NodeId::ContactInfo(node)
  601. }
  602. }
  603. impl From<Pubkey> for NodeId {
  604. #[inline]
  605. fn from(pubkey: Pubkey) -> Self {
  606. NodeId::Pubkey(pubkey)
  607. }
  608. }
  609. impl From<&GossipContactInfo> for ContactInfo {
  610. #[inline]
  611. fn from(node: &GossipContactInfo) -> Self {
  612. Self {
  613. pubkey: *node.pubkey(),
  614. wallclock: node.wallclock(),
  615. tvu_quic: node.tvu(Protocol::QUIC),
  616. tvu_udp: node.tvu(Protocol::UDP),
  617. }
  618. }
  619. }
  620. #[inline]
  621. pub(crate) fn get_broadcast_protocol(_: &ShredId) -> Protocol {
  622. Protocol::UDP
  623. }
  624. #[inline]
  625. fn get_root_distance(index: usize, fanout: usize) -> u8 {
  626. if index == 0 {
  627. 0
  628. } else if index <= fanout {
  629. 1
  630. } else if index <= fanout.saturating_add(1).saturating_mul(fanout) {
  631. 2
  632. } else {
  633. 3 // If changed, update MAX_NUM_TURBINE_HOPS.
  634. }
  635. }
  636. pub fn make_test_cluster<R: Rng>(
  637. rng: &mut R,
  638. num_nodes: usize,
  639. unstaked_ratio: Option<(u32, u32)>,
  640. ) -> (
  641. Vec<GossipContactInfo>,
  642. HashMap<Pubkey, u64>, // stakes
  643. ClusterInfo,
  644. ) {
  645. let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7));
  646. let mut nodes: Vec<_> = repeat_with(|| {
  647. let pubkey = solana_pubkey::new_rand();
  648. GossipContactInfo::new_localhost(&pubkey, /*wallclock:*/ timestamp())
  649. })
  650. .take(num_nodes)
  651. .collect();
  652. nodes.shuffle(rng);
  653. let keypair = Arc::new(Keypair::new());
  654. nodes[0] = GossipContactInfo::new_localhost(&keypair.pubkey(), /*wallclock:*/ timestamp());
  655. let this_node = nodes[0].clone();
  656. let mut stakes: HashMap<Pubkey, u64> = nodes
  657. .iter()
  658. .filter_map(|node| {
  659. if rng.gen_ratio(unstaked_numerator, unstaked_denominator) {
  660. None // No stake for some of the nodes.
  661. } else {
  662. Some((*node.pubkey(), rng.gen_range(0..20)))
  663. }
  664. })
  665. .collect();
  666. // Add some staked nodes with no contact-info.
  667. stakes.extend(repeat_with(|| (Pubkey::new_unique(), rng.gen_range(0..20))).take(100));
  668. let cluster_info = ClusterInfo::new(this_node, keypair, SocketAddrSpace::Unspecified);
  669. {
  670. let now = timestamp();
  671. let keypair = Keypair::new();
  672. let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
  673. // First node is pushed to crds table by ClusterInfo constructor.
  674. for node in nodes.iter().skip(1) {
  675. let node = CrdsData::from(node);
  676. let node = CrdsValue::new(node, &keypair);
  677. assert_eq!(
  678. gossip_crds.insert(node, now, GossipRoute::LocalMessage),
  679. Ok(())
  680. );
  681. }
  682. }
  683. (nodes, stakes, cluster_info)
  684. }
  685. pub(crate) fn get_data_plane_fanout(shred_slot: Slot, root_bank: &Bank) -> usize {
  686. if check_feature_activation(
  687. &feature_set::disable_turbine_fanout_experiments::id(),
  688. shred_slot,
  689. root_bank,
  690. ) {
  691. DATA_PLANE_FANOUT
  692. } else if check_feature_activation(
  693. &feature_set::enable_turbine_extended_fanout_experiments::id(),
  694. shred_slot,
  695. root_bank,
  696. ) {
  697. // Allocate ~2% of slots to turbine fanout experiments.
  698. match shred_slot % 359 {
  699. 11 => 1152,
  700. 61 => 1280,
  701. 111 => 1024,
  702. 161 => 1408,
  703. 211 => 896,
  704. 261 => 1536,
  705. 311 => 768,
  706. _ => DATA_PLANE_FANOUT,
  707. }
  708. } else {
  709. // feature_set::enable_turbine_fanout_experiments
  710. // is already activated on all clusters.
  711. match shred_slot % 359 {
  712. 11 => 64,
  713. 61 => 768,
  714. 111 => 128,
  715. 161 => 640,
  716. 211 => 256,
  717. 261 => 512,
  718. 311 => 384,
  719. _ => DATA_PLANE_FANOUT,
  720. }
  721. }
  722. }
  723. // Returns true if the feature is effective for the shred slot.
  724. #[must_use]
  725. pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
  726. match root_bank.feature_set.activated_slot(feature) {
  727. None => false,
  728. Some(feature_slot) => {
  729. let epoch_schedule = root_bank.epoch_schedule();
  730. let feature_epoch = epoch_schedule.get_epoch(feature_slot);
  731. let shred_epoch = epoch_schedule.get_epoch(shred_slot);
  732. feature_epoch < shred_epoch
  733. }
  734. }
  735. }
  736. #[cfg(test)]
  737. mod tests {
  738. use {
  739. super::*,
  740. itertools::Itertools,
  741. solana_hash::Hash as SolanaHash,
  742. solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
  743. std::{collections::VecDeque, fmt::Debug, hash::Hash},
  744. test_case::test_case,
  745. };
  746. #[test_case(true /* chacha8 */)]
  747. #[test_case(false /* chacha20 */)]
  748. /// Test that we provide a complete coverage
  749. /// of all the nodes with weighted shuffles
  750. fn test_complete_cluster_coverage(use_cha_cha_8: bool) {
  751. let fanout = 10;
  752. let mut rng = rand::thread_rng();
  753. let (_nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 20, Some((0, 1)));
  754. let slot_leader = cluster_info.id();
  755. // create a test cluster
  756. let cluster_nodes = new_cluster_nodes::<BroadcastStage>(
  757. &cluster_info,
  758. ClusterType::Development,
  759. &stakes,
  760. use_cha_cha_8,
  761. );
  762. let shred = Shredder::new(2, 1, 0, 0)
  763. .unwrap()
  764. .entries_to_merkle_shreds_for_tests(
  765. &Keypair::new(),
  766. &[],
  767. true,
  768. SolanaHash::default(),
  769. 0,
  770. 0,
  771. &ReedSolomonCache::default(),
  772. &mut ProcessShredsStats::default(),
  773. )
  774. .0
  775. .pop()
  776. .unwrap();
  777. let mut weighted_shuffle = cluster_nodes.weighted_shuffle.clone();
  778. let mut chacha_rng = TurbineRng::new_seeded(&slot_leader, &shred.id(), use_cha_cha_8);
  779. let shuffled_nodes: Vec<&Node> = weighted_shuffle
  780. .shuffle(&mut chacha_rng)
  781. .map(|i| &cluster_nodes.nodes[i])
  782. .collect();
  783. // Slot leader obviously has the shred
  784. let mut covered: HashSet<Pubkey> = HashSet::from([slot_leader]);
  785. // The root node has the shred sent to it initially
  786. let mut queue = VecDeque::from([*shuffled_nodes[0].pubkey()]);
  787. // traverse the turbine tree using the queue of nodes to visit (BFS)
  788. while let Some(addr) = queue.pop_front() {
  789. if !covered.insert(addr) {
  790. panic!("Should not send to already covered nodes, instead sending to {addr}");
  791. }
  792. let (_, peers) = get_retransmit_peers(
  793. fanout,
  794. |n: &Node| n.pubkey() == &addr,
  795. shuffled_nodes.clone(),
  796. );
  797. // visit all child nodes
  798. for peer in peers {
  799. trace!("{} is child of {addr}", peer.pubkey());
  800. queue.push_back(*peer.pubkey());
  801. if stakes[peer.pubkey()] == 0 {
  802. continue; // no check of retransmit parents for unstaked nodes
  803. }
  804. // luckily for us, ClusterNodes<RetransmitStage> does not do anything with own identity
  805. let mut peer_cluster_nodes = new_cluster_nodes::<RetransmitStage>(
  806. &cluster_info,
  807. ClusterType::Development,
  808. &stakes,
  809. use_cha_cha_8,
  810. );
  811. peer_cluster_nodes.pubkey = *peer.pubkey();
  812. // check that the parent computed by the child matches actual parent.
  813. let parent = peer_cluster_nodes
  814. .get_retransmit_parent(&slot_leader, &shred.id(), fanout)
  815. .unwrap();
  816. assert_eq!(
  817. Some(addr),
  818. parent,
  819. "Found incorrect parent for node {}",
  820. peer_cluster_nodes.pubkey
  821. );
  822. }
  823. }
  824. // Convert cluster_nodes into hashset of pubkeys
  825. let all_nodes: HashSet<_> = cluster_nodes.nodes.iter().map(|n| *n.pubkey()).collect();
  826. assert_eq!(all_nodes, covered, "All nodes must be covered");
  827. }
  828. #[test]
  829. fn test_cluster_nodes_retransmit() {
  830. let mut rng = rand::thread_rng();
  831. let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
  832. // ClusterInfo::tvu_peers excludes the node itself.
  833. assert_eq!(
  834. cluster_info.tvu_peers(GossipContactInfo::clone).len(),
  835. nodes.len() - 1
  836. );
  837. let cluster_nodes = new_cluster_nodes::<RetransmitStage>(
  838. &cluster_info,
  839. ClusterType::Development,
  840. &stakes,
  841. false,
  842. );
  843. // All nodes with contact-info should be in the index.
  844. // Staked nodes with no contact-info should be included.
  845. assert!(cluster_nodes.nodes.len() > nodes.len());
  846. // Assert that all nodes keep their contact-info.
  847. // and, all staked nodes are also included.
  848. {
  849. let cluster_nodes: HashMap<_, _> = cluster_nodes
  850. .nodes
  851. .iter()
  852. .map(|node| (node.pubkey(), node))
  853. .collect();
  854. for node in &nodes {
  855. assert_eq!(
  856. cluster_nodes[node.pubkey()]
  857. .contact_info()
  858. .unwrap()
  859. .pubkey(),
  860. node.pubkey()
  861. );
  862. }
  863. for (pubkey, stake) in &stakes {
  864. if *stake > 0 {
  865. assert_eq!(cluster_nodes[pubkey].stake, *stake);
  866. }
  867. }
  868. }
  869. }
  870. #[test_case(true)/*ChaCha8 */]
  871. #[test_case(false)/*ChaCha20 */]
  872. fn test_cluster_nodes_broadcast(use_cha_cha_8: bool) {
  873. let mut rng = rand::thread_rng();
  874. let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
  875. // ClusterInfo::tvu_peers excludes the node itself.
  876. assert_eq!(
  877. cluster_info.tvu_peers(GossipContactInfo::clone).len(),
  878. nodes.len() - 1
  879. );
  880. let cluster_nodes = ClusterNodes::<BroadcastStage>::new(
  881. &cluster_info,
  882. ClusterType::Development,
  883. &stakes,
  884. use_cha_cha_8,
  885. );
  886. // All nodes with contact-info should be in the index.
  887. // Excluding this node itself.
  888. // Staked nodes with no contact-info should be included.
  889. assert!(cluster_nodes.nodes.len() > nodes.len());
  890. // Assert that all nodes keep their contact-info.
  891. // and, all staked nodes are also included.
  892. {
  893. let cluster_nodes: HashMap<_, _> = cluster_nodes
  894. .nodes
  895. .iter()
  896. .map(|node| (node.pubkey(), node))
  897. .collect();
  898. for node in &nodes {
  899. assert_eq!(
  900. cluster_nodes[node.pubkey()]
  901. .contact_info()
  902. .unwrap()
  903. .pubkey(),
  904. node.pubkey()
  905. );
  906. }
  907. for (pubkey, stake) in &stakes {
  908. if *stake > 0 {
  909. assert_eq!(cluster_nodes[pubkey].stake, *stake);
  910. }
  911. }
  912. }
  913. }
  914. // Checks (1) computed retransmit children against expected children and
  915. // (2) computed parent of each child against the expected parent.
  916. fn check_retransmit_nodes<T>(fanout: usize, nodes: &[T], peers: Vec<Vec<T>>)
  917. where
  918. T: Copy + Eq + PartialEq + Debug + Hash,
  919. {
  920. // Map node identities to their index within the shuffled tree.
  921. let cache: HashMap<_, _> = nodes
  922. .iter()
  923. .copied()
  924. .enumerate()
  925. .map(|(k, node)| (node, k))
  926. .collect();
  927. let offset = peers.len();
  928. // Root node's parent is None.
  929. assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, nodes), None);
  930. for (k, peers) in peers.into_iter().enumerate() {
  931. {
  932. let (index, retransmit_peers) =
  933. get_retransmit_peers(fanout, |node| node == &nodes[k], nodes);
  934. assert_eq!(peers, retransmit_peers.copied().collect::<Vec<_>>());
  935. assert_eq!(index, k);
  936. }
  937. let parent = Some(nodes[k]);
  938. for peer in peers {
  939. assert_eq!(get_retransmit_parent(fanout, cache[&peer], nodes), parent);
  940. }
  941. }
  942. // Remaining nodes have no children.
  943. for k in offset..nodes.len() {
  944. let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], nodes);
  945. assert_eq!(peers.next(), None);
  946. assert_eq!(index, k);
  947. }
  948. }
  949. #[test]
  950. fn test_get_retransmit_nodes() {
  951. // fanout 2
  952. let nodes = [
  953. 7, // root
  954. 6, 10, // 1st layer
  955. // 2nd layer
  956. 5, 19, // 1st neighborhood
  957. 0, 14, // 2nd
  958. // 3rd layer
  959. 3, 1, // 1st neighborhood
  960. 12, 2, // 2nd
  961. 11, 4, // 3rd
  962. 15, 18, // 4th
  963. // 4th layer
  964. 13, 16, // 1st neighborhood
  965. 17, 9, // 2nd
  966. 8, // 3rd
  967. ];
  968. let peers = vec![
  969. vec![6, 10],
  970. vec![5, 0],
  971. vec![19, 14],
  972. vec![3, 12],
  973. vec![1, 2],
  974. vec![11, 15],
  975. vec![4, 18],
  976. vec![13, 17],
  977. vec![16, 9],
  978. vec![8],
  979. ];
  980. check_retransmit_nodes(/*fanout:*/ 2, &nodes, peers);
  981. // fanout 3
  982. let nodes = [
  983. 19, // root
  984. 14, 15, 28, // 1st layer
  985. // 2nd layer
  986. 29, 4, 5, // 1st neighborhood
  987. 9, 16, 7, // 2nd
  988. 26, 23, 2, // 3rd
  989. // 3rd layer
  990. 31, 3, 17, // 1st neighborhood
  991. 20, 25, 0, // 2nd
  992. 13, 30, 18, // 3rd
  993. 35, 21, 22, // 4th
  994. 6, 8, 11, // 5th
  995. 27, 1, 10, // 6th
  996. 12, 24, 34, // 7th
  997. 33, 32, // 8th
  998. ];
  999. let peers = vec![
  1000. vec![14, 15, 28],
  1001. vec![29, 9, 26],
  1002. vec![4, 16, 23],
  1003. vec![5, 7, 2],
  1004. vec![31, 20, 13],
  1005. vec![3, 25, 30],
  1006. vec![17, 0, 18],
  1007. vec![35, 6, 27],
  1008. vec![21, 8, 1],
  1009. vec![22, 11, 10],
  1010. vec![12, 33],
  1011. vec![24, 32],
  1012. vec![34],
  1013. ];
  1014. check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
  1015. let nodes = [
  1016. 5, // root
  1017. 34, 52, 8, // 1st layer
  1018. // 2nd layar
  1019. 44, 18, 2, // 1st neighborhood
  1020. 42, 47, 46, // 2nd
  1021. 11, 26, 28, // 3rd
  1022. // 3rd layer
  1023. 53, 23, 37, // 1st neighborhood
  1024. 40, 13, 7, // 2nd
  1025. 50, 35, 22, // 3rd
  1026. 3, 27, 31, // 4th
  1027. 10, 48, 15, // 5th
  1028. 19, 6, 30, // 6th
  1029. 36, 45, 1, // 7th
  1030. 38, 12, 17, // 8th
  1031. 4, 32, 16, // 9th
  1032. // 4th layer
  1033. 41, 49, 24, // 1st neighborhood
  1034. 14, 9, 0, // 2nd
  1035. 29, 21, 39, // 3rd
  1036. 43, 51, 33, // 4th
  1037. 25, 20, // 5th
  1038. ];
  1039. let peers = vec![
  1040. vec![34, 52, 8],
  1041. vec![44, 42, 11],
  1042. vec![18, 47, 26],
  1043. vec![2, 46, 28],
  1044. vec![53, 40, 50],
  1045. vec![23, 13, 35],
  1046. vec![37, 7, 22],
  1047. vec![3, 10, 19],
  1048. vec![27, 48, 6],
  1049. vec![31, 15, 30],
  1050. vec![36, 38, 4],
  1051. vec![45, 12, 32],
  1052. vec![1, 17, 16],
  1053. vec![41, 14, 29],
  1054. vec![49, 9, 21],
  1055. vec![24, 0, 39],
  1056. vec![43, 25],
  1057. vec![51, 20],
  1058. vec![33],
  1059. ];
  1060. check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
  1061. }
  1062. #[test_case(2, 1_347)]
  1063. #[test_case(3, 1_359)]
  1064. #[test_case(4, 4_296)]
  1065. #[test_case(5, 3_925)]
  1066. #[test_case(6, 8_778)]
  1067. #[test_case(7, 9_879)]
  1068. fn test_get_retransmit_nodes_round_trip(fanout: usize, size: usize) {
  1069. let mut rng = rand::thread_rng();
  1070. let mut nodes: Vec<_> = (0..size).collect();
  1071. nodes.shuffle(&mut rng);
  1072. // Map node identities to their index within the shuffled tree.
  1073. let cache: HashMap<_, _> = nodes
  1074. .iter()
  1075. .copied()
  1076. .enumerate()
  1077. .map(|(k, node)| (node, k))
  1078. .collect();
  1079. // Root node's parent is None.
  1080. assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None);
  1081. for k in 1..size {
  1082. let parent = get_retransmit_parent(fanout, k, &nodes).unwrap();
  1083. let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &parent, &nodes);
  1084. assert_eq!(index, cache[&parent]);
  1085. assert_eq!(peers.find(|&&peer| peer == nodes[k]), Some(&nodes[k]));
  1086. }
  1087. for k in 0..size {
  1088. let parent = Some(nodes[k]);
  1089. let (index, peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], &nodes);
  1090. assert_eq!(index, k);
  1091. for peer in peers {
  1092. assert_eq!(get_retransmit_parent(fanout, cache[peer], &nodes), parent);
  1093. }
  1094. }
  1095. }
  1096. #[test]
  1097. fn test_sort_and_dedup_nodes() {
  1098. let mut rng = rand::thread_rng();
  1099. let pubkeys: Vec<Pubkey> = std::iter::repeat_with(|| Pubkey::from(rng.gen::<[u8; 32]>()))
  1100. .take(50)
  1101. .collect();
  1102. let stakes = std::iter::repeat_with(|| rng.gen_range(0..100u64));
  1103. let stakes: HashMap<Pubkey, u64> = pubkeys.iter().copied().zip(stakes).collect();
  1104. let mut nodes: Vec<Node> = std::iter::repeat_with(|| {
  1105. let pubkey = pubkeys.choose(&mut rng).copied().unwrap();
  1106. let stake = stakes[&pubkey];
  1107. let node = GossipContactInfo::new_localhost(&pubkey, /*wallclock:*/ timestamp());
  1108. [
  1109. Node {
  1110. node: NodeId::from(ContactInfo::from(&node)),
  1111. stake,
  1112. },
  1113. Node {
  1114. node: NodeId::from(pubkey),
  1115. stake,
  1116. },
  1117. ]
  1118. })
  1119. .flatten()
  1120. .take(10_000)
  1121. .collect();
  1122. let mut unique_pubkeys: HashSet<Pubkey> = nodes.iter().map(Node::pubkey).copied().collect();
  1123. nodes.shuffle(&mut rng);
  1124. sort_and_dedup_nodes(&mut nodes);
  1125. // Assert that stakes are non-decreasing.
  1126. for (a, b) in nodes.iter().tuple_windows() {
  1127. assert!(a.stake >= b.stake);
  1128. }
  1129. // Assert that larger pubkey tie-breaks equal stakes.
  1130. for (a, b) in nodes.iter().tuple_windows() {
  1131. if a.stake == b.stake {
  1132. assert!(a.pubkey() > b.pubkey());
  1133. }
  1134. }
  1135. // Assert that NodeId::Pubkey are dropped in favor of
  1136. // NodeId::ContactInfo.
  1137. for node in &nodes {
  1138. assert_matches!(node.node, NodeId::ContactInfo(_));
  1139. }
  1140. // Assert that unique pubkeys are preserved.
  1141. for node in &nodes {
  1142. assert!(unique_pubkeys.remove(node.pubkey()))
  1143. }
  1144. assert!(unique_pubkeys.is_empty());
  1145. }
  1146. }