connection_workers_scheduler_test.rs 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985
  1. #[allow(deprecated)]
  2. // Reason: This deprecated function internally creates a
  3. // PinnedLeaderUpdater. This structure we want to move to tests as soon as
  4. // we can remove create_leader_updater function.
  5. use solana_tpu_client_next::leader_updater::create_leader_updater;
  6. use {
  7. crossbeam_channel::Receiver as CrossbeamReceiver,
  8. futures::future::BoxFuture,
  9. solana_cli_config::ConfigInput,
  10. solana_commitment_config::CommitmentConfig,
  11. solana_keypair::Keypair,
  12. solana_net_utils::sockets::{
  13. bind_to, localhost_port_range_for_tests, unique_port_range_for_tests,
  14. },
  15. solana_pubkey::Pubkey,
  16. solana_rpc_client::nonblocking::rpc_client::RpcClient,
  17. solana_signer::Signer,
  18. solana_streamer::{
  19. nonblocking::{
  20. swqos::SwQosConfig,
  21. testing_utilities::{make_client_endpoint, setup_quic_server, SpawnTestServerResult},
  22. },
  23. packet::PacketBatch,
  24. quic::QuicStreamerConfig,
  25. streamer::StakedNodes,
  26. },
  27. solana_tpu_client_next::{
  28. connection_workers_scheduler::{
  29. BindTarget, ConnectionWorkersSchedulerConfig, Fanout, NonblockingBroadcaster,
  30. StakeIdentity,
  31. },
  32. send_transaction_stats::SendTransactionStatsNonAtomic,
  33. transaction_batch::TransactionBatch,
  34. ClientBuilder, ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
  35. SendTransactionStats,
  36. },
  37. std::{
  38. collections::HashMap,
  39. net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
  40. num::Saturating,
  41. sync::{
  42. atomic::{AtomicU64, Ordering},
  43. Arc,
  44. },
  45. time::Duration,
  46. },
  47. tokio::{
  48. sync::{
  49. mpsc::{channel, Receiver},
  50. oneshot, watch,
  51. },
  52. task::JoinHandle,
  53. time::{interval, sleep, Instant},
  54. },
  55. tokio_util::sync::CancellationToken,
  56. };
  57. fn test_config(stake_identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
  58. let address = SocketAddr::new(
  59. IpAddr::V4(Ipv4Addr::LOCALHOST),
  60. unique_port_range_for_tests(1).start,
  61. );
  62. ConnectionWorkersSchedulerConfig {
  63. bind: BindTarget::Address(address),
  64. stake_identity: stake_identity.map(|identity| StakeIdentity::new(&identity)),
  65. num_connections: 1,
  66. skip_check_transaction_age: false,
  67. // At the moment we have only one strategy to send transactions: we try
  68. // to put to worker channel transaction batch and in case of failure
  69. // just drop it. This requires to use large channels here. In the
  70. // future, we are planning to add an option to send with backpressure at
  71. // the speed of fastest leader.
  72. worker_channel_size: 100,
  73. max_reconnect_attempts: 4,
  74. leaders_fanout: Fanout {
  75. send: 1,
  76. connect: 1,
  77. },
  78. }
  79. }
  80. async fn setup_connection_worker_scheduler(
  81. tpu_address: SocketAddr,
  82. transaction_receiver: Receiver<TransactionBatch>,
  83. stake_identity: Option<Keypair>,
  84. ) -> (
  85. JoinHandle<Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError>>,
  86. watch::Sender<Option<StakeIdentity>>,
  87. CancellationToken,
  88. ) {
  89. let json_rpc_url = "http://127.0.0.1:8899";
  90. let (_, websocket_url) = ConfigInput::compute_websocket_url_setting("", "", json_rpc_url, "");
  91. let rpc_client = Arc::new(RpcClient::new_with_commitment(
  92. json_rpc_url.to_string(),
  93. CommitmentConfig::confirmed(),
  94. ));
  95. let config = test_config(stake_identity);
  96. // Setup sending txs
  97. let cancel = CancellationToken::new();
  98. #[allow(deprecated)]
  99. let leader_updater = create_leader_updater(rpc_client, websocket_url, Some(tpu_address))
  100. .await
  101. .expect("Leader updates was successfully created");
  102. let (update_identity_sender, update_identity_receiver) = watch::channel(None);
  103. let scheduler = ConnectionWorkersScheduler::new(
  104. leader_updater,
  105. transaction_receiver,
  106. update_identity_receiver,
  107. cancel.clone(),
  108. );
  109. let scheduler = tokio::spawn(scheduler.run(config));
  110. (scheduler, update_identity_sender, cancel)
  111. }
  112. async fn join_scheduler(
  113. scheduler_handle: JoinHandle<
  114. Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError>,
  115. >,
  116. ) -> SendTransactionStatsNonAtomic {
  117. let scheduler_stats = scheduler_handle
  118. .await
  119. .unwrap()
  120. .expect("Scheduler should stop successfully.");
  121. scheduler_stats.read_and_reset()
  122. }
  123. // Specify the pessimistic time to finish generation and result checks.
  124. const TEST_MAX_TIME: Duration = Duration::from_millis(2500);
  125. struct SpawnTxGenerator {
  126. tx_receiver: Receiver<TransactionBatch>,
  127. tx_sender_shutdown: BoxFuture<'static, ()>,
  128. tx_sender_done: oneshot::Receiver<()>,
  129. }
  130. /// Generates `num_tx_batches` batches of transactions, each holding a single transaction of
  131. /// `tx_size` bytes.
  132. ///
  133. /// It will not close the returned `tx_receiver` until `tx_sender_shutdown` is invoked. Otherwise,
  134. /// there is a race condition, that exists between the last transaction being scheduled for delivery
  135. /// and the server connection being closed.
  136. fn spawn_tx_sender(
  137. tx_size: usize,
  138. num_tx_batches: usize,
  139. time_per_tx: Duration,
  140. ) -> SpawnTxGenerator {
  141. let num_tx_batches: u32 = num_tx_batches
  142. .try_into()
  143. .expect("`num_tx_batches` fits into u32 for all the tests");
  144. let (tx_sender, tx_receiver) = channel(1);
  145. let cancel = CancellationToken::new();
  146. let (done_sender, tx_sender_done) = oneshot::channel();
  147. let sender = tokio::spawn({
  148. let start = Instant::now();
  149. let tx_sender = tx_sender.clone();
  150. let main_loop = async move {
  151. for i in 0..num_tx_batches {
  152. let txs = vec![vec![i as u8; tx_size]; 1];
  153. tx_sender
  154. .send(TransactionBatch::new(txs))
  155. .await
  156. .expect("Receiver should not close their side");
  157. // Pretend the client runs at the specified TPS.
  158. let sleep_time = time_per_tx
  159. .saturating_mul(i)
  160. .saturating_sub(start.elapsed());
  161. if !sleep_time.is_zero() {
  162. sleep(sleep_time).await;
  163. }
  164. }
  165. // It is OK if the receiver has disconnected.
  166. let _ = done_sender.send(());
  167. };
  168. let cancel = cancel.clone();
  169. async move {
  170. tokio::select! {
  171. () = main_loop => (),
  172. () = cancel.cancelled() => (),
  173. }
  174. }
  175. });
  176. let tx_sender_shutdown = Box::pin(async move {
  177. cancel.cancel();
  178. // This makes sure that the sender exists up until the shutdown is invoked.
  179. drop(tx_sender);
  180. sender.await.unwrap();
  181. });
  182. SpawnTxGenerator {
  183. tx_receiver,
  184. tx_sender_shutdown,
  185. tx_sender_done,
  186. }
  187. }
  188. #[tokio::test]
  189. async fn test_basic_transactions_sending() {
  190. let SpawnTestServerResult {
  191. join_handle: server_handle,
  192. receiver,
  193. server_address,
  194. stats: _stats,
  195. cancel,
  196. } = setup_quic_server(
  197. None,
  198. QuicStreamerConfig::default_for_tests(),
  199. SwQosConfig::default(),
  200. );
  201. // Setup sending txs
  202. let tx_size = 1;
  203. let expected_num_txs: usize = 100;
  204. // Pretend that we are running at ~100 TPS.
  205. let SpawnTxGenerator {
  206. tx_receiver,
  207. tx_sender_shutdown,
  208. ..
  209. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(10));
  210. let (scheduler_handle, update_identity_sender, _scheduler_cancel) =
  211. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  212. // dropping sender will not lead to stop the scheduler.
  213. drop(update_identity_sender);
  214. // Check results
  215. let mut received_data = Vec::with_capacity(expected_num_txs);
  216. let now = Instant::now();
  217. let mut actual_num_packets = 0;
  218. while actual_num_packets < expected_num_txs {
  219. {
  220. let elapsed = now.elapsed();
  221. assert!(
  222. elapsed < TEST_MAX_TIME,
  223. "Failed to send {expected_num_txs} transaction in {elapsed:?}. Only sent \
  224. {actual_num_packets}",
  225. );
  226. }
  227. let Ok(packets) = receiver.try_recv() else {
  228. sleep(Duration::from_millis(10)).await;
  229. continue;
  230. };
  231. actual_num_packets += packets.len();
  232. for p in packets.iter() {
  233. let packet_id = p.data(0).expect("Data should not be lost by server.");
  234. received_data.push(*packet_id);
  235. assert_eq!(p.meta().size, 1);
  236. }
  237. }
  238. received_data.sort_unstable();
  239. for i in 1..received_data.len() {
  240. assert_eq!(received_data[i - 1] + 1, received_data[i]);
  241. }
  242. // Stop sending
  243. tx_sender_shutdown.await;
  244. let stats = join_scheduler(scheduler_handle).await;
  245. assert_eq!(stats.successfully_sent, expected_num_txs as u64,);
  246. // Stop server
  247. cancel.cancel();
  248. server_handle.await.unwrap();
  249. }
  250. async fn count_received_packets_for(
  251. receiver: CrossbeamReceiver<PacketBatch>,
  252. expected_tx_size: usize,
  253. receive_duration: Duration,
  254. ) -> usize {
  255. let now = Instant::now();
  256. let mut num_packets_received = Saturating(0usize);
  257. while now.elapsed() < receive_duration {
  258. if let Ok(packets) = receiver.try_recv() {
  259. num_packets_received += packets.len();
  260. for p in packets.iter() {
  261. assert_eq!(p.meta().size, expected_tx_size);
  262. }
  263. } else {
  264. sleep(Duration::from_millis(100)).await;
  265. }
  266. }
  267. num_packets_received.0
  268. }
  269. // Check that client can create connection even if the first several attempts were unsuccessful.
  270. #[tokio::test]
  271. async fn test_connection_denied_until_allowed() {
  272. let SpawnTestServerResult {
  273. join_handle: server_handle,
  274. receiver,
  275. server_address,
  276. stats: _stats,
  277. cancel,
  278. } = setup_quic_server(
  279. None,
  280. QuicStreamerConfig::default_for_tests(),
  281. SwQosConfig {
  282. // To prevent server from accepting a new connection, we
  283. // set max_connections_per_peer == 1
  284. max_connections_per_unstaked_peer: 1,
  285. ..Default::default()
  286. },
  287. );
  288. // If we create a blocking connection and try to create connections to send TXs,
  289. // the new connections will be immediately closed.
  290. // Since client is not retrying sending failed transactions, this leads to the packets loss.
  291. let blocking_connection = make_client_endpoint(&server_address, None).await;
  292. let time_per_tx = Duration::from_millis(100);
  293. // Setup sending txs
  294. let tx_size = 1;
  295. let num_tx_batches: usize = 30;
  296. let SpawnTxGenerator {
  297. tx_receiver,
  298. tx_sender_shutdown,
  299. ..
  300. } = spawn_tx_sender(tx_size, num_tx_batches, time_per_tx);
  301. let (scheduler_handle, _update_identity_sender, _scheduler_cancel) =
  302. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  303. // Check results
  304. let received_num_packets = count_received_packets_for(
  305. receiver.clone(),
  306. tx_size,
  307. time_per_tx * num_tx_batches as u32 / 2,
  308. )
  309. .await;
  310. assert_eq!(
  311. received_num_packets, 0,
  312. "Expected to lose all packets, got {received_num_packets} out of {num_tx_batches} sent"
  313. );
  314. // drop blocking connection, allow packets to get in now
  315. drop(blocking_connection);
  316. let received_num_packets =
  317. count_received_packets_for(receiver, tx_size, time_per_tx * num_tx_batches as u32 / 2)
  318. .await;
  319. assert!(
  320. received_num_packets > 0,
  321. "Expected to get some packets, got {received_num_packets} out of {num_tx_batches} sent"
  322. );
  323. // Wait for the exchange to finish.
  324. tx_sender_shutdown.await;
  325. let stats = join_scheduler(scheduler_handle).await;
  326. // Expect at least some errors.
  327. assert!(
  328. stats.connection_error_application_closed > 0,
  329. "Expected at least 1 connection error, connection_error_application_closed: {}",
  330. stats.connection_error_application_closed
  331. );
  332. // Exit server
  333. cancel.cancel();
  334. server_handle.await.unwrap();
  335. }
  336. // Check that if the client connection has been pruned, client manages to
  337. // reestablish it. With more packets, we can observe the impact of pruning
  338. // even with proactive detection.
  339. #[tokio::test]
  340. async fn test_connection_pruned_and_reopened() {
  341. let SpawnTestServerResult {
  342. join_handle: server_handle,
  343. receiver,
  344. server_address,
  345. stats: _stats,
  346. cancel,
  347. } = setup_quic_server(
  348. None,
  349. QuicStreamerConfig {
  350. ..QuicStreamerConfig::default_for_tests()
  351. },
  352. SwQosConfig {
  353. max_connections_per_unstaked_peer: 100,
  354. max_unstaked_connections: 1,
  355. ..Default::default()
  356. },
  357. );
  358. // Setup sending txs
  359. let tx_size = 1;
  360. let expected_num_txs: usize = 48;
  361. let SpawnTxGenerator {
  362. tx_receiver,
  363. tx_sender_shutdown,
  364. ..
  365. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
  366. let (scheduler_handle, _update_identity_sender, _scheduler_cancel) =
  367. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  368. sleep(Duration::from_millis(400)).await;
  369. let _connection_to_prune_client = make_client_endpoint(&server_address, None).await;
  370. // Check results
  371. let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
  372. assert!(actual_num_packets < expected_num_txs);
  373. // Wait for the exchange to finish.
  374. tx_sender_shutdown.await;
  375. let stats = join_scheduler(scheduler_handle).await;
  376. // Proactive detection catches pruning immediately, expect multiple retries.
  377. assert!(
  378. stats.connection_error_application_closed + stats.write_error_connection_lost >= 1,
  379. "Expected at least 1 connection error from pruning and retries. Stats: {stats:?}"
  380. );
  381. // Exit server
  382. cancel.cancel();
  383. server_handle.await.unwrap();
  384. }
  385. /// Check that client creates staked connection. To do that prohibit unstaked
  386. /// connection and verify that all the txs has been received.
  387. #[tokio::test]
  388. async fn test_staked_connection() {
  389. let stake_identity = Keypair::new();
  390. let stakes = HashMap::from([(stake_identity.pubkey(), 100_000)]);
  391. let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::<Pubkey, u64>::default());
  392. let SpawnTestServerResult {
  393. join_handle: server_handle,
  394. receiver,
  395. server_address,
  396. stats: _stats,
  397. cancel,
  398. } = setup_quic_server(
  399. Some(staked_nodes),
  400. QuicStreamerConfig {
  401. ..QuicStreamerConfig::default()
  402. },
  403. SwQosConfig {
  404. // Must use at least the number of endpoints (10) because
  405. // `max_staked_connections` and `max_unstaked_connections` are
  406. // cumulative for all the endpoints.
  407. max_staked_connections: 10,
  408. max_unstaked_connections: 0,
  409. ..Default::default()
  410. },
  411. );
  412. // Setup sending txs
  413. let tx_size = 1;
  414. let expected_num_txs: usize = 10;
  415. let SpawnTxGenerator {
  416. tx_receiver,
  417. tx_sender_shutdown,
  418. ..
  419. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
  420. let (scheduler_handle, _update_certificate_sender, _scheduler_cancel) =
  421. setup_connection_worker_scheduler(server_address, tx_receiver, Some(stake_identity)).await;
  422. // Check results
  423. let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
  424. assert_eq!(actual_num_packets, expected_num_txs);
  425. // Wait for the exchange to finish.
  426. tx_sender_shutdown.await;
  427. let stats = join_scheduler(scheduler_handle).await;
  428. assert_eq!(
  429. stats,
  430. SendTransactionStatsNonAtomic {
  431. successfully_sent: expected_num_txs as u64,
  432. ..Default::default()
  433. }
  434. );
  435. // Exit server
  436. cancel.cancel();
  437. server_handle.await.unwrap();
  438. }
  439. // Check that if client sends transactions at a reasonably high rate that is
  440. // higher than what the server accepts, nevertheless all the transactions are
  441. // delivered and there are no errors on the client side.
  442. #[tokio::test]
  443. async fn test_connection_throttling() {
  444. let SpawnTestServerResult {
  445. join_handle: server_handle,
  446. receiver,
  447. server_address,
  448. stats: _stats,
  449. cancel,
  450. } = setup_quic_server(
  451. None,
  452. QuicStreamerConfig::default_for_tests(),
  453. SwQosConfig::default(),
  454. );
  455. // Setup sending txs
  456. let tx_size = 1;
  457. let expected_num_txs: usize = 50;
  458. // Send at 1000 TPS - x10 more than the throttling interval of 10ms used in other tests allows.
  459. let SpawnTxGenerator {
  460. tx_receiver,
  461. tx_sender_shutdown,
  462. ..
  463. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1));
  464. let (scheduler_handle, _update_certificate_sender, _scheduler_cancel) =
  465. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  466. // Check results
  467. let actual_num_packets =
  468. count_received_packets_for(receiver, tx_size, Duration::from_secs(1)).await;
  469. assert_eq!(actual_num_packets, expected_num_txs);
  470. // Stop sending
  471. tx_sender_shutdown.await;
  472. let stats = join_scheduler(scheduler_handle).await;
  473. assert_eq!(
  474. stats,
  475. SendTransactionStatsNonAtomic {
  476. successfully_sent: expected_num_txs as u64,
  477. ..Default::default()
  478. }
  479. );
  480. // Exit server
  481. cancel.cancel();
  482. server_handle.await.unwrap();
  483. }
  484. // Check that when the host cannot be reached, the client exits gracefully.
  485. #[tokio::test]
  486. async fn test_no_host() {
  487. // A "black hole" address for the TPU.
  488. let server_ip = IpAddr::V6(Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 1));
  489. let server_address = SocketAddr::new(server_ip, 49151);
  490. // Setup sending side.
  491. let tx_size = 1;
  492. let max_send_attempts: usize = 10;
  493. let SpawnTxGenerator {
  494. tx_receiver,
  495. tx_sender_shutdown,
  496. tx_sender_done,
  497. ..
  498. } = spawn_tx_sender(tx_size, max_send_attempts, Duration::from_millis(10));
  499. let (scheduler_handle, _update_certificate_sender, _scheduler_cancel) =
  500. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  501. // Wait for all the transactions to be sent, and some extra time for the delivery to be
  502. // attempted.
  503. tx_sender_done.await.unwrap();
  504. sleep(Duration::from_millis(100)).await;
  505. // Wait for the generator to finish.
  506. tx_sender_shutdown.await;
  507. // For each transaction, we will check if worker exists and active. In this
  508. // case, worker will never be active because when failed creating
  509. // connection, we stop it. So scheduler will `max_send_attempts` try to
  510. // create worker and fail each time.
  511. let stats = join_scheduler(scheduler_handle).await;
  512. assert_eq!(
  513. stats.connect_error_invalid_remote_address,
  514. max_send_attempts as u64
  515. );
  516. }
  517. // Check that when the client is rate-limited by server, we update counters
  518. // accordingly. To implement it we:
  519. // * set the connection limit per minute to 1
  520. // * create a dummy connection to reach the limit and immediately close it
  521. // * set up client which will try to create a new connection which it will be
  522. // rate-limited. This test doesn't check what happens when the rate-limiting
  523. // period ends because it too long for test (1min).
  524. #[tokio::test]
  525. async fn test_rate_limiting() {
  526. let SpawnTestServerResult {
  527. join_handle: server_handle,
  528. receiver,
  529. server_address,
  530. stats: _stats,
  531. cancel,
  532. } = setup_quic_server(
  533. None,
  534. QuicStreamerConfig {
  535. max_connections_per_ipaddr_per_min: 1,
  536. ..QuicStreamerConfig::default_for_tests()
  537. },
  538. SwQosConfig {
  539. max_connections_per_unstaked_peer: 100,
  540. ..Default::default()
  541. },
  542. );
  543. // open a connection to consume the limit
  544. let connection_to_reach_limit = make_client_endpoint(&server_address, None).await;
  545. drop(connection_to_reach_limit);
  546. // Setup sending txs which are full packets in size
  547. let tx_size = 1024;
  548. let expected_num_txs: usize = 16;
  549. let SpawnTxGenerator {
  550. tx_receiver,
  551. tx_sender_shutdown,
  552. ..
  553. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
  554. let (scheduler_handle, _update_certificate_sender, scheduler_cancel) =
  555. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  556. let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
  557. assert_eq!(actual_num_packets, 0);
  558. // Stop the sender.
  559. tx_sender_shutdown.await;
  560. // And the scheduler.
  561. scheduler_cancel.cancel();
  562. let stats = join_scheduler(scheduler_handle).await;
  563. assert!(
  564. stats
  565. == SendTransactionStatsNonAtomic {
  566. connection_error_timed_out: 1,
  567. ..Default::default()
  568. }
  569. );
  570. // Stop the server.
  571. cancel.cancel();
  572. server_handle.await.unwrap();
  573. }
  574. // The same as test_rate_limiting but here we wait for 1 min to check that the
  575. // connection has been established.
  576. #[tokio::test]
  577. // TODO Provide an alternative testing interface for `streamer::nonblocking::quic::spawn_server`
  578. // that would accept throttling at a granularity below 1 minute.
  579. #[ignore = "takes 70s to complete"]
  580. async fn test_rate_limiting_establish_connection() {
  581. let SpawnTestServerResult {
  582. join_handle: server_handle,
  583. receiver,
  584. server_address,
  585. stats: _stats,
  586. cancel,
  587. } = setup_quic_server(
  588. None,
  589. QuicStreamerConfig {
  590. max_connections_per_ipaddr_per_min: 1,
  591. ..QuicStreamerConfig::default_for_tests()
  592. },
  593. SwQosConfig {
  594. max_connections_per_unstaked_peer: 100,
  595. ..Default::default()
  596. },
  597. );
  598. let connection_to_reach_limit = make_client_endpoint(&server_address, None).await;
  599. drop(connection_to_reach_limit);
  600. // Setup sending txs
  601. let tx_size = 1;
  602. let expected_num_txs: usize = 65;
  603. let SpawnTxGenerator {
  604. tx_receiver,
  605. tx_sender_shutdown,
  606. ..
  607. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1000));
  608. let (scheduler_handle, _update_certificate_sender, scheduler_cancel) =
  609. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  610. let actual_num_packets =
  611. count_received_packets_for(receiver, tx_size, Duration::from_secs(70)).await;
  612. assert!(
  613. actual_num_packets > 0,
  614. "As we wait longer than 1 minute, at least one transaction should be delivered. After 1 \
  615. minute the server is expected to accept our connection. Actual packets delivered: \
  616. {actual_num_packets}"
  617. );
  618. // Stop the sender.
  619. tx_sender_shutdown.await;
  620. // And the scheduler.
  621. scheduler_cancel.cancel();
  622. let mut stats = join_scheduler(scheduler_handle).await;
  623. assert!(
  624. stats.connection_error_timed_out > 0,
  625. "As the quinn timeout is below 1 minute, a few connections will fail to connect during \
  626. the 1 minute delay. Actual connection_error_timed_out: {}",
  627. stats.connection_error_timed_out
  628. );
  629. assert!(
  630. stats.successfully_sent > 0,
  631. "As we run the test for longer than 1 minute, we expect a connection to be established, \
  632. and a number of transactions to be delivered.\nActual successfully_sent: {}",
  633. stats.successfully_sent
  634. );
  635. // All the rest of the error counters should be 0.
  636. stats.connection_error_timed_out = 0;
  637. stats.successfully_sent = 0;
  638. assert_eq!(stats, SendTransactionStatsNonAtomic::default());
  639. // Stop the server.
  640. cancel.cancel();
  641. server_handle.await.unwrap();
  642. }
  643. // Check that identity is updated successfully using corresponding channel.
  644. //
  645. // Since the identity update and the transactions are sent concurrently to their channels
  646. // and scheduler selects randomly which channel to handle first, we cannot
  647. // guarantee in this test that the identity has been updated before we start
  648. // sending transactions. Hence, instead of checking that all the transactions
  649. // have been delivered, we check that at least some have been.
  650. #[tokio::test]
  651. async fn test_update_identity() {
  652. let stake_identity = Keypair::new();
  653. let stakes = HashMap::from([(stake_identity.pubkey(), 100_000)]);
  654. let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::<Pubkey, u64>::default());
  655. let SpawnTestServerResult {
  656. join_handle: server_handle,
  657. receiver,
  658. server_address,
  659. stats: _stats,
  660. cancel,
  661. } = setup_quic_server(
  662. Some(staked_nodes),
  663. QuicStreamerConfig {
  664. ..QuicStreamerConfig::default_for_tests()
  665. },
  666. SwQosConfig {
  667. // Must use at least the number of endpoints (10) because
  668. // `max_staked_connections` and `max_unstaked_connections` are
  669. // cumulative for all the endpoints.
  670. max_staked_connections: 10,
  671. // Deny all unstaked connections.
  672. max_unstaked_connections: 0,
  673. ..Default::default()
  674. },
  675. );
  676. // Setup sending txs
  677. let tx_size = 1;
  678. let num_txs: usize = 100;
  679. let SpawnTxGenerator {
  680. tx_receiver,
  681. tx_sender_shutdown,
  682. ..
  683. } = spawn_tx_sender(tx_size, num_txs, Duration::from_millis(50));
  684. let (scheduler_handle, update_identity_sender, scheduler_cancel) =
  685. setup_connection_worker_scheduler(
  686. server_address,
  687. tx_receiver,
  688. // Create scheduler with unstaked identity.
  689. None,
  690. )
  691. .await;
  692. // Update identity.
  693. update_identity_sender
  694. .send(Some(StakeIdentity::new(&stake_identity)))
  695. .unwrap();
  696. let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
  697. assert!(actual_num_packets > 0);
  698. // Stop the sender.
  699. tx_sender_shutdown.await;
  700. // And the scheduler.
  701. scheduler_cancel.cancel();
  702. let stats = join_scheduler(scheduler_handle).await;
  703. assert!(stats.successfully_sent > 0);
  704. // Exit server
  705. cancel.cancel();
  706. server_handle.await.unwrap();
  707. }
  708. // Test that connection close events are detected immediately via
  709. // connection.closed() monitoring, not only when send operations fail.
  710. #[tokio::test]
  711. #[ignore = "Enable after we introduce TaskTracker to streamer."]
  712. async fn test_proactive_connection_close_detection() {
  713. let SpawnTestServerResult {
  714. join_handle: server_handle,
  715. receiver,
  716. server_address,
  717. stats: _stats,
  718. cancel,
  719. } = setup_quic_server(
  720. None,
  721. QuicStreamerConfig {
  722. ..QuicStreamerConfig::default_for_tests()
  723. },
  724. SwQosConfig {
  725. max_connections_per_unstaked_peer: 1,
  726. max_unstaked_connections: 1,
  727. ..Default::default()
  728. },
  729. );
  730. // Setup controlled transaction sending
  731. let tx_size = 1;
  732. let (tx_sender, tx_receiver) = channel(10);
  733. let (scheduler_handle, _update_identity_sender, scheduler_cancel) =
  734. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  735. // Send first transaction to establish connection
  736. tx_sender
  737. .send(TransactionBatch::new(vec![vec![1u8; tx_size]]))
  738. .await
  739. .expect("Send first batch");
  740. // Verify first packet received
  741. let mut first_packet_received = false;
  742. let start = Instant::now();
  743. while !first_packet_received && start.elapsed() < Duration::from_secs(1) {
  744. if let Ok(packets) = receiver.try_recv() {
  745. if !packets.is_empty() {
  746. first_packet_received = true;
  747. }
  748. } else {
  749. sleep(Duration::from_millis(10)).await;
  750. }
  751. }
  752. assert!(first_packet_received, "First packet should be received");
  753. // Exit server
  754. cancel.cancel();
  755. server_handle.await.unwrap();
  756. tx_sender
  757. .send(TransactionBatch::new(vec![vec![2u8; tx_size]]))
  758. .await
  759. .expect("Send second batch");
  760. tx_sender
  761. .send(TransactionBatch::new(vec![vec![3u8; tx_size]]))
  762. .await
  763. .expect("Send third batch");
  764. // Clean up
  765. scheduler_cancel.cancel();
  766. let stats = join_scheduler(scheduler_handle).await;
  767. // Verify proactive close detection
  768. assert!(
  769. stats.connection_error_application_closed > 0 || stats.write_error_connection_lost > 0,
  770. "Should detect connection close proactively. Stats: {stats:?}"
  771. );
  772. }
  773. #[tokio::test]
  774. async fn test_client_builder() {
  775. let SpawnTestServerResult {
  776. join_handle: server_handle,
  777. receiver,
  778. server_address,
  779. stats: _stats,
  780. cancel,
  781. } = setup_quic_server(
  782. None,
  783. QuicStreamerConfig::default_for_tests(),
  784. SwQosConfig::default(),
  785. );
  786. let _drop_guard = cancel.clone().drop_guard();
  787. let successfully_sent = Arc::new(AtomicU64::new(0));
  788. let port_range = localhost_port_range_for_tests();
  789. let socket = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), port_range.0)
  790. .expect("Should be able to open UdpSocket for tests.");
  791. let json_rpc_url = "http://127.0.0.1:8899";
  792. let (_, websocket_url) = ConfigInput::compute_websocket_url_setting("", "", json_rpc_url, "");
  793. let rpc_client = Arc::new(RpcClient::new_with_commitment(
  794. json_rpc_url.to_string(),
  795. CommitmentConfig::confirmed(),
  796. ));
  797. #[allow(deprecated)]
  798. let leader_updater = create_leader_updater(rpc_client, websocket_url, Some(server_address))
  799. .await
  800. .unwrap();
  801. let builder = ClientBuilder::new(leader_updater)
  802. .cancel_token(cancel.child_token())
  803. .bind_socket(socket)
  804. .leader_send_fanout(1)
  805. .identity(None)
  806. .max_cache_size(1)
  807. .worker_channel_size(100)
  808. .metric_reporter({
  809. let successfully_sent = successfully_sent.clone();
  810. |stats: Arc<SendTransactionStats>, cancel: CancellationToken| async move {
  811. let mut interval = interval(Duration::from_millis(10));
  812. cancel
  813. .run_until_cancelled(async {
  814. loop {
  815. interval.tick().await;
  816. let view = stats.read_and_reset();
  817. successfully_sent.fetch_add(view.successfully_sent, Ordering::Relaxed);
  818. }
  819. })
  820. .await;
  821. }
  822. });
  823. let (tx_sender, client) = builder
  824. .build::<NonblockingBroadcaster>()
  825. .expect("Client should be built successfully.");
  826. // Setup sending txs
  827. let tx_size = 1;
  828. let expected_num_txs: usize = 2;
  829. let txs = vec![vec![0_u8; tx_size]; 1];
  830. tx_sender
  831. .send_transactions_in_batch(txs.clone())
  832. .await
  833. .expect("Client should accept the transaction batch");
  834. tx_sender
  835. .try_send_transactions_in_batch(txs.clone())
  836. .expect("Client should accept the transaction batch");
  837. // Check results
  838. let now = Instant::now();
  839. let mut actual_num_packets = 0;
  840. while actual_num_packets < expected_num_txs {
  841. {
  842. let elapsed = now.elapsed();
  843. assert!(
  844. elapsed < TEST_MAX_TIME,
  845. "Failed to send {expected_num_txs} transaction in {elapsed:?}. Only sent \
  846. {actual_num_packets}",
  847. );
  848. }
  849. let Ok(packets) = receiver.try_recv() else {
  850. sleep(Duration::from_millis(10)).await;
  851. continue;
  852. };
  853. actual_num_packets += packets.len();
  854. for p in packets.iter() {
  855. assert_eq!(p.meta().size, 1);
  856. }
  857. }
  858. // Stop client
  859. client
  860. .shutdown()
  861. .await
  862. .expect("Client should shutdown successfully.");
  863. assert_eq!(
  864. successfully_sent.load(Ordering::Relaxed),
  865. expected_num_txs as u64,
  866. );
  867. // Stop server
  868. cancel.cancel();
  869. server_handle.await.unwrap();
  870. }