quic_client.rs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. //! Simple nonblocking client that connects to a given UDP port with the QUIC protocol
  2. //! and provides an interface for sending data which is restricted by the
  3. //! server's flow control.
  4. use {
  5. async_lock::Mutex,
  6. async_trait::async_trait,
  7. futures::future::TryFutureExt,
  8. log::*,
  9. quinn::{
  10. crypto::rustls::QuicClientConfig, ClientConfig, ClosedStream, ConnectError, Connection,
  11. ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig,
  12. WriteError,
  13. },
  14. solana_connection_cache::{
  15. client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats,
  16. nonblocking::client_connection::ClientConnection,
  17. },
  18. solana_keypair::Keypair,
  19. solana_measure::measure::Measure,
  20. solana_net_utils::sockets,
  21. solana_quic_definitions::{
  22. QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT, QUIC_SEND_FAIRNESS,
  23. },
  24. solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind,
  25. solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID,
  26. solana_tls_utils::{
  27. new_dummy_x509_certificate, socket_addr_to_quic_server_name, tls_client_config_builder,
  28. QuicClientCertificate,
  29. },
  30. solana_transaction_error::TransportResult,
  31. std::{
  32. net::{SocketAddr, UdpSocket},
  33. sync::{atomic::Ordering, Arc},
  34. thread,
  35. },
  36. thiserror::Error,
  37. tokio::{sync::OnceCell, time::timeout},
  38. };
  39. /// A lazy-initialized Quic Endpoint
  40. pub struct QuicLazyInitializedEndpoint {
  41. endpoint: OnceCell<Arc<Endpoint>>,
  42. client_certificate: Arc<QuicClientCertificate>,
  43. client_endpoint: Option<Endpoint>,
  44. }
  45. #[derive(Error, Debug)]
  46. pub enum QuicError {
  47. #[error(transparent)]
  48. WriteError(#[from] WriteError),
  49. #[error(transparent)]
  50. ConnectionError(#[from] ConnectionError),
  51. #[error(transparent)]
  52. ConnectError(#[from] ConnectError),
  53. #[error(transparent)]
  54. ClosedStream(#[from] ClosedStream),
  55. }
  56. impl From<QuicError> for ClientErrorKind {
  57. fn from(quic_error: QuicError) -> Self {
  58. Self::Custom(format!("{quic_error:?}"))
  59. }
  60. }
  61. impl QuicLazyInitializedEndpoint {
  62. pub fn new(
  63. client_certificate: Arc<QuicClientCertificate>,
  64. client_endpoint: Option<Endpoint>,
  65. ) -> Self {
  66. Self {
  67. endpoint: OnceCell::<Arc<Endpoint>>::new(),
  68. client_certificate,
  69. client_endpoint,
  70. }
  71. }
  72. fn create_endpoint(&self) -> Endpoint {
  73. let mut endpoint = if let Some(endpoint) = &self.client_endpoint {
  74. endpoint.clone()
  75. } else {
  76. // This will bind to random ports, but VALIDATOR_PORT_RANGE is outside
  77. // of the range for CI tests when this is running in CI
  78. let client_socket = sockets::bind_in_range_with_config(
  79. std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
  80. solana_net_utils::VALIDATOR_PORT_RANGE,
  81. sockets::SocketConfiguration::default(),
  82. )
  83. .expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
  84. .1;
  85. info!("Local endpoint is : {client_socket:?}");
  86. QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
  87. };
  88. let mut crypto = tls_client_config_builder()
  89. .with_client_auth_cert(
  90. vec![self.client_certificate.certificate.clone()],
  91. self.client_certificate.key.clone_key(),
  92. )
  93. .expect("Failed to set QUIC client certificates");
  94. crypto.enable_early_data = true;
  95. crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
  96. let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto).unwrap()));
  97. let mut transport_config = TransportConfig::default();
  98. let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
  99. transport_config.max_idle_timeout(Some(timeout));
  100. transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE));
  101. transport_config.send_fairness(QUIC_SEND_FAIRNESS);
  102. config.transport_config(Arc::new(transport_config));
  103. endpoint.set_default_client_config(config);
  104. endpoint
  105. }
  106. async fn get_endpoint(&self) -> Arc<Endpoint> {
  107. self.endpoint
  108. .get_or_init(|| async { Arc::new(self.create_endpoint()) })
  109. .await
  110. .clone()
  111. }
  112. }
  113. impl Default for QuicLazyInitializedEndpoint {
  114. fn default() -> Self {
  115. let (cert, priv_key) = new_dummy_x509_certificate(&Keypair::new());
  116. Self::new(
  117. Arc::new(QuicClientCertificate {
  118. certificate: cert,
  119. key: priv_key,
  120. }),
  121. None,
  122. )
  123. }
  124. }
  125. /// A wrapper over NewConnection with additional capability to create the endpoint as part
  126. /// of creating a new connection.
  127. #[derive(Clone)]
  128. struct QuicNewConnection {
  129. endpoint: Arc<Endpoint>,
  130. connection: Arc<Connection>,
  131. }
  132. impl QuicNewConnection {
  133. /// Create a QuicNewConnection given the remote address 'addr'.
  134. async fn make_connection(
  135. endpoint: Arc<QuicLazyInitializedEndpoint>,
  136. addr: SocketAddr,
  137. stats: &ClientStats,
  138. ) -> Result<Self, QuicError> {
  139. let mut make_connection_measure = Measure::start("make_connection_measure");
  140. let endpoint = endpoint.get_endpoint().await;
  141. let server_name = socket_addr_to_quic_server_name(addr);
  142. let connecting = endpoint.connect(addr, &server_name)?;
  143. stats.total_connections.fetch_add(1, Ordering::Relaxed);
  144. if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
  145. {
  146. if connecting_result.is_err() {
  147. stats.connection_errors.fetch_add(1, Ordering::Relaxed);
  148. }
  149. make_connection_measure.stop();
  150. stats
  151. .make_connection_ms
  152. .fetch_add(make_connection_measure.as_ms(), Ordering::Relaxed);
  153. let connection = connecting_result?;
  154. Ok(Self {
  155. endpoint,
  156. connection: Arc::new(connection),
  157. })
  158. } else {
  159. Err(ConnectionError::TimedOut.into())
  160. }
  161. }
  162. fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
  163. quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
  164. .expect("QuicNewConnection::create_endpoint quinn::Endpoint::new")
  165. }
  166. // Attempts to make a faster connection by taking advantage of pre-existing key material.
  167. // Only works if connection to this endpoint was previously established.
  168. async fn make_connection_0rtt(
  169. &mut self,
  170. addr: SocketAddr,
  171. stats: &ClientStats,
  172. ) -> Result<Arc<Connection>, QuicError> {
  173. let server_name = socket_addr_to_quic_server_name(addr);
  174. let connecting = self.endpoint.connect(addr, &server_name)?;
  175. stats.total_connections.fetch_add(1, Ordering::Relaxed);
  176. let connection = match connecting.into_0rtt() {
  177. Ok((connection, zero_rtt)) => {
  178. if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await {
  179. if zero_rtt {
  180. stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed);
  181. } else {
  182. stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed);
  183. }
  184. connection
  185. } else {
  186. return Err(ConnectionError::TimedOut.into());
  187. }
  188. }
  189. Err(connecting) => {
  190. stats.connection_errors.fetch_add(1, Ordering::Relaxed);
  191. if let Ok(connecting_result) =
  192. timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
  193. {
  194. connecting_result?
  195. } else {
  196. return Err(ConnectionError::TimedOut.into());
  197. }
  198. }
  199. };
  200. self.connection = Arc::new(connection);
  201. Ok(self.connection.clone())
  202. }
  203. }
  204. pub struct QuicClient {
  205. endpoint: Arc<QuicLazyInitializedEndpoint>,
  206. connection: Arc<Mutex<Option<QuicNewConnection>>>,
  207. addr: SocketAddr,
  208. stats: Arc<ClientStats>,
  209. }
  210. const CONNECTION_CLOSE_CODE_APPLICATION_CLOSE: u32 = 0u32;
  211. const CONNECTION_CLOSE_REASON_APPLICATION_CLOSE: &[u8] = b"dropped";
  212. impl QuicClient {
  213. /// Explicitly close the connection. Must be called manually if cleanup is needed.
  214. pub async fn close(&self) {
  215. let mut conn_guard = self.connection.lock().await;
  216. if let Some(conn) = conn_guard.take() {
  217. debug!(
  218. "Closing connection to {} connection_id: {:?}",
  219. self.addr, conn.connection
  220. );
  221. conn.connection.close(
  222. CONNECTION_CLOSE_CODE_APPLICATION_CLOSE.into(),
  223. CONNECTION_CLOSE_REASON_APPLICATION_CLOSE,
  224. );
  225. }
  226. }
  227. }
  228. impl QuicClient {
  229. pub fn new(endpoint: Arc<QuicLazyInitializedEndpoint>, addr: SocketAddr) -> Self {
  230. Self {
  231. endpoint,
  232. connection: Arc::new(Mutex::new(None)),
  233. addr,
  234. stats: Arc::new(ClientStats::default()),
  235. }
  236. }
  237. async fn _send_buffer_using_conn(
  238. data: &[u8],
  239. connection: &Connection,
  240. ) -> Result<(), QuicError> {
  241. let mut send_stream = connection.open_uni().await?;
  242. send_stream.write_all(data).await?;
  243. Ok(())
  244. }
  245. // Attempts to send data, connecting/reconnecting as necessary
  246. // On success, returns the connection used to successfully send the data
  247. async fn _send_buffer(
  248. &self,
  249. data: &[u8],
  250. stats: &ClientStats,
  251. connection_stats: Arc<ConnectionCacheStats>,
  252. ) -> Result<Arc<Connection>, QuicError> {
  253. let mut measure_send_packet = Measure::start("send_packet_us");
  254. let mut measure_prepare_connection = Measure::start("prepare_connection");
  255. let mut connection_try_count = 0;
  256. let mut last_connection_id = 0;
  257. let mut last_error = None;
  258. while connection_try_count < 2 {
  259. let connection = {
  260. let mut conn_guard = self.connection.lock().await;
  261. let maybe_conn = conn_guard.as_mut();
  262. match maybe_conn {
  263. Some(conn) => {
  264. if conn.connection.stable_id() == last_connection_id {
  265. // this is the problematic connection we had used before, create a new one
  266. let conn = conn.make_connection_0rtt(self.addr, stats).await;
  267. match conn {
  268. Ok(conn) => {
  269. info!(
  270. "Made 0rtt connection to {} with id {} try_count {}, \
  271. last_connection_id: {}, last_error: {:?}",
  272. self.addr,
  273. conn.stable_id(),
  274. connection_try_count,
  275. last_connection_id,
  276. last_error,
  277. );
  278. connection_try_count += 1;
  279. conn
  280. }
  281. Err(err) => {
  282. info!(
  283. "Cannot make 0rtt connection to {}, error {:}",
  284. self.addr, err
  285. );
  286. return Err(err);
  287. }
  288. }
  289. } else {
  290. stats.connection_reuse.fetch_add(1, Ordering::Relaxed);
  291. conn.connection.clone()
  292. }
  293. }
  294. None => {
  295. let conn = QuicNewConnection::make_connection(
  296. self.endpoint.clone(),
  297. self.addr,
  298. stats,
  299. )
  300. .await;
  301. match conn {
  302. Ok(conn) => {
  303. *conn_guard = Some(conn.clone());
  304. info!(
  305. "Made connection to {} id {} try_count {}, from connection \
  306. cache warming?: {}",
  307. self.addr,
  308. conn.connection.stable_id(),
  309. connection_try_count,
  310. data.is_empty(),
  311. );
  312. connection_try_count += 1;
  313. conn.connection.clone()
  314. }
  315. Err(err) => {
  316. info!(
  317. "Cannot make connection to {}, error {:}, from connection \
  318. cache warming?: {}",
  319. self.addr,
  320. err,
  321. data.is_empty()
  322. );
  323. return Err(err);
  324. }
  325. }
  326. }
  327. }
  328. };
  329. let new_stats = connection.stats();
  330. connection_stats
  331. .total_client_stats
  332. .congestion_events
  333. .update_stat(
  334. &self.stats.congestion_events,
  335. new_stats.path.congestion_events,
  336. );
  337. connection_stats
  338. .total_client_stats
  339. .streams_blocked_uni
  340. .update_stat(
  341. &self.stats.streams_blocked_uni,
  342. new_stats.frame_tx.streams_blocked_uni,
  343. );
  344. connection_stats
  345. .total_client_stats
  346. .data_blocked
  347. .update_stat(&self.stats.data_blocked, new_stats.frame_tx.data_blocked);
  348. connection_stats
  349. .total_client_stats
  350. .acks
  351. .update_stat(&self.stats.acks, new_stats.frame_tx.acks);
  352. if data.is_empty() {
  353. // no need to send packet as it is only for warming connections
  354. return Ok(connection);
  355. }
  356. last_connection_id = connection.stable_id();
  357. measure_prepare_connection.stop();
  358. match Self::_send_buffer_using_conn(data, &connection).await {
  359. Ok(()) => {
  360. measure_send_packet.stop();
  361. stats.successful_packets.fetch_add(1, Ordering::Relaxed);
  362. stats
  363. .send_packets_us
  364. .fetch_add(measure_send_packet.as_us(), Ordering::Relaxed);
  365. stats
  366. .prepare_connection_us
  367. .fetch_add(measure_prepare_connection.as_us(), Ordering::Relaxed);
  368. trace!(
  369. "Successfully sent to {} with id {}, thread: {:?}, data len: {}, \
  370. send_packet_us: {} prepare_connection_us: {}",
  371. self.addr,
  372. connection.stable_id(),
  373. thread::current().id(),
  374. data.len(),
  375. measure_send_packet.as_us(),
  376. measure_prepare_connection.as_us(),
  377. );
  378. return Ok(connection);
  379. }
  380. Err(err) => match err {
  381. QuicError::ConnectionError(_) => {
  382. last_error = Some(err);
  383. }
  384. _ => {
  385. info!(
  386. "Error sending to {} with id {}, error {:?} thread: {:?}",
  387. self.addr,
  388. connection.stable_id(),
  389. err,
  390. thread::current().id(),
  391. );
  392. return Err(err);
  393. }
  394. },
  395. }
  396. }
  397. // if we come here, that means we have exhausted maximum retries, return the error
  398. info!(
  399. "Ran into an error sending data {:?}, exhausted retries to {}",
  400. last_error, self.addr
  401. );
  402. // If we get here but last_error is None, then we have a logic error
  403. // in this function, so panic here with an expect to help debugging
  404. Err(last_error.expect("QuicClient::_send_buffer last_error.expect"))
  405. }
  406. pub async fn send_buffer<T>(
  407. &self,
  408. data: T,
  409. stats: &ClientStats,
  410. connection_stats: Arc<ConnectionCacheStats>,
  411. ) -> Result<(), ClientErrorKind>
  412. where
  413. T: AsRef<[u8]>,
  414. {
  415. self._send_buffer(data.as_ref(), stats, connection_stats)
  416. .await
  417. .map_err(Into::<ClientErrorKind>::into)?;
  418. Ok(())
  419. }
  420. pub async fn send_batch<T>(
  421. &self,
  422. buffers: &[T],
  423. stats: &ClientStats,
  424. connection_stats: Arc<ConnectionCacheStats>,
  425. ) -> Result<(), ClientErrorKind>
  426. where
  427. T: AsRef<[u8]>,
  428. {
  429. // Start off by "testing" the connection by sending the first buffer
  430. // This will also connect to the server if not already connected
  431. // and reconnect and retry if the first send attempt failed
  432. // (for example due to a timed out connection), returning an error
  433. // or the connection that was used to successfully send the buffer.
  434. // We will use the returned connection to send the rest of the buffers in the batch
  435. // to avoid touching the mutex in self, and not bother reconnecting if we fail along the way
  436. // since testing even in the ideal GCE environment has found no cases
  437. // where reconnecting and retrying in the middle of a batch send
  438. // (i.e. we encounter a connection error in the middle of a batch send, which presumably cannot
  439. // be due to a timed out connection) has succeeded
  440. if buffers.is_empty() {
  441. return Ok(());
  442. }
  443. let connection = self
  444. ._send_buffer(buffers[0].as_ref(), stats, connection_stats)
  445. .await
  446. .map_err(Into::<ClientErrorKind>::into)?;
  447. for data in buffers[1..buffers.len()].iter() {
  448. Self::_send_buffer_using_conn(data.as_ref(), &connection).await?;
  449. }
  450. Ok(())
  451. }
  452. pub fn server_addr(&self) -> &SocketAddr {
  453. &self.addr
  454. }
  455. pub fn stats(&self) -> Arc<ClientStats> {
  456. self.stats.clone()
  457. }
  458. }
  459. pub struct QuicClientConnection {
  460. pub client: Arc<QuicClient>,
  461. pub connection_stats: Arc<ConnectionCacheStats>,
  462. }
  463. impl QuicClientConnection {
  464. pub fn base_stats(&self) -> Arc<ClientStats> {
  465. self.client.stats()
  466. }
  467. pub fn connection_stats(&self) -> Arc<ConnectionCacheStats> {
  468. self.connection_stats.clone()
  469. }
  470. pub fn new(
  471. endpoint: Arc<QuicLazyInitializedEndpoint>,
  472. addr: SocketAddr,
  473. connection_stats: Arc<ConnectionCacheStats>,
  474. ) -> Self {
  475. let client = Arc::new(QuicClient::new(endpoint, addr));
  476. Self::new_with_client(client, connection_stats)
  477. }
  478. pub fn new_with_client(
  479. client: Arc<QuicClient>,
  480. connection_stats: Arc<ConnectionCacheStats>,
  481. ) -> Self {
  482. Self {
  483. client,
  484. connection_stats,
  485. }
  486. }
  487. }
  488. #[async_trait]
  489. impl ClientConnection for QuicClientConnection {
  490. fn server_addr(&self) -> &SocketAddr {
  491. self.client.server_addr()
  492. }
  493. async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
  494. let stats = ClientStats::default();
  495. let len = buffers.len();
  496. let res = self
  497. .client
  498. .send_batch(buffers, &stats, self.connection_stats.clone())
  499. .await;
  500. self.connection_stats
  501. .add_client_stats(&stats, len, res.is_ok());
  502. res?;
  503. Ok(())
  504. }
  505. async fn send_data(&self, data: &[u8]) -> TransportResult<()> {
  506. let stats = Arc::new(ClientStats::default());
  507. // When data is empty which is from cache warmer, we are not sending packets actually, do not count it in
  508. let num_packets = if data.is_empty() { 0 } else { 1 };
  509. self.client
  510. .send_buffer(data, &stats, self.connection_stats.clone())
  511. .map_ok(|v| {
  512. self.connection_stats
  513. .add_client_stats(&stats, num_packets, true);
  514. v
  515. })
  516. .map_err(|e| {
  517. warn!(
  518. "Failed to send data async to {}, error: {:?} ",
  519. self.server_addr(),
  520. e
  521. );
  522. datapoint_warn!("send-wire-async", ("failure", 1, i64),);
  523. self.connection_stats
  524. .add_client_stats(&stats, num_packets, false);
  525. e.into()
  526. })
  527. .await
  528. }
  529. }