Przeglądaj źródła

remove connection-cache for TPU txs from validator (#9099)

Since UDP is no longer needed for TPU, we are free to remove also ConnectionCache support for TPU from validator.
kirill lykov 1 dzień temu
rodzic
commit
6a8374ca50

+ 1 - 17
bench-tps/src/cli.rs

@@ -10,7 +10,7 @@ use {
     solana_keypair::{read_keypair_file, Keypair},
     solana_pubkey::Pubkey,
     solana_streamer::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
-    solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
+    solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
     std::{
         net::{IpAddr, Ipv4Addr},
         time::Duration,
@@ -65,7 +65,6 @@ pub struct Config {
     pub num_lamports_per_account: u64,
     pub target_slots_per_epoch: u64,
     pub external_client_type: ExternalClientType,
-    pub use_quic: bool,
     pub tpu_connection_pool_size: usize,
     pub tpu_max_connections_per_ipaddr_per_minute: u64,
     pub compute_unit_price: Option<ComputeUnitPrice>,
@@ -101,7 +100,6 @@ impl Default for Config {
             num_lamports_per_account: NUM_LAMPORTS_PER_ACCOUNT_DEFAULT,
             target_slots_per_epoch: 0,
             external_client_type: ExternalClientType::default(),
-            use_quic: DEFAULT_TPU_USE_QUIC,
             tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
             tpu_max_connections_per_ipaddr_per_minute:
                 DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
@@ -282,15 +280,6 @@ pub fn build_args<'a>(version: &'_ str) -> App<'a, '_> {
                 .takes_value(false)
                 .help("Submit transactions with a TpuClient"),
         )
-        .arg(
-            Arg::with_name("tpu_disable_quic")
-                .long("tpu-disable-quic")
-                .takes_value(false)
-                .help(
-                    "DEPRECATED: Do not submit transactions via QUIC; only affects TpuClient \
-                     (default) sends",
-                ),
-        )
         .arg(
             Arg::with_name("tpu_connection_pool_size")
                 .long("tpu-connection-pool-size")
@@ -448,11 +437,6 @@ pub fn parse_args(matches: &ArgMatches) -> Result<Config, &'static str> {
         args.external_client_type = ExternalClientType::RpcClient;
     }
 
-    if matches.is_present("tpu_disable_quic") {
-        eprintln!("Warning: TPU over UDP is deprecated");
-        args.use_quic = false;
-    }
-
     if let Some(v) = matches.value_of("tpu_connection_pool_size") {
         args.tpu_connection_pool_size = v
             .to_string()

+ 0 - 9
bench-tps/src/main.rs

@@ -76,17 +76,10 @@ fn find_node_activated_stake(
 fn create_connection_cache(
     json_rpc_url: &str,
     tpu_connection_pool_size: usize,
-    use_quic: bool,
     bind_address: IpAddr,
     client_node_id: Option<&Keypair>,
     commitment_config: CommitmentConfig,
 ) -> ConnectionCache {
-    if !use_quic {
-        return ConnectionCache::with_udp(
-            "bench-tps-connection_cache_udp",
-            tpu_connection_pool_size,
-        );
-    }
     if client_node_id.is_none() {
         return ConnectionCache::new_quic(
             "bench-tps-connection_cache_quic",
@@ -193,7 +186,6 @@ fn main() {
         target_lamports_per_signature,
         num_lamports_per_account,
         external_client_type,
-        use_quic,
         tpu_connection_pool_size,
         skip_tx_account_data_size,
         compute_unit_price,
@@ -240,7 +232,6 @@ fn main() {
     let connection_cache = create_connection_cache(
         json_rpc_url,
         *tpu_connection_pool_size,
-        *use_quic,
         *bind_address,
         client_node_id.as_ref(),
         *commitment_config,

+ 1 - 3
core/src/tvu.rs

@@ -160,7 +160,6 @@ impl Tvu {
         wait_to_vote_slot: Option<Slot>,
         snapshot_controller: Option<Arc<SnapshotController>>,
         log_messages_bytes_limit: Option<usize>,
-        connection_cache: Option<&Arc<ConnectionCache>>,
         prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
         banking_tracer: Arc<BankingTracer>,
         turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
@@ -361,7 +360,7 @@ impl Tvu {
         );
 
         let warm_quic_cache_service = create_cache_warmer_if_needed(
-            connection_cache,
+            None,
             vote_connection_cache,
             cluster_info,
             poh_recorder,
@@ -603,7 +602,6 @@ pub mod tests {
             None,
             None, // snapshot_controller
             None,
-            Some(&Arc::new(ConnectionCache::new("connection_cache_test"))),
             &ignored_prioritization_fee_cache,
             BankingTracer::new_disabled(),
             turbine_quic_endpoint_sender,

+ 16 - 76
core/src/validator.rs

@@ -140,9 +140,7 @@ use {
         streamer::StakedNodes,
     },
     solana_time_utils::timestamp,
-    solana_tpu_client::tpu_client::{
-        DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
-    },
+    solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
     solana_turbine::{
         self,
         broadcast_stage::BroadcastStageType,
@@ -381,7 +379,6 @@ pub struct ValidatorConfig {
     pub replay_transactions_threads: NonZeroUsize,
     pub tvu_shred_sigverify_threads: NonZeroUsize,
     pub delay_leader_block_for_pending_fork: bool,
-    pub use_tpu_client_next: bool,
     pub retransmit_xdp: Option<XdpConfig>,
     pub repair_handler_type: RepairHandlerType,
 }
@@ -465,7 +462,6 @@ impl ValidatorConfig {
             tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
                 .expect("thread count is non-zero"),
             delay_leader_block_for_pending_fork: false,
-            use_tpu_client_next: true,
             retransmit_xdp: None,
             repair_handler_type: RepairHandlerType::default(),
         }
@@ -557,8 +553,6 @@ struct TransactionHistoryServices {
 
 /// A struct easing passing Validator TPU Configurations
 pub struct ValidatorTpuConfig {
-    /// Controls if to use QUIC for sending regular TPU transaction
-    pub use_quic: bool,
     /// Controls if to use QUIC for sending TPU votes
     pub vote_use_quic: bool,
     /// Controls the connection cache pool size
@@ -606,7 +600,6 @@ impl ValidatorTpuConfig {
         };
 
         ValidatorTpuConfig {
-            use_quic: DEFAULT_TPU_USE_QUIC,
             vote_use_quic: DEFAULT_VOTE_USE_QUIC,
             tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
             tpu_enable_udp,
@@ -685,7 +678,6 @@ impl Validator {
         info!("debug-assertion status: {DEBUG_ASSERTION_STATUS}");
 
         let ValidatorTpuConfig {
-            use_quic,
             vote_use_quic,
             tpu_connection_pool_size,
             tpu_enable_udp,
@@ -1130,38 +1122,6 @@ impl Validator {
 
         let mut tpu_transactions_forwards_client_sockets =
             Some(node.sockets.tpu_transaction_forwarding_clients);
-        let connection_cache = match (config.use_tpu_client_next, use_quic) {
-            (false, true) => Some(Arc::new(ConnectionCache::new_with_client_options(
-                "connection_cache_tpu_quic",
-                tpu_connection_pool_size,
-                Some({
-                    // this conversion is not beautiful but rust does not allow popping single
-                    // elements from a boxed slice
-                    let socketbox: Box<[_; 1]> = tpu_transactions_forwards_client_sockets
-                        .take()
-                        .unwrap()
-                        .try_into()
-                        .expect("Multihoming support for connection cache is not available");
-                    let [sock] = *socketbox;
-                    sock
-                }),
-                Some((
-                    &identity_keypair,
-                    node.info
-                        .tpu(Protocol::UDP)
-                        .ok_or_else(|| {
-                            ValidatorError::Other(String::from("Invalid UDP address for TPU"))
-                        })?
-                        .ip(),
-                )),
-                Some((&staked_nodes, &identity_keypair.pubkey())),
-            ))),
-            (false, false) => Some(Arc::new(ConnectionCache::with_udp(
-                "connection_cache_tpu_udp",
-                tpu_connection_pool_size,
-            ))),
-            (true, _) => None,
-        };
 
         let vote_connection_cache = if vote_use_quic {
             let vote_connection_cache = ConnectionCache::new_with_client_options(
@@ -1193,15 +1153,14 @@ impl Validator {
         // always need a tokio runtime (and the respective handle) to initialize
         // the turbine QUIC endpoint.
         let current_runtime_handle = tokio::runtime::Handle::try_current();
-        let tpu_client_next_runtime =
-            (current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
-                tokio::runtime::Builder::new_multi_thread()
-                    .enable_all()
-                    .worker_threads(2)
-                    .thread_name("solTpuClientRt")
-                    .build()
-                    .unwrap()
-            });
+        let tpu_client_next_runtime = current_runtime_handle.is_err().then(|| {
+            tokio::runtime::Builder::new_multi_thread()
+                .enable_all()
+                .worker_threads(2)
+                .thread_name("solTpuClientRt")
+                .build()
+                .unwrap()
+        });
 
         let rpc_override_health_check =
             Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
@@ -1228,7 +1187,7 @@ impl Validator {
                 None
             };
 
-            let client_option = if config.use_tpu_client_next {
+            let client_option = {
                 let runtime_handle = tpu_client_next_runtime
                     .as_ref()
                     .map(TokioRuntime::handle)
@@ -1239,11 +1198,6 @@ impl Validator {
                     runtime_handle.clone(),
                     cancel.clone(),
                 )
-            } else {
-                let Some(connection_cache) = &connection_cache else {
-                    panic!("ConnectionCache should exist by construction.");
-                };
-                ClientOption::ConnectionCache(connection_cache.clone())
             };
             let rpc_svc_config = JsonRpcServiceConfig {
                 rpc_addr,
@@ -1565,13 +1519,6 @@ impl Validator {
             )
         });
 
-        // If RPC is supported and ConnectionCache is used, pass ConnectionCache for being warmup inside Tvu.
-        let connection_cache_for_warmup =
-            if json_rpc_service.is_some() && connection_cache.is_some() {
-                connection_cache.as_ref()
-            } else {
-                None
-            };
         let (xdp_retransmitter, xdp_sender) =
             if let Some(xdp_config) = config.retransmit_xdp.clone() {
                 let src_port = node.sockets.retransmit_sockets[0]
@@ -1651,7 +1598,6 @@ impl Validator {
             config.wait_to_vote_slot,
             Some(snapshot_controller.clone()),
             config.runtime_config.log_messages_bytes_limit,
-            connection_cache_for_warmup,
             &prioritization_fee_cache,
             banking_tracer.clone(),
             turbine_quic_endpoint_sender.clone(),
@@ -1689,9 +1635,7 @@ impl Validator {
         }
 
         let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default()));
-        let forwarding_tpu_client = if let Some(connection_cache) = &connection_cache {
-            ForwardingClientOption::ConnectionCache(connection_cache.clone())
-        } else {
+        let forwarding_tpu_client = {
             let runtime_handle = tpu_client_next_runtime
                 .as_ref()
                 .map(TokioRuntime::handle)
@@ -1778,15 +1722,11 @@ impl Validator {
         );
 
         *start_progress.write().unwrap() = ValidatorStartProgress::Running;
-        if config.use_tpu_client_next {
-            if let Some(json_rpc_service) = &json_rpc_service {
-                key_notifiers.write().unwrap().add(
-                    KeyUpdaterType::RpcService,
-                    json_rpc_service.get_client_key_updater(),
-                );
-            }
-            // note, that we don't need to add ConnectionClient to key_notifiers
-            // because it is added inside Tpu.
+        if let Some(json_rpc_service) = &json_rpc_service {
+            key_notifiers.write().unwrap().add(
+                KeyUpdaterType::RpcService,
+                json_rpc_service.get_client_key_updater(),
+            );
         }
 
         *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {

+ 15 - 24
local-cluster/src/local_cluster.rs

@@ -48,7 +48,7 @@ use {
     solana_system_transaction as system_transaction,
     solana_tpu_client::tpu_client::{
         TpuClient, TpuClientConfig, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP,
-        DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
+        DEFAULT_VOTE_USE_QUIC,
     },
     solana_transaction::Transaction,
     solana_transaction_error::TransportError,
@@ -94,7 +94,6 @@ pub struct ClusterConfig {
     pub cluster_type: ClusterType,
     pub poh_config: PohConfig,
     pub additional_accounts: Vec<(Pubkey, AccountSharedData)>,
-    pub tpu_use_quic: bool,
     pub tpu_connection_pool_size: usize,
     pub vote_use_quic: bool,
 }
@@ -133,7 +132,6 @@ impl Default for ClusterConfig {
             poh_config: PohConfig::default(),
             skip_warmup_slots: false,
             additional_accounts: vec![],
-            tpu_use_quic: DEFAULT_TPU_USE_QUIC,
             tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
             vote_use_quic: DEFAULT_VOTE_USE_QUIC,
         }
@@ -153,7 +151,7 @@ pub struct LocalCluster {
     pub validators: HashMap<Pubkey, ClusterValidatorInfo>,
     pub genesis_config: GenesisConfig,
     pub connection_cache: Arc<ConnectionCache>,
-    quic_connection_cache_config: Option<QuicConnectionCacheConfig>,
+    quic_connection_cache_config: QuicConnectionCacheConfig,
     tpu_connection_pool_size: usize,
     shred_version: u16,
 }
@@ -195,7 +193,7 @@ impl LocalCluster {
     pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self {
         assert_eq!(config.validator_configs.len(), config.node_stakes.len());
 
-        let quic_connection_cache_config = config.tpu_use_quic.then(|| {
+        let quic_connection_cache_config = {
             let client_keypair = Keypair::new();
             let stake = DEFAULT_NODE_STAKE;
 
@@ -219,7 +217,7 @@ impl LocalCluster {
                 client_keypair,
                 staked_nodes,
             }
-        });
+        };
 
         let connection_cache = create_connection_cache(
             &quic_connection_cache_config,
@@ -1180,26 +1178,19 @@ impl LocalCluster {
 }
 
 fn create_connection_cache(
-    quic_connection_cache_config: &Option<QuicConnectionCacheConfig>,
+    config: &QuicConnectionCacheConfig,
     tpu_connection_pool_size: usize,
 ) -> Arc<ConnectionCache> {
-    if let Some(config) = quic_connection_cache_config {
-        Arc::new(ConnectionCache::new_with_client_options(
-            "connection_cache_local_cluster_quic_staked",
-            tpu_connection_pool_size,
-            Some(solana_net_utils::sockets::bind_to_localhost_unique().unwrap()),
-            Some((
-                &config.client_keypair,
-                IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
-            )),
-            Some((&config.staked_nodes, &config.client_keypair.pubkey())),
-        ))
-    } else {
-        Arc::new(ConnectionCache::with_udp(
-            "connection_cache_local_cluster_udp",
-            tpu_connection_pool_size,
-        ))
-    }
+    Arc::new(ConnectionCache::new_with_client_options(
+        "connection_cache_local_cluster_quic_staked",
+        tpu_connection_pool_size,
+        Some(solana_net_utils::sockets::bind_to_localhost_unique().unwrap()),
+        Some((
+            &config.client_keypair,
+            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
+        )),
+        Some((&config.staked_nodes, &config.client_keypair.pubkey())),
+    ))
 }
 
 impl Cluster for LocalCluster {

+ 0 - 1
local-cluster/src/validator_configs.rs

@@ -80,7 +80,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
         replay_transactions_threads: config.replay_transactions_threads,
         tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads,
         delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork,
-        use_tpu_client_next: config.use_tpu_client_next,
         retransmit_xdp: config.retransmit_xdp.clone(),
         repair_handler_type: config.repair_handler_type.clone(),
     }

+ 0 - 1
tpu-client/src/tpu_client.rs

@@ -26,7 +26,6 @@ use {
 };
 
 pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
-pub const DEFAULT_TPU_USE_QUIC: bool = true;
 pub const DEFAULT_VOTE_USE_QUIC: bool = false;
 
 /// The default connection count is set to 1 -- it should

+ 0 - 9
validator/src/commands/run/args.rs

@@ -1299,15 +1299,6 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a,
             .requires("retransmit_xdp_cpu_cores")
             .help("EXPERIMENTAL: Enable XDP zero copy. Requires hardware support"),
     )
-    .arg(
-        Arg::with_name("use_connection_cache")
-            .long("use-connection-cache")
-            .takes_value(false)
-            .help(
-                "Use connection-cache crate to send transactions over TPU ports. If not \
-                 set,tpu-client-next is used by default.",
-            ),
-    )
     .args(&pub_sub_config::args(/*test_validator:*/ false))
     .args(&json_rpc_config::args())
     .args(&rpc_bigtable_config::args())

+ 0 - 6
validator/src/commands/run/execute.rs

@@ -212,10 +212,6 @@ pub fn execute(
 
     if bind_addresses.len() > 1 {
         for (flag, msg) in [
-            (
-                "use_connection_cache",
-                "Connection cache can not be used in a multihoming context",
-            ),
             (
                 "advertised_ip",
                 "--advertised-ip cannot be used in a multihoming context. In multihoming, the \
@@ -587,7 +583,6 @@ pub fn execute(
         turbine_disabled: Arc::<AtomicBool>::default(),
         retransmit_xdp,
         broadcast_stage_type: BroadcastStageType::Standard,
-        use_tpu_client_next: !matches.is_present("use_connection_cache"),
         block_verification_method: value_t_or_exit!(
             matches,
             "block_verification_method",
@@ -998,7 +993,6 @@ pub fn execute(
         start_progress,
         run_args.socket_addr_space,
         ValidatorTpuConfig {
-            use_quic: true,
             vote_use_quic,
             tpu_connection_pool_size,
             tpu_enable_udp,