vortexor.rs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. use {
  2. crossbeam_channel::unbounded,
  3. log::info,
  4. solana_keypair::Keypair,
  5. solana_local_cluster::{
  6. cluster::ClusterValidatorInfo,
  7. local_cluster::{ClusterConfig, LocalCluster},
  8. },
  9. solana_native_token::LAMPORTS_PER_SOL,
  10. solana_net_utils::{SocketAddrSpace, VALIDATOR_PORT_RANGE},
  11. solana_pubkey::Pubkey,
  12. solana_signer::Signer,
  13. solana_streamer::{
  14. nonblocking::testing_utilities::check_multiple_streams,
  15. quic::{
  16. DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STAKED_CONNECTIONS,
  17. DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_MAX_UNSTAKED_CONNECTIONS,
  18. },
  19. streamer::StakedNodes,
  20. },
  21. solana_vortexor::{
  22. cli::{DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER, DEFAULT_NUM_QUIC_ENDPOINTS},
  23. rpc_load_balancer,
  24. stake_updater::StakeUpdater,
  25. vortexor::Vortexor,
  26. },
  27. std::{
  28. collections::HashMap,
  29. sync::{
  30. atomic::{AtomicBool, Ordering},
  31. Arc, RwLock,
  32. },
  33. time::Duration,
  34. },
  35. tokio_util::sync::CancellationToken,
  36. url::Url,
  37. };
  38. #[tokio::test(flavor = "multi_thread")]
  39. async fn test_vortexor() {
  40. agave_logger::setup();
  41. let bind_address = solana_net_utils::parse_host("127.0.0.1").expect("invalid bind_address");
  42. let keypair = Keypair::new();
  43. let cancel = CancellationToken::new();
  44. let (tpu_sender, tpu_receiver) = unbounded();
  45. let (tpu_fwd_sender, tpu_fwd_receiver) = unbounded();
  46. let tpu_sockets = Vortexor::create_tpu_sockets(
  47. bind_address,
  48. VALIDATOR_PORT_RANGE,
  49. None, // tpu_address
  50. None, // tpu_forward_address
  51. DEFAULT_NUM_QUIC_ENDPOINTS,
  52. );
  53. let tpu_address = tpu_sockets.tpu_quic[0].local_addr().unwrap();
  54. let tpu_fwd_address = tpu_sockets.tpu_quic_fwd[0].local_addr().unwrap();
  55. let stakes = HashMap::from([(keypair.pubkey(), 10000)]);
  56. let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(
  57. Arc::new(stakes),
  58. HashMap::<Pubkey, u64>::default(), // overrides
  59. )));
  60. let vortexor = Vortexor::create_vortexor(
  61. tpu_sockets,
  62. staked_nodes,
  63. tpu_sender,
  64. tpu_fwd_sender,
  65. DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER,
  66. DEFAULT_MAX_STAKED_CONNECTIONS,
  67. DEFAULT_MAX_UNSTAKED_CONNECTIONS,
  68. DEFAULT_MAX_STAKED_CONNECTIONS.saturating_add(DEFAULT_MAX_UNSTAKED_CONNECTIONS), // max_fwd_staked_connections
  69. 0, // max_fwd_unstaked_connections
  70. DEFAULT_MAX_STREAMS_PER_MS,
  71. DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
  72. &keypair,
  73. cancel.clone(),
  74. );
  75. check_multiple_streams(tpu_receiver, tpu_address, Some(&keypair)).await;
  76. check_multiple_streams(tpu_fwd_receiver, tpu_fwd_address, Some(&keypair)).await;
  77. cancel.cancel();
  78. vortexor.join().unwrap();
  79. }
  80. fn get_server_urls(validator: &ClusterValidatorInfo) -> (Url, Url) {
  81. let rpc_addr = validator.info.contact_info.rpc().unwrap();
  82. let rpc_pubsub_addr = validator.info.contact_info.rpc_pubsub().unwrap();
  83. let rpc_url = Url::parse(format!("http://{rpc_addr}").as_str()).unwrap();
  84. let ws_url = Url::parse(format!("ws://{rpc_pubsub_addr}").as_str()).unwrap();
  85. (rpc_url, ws_url)
  86. }
  87. #[test]
  88. fn test_stake_update() {
  89. agave_logger::setup();
  90. // Create a local cluster with 3 validators
  91. let default_node_stake = 10 * LAMPORTS_PER_SOL; // Define a default value for node stake
  92. let mint_lamports = 100 * LAMPORTS_PER_SOL;
  93. let mut config = ClusterConfig::new_with_equal_stakes(3, mint_lamports, default_node_stake);
  94. let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
  95. info!(
  96. "Cluster created with {} validators",
  97. cluster.validators.len()
  98. );
  99. assert_eq!(cluster.validators.len(), 3);
  100. let pubkey = cluster.entry_point_info.pubkey();
  101. let validator = &cluster.validators[pubkey];
  102. let mut servers = vec![get_server_urls(validator)];
  103. // add one more RPC subscription to another validator
  104. for validator in cluster.validators.values() {
  105. if validator.info.keypair.pubkey() != *pubkey {
  106. servers.push(get_server_urls(validator));
  107. break;
  108. }
  109. }
  110. let exit = Arc::new(AtomicBool::new(false));
  111. let (rpc_load_balancer, slot_receiver) =
  112. rpc_load_balancer::RpcLoadBalancer::new(&servers, &exit);
  113. // receive 2 slot updates
  114. let mut i = 0;
  115. let slot_receive_timeout = Duration::from_secs(5); // conservative timeout to ensure stable test
  116. while i < 2 {
  117. let slot = slot_receiver
  118. .recv_timeout(slot_receive_timeout)
  119. .unwrap_or_else(|_| panic!("Expected a slot within {slot_receive_timeout:?}"));
  120. i += 1;
  121. info!("Received a slot update: {slot}");
  122. }
  123. let rpc_load_balancer = Arc::new(rpc_load_balancer);
  124. // Now create a stake updater service
  125. let shared_staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
  126. let staked_nodes_updater_service = StakeUpdater::new(
  127. exit.clone(),
  128. rpc_load_balancer.clone(),
  129. shared_staked_nodes.clone(),
  130. Duration::from_millis(100), // short sleep to speed up the test for service exit
  131. );
  132. // Waiting for the stake map to be populated by the stake updater service
  133. let start_of_stake_updater = std::time::Instant::now();
  134. let stake_updater_timeout = Duration::from_secs(10); // conservative timeout to ensure stable test
  135. loop {
  136. let stakes = shared_staked_nodes.read().unwrap();
  137. if let Some(stake) = stakes.get_node_stake(pubkey) {
  138. info!("Stake for {pubkey}: {stake}");
  139. assert_eq!(stake, default_node_stake);
  140. let total_stake = stakes.total_stake();
  141. info!("total_stake: {total_stake}");
  142. assert!(total_stake >= default_node_stake);
  143. break;
  144. }
  145. info!("Waiting for stake map to be populated for {pubkey:?}...");
  146. drop(stakes); // Drop the read lock before sleeping so the writer side can proceed
  147. std::thread::sleep(std::time::Duration::from_millis(100));
  148. if start_of_stake_updater.elapsed() > stake_updater_timeout {
  149. panic!("Timeout waiting for stake map to be populated");
  150. }
  151. }
  152. info!("Test done, exiting stake updater service");
  153. exit.store(true, Ordering::Relaxed);
  154. staked_nodes_updater_service.join().unwrap();
  155. info!("Stake updater service exited successfully, shutting down cluster");
  156. cluster.exit();
  157. info!("Cluster exited successfully");
  158. }