|
|
@@ -310,53 +310,63 @@ async fn test_connection_denied_until_allowed() {
|
|
|
cancel,
|
|
|
} = setup_quic_server(
|
|
|
None,
|
|
|
- QuicStreamerConfig::default_for_tests(),
|
|
|
+ QuicStreamerConfig {
|
|
|
+ // To prevent server from accepting a new connection, we
|
|
|
+ // set max_connections_per_peer == 1
|
|
|
+ max_connections_per_unstaked_peer: 1,
|
|
|
+ ..QuicStreamerConfig::default_for_tests()
|
|
|
+ },
|
|
|
SwQosConfig::default(),
|
|
|
);
|
|
|
|
|
|
- // To prevent server from accepting a new connection, we use the following observation.
|
|
|
- // Since max_connections_per_peer == 1 (< max_unstaked_connections == 500), if we create a first
|
|
|
- // connection and later try another one, the second connection will be immediately closed.
|
|
|
- //
|
|
|
+ // If we create a blocking connection and try to create connections to send TXs,
|
|
|
+ // the new connections will be immediately closed.
|
|
|
// Since client is not retrying sending failed transactions, this leads to the packets loss.
|
|
|
- // The connection has been created and closed when we already have sent the data.
|
|
|
- let throttling_connection = make_client_endpoint(&server_address, None).await;
|
|
|
+ let blocking_connection = make_client_endpoint(&server_address, None).await;
|
|
|
|
|
|
+ let time_per_tx = Duration::from_millis(100);
|
|
|
// Setup sending txs
|
|
|
let tx_size = 1;
|
|
|
- let expected_num_txs: usize = 10;
|
|
|
+ let num_tx_batches: usize = 30;
|
|
|
let SpawnTxGenerator {
|
|
|
tx_receiver,
|
|
|
tx_sender_shutdown,
|
|
|
..
|
|
|
- } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
|
|
|
+ } = spawn_tx_sender(tx_size, num_tx_batches, time_per_tx);
|
|
|
|
|
|
let (scheduler_handle, _update_identity_sender, _scheduler_cancel) =
|
|
|
setup_connection_worker_scheduler(server_address, tx_receiver, None).await;
|
|
|
|
|
|
// Check results
|
|
|
- let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
|
|
|
+ let received_num_packets = count_received_packets_for(
|
|
|
+ receiver.clone(),
|
|
|
+ tx_size,
|
|
|
+ time_per_tx * num_tx_batches as u32 / 2,
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+ assert_eq!(
|
|
|
+ received_num_packets, 0,
|
|
|
+ "Expected to lose all packets, got {received_num_packets} out of {num_tx_batches} sent"
|
|
|
+ );
|
|
|
+ // drop blocking connection, allow packets to get in now
|
|
|
+ drop(blocking_connection);
|
|
|
+ let received_num_packets =
|
|
|
+ count_received_packets_for(receiver, tx_size, time_per_tx * num_tx_batches as u32 / 2)
|
|
|
+ .await;
|
|
|
assert!(
|
|
|
- actual_num_packets < expected_num_txs,
|
|
|
- "Expected to receive {expected_num_txs} packets in {TEST_MAX_TIME:?} Got packets: \
|
|
|
- {actual_num_packets}"
|
|
|
+ received_num_packets > 0,
|
|
|
+ "Expected to get some packets, got {received_num_packets} out of {num_tx_batches} sent"
|
|
|
);
|
|
|
-
|
|
|
// Wait for the exchange to finish.
|
|
|
tx_sender_shutdown.await;
|
|
|
let stats = join_scheduler(scheduler_handle).await;
|
|
|
- // With proactive detection, we detect rejection immediately and retry within test duration.
|
|
|
- // Expect at least 2 errors: initial rejection + retry attempts.
|
|
|
+ // Expect at least some errors.
|
|
|
assert!(
|
|
|
- stats.write_error_connection_lost + stats.connection_error_application_closed >= 2,
|
|
|
- "Expected at least 2 connection errors, got write_error_connection_lost: {}, \
|
|
|
- connection_error_application_closed: {}",
|
|
|
- stats.write_error_connection_lost,
|
|
|
+ stats.connection_error_application_closed > 0,
|
|
|
+ "Expected at least 1 connection error, connection_error_application_closed: {}",
|
|
|
stats.connection_error_application_closed
|
|
|
);
|
|
|
|
|
|
- drop(throttling_connection);
|
|
|
-
|
|
|
// Exit server
|
|
|
cancel.cancel();
|
|
|
server_handle.await.unwrap();
|