gossip.rs 13 KB


  1. #![allow(clippy::arithmetic_side_effects)]
  2. #[macro_use]
  3. extern crate log;
  4. use {
  5. rayon::iter::*,
  6. solana_gossip::{
  7. cluster_info::ClusterInfo,
  8. contact_info::{ContactInfo, Protocol},
  9. crds::Cursor,
  10. gossip_service::GossipService,
  11. node::Node,
  12. },
  13. solana_hash::Hash,
  14. solana_keypair::Keypair,
  15. solana_perf::packet::Packet,
  16. solana_pubkey::Pubkey,
  17. solana_runtime::bank_forks::BankForks,
  18. solana_signer::Signer,
  19. solana_streamer::{
  20. sendmmsg::{multi_target_send, SendPktsError},
  21. socket::SocketAddrSpace,
  22. },
  23. solana_time_utils::timestamp,
  24. solana_transaction::Transaction,
  25. solana_vote_program::{vote_instruction, vote_state::Vote},
  26. std::{
  27. net::UdpSocket,
  28. sync::{
  29. atomic::{AtomicBool, Ordering},
  30. Arc, RwLock,
  31. },
  32. thread::sleep,
  33. time::Duration,
  34. },
  35. };
  36. fn test_node(exit: Arc<AtomicBool>) -> (Arc<ClusterInfo>, GossipService, UdpSocket) {
  37. let keypair = Arc::new(Keypair::new());
  38. let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey());
  39. let cluster_info = Arc::new(ClusterInfo::new(
  40. test_node.info.clone(),
  41. keypair,
  42. SocketAddrSpace::Unspecified,
  43. ));
  44. let gossip_service = GossipService::new(
  45. &cluster_info,
  46. None,
  47. test_node.sockets.gossip,
  48. None,
  49. true, // should_check_duplicate_instance
  50. None,
  51. exit,
  52. );
  53. let _ = cluster_info.my_contact_info();
  54. (
  55. cluster_info,
  56. gossip_service,
  57. test_node.sockets.tvu.pop().unwrap(),
  58. )
  59. }
  60. fn test_node_with_bank(
  61. node_keypair: Arc<Keypair>,
  62. exit: Arc<AtomicBool>,
  63. bank_forks: Arc<RwLock<BankForks>>,
  64. ) -> (Arc<ClusterInfo>, GossipService, UdpSocket) {
  65. let mut test_node = Node::new_localhost_with_pubkey(&node_keypair.pubkey());
  66. let cluster_info = Arc::new(ClusterInfo::new(
  67. test_node.info.clone(),
  68. node_keypair,
  69. SocketAddrSpace::Unspecified,
  70. ));
  71. let gossip_service = GossipService::new(
  72. &cluster_info,
  73. Some(bank_forks),
  74. test_node.sockets.gossip,
  75. None,
  76. true, // should_check_duplicate_instance
  77. None,
  78. exit,
  79. );
  80. let _ = cluster_info.my_contact_info();
  81. (
  82. cluster_info,
  83. gossip_service,
  84. test_node.sockets.tvu.pop().unwrap(),
  85. )
  86. }
  87. /// Test that the network converges.
  88. /// Run until every node in the network has a full ContactInfo set.
  89. /// Check that nodes stop sending updates after all the ContactInfo has been shared.
  90. /// tests that actually use this function are below
  91. fn run_gossip_topo<F>(num: usize, topo: F)
  92. where
  93. F: Fn(&Vec<(Arc<ClusterInfo>, GossipService, UdpSocket)>),
  94. {
  95. let exit = Arc::new(AtomicBool::new(false));
  96. let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect();
  97. topo(&listen);
  98. let mut done = false;
  99. for i in 0..(num * 32) {
  100. let total: usize = listen.iter().map(|v| v.0.gossip_peers().len()).sum();
  101. if (total + num) * 10 > num * num * 9 {
  102. done = true;
  103. break;
  104. } else {
  105. trace!("not converged {} {} {}", i, total + num, num * num);
  106. }
  107. sleep(Duration::from_secs(1));
  108. }
  109. exit.store(true, Ordering::Relaxed);
  110. for (_, dr, _) in listen {
  111. dr.join().unwrap();
  112. }
  113. assert!(done);
  114. }
  115. /// retransmit messages to a list of nodes
  116. fn retransmit_to(
  117. peers: &[&ContactInfo],
  118. data: &[u8],
  119. socket: &UdpSocket,
  120. forwarded: bool,
  121. socket_addr_space: &SocketAddrSpace,
  122. ) {
  123. trace!("retransmit orders {}", peers.len());
  124. let dests: Vec<_> = if forwarded {
  125. peers
  126. .iter()
  127. .filter_map(|peer| peer.tvu(Protocol::UDP))
  128. .filter(|addr| socket_addr_space.check(addr))
  129. .collect()
  130. } else {
  131. peers
  132. .iter()
  133. .filter_map(|peer| peer.tvu(Protocol::UDP))
  134. .filter(|addr| socket_addr_space.check(addr))
  135. .collect()
  136. };
  137. match multi_target_send(socket, data, &dests) {
  138. Ok(()) => (),
  139. Err(SendPktsError::IoError(ioerr, num_failed)) => {
  140. error!(
  141. "retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
  142. ioerr,
  143. num_failed,
  144. dests.len(),
  145. );
  146. }
  147. }
  148. }
  149. /// ring a -> b -> c -> d -> e -> a
  150. #[test]
  151. fn gossip_ring() {
  152. agave_logger::setup();
  153. run_gossip_topo(40, |listen| {
  154. let num = listen.len();
  155. for n in 0..num {
  156. let y = n % listen.len();
  157. let x = (n + 1) % listen.len();
  158. let yv = &listen[y].0;
  159. let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap();
  160. d.set_wallclock(timestamp());
  161. listen[x].0.insert_info(d);
  162. }
  163. });
  164. }
  165. /// ring a -> b -> c -> d -> e -> a
  166. #[test]
  167. #[ignore]
  168. fn gossip_ring_large() {
  169. agave_logger::setup();
  170. run_gossip_topo(600, |listen| {
  171. let num = listen.len();
  172. for n in 0..num {
  173. let y = n % listen.len();
  174. let x = (n + 1) % listen.len();
  175. let yv = &listen[y].0;
  176. let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap();
  177. d.set_wallclock(timestamp());
  178. listen[x].0.insert_info(d);
  179. }
  180. });
  181. }
  182. /// star a -> (b,c,d,e)
  183. #[test]
  184. fn gossip_star() {
  185. agave_logger::setup();
  186. run_gossip_topo(10, |listen| {
  187. let num = listen.len();
  188. for n in 0..(num - 1) {
  189. let x = 0;
  190. let y = (n + 1) % listen.len();
  191. let yv = &listen[y].0;
  192. let mut yd = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap();
  193. yd.set_wallclock(timestamp());
  194. let xv = &listen[x].0;
  195. xv.insert_info(yd);
  196. trace!("star leader {}", &xv.id());
  197. }
  198. });
  199. }
  200. /// rstar a <- (b,c,d,e)
  201. #[test]
  202. fn gossip_rstar() {
  203. agave_logger::setup();
  204. run_gossip_topo(10, |listen| {
  205. let num = listen.len();
  206. let xd = {
  207. let xv = &listen[0].0;
  208. xv.lookup_contact_info(&xv.id(), |ci| ci.clone()).unwrap()
  209. };
  210. trace!("rstar leader {}", xd.pubkey());
  211. for n in 0..(num - 1) {
  212. let y = (n + 1) % listen.len();
  213. let yv = &listen[y].0;
  214. yv.insert_info(xd.clone());
  215. trace!("rstar insert {} into {}", xd.pubkey(), yv.id());
  216. }
  217. });
  218. }
  219. #[test]
  220. pub fn cluster_info_retransmit() {
  221. agave_logger::setup();
  222. let exit = Arc::new(AtomicBool::new(false));
  223. trace!("c1:");
  224. let (c1, dr1, tn1) = test_node(exit.clone());
  225. trace!("c2:");
  226. let (c2, dr2, tn2) = test_node(exit.clone());
  227. trace!("c3:");
  228. let (c3, dr3, tn3) = test_node(exit.clone());
  229. let c1_contact_info = c1.my_contact_info();
  230. c2.insert_info(c1_contact_info.clone());
  231. c3.insert_info(c1_contact_info);
  232. let num = 3;
  233. //wait to converge
  234. trace!("waiting to converge:");
  235. let mut done = false;
  236. for _ in 0..30 {
  237. done = c1.gossip_peers().len() == num - 1
  238. && c2.gossip_peers().len() == num - 1
  239. && c3.gossip_peers().len() == num - 1;
  240. if done {
  241. break;
  242. }
  243. sleep(Duration::from_secs(1));
  244. }
  245. assert!(done);
  246. let mut p = Packet::default();
  247. p.meta_mut().size = 10;
  248. let peers = c1.tvu_peers(ContactInfo::clone);
  249. let retransmit_peers: Vec<_> = peers.iter().collect();
  250. retransmit_to(
  251. &retransmit_peers,
  252. p.data(..).unwrap(),
  253. &tn1,
  254. false,
  255. &SocketAddrSpace::Unspecified,
  256. );
  257. let res: Vec<_> = [tn1, tn2, tn3]
  258. .into_par_iter()
  259. .map(|s| {
  260. let mut p = Packet::default();
  261. s.set_read_timeout(Some(Duration::from_secs(1))).unwrap();
  262. let res = s.recv_from(p.buffer_mut());
  263. res.is_err() //true if failed to receive the retransmit packet
  264. })
  265. .collect();
  266. //true if failed receive the retransmit packet, r2, and r3 should succeed
  267. //r1 was the sender, so it should fail to receive the packet
  268. assert_eq!(res, [true, false, false]);
  269. exit.store(true, Ordering::Relaxed);
  270. dr1.join().unwrap();
  271. dr2.join().unwrap();
  272. dr3.join().unwrap();
  273. }
  274. #[test]
  275. #[ignore]
  276. pub fn cluster_info_scale() {
  277. use {
  278. solana_measure::measure::Measure,
  279. solana_perf::test_tx::test_tx,
  280. solana_runtime::{
  281. bank::Bank,
  282. genesis_utils::{create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs},
  283. },
  284. };
  285. agave_logger::setup();
  286. let exit = Arc::new(AtomicBool::new(false));
  287. let num_nodes: usize = std::env::var("NUM_NODES")
  288. .unwrap_or_else(|_| "10".to_string())
  289. .parse()
  290. .expect("could not parse NUM_NODES as a number");
  291. let vote_keypairs: Vec<_> = (0..num_nodes)
  292. .map(|_| ValidatorVoteKeypairs::new_rand())
  293. .collect();
  294. let genesis_config_info = create_genesis_config_with_vote_accounts(
  295. 10_000,
  296. &vote_keypairs,
  297. vec![100; vote_keypairs.len()],
  298. );
  299. let bank0 = Bank::new_for_tests(&genesis_config_info.genesis_config);
  300. let bank_forks = BankForks::new_rw_arc(bank0);
  301. let nodes: Vec<_> = vote_keypairs
  302. .into_iter()
  303. .map(|keypairs| {
  304. test_node_with_bank(
  305. Arc::new(keypairs.node_keypair),
  306. exit.clone(),
  307. bank_forks.clone(),
  308. )
  309. })
  310. .collect();
  311. let ci0 = nodes[0].0.my_contact_info();
  312. for node in &nodes[1..] {
  313. node.0.insert_info(ci0.clone());
  314. }
  315. let mut time = Measure::start("time");
  316. let mut done;
  317. let mut success = false;
  318. for _ in 0..30 {
  319. done = true;
  320. for (i, node) in nodes.iter().enumerate() {
  321. warn!("node {} peers: {}", i, node.0.gossip_peers().len());
  322. if node.0.gossip_peers().len() != num_nodes - 1 {
  323. done = false;
  324. break;
  325. }
  326. }
  327. if done {
  328. success = true;
  329. break;
  330. }
  331. sleep(Duration::from_secs(1));
  332. }
  333. time.stop();
  334. warn!("found {num_nodes} nodes in {time} success: {success}");
  335. for num_votes in 1..1000 {
  336. let mut time = Measure::start("votes");
  337. let tx = test_tx();
  338. warn!("tx.message.account_keys: {:?}", tx.message.account_keys);
  339. let vote = Vote::new(
  340. vec![1, 3, num_votes + 5], // slots
  341. Hash::default(),
  342. );
  343. let ix = vote_instruction::vote(
  344. &Pubkey::new_unique(), // vote_pubkey
  345. &Pubkey::new_unique(), // authorized_voter_pubkey
  346. vote,
  347. );
  348. let tx = Transaction::new_with_payer(
  349. &[ix], // instructions
  350. None, // payer
  351. );
  352. let tower = vec![num_votes + 5];
  353. nodes[0].0.push_vote(&tower, tx.clone());
  354. let mut success = false;
  355. for _ in 0..(30 * 5) {
  356. let mut not_done = 0;
  357. let mut num_old = 0;
  358. let mut num_push_total = 0;
  359. let mut num_pushes = 0;
  360. let mut num_pulls = 0;
  361. for (node, _, _) in nodes.iter() {
  362. //if node.0.get_votes(0).1.len() != (num_nodes * num_votes) {
  363. let has_tx = node
  364. .get_votes(&mut Cursor::default())
  365. .iter()
  366. .filter(|v| v.message.account_keys == tx.message.account_keys)
  367. .count();
  368. num_old += node.gossip.push.num_old.load(Ordering::Relaxed);
  369. num_push_total += node.gossip.push.num_total.load(Ordering::Relaxed);
  370. num_pushes += node.gossip.push.num_pushes.load(Ordering::Relaxed);
  371. num_pulls += node.gossip.pull.num_pulls.load(Ordering::Relaxed);
  372. if has_tx == 0 {
  373. not_done += 1;
  374. }
  375. }
  376. warn!("not_done: {}/{}", not_done, nodes.len());
  377. warn!("num_old: {num_old}");
  378. warn!("num_push_total: {num_push_total}");
  379. warn!("num_pushes: {num_pushes}");
  380. warn!("num_pulls: {num_pulls}");
  381. success = not_done < (nodes.len() / 20);
  382. if success {
  383. break;
  384. }
  385. sleep(Duration::from_millis(200));
  386. }
  387. time.stop();
  388. warn!("propagated vote {num_votes} in {time} success: {success}");
  389. sleep(Duration::from_millis(200));
  390. for (node, _, _) in nodes.iter() {
  391. node.gossip.push.num_old.store(0, Ordering::Relaxed);
  392. node.gossip.push.num_total.store(0, Ordering::Relaxed);
  393. node.gossip.push.num_pushes.store(0, Ordering::Relaxed);
  394. node.gossip.pull.num_pulls.store(0, Ordering::Relaxed);
  395. }
  396. }
  397. exit.store(true, Ordering::Relaxed);
  398. for node in nodes {
  399. node.1.join().unwrap();
  400. }
  401. }