connection_workers_scheduler_test.rs 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. use {
  2. crossbeam_channel::Receiver as CrossbeamReceiver,
  3. futures::future::BoxFuture,
  4. solana_cli_config::ConfigInput,
  5. solana_commitment_config::CommitmentConfig,
  6. solana_keypair::Keypair,
  7. solana_net_utils::sockets::unique_port_range_for_tests,
  8. solana_pubkey::Pubkey,
  9. solana_rpc_client::nonblocking::rpc_client::RpcClient,
  10. solana_signer::Signer,
  11. solana_streamer::{
  12. nonblocking::testing_utilities::{
  13. make_client_endpoint, setup_quic_server, SpawnTestServerResult,
  14. },
  15. packet::PacketBatch,
  16. quic::QuicServerParams,
  17. streamer::StakedNodes,
  18. },
  19. solana_tpu_client_next::{
  20. connection_workers_scheduler::{
  21. BindTarget, ConnectionWorkersSchedulerConfig, Fanout, StakeIdentity,
  22. },
  23. leader_updater::create_leader_updater,
  24. send_transaction_stats::SendTransactionStatsNonAtomic,
  25. transaction_batch::TransactionBatch,
  26. ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats,
  27. },
  28. std::{
  29. collections::HashMap,
  30. net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
  31. num::Saturating,
  32. sync::{atomic::Ordering, Arc},
  33. time::Duration,
  34. },
  35. tokio::{
  36. sync::{
  37. mpsc::{channel, Receiver},
  38. oneshot, watch,
  39. },
  40. task::JoinHandle,
  41. time::{sleep, Instant},
  42. },
  43. tokio_util::sync::CancellationToken,
  44. };
  45. fn test_config(stake_identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
  46. let address = SocketAddr::new(
  47. IpAddr::V4(Ipv4Addr::LOCALHOST),
  48. unique_port_range_for_tests(1).start,
  49. );
  50. ConnectionWorkersSchedulerConfig {
  51. bind: BindTarget::Address(address),
  52. stake_identity: stake_identity.map(|identity| StakeIdentity::new(&identity)),
  53. num_connections: 1,
  54. skip_check_transaction_age: false,
  55. // At the moment we have only one strategy to send transactions: we try
  56. // to put to worker channel transaction batch and in case of failure
  57. // just drop it. This requires to use large channels here. In the
  58. // future, we are planning to add an option to send with backpressure at
  59. // the speed of fastest leader.
  60. worker_channel_size: 100,
  61. max_reconnect_attempts: 4,
  62. leaders_fanout: Fanout {
  63. send: 1,
  64. connect: 1,
  65. },
  66. }
  67. }
  68. async fn setup_connection_worker_scheduler(
  69. tpu_address: SocketAddr,
  70. transaction_receiver: Receiver<TransactionBatch>,
  71. stake_identity: Option<Keypair>,
  72. ) -> (
  73. JoinHandle<Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError>>,
  74. watch::Sender<Option<StakeIdentity>>,
  75. CancellationToken,
  76. ) {
  77. let json_rpc_url = "http://127.0.0.1:8899";
  78. let (_, websocket_url) = ConfigInput::compute_websocket_url_setting("", "", json_rpc_url, "");
  79. let rpc_client = Arc::new(RpcClient::new_with_commitment(
  80. json_rpc_url.to_string(),
  81. CommitmentConfig::confirmed(),
  82. ));
  83. // Setup sending txs
  84. let leader_updater = create_leader_updater(rpc_client, websocket_url, Some(tpu_address))
  85. .await
  86. .expect("Leader updates was successfully created");
  87. let cancel = CancellationToken::new();
  88. let (update_identity_sender, update_identity_receiver) = watch::channel(None);
  89. let scheduler = ConnectionWorkersScheduler::new(
  90. leader_updater,
  91. transaction_receiver,
  92. update_identity_receiver,
  93. cancel.clone(),
  94. );
  95. let config = test_config(stake_identity);
  96. let scheduler = tokio::spawn(scheduler.run(config));
  97. (scheduler, update_identity_sender, cancel)
  98. }
  99. async fn join_scheduler(
  100. scheduler_handle: JoinHandle<
  101. Result<Arc<SendTransactionStats>, ConnectionWorkersSchedulerError>,
  102. >,
  103. ) -> SendTransactionStatsNonAtomic {
  104. let scheduler_stats = scheduler_handle
  105. .await
  106. .unwrap()
  107. .expect("Scheduler should stop successfully.");
  108. scheduler_stats.read_and_reset()
  109. }
  110. // Specify the pessimistic time to finish generation and result checks.
  111. const TEST_MAX_TIME: Duration = Duration::from_millis(2500);
  112. struct SpawnTxGenerator {
  113. tx_receiver: Receiver<TransactionBatch>,
  114. tx_sender_shutdown: BoxFuture<'static, ()>,
  115. tx_sender_done: oneshot::Receiver<()>,
  116. }
  117. /// Generates `num_tx_batches` batches of transactions, each holding a single transaction of
  118. /// `tx_size` bytes.
  119. ///
  120. /// It will not close the returned `tx_receiver` until `tx_sender_shutdown` is invoked. Otherwise,
  121. /// there is a race condition, that exists between the last transaction being scheduled for delivery
  122. /// and the server connection being closed.
  123. fn spawn_tx_sender(
  124. tx_size: usize,
  125. num_tx_batches: usize,
  126. time_per_tx: Duration,
  127. ) -> SpawnTxGenerator {
  128. let num_tx_batches: u32 = num_tx_batches
  129. .try_into()
  130. .expect("`num_tx_batches` fits into u32 for all the tests");
  131. let (tx_sender, tx_receiver) = channel(1);
  132. let cancel = CancellationToken::new();
  133. let (done_sender, tx_sender_done) = oneshot::channel();
  134. let sender = tokio::spawn({
  135. let start = Instant::now();
  136. let tx_sender = tx_sender.clone();
  137. let main_loop = async move {
  138. for i in 0..num_tx_batches {
  139. let txs = vec![vec![i as u8; tx_size]; 1];
  140. tx_sender
  141. .send(TransactionBatch::new(txs))
  142. .await
  143. .expect("Receiver should not close their side");
  144. // Pretend the client runs at the specified TPS.
  145. let sleep_time = time_per_tx
  146. .saturating_mul(i)
  147. .saturating_sub(start.elapsed());
  148. if !sleep_time.is_zero() {
  149. sleep(sleep_time).await;
  150. }
  151. }
  152. // It is OK if the receiver has disconnected.
  153. let _ = done_sender.send(());
  154. };
  155. let cancel = cancel.clone();
  156. async move {
  157. tokio::select! {
  158. () = main_loop => (),
  159. () = cancel.cancelled() => (),
  160. }
  161. }
  162. });
  163. let tx_sender_shutdown = Box::pin(async move {
  164. cancel.cancel();
  165. // This makes sure that the sender exists up until the shutdown is invoked.
  166. drop(tx_sender);
  167. sender.await.unwrap();
  168. });
  169. SpawnTxGenerator {
  170. tx_receiver,
  171. tx_sender_shutdown,
  172. tx_sender_done,
  173. }
  174. }
  175. #[tokio::test]
  176. async fn test_basic_transactions_sending() {
  177. let SpawnTestServerResult {
  178. join_handle: server_handle,
  179. exit,
  180. receiver,
  181. server_address,
  182. stats: _stats,
  183. } = setup_quic_server(None, QuicServerParams::default_for_tests());
  184. // Setup sending txs
  185. let tx_size = 1;
  186. let expected_num_txs: usize = 100;
  187. // Pretend that we are running at ~100 TPS.
  188. let SpawnTxGenerator {
  189. tx_receiver,
  190. tx_sender_shutdown,
  191. ..
  192. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(10));
  193. let (scheduler_handle, update_identity_sender, _scheduler_cancel) =
  194. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  195. // dropping sender will not lead to stop the scheduler.
  196. drop(update_identity_sender);
  197. // Check results
  198. let mut received_data = Vec::with_capacity(expected_num_txs);
  199. let now = Instant::now();
  200. let mut actual_num_packets = 0;
  201. while actual_num_packets < expected_num_txs {
  202. {
  203. let elapsed = now.elapsed();
  204. assert!(
  205. elapsed < TEST_MAX_TIME,
  206. "Failed to send {expected_num_txs} transaction in {elapsed:?}. Only sent \
  207. {actual_num_packets}",
  208. );
  209. }
  210. let Ok(packets) = receiver.try_recv() else {
  211. sleep(Duration::from_millis(10)).await;
  212. continue;
  213. };
  214. actual_num_packets += packets.len();
  215. for p in packets.iter() {
  216. let packet_id = p.data(0).expect("Data should not be lost by server.");
  217. received_data.push(*packet_id);
  218. assert_eq!(p.meta().size, 1);
  219. }
  220. }
  221. received_data.sort_unstable();
  222. for i in 1..received_data.len() {
  223. assert_eq!(received_data[i - 1] + 1, received_data[i]);
  224. }
  225. // Stop sending
  226. tx_sender_shutdown.await;
  227. let stats = join_scheduler(scheduler_handle).await;
  228. assert_eq!(stats.successfully_sent, expected_num_txs as u64,);
  229. // Stop server
  230. exit.store(true, Ordering::Relaxed);
  231. server_handle.await.unwrap();
  232. }
  233. async fn count_received_packets_for(
  234. receiver: CrossbeamReceiver<PacketBatch>,
  235. expected_tx_size: usize,
  236. receive_duration: Duration,
  237. ) -> usize {
  238. let now = Instant::now();
  239. let mut num_packets_received = Saturating(0usize);
  240. while now.elapsed() < receive_duration {
  241. if let Ok(packets) = receiver.try_recv() {
  242. num_packets_received += packets.len();
  243. for p in packets.iter() {
  244. assert_eq!(p.meta().size, expected_tx_size);
  245. }
  246. } else {
  247. sleep(Duration::from_millis(100)).await;
  248. }
  249. }
  250. num_packets_received.0
  251. }
  252. // Check that client can create connection even if the first several attempts were unsuccessful.
  253. #[tokio::test]
  254. async fn test_connection_denied_until_allowed() {
  255. let SpawnTestServerResult {
  256. join_handle: server_handle,
  257. exit,
  258. receiver,
  259. server_address,
  260. stats: _stats,
  261. } = setup_quic_server(None, QuicServerParams::default_for_tests());
  262. // To prevent server from accepting a new connection, we use the following observation.
  263. // Since max_connections_per_peer == 1 (< max_unstaked_connections == 500), if we create a first
  264. // connection and later try another one, the second connection will be immediately closed.
  265. //
  266. // Since client is not retrying sending failed transactions, this leads to the packets loss.
  267. // The connection has been created and closed when we already have sent the data.
  268. let throttling_connection = make_client_endpoint(&server_address, None).await;
  269. // Setup sending txs
  270. let tx_size = 1;
  271. let expected_num_txs: usize = 10;
  272. let SpawnTxGenerator {
  273. tx_receiver,
  274. tx_sender_shutdown,
  275. ..
  276. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
  277. let (scheduler_handle, _update_identity_sender, _scheduler_cancel) =
  278. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  279. // Check results
  280. let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
  281. assert!(
  282. actual_num_packets < expected_num_txs,
  283. "Expected to receive {expected_num_txs} packets in {TEST_MAX_TIME:?} Got packets: \
  284. {actual_num_packets}"
  285. );
  286. // Wait for the exchange to finish.
  287. tx_sender_shutdown.await;
  288. let stats = join_scheduler(scheduler_handle).await;
  289. // With proactive detection, we detect rejection immediately and retry within test duration.
  290. // Expect at least 2 errors: initial rejection + retry attempts.
  291. assert!(
  292. stats.write_error_connection_lost + stats.connection_error_application_closed >= 2,
  293. "Expected at least 2 connection errors, got write_error_connection_lost: {}, \
  294. connection_error_application_closed: {}",
  295. stats.write_error_connection_lost,
  296. stats.connection_error_application_closed
  297. );
  298. drop(throttling_connection);
  299. // Exit server
  300. exit.store(true, Ordering::Relaxed);
  301. server_handle.await.unwrap();
  302. }
  303. // Check that if the client connection has been pruned, client manages to
  304. // reestablish it. With more packets, we can observe the impact of pruning
  305. // even with proactive detection.
  306. #[tokio::test]
  307. async fn test_connection_pruned_and_reopened() {
  308. let SpawnTestServerResult {
  309. join_handle: server_handle,
  310. exit,
  311. receiver,
  312. server_address,
  313. stats: _stats,
  314. } = setup_quic_server(
  315. None,
  316. QuicServerParams {
  317. max_connections_per_peer: 100,
  318. max_unstaked_connections: 1,
  319. ..QuicServerParams::default_for_tests()
  320. },
  321. );
  322. // Setup sending txs
  323. let tx_size = 1;
  324. let expected_num_txs: usize = 48;
  325. let SpawnTxGenerator {
  326. tx_receiver,
  327. tx_sender_shutdown,
  328. ..
  329. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
  330. let (scheduler_handle, _update_identity_sender, _scheduler_cancel) =
  331. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  332. sleep(Duration::from_millis(400)).await;
  333. let _connection_to_prune_client = make_client_endpoint(&server_address, None).await;
  334. // Check results
  335. let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
  336. assert!(actual_num_packets < expected_num_txs);
  337. // Wait for the exchange to finish.
  338. tx_sender_shutdown.await;
  339. let stats = join_scheduler(scheduler_handle).await;
  340. // Proactive detection catches pruning immediately, expect multiple retries.
  341. assert!(
  342. stats.connection_error_application_closed + stats.write_error_connection_lost >= 1,
  343. "Expected at least 1 connection error from pruning and retries. Stats: {stats:?}"
  344. );
  345. // Exit server
  346. exit.store(true, Ordering::Relaxed);
  347. server_handle.await.unwrap();
  348. }
  349. /// Check that client creates staked connection. To do that prohibit unstaked
  350. /// connection and verify that all the txs has been received.
  351. #[tokio::test]
  352. async fn test_staked_connection() {
  353. let stake_identity = Keypair::new();
  354. let stakes = HashMap::from([(stake_identity.pubkey(), 100_000)]);
  355. let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::<Pubkey, u64>::default());
  356. let SpawnTestServerResult {
  357. join_handle: server_handle,
  358. exit,
  359. receiver,
  360. server_address,
  361. stats: _stats,
  362. } = setup_quic_server(
  363. Some(staked_nodes),
  364. QuicServerParams {
  365. // Must use at least the number of endpoints (10) because
  366. // `max_staked_connections` and `max_unstaked_connections` are
  367. // cumulative for all the endpoints.
  368. max_staked_connections: 10,
  369. max_unstaked_connections: 0,
  370. ..QuicServerParams::default_for_tests()
  371. },
  372. );
  373. // Setup sending txs
  374. let tx_size = 1;
  375. let expected_num_txs: usize = 10;
  376. let SpawnTxGenerator {
  377. tx_receiver,
  378. tx_sender_shutdown,
  379. ..
  380. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
  381. let (scheduler_handle, _update_certificate_sender, _scheduler_cancel) =
  382. setup_connection_worker_scheduler(server_address, tx_receiver, Some(stake_identity)).await;
  383. // Check results
  384. let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
  385. assert_eq!(actual_num_packets, expected_num_txs);
  386. // Wait for the exchange to finish.
  387. tx_sender_shutdown.await;
  388. let stats = join_scheduler(scheduler_handle).await;
  389. assert_eq!(
  390. stats,
  391. SendTransactionStatsNonAtomic {
  392. successfully_sent: expected_num_txs as u64,
  393. ..Default::default()
  394. }
  395. );
  396. // Exit server
  397. exit.store(true, Ordering::Relaxed);
  398. server_handle.await.unwrap();
  399. }
  400. // Check that if client sends transactions at a reasonably high rate that is
  401. // higher than what the server accepts, nevertheless all the transactions are
  402. // delivered and there are no errors on the client side.
  403. #[tokio::test]
  404. async fn test_connection_throttling() {
  405. let SpawnTestServerResult {
  406. join_handle: server_handle,
  407. exit,
  408. receiver,
  409. server_address,
  410. stats: _stats,
  411. } = setup_quic_server(None, QuicServerParams::default_for_tests());
  412. // Setup sending txs
  413. let tx_size = 1;
  414. let expected_num_txs: usize = 50;
  415. // Send at 1000 TPS - x10 more than the throttling interval of 10ms used in other tests allows.
  416. let SpawnTxGenerator {
  417. tx_receiver,
  418. tx_sender_shutdown,
  419. ..
  420. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1));
  421. let (scheduler_handle, _update_certificate_sender, _scheduler_cancel) =
  422. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  423. // Check results
  424. let actual_num_packets =
  425. count_received_packets_for(receiver, tx_size, Duration::from_secs(1)).await;
  426. assert_eq!(actual_num_packets, expected_num_txs);
  427. // Stop sending
  428. tx_sender_shutdown.await;
  429. let stats = join_scheduler(scheduler_handle).await;
  430. assert_eq!(
  431. stats,
  432. SendTransactionStatsNonAtomic {
  433. successfully_sent: expected_num_txs as u64,
  434. ..Default::default()
  435. }
  436. );
  437. // Exit server
  438. exit.store(true, Ordering::Relaxed);
  439. server_handle.await.unwrap();
  440. }
  441. // Check that when the host cannot be reached, the client exits gracefully.
  442. #[tokio::test]
  443. async fn test_no_host() {
  444. // A "black hole" address for the TPU.
  445. let server_ip = IpAddr::V6(Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 1));
  446. let server_address = SocketAddr::new(server_ip, 49151);
  447. // Setup sending side.
  448. let tx_size = 1;
  449. let max_send_attempts: usize = 10;
  450. let SpawnTxGenerator {
  451. tx_receiver,
  452. tx_sender_shutdown,
  453. tx_sender_done,
  454. ..
  455. } = spawn_tx_sender(tx_size, max_send_attempts, Duration::from_millis(10));
  456. let (scheduler_handle, _update_certificate_sender, _scheduler_cancel) =
  457. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  458. // Wait for all the transactions to be sent, and some extra time for the delivery to be
  459. // attempted.
  460. tx_sender_done.await.unwrap();
  461. sleep(Duration::from_millis(100)).await;
  462. // Wait for the generator to finish.
  463. tx_sender_shutdown.await;
  464. // While attempting to establish a connection with a nonexistent host, we fill the worker's
  465. // channel.
  466. let stats = join_scheduler(scheduler_handle).await;
  467. // `5` because `config.max_reconnect_attempts` is 4
  468. assert_eq!(stats.connect_error_invalid_remote_address, 5);
  469. }
  470. // Check that when the client is rate-limited by server, we update counters
  471. // accordingly. To implement it we:
  472. // * set the connection limit per minute to 1
  473. // * create a dummy connection to reach the limit and immediately close it
  474. // * set up client which will try to create a new connection which it will be
  475. // rate-limited. This test doesn't check what happens when the rate-limiting
  476. // period ends because it too long for test (1min).
  477. #[tokio::test]
  478. async fn test_rate_limiting() {
  479. let SpawnTestServerResult {
  480. join_handle: server_handle,
  481. exit,
  482. receiver,
  483. server_address,
  484. stats: _stats,
  485. } = setup_quic_server(
  486. None,
  487. QuicServerParams {
  488. max_connections_per_peer: 100,
  489. max_connections_per_ipaddr_per_min: 1,
  490. ..QuicServerParams::default_for_tests()
  491. },
  492. );
  493. // open a connection to consume the limit
  494. let connection_to_reach_limit = make_client_endpoint(&server_address, None).await;
  495. drop(connection_to_reach_limit);
  496. // Setup sending txs which are full packets in size
  497. let tx_size = 1024;
  498. let expected_num_txs: usize = 16;
  499. let SpawnTxGenerator {
  500. tx_receiver,
  501. tx_sender_shutdown,
  502. ..
  503. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
  504. let (scheduler_handle, _update_certificate_sender, scheduler_cancel) =
  505. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  506. let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
  507. assert_eq!(actual_num_packets, 0);
  508. // Stop the sender.
  509. tx_sender_shutdown.await;
  510. // And the scheduler.
  511. scheduler_cancel.cancel();
  512. let stats = join_scheduler(scheduler_handle).await;
  513. // we get 2 transactions registered as sent (but not acked) because of how QUIC works
  514. // before ratelimiter kicks in.
  515. assert!(
  516. stats
  517. == SendTransactionStatsNonAtomic {
  518. successfully_sent: 2,
  519. write_error_connection_lost: 2,
  520. ..Default::default()
  521. }
  522. );
  523. // Stop the server.
  524. exit.store(true, Ordering::Relaxed);
  525. server_handle.await.unwrap();
  526. }
  527. // The same as test_rate_limiting but here we wait for 1 min to check that the
  528. // connection has been established.
  529. #[tokio::test]
  530. // TODO Provide an alternative testing interface for `streamer::nonblocking::quic::spawn_server`
  531. // that would accept throttling at a granularity below 1 minute.
  532. #[ignore = "takes 70s to complete"]
  533. async fn test_rate_limiting_establish_connection() {
  534. let SpawnTestServerResult {
  535. join_handle: server_handle,
  536. exit,
  537. receiver,
  538. server_address,
  539. stats: _stats,
  540. } = setup_quic_server(
  541. None,
  542. QuicServerParams {
  543. max_connections_per_peer: 100,
  544. max_connections_per_ipaddr_per_min: 1,
  545. ..QuicServerParams::default_for_tests()
  546. },
  547. );
  548. let connection_to_reach_limit = make_client_endpoint(&server_address, None).await;
  549. drop(connection_to_reach_limit);
  550. // Setup sending txs
  551. let tx_size = 1;
  552. let expected_num_txs: usize = 65;
  553. let SpawnTxGenerator {
  554. tx_receiver,
  555. tx_sender_shutdown,
  556. ..
  557. } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1000));
  558. let (scheduler_handle, _update_certificate_sender, scheduler_cancel) =
  559. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  560. let actual_num_packets =
  561. count_received_packets_for(receiver, tx_size, Duration::from_secs(70)).await;
  562. assert!(
  563. actual_num_packets > 0,
  564. "As we wait longer than 1 minute, at least one transaction should be delivered. After 1 \
  565. minute the server is expected to accept our connection. Actual packets delivered: \
  566. {actual_num_packets}"
  567. );
  568. // Stop the sender.
  569. tx_sender_shutdown.await;
  570. // And the scheduler.
  571. scheduler_cancel.cancel();
  572. let mut stats = join_scheduler(scheduler_handle).await;
  573. assert!(
  574. stats.connection_error_timed_out > 0,
  575. "As the quinn timeout is below 1 minute, a few connections will fail to connect during \
  576. the 1 minute delay. Actual connection_error_timed_out: {}",
  577. stats.connection_error_timed_out
  578. );
  579. assert!(
  580. stats.successfully_sent > 0,
  581. "As we run the test for longer than 1 minute, we expect a connection to be established, \
  582. and a number of transactions to be delivered.\nActual successfully_sent: {}",
  583. stats.successfully_sent
  584. );
  585. // All the rest of the error counters should be 0.
  586. stats.connection_error_timed_out = 0;
  587. stats.successfully_sent = 0;
  588. assert_eq!(stats, SendTransactionStatsNonAtomic::default());
  589. // Stop the server.
  590. exit.store(true, Ordering::Relaxed);
  591. server_handle.await.unwrap();
  592. }
  593. // Check that identity is updated successfully using corresponding channel.
  594. //
  595. // Since the identity update and the transactions are sent concurrently to their channels
  596. // and scheduler selects randomly which channel to handle first, we cannot
  597. // guarantee in this test that the identity has been updated before we start
  598. // sending transactions. Hence, instead of checking that all the transactions
  599. // have been delivered, we check that at least some have been.
  600. #[tokio::test]
  601. async fn test_update_identity() {
  602. let stake_identity = Keypair::new();
  603. let stakes = HashMap::from([(stake_identity.pubkey(), 100_000)]);
  604. let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::<Pubkey, u64>::default());
  605. let SpawnTestServerResult {
  606. join_handle: server_handle,
  607. exit,
  608. receiver,
  609. server_address,
  610. stats: _stats,
  611. } = setup_quic_server(
  612. Some(staked_nodes),
  613. QuicServerParams {
  614. // Must use at least the number of endpoints (10) because
  615. // `max_staked_connections` and `max_unstaked_connections` are
  616. // cumulative for all the endpoints.
  617. max_staked_connections: 10,
  618. // Deny all unstaked connections.
  619. max_unstaked_connections: 0,
  620. ..QuicServerParams::default_for_tests()
  621. },
  622. );
  623. // Setup sending txs
  624. let tx_size = 1;
  625. let num_txs: usize = 100;
  626. let SpawnTxGenerator {
  627. tx_receiver,
  628. tx_sender_shutdown,
  629. ..
  630. } = spawn_tx_sender(tx_size, num_txs, Duration::from_millis(50));
  631. let (scheduler_handle, update_identity_sender, scheduler_cancel) =
  632. setup_connection_worker_scheduler(
  633. server_address,
  634. tx_receiver,
  635. // Create scheduler with unstaked identity.
  636. None,
  637. )
  638. .await;
  639. // Update identity.
  640. update_identity_sender
  641. .send(Some(StakeIdentity::new(&stake_identity)))
  642. .unwrap();
  643. let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
  644. assert!(actual_num_packets > 0);
  645. // Stop the sender.
  646. tx_sender_shutdown.await;
  647. // And the scheduler.
  648. scheduler_cancel.cancel();
  649. let stats = join_scheduler(scheduler_handle).await;
  650. assert!(stats.successfully_sent > 0);
  651. // Exit server
  652. exit.store(true, Ordering::Relaxed);
  653. server_handle.await.unwrap();
  654. }
  655. // Test that connection close events are detected immediately via connection.closed()
  656. // monitoring, not only when send operations fail.
  657. #[tokio::test]
  658. async fn test_proactive_connection_close_detection() {
  659. let SpawnTestServerResult {
  660. join_handle: server_handle,
  661. exit,
  662. receiver,
  663. server_address,
  664. stats: _stats,
  665. } = setup_quic_server(
  666. None,
  667. QuicServerParams {
  668. max_connections_per_peer: 1,
  669. max_unstaked_connections: 1,
  670. ..QuicServerParams::default_for_tests()
  671. },
  672. );
  673. // Setup controlled transaction sending
  674. let tx_size = 1;
  675. let (tx_sender, tx_receiver) = channel(10);
  676. let sender_task = tokio::spawn(async move {
  677. // Send first transaction to establish connection
  678. tx_sender
  679. .send(TransactionBatch::new(vec![vec![1u8; tx_size]]))
  680. .await
  681. .expect("Send first batch");
  682. // Idle period where connection might be closed
  683. sleep(Duration::from_millis(500)).await;
  684. // Attempt another send
  685. drop(tx_sender.send(TransactionBatch::new(vec![vec![2u8; tx_size]])));
  686. });
  687. let (scheduler_handle, _update_identity_sender, scheduler_cancel) =
  688. setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
  689. // Verify first packet received
  690. let mut first_packet_received = false;
  691. let start = Instant::now();
  692. while !first_packet_received && start.elapsed() < Duration::from_secs(1) {
  693. if let Ok(packets) = receiver.try_recv() {
  694. if !packets.is_empty() {
  695. first_packet_received = true;
  696. }
  697. } else {
  698. sleep(Duration::from_millis(10)).await;
  699. }
  700. }
  701. assert!(first_packet_received, "First packet should be received");
  702. // Force connection close by exceeding max_connections_per_peer
  703. let _pruning_connection = make_client_endpoint(&server_address, None).await;
  704. // Allow time for proactive detection
  705. sleep(Duration::from_millis(200)).await;
  706. // Clean up
  707. scheduler_cancel.cancel();
  708. let _ = sender_task.await;
  709. let stats = join_scheduler(scheduler_handle).await;
  710. // Verify proactive close detection
  711. assert!(
  712. stats.connection_error_application_closed > 0 || stats.write_error_connection_lost > 0,
  713. "Should detect connection close proactively. Stats: {stats:?}"
  714. );
  715. // Exit server
  716. exit.store(true, Ordering::Relaxed);
  717. server_handle.await.unwrap();
  718. }