crds_gossip.rs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836
  1. #![allow(clippy::arithmetic_side_effects)]
  2. use {
  3. bincode::serialized_size,
  4. itertools::Itertools,
  5. log::*,
  6. rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
  7. serial_test::serial,
  8. solana_gossip::{
  9. cluster_info_metrics::GossipStats,
  10. contact_info::ContactInfo,
  11. crds::GossipRoute,
  12. crds_data::CrdsData,
  13. crds_gossip::*,
  14. crds_gossip_error::CrdsGossipError,
  15. crds_gossip_pull::{
  16. CrdsTimeouts, ProcessPullStats, PullRequest, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
  17. },
  18. crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
  19. crds_value::{CrdsValue, CrdsValueLabel},
  20. },
  21. solana_keypair::Keypair,
  22. solana_net_utils::SocketAddrSpace,
  23. solana_pubkey::Pubkey,
  24. solana_rayon_threadlimit::get_thread_count,
  25. solana_sha256_hasher::hash,
  26. solana_signer::Signer,
  27. solana_time_utils::timestamp,
  28. std::{
  29. collections::{HashMap, HashSet},
  30. net::{Ipv4Addr, SocketAddr},
  31. ops::Deref,
  32. sync::{Arc, Mutex},
  33. time::{Duration, Instant},
  34. },
  35. };
  36. type PingCache = solana_gossip::ping_pong::PingCache<32>;
  37. #[derive(Clone)]
  38. struct Node {
  39. keypair: Arc<Keypair>,
  40. contact_info: ContactInfo,
  41. gossip: Arc<CrdsGossip>,
  42. ping_cache: Arc<Mutex<PingCache>>,
  43. stake: u64,
  44. }
  45. impl Node {
  46. fn new(keypair: Arc<Keypair>, contact_info: ContactInfo, gossip: Arc<CrdsGossip>) -> Self {
  47. Self::staked(keypair, contact_info, gossip, 0)
  48. }
  49. fn staked(
  50. keypair: Arc<Keypair>,
  51. contact_info: ContactInfo,
  52. gossip: Arc<CrdsGossip>,
  53. stake: u64,
  54. ) -> Self {
  55. let ping_cache = Arc::new(new_ping_cache());
  56. Node {
  57. keypair,
  58. contact_info,
  59. gossip,
  60. ping_cache,
  61. stake,
  62. }
  63. }
  64. }
  65. struct Network {
  66. nodes: HashMap<Pubkey, Node>,
  67. stake_pruned: u64,
  68. connections_pruned: HashSet<(Pubkey, Pubkey)>,
  69. }
  70. impl Network {
  71. fn new(nodes: HashMap<Pubkey, Node>) -> Self {
  72. Network {
  73. nodes,
  74. connections_pruned: HashSet::new(),
  75. stake_pruned: 0,
  76. }
  77. }
  78. }
  79. impl Deref for Network {
  80. type Target = HashMap<Pubkey, Node>;
  81. fn deref(&self) -> &Self::Target {
  82. &self.nodes
  83. }
  84. }
  85. fn stakes(network: &Network) -> HashMap<Pubkey, u64> {
  86. let mut stakes = HashMap::new();
  87. for (key, Node { stake, .. }) in network.iter() {
  88. stakes.insert(*key, *stake);
  89. }
  90. stakes
  91. }
  92. fn star_network_create(num: usize) -> Network {
  93. let gossip_port_offset = 9000;
  94. let node_keypair = Arc::new(Keypair::new());
  95. let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
  96. let entry = CrdsValue::new(CrdsData::from(&contact_info), &node_keypair);
  97. let mut network: HashMap<_, _> = (1..num)
  98. .map(|k| {
  99. let node_keypair = Arc::new(Keypair::new());
  100. // Need unique gossip addresses, otherwise nodes will be deduped by
  101. // crds_gossip::dedup_gossip_addresses before peer sampling.
  102. let mut contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
  103. let gossip_port = gossip_port_offset + u16::try_from(k).unwrap();
  104. contact_info
  105. .set_gossip((Ipv4Addr::LOCALHOST, gossip_port))
  106. .unwrap();
  107. let new = CrdsValue::new(CrdsData::from(&contact_info), &node_keypair);
  108. let node = CrdsGossip::default();
  109. {
  110. let mut node_crds = node.crds.write().unwrap();
  111. node_crds
  112. .insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
  113. .unwrap();
  114. node_crds
  115. .insert(entry.clone(), timestamp(), GossipRoute::LocalMessage)
  116. .unwrap();
  117. }
  118. let node = Node::new(node_keypair, contact_info, Arc::new(node));
  119. (new.label().pubkey(), node)
  120. })
  121. .collect();
  122. let node = CrdsGossip::default();
  123. let id = entry.label().pubkey();
  124. node.crds
  125. .write()
  126. .unwrap()
  127. .insert(entry, timestamp(), GossipRoute::LocalMessage)
  128. .unwrap();
  129. let node = Node::new(node_keypair, contact_info, Arc::new(node));
  130. network.insert(id, node);
  131. Network::new(network)
  132. }
  133. fn rstar_network_create(num: usize) -> Network {
  134. let node_keypair = Arc::new(Keypair::new());
  135. let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
  136. let entry = CrdsValue::new(CrdsData::from(&contact_info), &node_keypair);
  137. let origin = CrdsGossip::default();
  138. let id = entry.label().pubkey();
  139. origin
  140. .crds
  141. .write()
  142. .unwrap()
  143. .insert(entry, timestamp(), GossipRoute::LocalMessage)
  144. .unwrap();
  145. let mut network: HashMap<_, _> = (1..num)
  146. .map(|_| {
  147. let node_keypair = Arc::new(Keypair::new());
  148. let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
  149. let new = CrdsValue::new(CrdsData::from(&contact_info), &node_keypair);
  150. let node = CrdsGossip::default();
  151. node.crds
  152. .write()
  153. .unwrap()
  154. .insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
  155. .unwrap();
  156. origin
  157. .crds
  158. .write()
  159. .unwrap()
  160. .insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
  161. .unwrap();
  162. let node = Node::new(node_keypair, contact_info, Arc::new(node));
  163. (new.label().pubkey(), node)
  164. })
  165. .collect();
  166. let node = Node::new(node_keypair, contact_info, Arc::new(origin));
  167. network.insert(id, node);
  168. Network::new(network)
  169. }
  170. fn ring_network_create(num: usize) -> Network {
  171. let mut network: HashMap<_, _> = (0..num)
  172. .map(|_| {
  173. let node_keypair = Arc::new(Keypair::new());
  174. let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
  175. let new = CrdsValue::new(CrdsData::from(&contact_info), &node_keypair);
  176. let node = CrdsGossip::default();
  177. node.crds
  178. .write()
  179. .unwrap()
  180. .insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
  181. .unwrap();
  182. let node = Node::new(node_keypair, contact_info, Arc::new(node));
  183. (new.label().pubkey(), node)
  184. })
  185. .collect();
  186. let keys: Vec<Pubkey> = network.keys().cloned().collect();
  187. for k in 0..keys.len() {
  188. let start_info = {
  189. let start = &network[&keys[k]];
  190. let start_id = keys[k];
  191. let label = CrdsValueLabel::ContactInfo(start_id);
  192. let gossip_crds = start.gossip.crds.read().unwrap();
  193. gossip_crds.get::<&CrdsValue>(&label).unwrap().clone()
  194. };
  195. let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap();
  196. let mut end_crds = end.gossip.crds.write().unwrap();
  197. end_crds
  198. .insert(start_info, timestamp(), GossipRoute::LocalMessage)
  199. .unwrap();
  200. }
  201. Network::new(network)
  202. }
  203. fn connected_staked_network_create(stakes: &[u64]) -> Network {
  204. let num = stakes.len();
  205. let mut network: HashMap<_, _> = (0..num)
  206. .map(|n| {
  207. let node_keypair = Arc::new(Keypair::new());
  208. let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
  209. let new = CrdsValue::new(CrdsData::from(&contact_info), &node_keypair);
  210. let node = CrdsGossip::default();
  211. node.crds
  212. .write()
  213. .unwrap()
  214. .insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
  215. .unwrap();
  216. let node = Node::staked(node_keypair, contact_info, Arc::new(node), stakes[n]);
  217. (new.label().pubkey(), node)
  218. })
  219. .collect();
  220. let keys: Vec<Pubkey> = network.keys().cloned().collect();
  221. let start_entries: Vec<_> = keys
  222. .iter()
  223. .map(|k| {
  224. let start = &network[k];
  225. let start_label = CrdsValueLabel::ContactInfo(*k);
  226. let gossip_crds = start.gossip.crds.read().unwrap();
  227. gossip_crds.get::<&CrdsValue>(&start_label).unwrap().clone()
  228. })
  229. .collect();
  230. for (end_pubkey, end) in network.iter_mut() {
  231. let mut end_crds = end.gossip.crds.write().unwrap();
  232. for k in 0..keys.len() {
  233. if keys[k] != *end_pubkey {
  234. let start_info = start_entries[k].clone();
  235. end_crds
  236. .insert(start_info, timestamp(), GossipRoute::LocalMessage)
  237. .unwrap();
  238. }
  239. }
  240. }
  241. Network::new(network)
  242. }
  243. fn network_simulator_pull_only(thread_pool: &ThreadPool, network: &Network) {
  244. let num = network.len();
  245. // In absence of gossip push messages, a pull only network with star
  246. // topology will not converge because it forms a DAG. We add additional
  247. // edges so that there is a directed path between every two pair of nodes.
  248. let (pubkeys, mut entries): (Vec<_>, Vec<_>) = network
  249. .nodes
  250. .iter()
  251. .map(|(&pubkey, node)| {
  252. let label = CrdsValueLabel::ContactInfo(pubkey);
  253. let crds = node.gossip.crds.read().unwrap();
  254. let entry = crds.get::<&CrdsValue>(&label).unwrap().clone();
  255. (pubkey, entry)
  256. })
  257. .unzip();
  258. entries.rotate_right(1);
  259. for (pubkey, entry) in pubkeys.into_iter().zip(entries) {
  260. let mut crds = network.nodes[&pubkey].gossip.crds.write().unwrap();
  261. let _ = crds.insert(entry, timestamp(), GossipRoute::LocalMessage);
  262. }
  263. let (converged, bytes_tx) = network_run_pull(thread_pool, network, 0, num * 2, 0.9);
  264. trace!("network_simulator_pull_{num}: converged: {converged} total_bytes: {bytes_tx}");
  265. assert!(converged >= 0.9);
  266. }
  267. fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_convergance: f64) {
  268. let num = network.len();
  269. // run for a small amount of time
  270. let (converged, bytes_tx) = network_run_pull(thread_pool, network, 0, 10, 1.0);
  271. trace!("network_simulator_push_{num}: converged: {converged}");
  272. // make sure there is someone in the active set
  273. let network_values: Vec<Node> = network.values().cloned().collect();
  274. network_values.par_iter().for_each(|node| {
  275. node.gossip.refresh_push_active_set(
  276. &node.keypair,
  277. 0, // shred version
  278. &HashMap::new(), // stakes
  279. None, // gossip validators
  280. &node.ping_cache,
  281. &mut Vec::new(), // pings
  282. &SocketAddrSpace::Unspecified,
  283. );
  284. });
  285. let mut total_bytes = bytes_tx;
  286. let mut ts = timestamp();
  287. for _ in 1..num {
  288. let start = ts.div_ceil(100) as usize;
  289. let end = start + 10;
  290. let now = (start * 100) as u64;
  291. ts += 1000;
  292. // push a message to the network
  293. network_values.par_iter().for_each(|node| {
  294. let node_pubkey = node.keypair.pubkey();
  295. let mut m = {
  296. let node_crds = node.gossip.crds.read().unwrap();
  297. node_crds.get::<&ContactInfo>(node_pubkey).cloned().unwrap()
  298. };
  299. m.set_wallclock(now);
  300. node.gossip.process_push_message(
  301. vec![(
  302. Pubkey::default(),
  303. vec![CrdsValue::new(CrdsData::ContactInfo(m), &Keypair::new())],
  304. )],
  305. now,
  306. );
  307. });
  308. // push for a bit
  309. let (queue_size, bytes_tx) = network_run_push(thread_pool, network, start, end);
  310. total_bytes += bytes_tx;
  311. trace!("network_simulator_push_{num}: queue_size: {queue_size} bytes: {bytes_tx}");
  312. // pull for a bit
  313. let (converged, bytes_tx) = network_run_pull(thread_pool, network, start, end, 1.0);
  314. total_bytes += bytes_tx;
  315. trace!(
  316. "network_simulator_push_{num}: converged: {converged} bytes: {bytes_tx} total_bytes: \
  317. {total_bytes}"
  318. );
  319. if converged > max_convergance {
  320. break;
  321. }
  322. }
  323. }
  324. fn network_run_push(
  325. thread_pool: &ThreadPool,
  326. network: &mut Network,
  327. start: usize,
  328. end: usize,
  329. ) -> (usize, usize) {
  330. let mut bytes: usize = 0;
  331. let mut num_msgs: usize = 0;
  332. let mut total: usize = 0;
  333. let num = network.len();
  334. let mut prunes: usize = 0;
  335. let mut delivered: usize = 0;
  336. let mut stake_pruned: u64 = 0;
  337. let network_values: Vec<Node> = network.values().cloned().collect();
  338. let stakes = stakes(network);
  339. for t in start..end {
  340. let now = t as u64 * 100;
  341. let requests: Vec<_> = network_values
  342. .par_iter()
  343. .map(|node| {
  344. let node_pubkey = node.keypair.pubkey();
  345. let timeouts = node.gossip.make_timeouts(
  346. node_pubkey,
  347. &stakes,
  348. Duration::from_millis(node.gossip.pull.crds_timeout),
  349. );
  350. node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts);
  351. let (entries, messages, _) = node.gossip.new_push_messages(
  352. &node_pubkey,
  353. now,
  354. &stakes,
  355. |_| true, // should_retain_crds_value
  356. );
  357. let messages = messages
  358. .into_iter()
  359. .map(|(pubkey, indices)| {
  360. let values = indices.into_iter().map(|k| entries[k].clone()).collect();
  361. (pubkey, values)
  362. })
  363. .collect::<Vec<(_, Vec<_>)>>();
  364. (node_pubkey, messages)
  365. })
  366. .collect();
  367. let transfered: Vec<_> = requests
  368. .into_par_iter()
  369. .map(|(from, push_messages)| {
  370. let mut bytes: usize = 0;
  371. let mut delivered: usize = 0;
  372. let mut num_msgs: usize = 0;
  373. let mut pruned: HashSet<(Pubkey, Pubkey)> = HashSet::new();
  374. for (to, msgs) in push_messages {
  375. // 8 bytes for encoding the length of the vector.
  376. bytes += 8 + msgs
  377. .iter()
  378. .map(CrdsValue::bincode_serialized_size)
  379. .sum::<usize>();
  380. num_msgs += 1;
  381. let origins: HashSet<_> = network
  382. .get(&to)
  383. .unwrap()
  384. .gossip
  385. .process_push_message(vec![(from, msgs.clone())], now)
  386. .into_iter()
  387. .collect();
  388. let prunes_map = network
  389. .get(&to)
  390. .map(|node| {
  391. let node_pubkey = node.keypair.pubkey();
  392. node.gossip
  393. .prune_received_cache(&node_pubkey, origins, &stakes)
  394. })
  395. .unwrap();
  396. for (from, prune_set) in prunes_map {
  397. let prune_keys: Vec<_> = prune_set.into_iter().collect();
  398. for prune_key in &prune_keys {
  399. pruned.insert((from, *prune_key));
  400. }
  401. bytes += serialized_size(&prune_keys).unwrap() as usize;
  402. delivered += 1;
  403. network
  404. .get(&from)
  405. .map(|node| {
  406. let node_pubkey = node.keypair.pubkey();
  407. let destination = node_pubkey;
  408. let now = timestamp();
  409. node.gossip
  410. .process_prune_msg(
  411. &node_pubkey,
  412. &to,
  413. &destination,
  414. &prune_keys,
  415. now,
  416. now,
  417. &stakes,
  418. )
  419. .unwrap()
  420. })
  421. .unwrap();
  422. }
  423. }
  424. (bytes, delivered, num_msgs, pruned)
  425. })
  426. .collect();
  427. for (b, d, m, p) in transfered {
  428. bytes += b;
  429. delivered += d;
  430. num_msgs += m;
  431. for (from, to) in p {
  432. let from_stake = stakes.get(&from).unwrap();
  433. if network.connections_pruned.insert((from, to)) {
  434. prunes += 1;
  435. stake_pruned += *from_stake;
  436. }
  437. }
  438. }
  439. if now.is_multiple_of(CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS) && now > 0 {
  440. network_values.par_iter().for_each(|node| {
  441. node.gossip.refresh_push_active_set(
  442. &node.keypair,
  443. 0, // shred version
  444. &HashMap::new(), // stakes
  445. None, // gossip validators
  446. &node.ping_cache,
  447. &mut Vec::new(), // pings
  448. &SocketAddrSpace::Unspecified,
  449. );
  450. });
  451. }
  452. total = network_values
  453. .par_iter()
  454. .map(|node| node.gossip.push.num_pending(&node.gossip.crds))
  455. .sum();
  456. trace!(
  457. "network_run_push_{num}: now: {now} queue: {total} bytes: {bytes} num_msgs: \
  458. {num_msgs} prunes: {prunes} stake_pruned: {stake_pruned} delivered: {delivered}"
  459. );
  460. }
  461. network.stake_pruned += stake_pruned;
  462. (total, bytes)
  463. }
  464. fn network_run_pull(
  465. thread_pool: &ThreadPool,
  466. network: &Network,
  467. start: usize,
  468. end: usize,
  469. max_convergance: f64,
  470. ) -> (f64, usize) {
  471. let mut bytes: usize = 0;
  472. let mut msgs: usize = 0;
  473. let mut overhead: usize = 0;
  474. let mut convergance = 0f64;
  475. let num = network.len();
  476. let network_values: Vec<Node> = network.values().cloned().collect();
  477. let stakes = stakes(network);
  478. for node in &network_values {
  479. let mut ping_cache = node.ping_cache.lock().unwrap();
  480. for other in &network_values {
  481. if node.keypair.pubkey() != other.keypair.pubkey() {
  482. ping_cache.mock_pong(
  483. other.keypair.pubkey(),
  484. other.contact_info.gossip().unwrap(),
  485. Instant::now(),
  486. );
  487. }
  488. }
  489. }
  490. let nodes: HashMap<SocketAddr, ContactInfo> = network
  491. .nodes
  492. .values()
  493. .map(|node| {
  494. let node = &node.contact_info;
  495. (node.gossip().unwrap(), node.clone())
  496. })
  497. .collect();
  498. for t in start..end {
  499. let now = t as u64 * 100;
  500. let requests: Vec<_> = {
  501. network_values
  502. .par_iter()
  503. .flat_map_iter(|from| {
  504. let mut pings = Vec::new();
  505. let requests = from
  506. .gossip
  507. .new_pull_request(
  508. thread_pool,
  509. from.keypair.deref(),
  510. 0, // shred version.
  511. now,
  512. None,
  513. &HashMap::new(),
  514. 992, // max_bloom_filter_bytes
  515. from.ping_cache.deref(),
  516. &mut pings,
  517. &SocketAddrSpace::Unspecified,
  518. )
  519. .map(|requests| {
  520. requests
  521. .into_group_map()
  522. .into_iter()
  523. .map(|(addr, filters)| {
  524. (nodes.get(&addr).cloned().unwrap(), filters)
  525. })
  526. .collect::<Vec<_>>()
  527. })
  528. .unwrap_or_default();
  529. let from_pubkey = from.keypair.pubkey();
  530. let label = CrdsValueLabel::ContactInfo(from_pubkey);
  531. let gossip_crds = from.gossip.crds.read().unwrap();
  532. let self_info = gossip_crds.get::<&CrdsValue>(&label).unwrap().clone();
  533. requests
  534. .into_iter()
  535. .map(move |(peer, filters)| (*peer.pubkey(), filters, self_info.clone()))
  536. })
  537. .collect()
  538. };
  539. let transfered: Vec<_> = requests
  540. .into_iter()
  541. .map(|(to, filters, caller_info)| {
  542. let mut bytes: usize = 0;
  543. let mut msgs: usize = 0;
  544. let mut overhead: usize = 0;
  545. let from = caller_info.label().pubkey();
  546. bytes += filters.iter().map(|f| f.filter.keys.len()).sum::<usize>();
  547. bytes += filters
  548. .iter()
  549. .map(|f| f.filter.bits.len() as usize / 8)
  550. .sum::<usize>();
  551. bytes += caller_info.bincode_serialized_size();
  552. let requests: Vec<_> = filters
  553. .into_iter()
  554. .map(|filter| PullRequest {
  555. pubkey: from,
  556. addr: SocketAddr::from(([0; 4], 0)),
  557. wallclock: now,
  558. filter,
  559. })
  560. .collect();
  561. let rsp: Vec<_> = network
  562. .get(&to)
  563. .map(|node| {
  564. node.gossip
  565. .generate_pull_responses(
  566. thread_pool,
  567. &requests,
  568. usize::MAX, // output_size_limit
  569. now,
  570. |_| true, // should_retain_crds_value
  571. 0, // network shred version
  572. &GossipStats::default(),
  573. )
  574. .into_iter()
  575. .flatten()
  576. .collect()
  577. })
  578. .unwrap();
  579. // 8 bytes for encoding the length of the vector.
  580. bytes += 8 + rsp
  581. .iter()
  582. .map(CrdsValue::bincode_serialized_size)
  583. .sum::<usize>();
  584. msgs += rsp.len();
  585. if let Some(node) = network.get(&from) {
  586. let mut stats = ProcessPullStats::default();
  587. let timeouts = CrdsTimeouts::new(
  588. node.keypair.pubkey(),
  589. CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, // default_timeout
  590. Duration::from_secs(48 * 3600), // epoch_duration
  591. &stakes,
  592. );
  593. let (vers, vers_expired_timeout, failed_inserts) = node
  594. .gossip
  595. .filter_pull_responses(&timeouts, rsp, now, &mut stats);
  596. node.gossip.process_pull_responses(
  597. vers,
  598. vers_expired_timeout,
  599. failed_inserts,
  600. now,
  601. &mut stats,
  602. );
  603. overhead += stats.failed_insert;
  604. overhead += stats.failed_timeout;
  605. }
  606. (bytes, msgs, overhead)
  607. })
  608. .collect();
  609. for (b, m, o) in transfered {
  610. bytes += b;
  611. msgs += m;
  612. overhead += o;
  613. }
  614. let total: usize = network_values
  615. .par_iter()
  616. .map(|v| v.gossip.crds.read().unwrap().len())
  617. .sum();
  618. convergance = total as f64 / ((num * num) as f64);
  619. if convergance > max_convergance {
  620. break;
  621. }
  622. trace!(
  623. "network_run_pull_{num}: now: {now} connections: {total} convergance: {convergance} \
  624. bytes: {bytes} msgs: {msgs} overhead: {overhead}"
  625. );
  626. }
  627. (convergance, bytes)
  628. }
  629. fn build_gossip_thread_pool() -> ThreadPool {
  630. ThreadPoolBuilder::new()
  631. .num_threads(get_thread_count().min(2))
  632. .thread_name(|i| format!("gossipTest{i:02}"))
  633. .build()
  634. .unwrap()
  635. }
  636. fn new_ping_cache() -> Mutex<PingCache> {
  637. let ping_cache = PingCache::new(
  638. &mut rand::thread_rng(),
  639. Instant::now(),
  640. Duration::from_secs(20 * 60), // ttl
  641. Duration::from_secs(20 * 60) / 64, // rate_limit_delay
  642. 2048, // capacity
  643. );
  644. Mutex::new(ping_cache)
  645. }
  646. #[test]
  647. #[serial]
  648. fn test_star_network_pull_50() {
  649. let network = star_network_create(50);
  650. let thread_pool = build_gossip_thread_pool();
  651. network_simulator_pull_only(&thread_pool, &network);
  652. }
  653. #[test]
  654. #[serial]
  655. fn test_star_network_pull_100() {
  656. let network = star_network_create(100);
  657. let thread_pool = build_gossip_thread_pool();
  658. network_simulator_pull_only(&thread_pool, &network);
  659. }
  660. #[test]
  661. #[serial]
  662. fn test_star_network_push_star_200() {
  663. let mut network = star_network_create(200);
  664. let thread_pool = build_gossip_thread_pool();
  665. network_simulator(&thread_pool, &mut network, 0.9);
  666. }
  667. #[ignore]
  668. #[test]
  669. fn test_star_network_push_rstar_200() {
  670. let mut network = rstar_network_create(200);
  671. let thread_pool = build_gossip_thread_pool();
  672. network_simulator(&thread_pool, &mut network, 0.9);
  673. }
  674. #[test]
  675. #[serial]
  676. fn test_star_network_push_ring_200() {
  677. let mut network = ring_network_create(200);
  678. let thread_pool = build_gossip_thread_pool();
  679. network_simulator(&thread_pool, &mut network, 0.9);
  680. }
  681. // With the new pruning logic, this test is no longer valid and can be deleted.
  682. // Ignoring it for now until the pruning code is stable.
  683. #[test]
  684. #[ignore]
  685. #[serial]
  686. fn test_connected_staked_network() {
  687. agave_logger::setup();
  688. let thread_pool = build_gossip_thread_pool();
  689. let stakes = [
  690. [1000; 2].to_vec(),
  691. [100; 3].to_vec(),
  692. [10; 5].to_vec(),
  693. [1; 15].to_vec(),
  694. ]
  695. .concat();
  696. let mut network = connected_staked_network_create(&stakes);
  697. network_simulator(&thread_pool, &mut network, 1.0);
  698. let stake_sum: u64 = stakes.iter().sum();
  699. let avg_stake: u64 = stake_sum / stakes.len() as u64;
  700. let avg_stake_pruned = network.stake_pruned / network.connections_pruned.len() as u64;
  701. trace!(
  702. "connected staked networks, connections_pruned: {}, avg_stake: {}, avg_stake_pruned: {}",
  703. network.connections_pruned.len(),
  704. avg_stake,
  705. avg_stake_pruned
  706. );
  707. assert!(
  708. avg_stake_pruned < avg_stake,
  709. "network should prune lower stakes more often"
  710. )
  711. }
  712. #[test]
  713. #[ignore]
  714. fn test_star_network_large_pull() {
  715. agave_logger::setup();
  716. let network = star_network_create(2000);
  717. let thread_pool = build_gossip_thread_pool();
  718. network_simulator_pull_only(&thread_pool, &network);
  719. }
  720. #[test]
  721. #[ignore]
  722. fn test_rstar_network_large_push() {
  723. agave_logger::setup();
  724. let mut network = rstar_network_create(4000);
  725. let thread_pool = build_gossip_thread_pool();
  726. network_simulator(&thread_pool, &mut network, 0.9);
  727. }
  728. #[test]
  729. #[ignore]
  730. fn test_ring_network_large_push() {
  731. agave_logger::setup();
  732. let mut network = ring_network_create(4001);
  733. let thread_pool = build_gossip_thread_pool();
  734. network_simulator(&thread_pool, &mut network, 0.9);
  735. }
  736. #[test]
  737. #[ignore]
  738. fn test_star_network_large_push() {
  739. agave_logger::setup();
  740. let mut network = star_network_create(4002);
  741. let thread_pool = build_gossip_thread_pool();
  742. network_simulator(&thread_pool, &mut network, 0.9);
  743. }
  744. #[test]
  745. fn test_prune_errors() {
  746. let crds_gossip = CrdsGossip::default();
  747. let keypair = Keypair::new();
  748. let id = keypair.pubkey();
  749. let ci = ContactInfo::new_localhost(&Pubkey::from([1; 32]), 0);
  750. let prune_pubkey = Pubkey::from([2; 32]);
  751. crds_gossip
  752. .crds
  753. .write()
  754. .unwrap()
  755. .insert(
  756. CrdsValue::new(CrdsData::from(&ci), &Keypair::new()),
  757. 0,
  758. GossipRoute::LocalMessage,
  759. )
  760. .unwrap();
  761. let ping_cache = new_ping_cache();
  762. crds_gossip.refresh_push_active_set(
  763. &keypair,
  764. 0, // shred version
  765. &HashMap::new(), // stakes
  766. None, // gossip validators
  767. &ping_cache,
  768. &mut Vec::new(), // pings
  769. &SocketAddrSpace::Unspecified,
  770. );
  771. let now = timestamp();
  772. let stakes = HashMap::<Pubkey, u64>::default();
  773. //incorrect dest
  774. let mut res = crds_gossip.process_prune_msg(
  775. &id, // self_pubkey
  776. ci.pubkey(), // peer
  777. &Pubkey::from(hash(&[1; 32]).to_bytes()), // destination
  778. &[prune_pubkey], // origins
  779. now,
  780. now,
  781. &stakes,
  782. );
  783. assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
  784. //correct dest
  785. res = crds_gossip.process_prune_msg(
  786. &id, // self_pubkey
  787. ci.pubkey(), // peer
  788. &id, // destination
  789. &[prune_pubkey], // origins
  790. now,
  791. now,
  792. &stakes,
  793. );
  794. res.unwrap();
  795. //test timeout
  796. let timeout = now + crds_gossip.push.prune_timeout * 2;
  797. res = crds_gossip.process_prune_msg(
  798. &id, // self_pubkey
  799. ci.pubkey(), // peer
  800. &id, // destination
  801. &[prune_pubkey], // origins
  802. now,
  803. timeout,
  804. &stakes,
  805. );
  806. assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
  807. }