Browse Source

use dedicated runtime for tpu_client_next (#6021)

Use dedicated runtime for tpu_client_next instead of relying on RPC runtime
kirill lykov 6 tháng trước cách đây
mục cha
commit
3452b825d1
2 tập tin đã thay đổi với 39 bổ sung18 xóa
  1. 35 14
      core/src/validator.rs
  2. 4 4
      rpc/src/rpc_service.rs

+ 35 - 14
core/src/validator.rs

@@ -589,6 +589,10 @@ pub struct Validator {
     repair_quic_endpoints: Option<[Endpoint; 3]>,
     repair_quic_endpoints_runtime: Option<TokioRuntime>,
     repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
+    // This runtime is used to run the client owned by SendTransactionService.
+    // We don't wait for its JoinHandle here because ownership and shutdown
+    // are managed elsewhere. This variable is intentionally unused.
+    _tpu_client_next_runtime: Option<TokioRuntime>,
 }
 
 impl Validator {
@@ -1139,6 +1143,22 @@ impl Validator {
             ))
         };
 
+        // test-validator crate may start the validator in a tokio runtime
+        // context which forces us to use the same runtime because a nested
+        // runtime will cause panic at drop. Outside test-validator crate, we
+        // always need a tokio runtime (and the respective handle) to initialize
+        // the turbine QUIC endpoint.
+        let current_runtime_handle = tokio::runtime::Handle::try_current();
+        let tpu_client_next_runtime =
+            (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
+                tokio::runtime::Builder::new_multi_thread()
+                    .enable_all()
+                    .worker_threads(2)
+                    .thread_name("solTpuClientRt")
+                    .build()
+                    .unwrap()
+            });
+
         let rpc_override_health_check =
             Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
         let (
@@ -1163,6 +1183,19 @@ impl Validator {
                 None
             };
 
+            let client_option = if config.use_tpu_client_next {
+                let runtime_handle = tpu_client_next_runtime
+                    .as_ref()
+                    .map(TokioRuntime::handle)
+                    .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
+                ClientOption::TpuClientNext(
+                    Arc::as_ref(&identity_keypair),
+                    node.sockets.rpc_sts_client,
+                    runtime_handle.clone(),
+                )
+            } else {
+                ClientOption::ConnectionCache(connection_cache.clone())
+            };
             let rpc_svc_config = JsonRpcServiceConfig {
                 rpc_addr,
                 rpc_config: config.rpc_config.clone(),
@@ -1185,14 +1218,7 @@ impl Validator {
                 max_complete_transaction_status_slot,
                 max_complete_rewards_slot,
                 prioritization_fee_cache: prioritization_fee_cache.clone(),
-                client_option: if config.use_tpu_client_next {
-                    ClientOption::TpuClientNext(
-                        Arc::as_ref(&identity_keypair),
-                        node.sockets.rpc_sts_client,
-                    )
-                } else {
-                    ClientOption::ConnectionCache(connection_cache.clone())
-                },
+                client_option,
             };
             let json_rpc_service =
                 JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
@@ -1363,12 +1389,6 @@ impl Validator {
             .as_ref()
             .map(|service| service.sender_cloned());
 
-        // test-validator crate may start the validator in a tokio runtime
-        // context which forces us to use the same runtime because a nested
-        // runtime will cause panic at drop.
-        // Outside test-validator crate, we always need a tokio runtime (and
-        // the respective handle) to initialize the turbine QUIC endpoint.
-        let current_runtime_handle = tokio::runtime::Handle::try_current();
         let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
             && genesis_config.cluster_type != ClusterType::MainnetBeta)
             .then(|| {
@@ -1686,6 +1706,7 @@ impl Validator {
             repair_quic_endpoints,
             repair_quic_endpoints_runtime,
             repair_quic_endpoints_join_handle,
+            _tpu_client_next_runtime: tpu_client_next_runtime,
         })
     }
 

+ 4 - 4
rpc/src/rpc_service.rs

@@ -52,7 +52,7 @@ use {
         },
         thread::{self, Builder, JoinHandle},
     },
-    tokio::runtime::{Builder as TokioBuilder, Runtime as TokioRuntime},
+    tokio::runtime::{Builder as TokioBuilder, Handle as RuntimeHandle, Runtime as TokioRuntime},
     tokio_util::codec::{BytesCodec, FramedRead},
 };
 
@@ -409,7 +409,7 @@ pub struct JsonRpcServiceConfig<'a> {
 ///   requires a reference to a [`Keypair`].
 pub enum ClientOption<'a> {
     ConnectionCache(Arc<ConnectionCache>),
-    TpuClientNext(&'a Keypair, UdpSocket),
+    TpuClientNext(&'a Keypair, UdpSocket, RuntimeHandle),
 }
 
 impl JsonRpcService {
@@ -466,7 +466,7 @@ impl JsonRpcService {
                 )?;
                 Ok(json_rpc_service)
             }
-            ClientOption::TpuClientNext(identity_keypair, tpu_client_socket) => {
+            ClientOption::TpuClientNext(identity_keypair, tpu_client_socket, client_runtime) => {
                 let my_tpu_address = config
                     .cluster_info
                     .my_contact_info()
@@ -476,7 +476,7 @@ impl JsonRpcService {
                         Protocol::QUIC
                     ))?;
                 let client = TpuClientNextClient::new(
-                    runtime.handle().clone(),
+                    client_runtime,
                     my_tpu_address,
                     config.send_transaction_service_config.tpu_peers.clone(),
                     leader_info,