Browse Source

tpu-client-next: make some functions and structs public (#6954)

This PR fixes the problem that a user cannot create a custom scheduler implementation using provided components (Worker, WorkerCache, etc) because part of the functionality is private. To fix this, we mark them public with `agave-unstable-api` feature.
kirill lykov 2 months ago
parent
commit
ad09804432

+ 1 - 0
Cargo.lock

@@ -11339,6 +11339,7 @@ dependencies = [
  "futures 0.3.31",
  "futures 0.3.31",
  "log",
  "log",
  "lru",
  "lru",
+ "qualifier_attr",
  "quinn",
  "quinn",
  "rustls 0.23.31",
  "rustls 0.23.31",
  "solana-cli-config",
  "solana-cli-config",

+ 3 - 1
tpu-client-next/Cargo.toml

@@ -12,15 +12,17 @@ edition = { workspace = true }
 targets = ["x86_64-unknown-linux-gnu"]
 targets = ["x86_64-unknown-linux-gnu"]
 
 
 [features]
 [features]
+agave-unstable-api = ["dep:qualifier_attr"]
 default = ["log"]
 default = ["log"]
 log = ["dep:log"]
 log = ["dep:log"]
-tracing = ["dep:tracing"]
 metrics = ["dep:solana-metrics"]
 metrics = ["dep:solana-metrics"]
+tracing = ["dep:tracing"]
 
 
 [dependencies]
 [dependencies]
 async-trait = { workspace = true }
 async-trait = { workspace = true }
 log = { workspace = true, optional = true }
 log = { workspace = true, optional = true }
 lru = { workspace = true }
 lru = { workspace = true }
+qualifier_attr = { workspace = true, optional = true }
 quinn = { workspace = true }
 quinn = { workspace = true }
 rustls = { workspace = true }
 rustls = { workspace = true }
 solana-clock = { workspace = true }
 solana-clock = { workspace = true }

+ 6 - 1
tpu-client-next/src/connection_workers_scheduler.rs

@@ -1,6 +1,8 @@
 //! This module defines [`ConnectionWorkersScheduler`] which sends transactions
 //! This module defines [`ConnectionWorkersScheduler`] which sends transactions
 //! to the upcoming leaders.
 //! to the upcoming leaders.
 
 
+#[cfg(feature = "agave-unstable-api")]
+use qualifier_attr::qualifiers;
 use {
 use {
     super::leader_updater::LeaderUpdater,
     super::leader_updater::LeaderUpdater,
     crate::{
     crate::{
@@ -62,6 +64,7 @@ pub enum ConnectionWorkersSchedulerError {
 /// The idea of having a separate `connect` parameter is to create a set of
 /// The idea of having a separate `connect` parameter is to create a set of
 /// nodes to connect to in advance in order to hide the latency of opening new
 /// nodes to connect to in advance in order to hide the latency of opening new
 /// connection. Hence, `connect` must be greater or equal to `send`
 /// connection. Hence, `connect` must be greater or equal to `send`
+#[derive(Debug, Clone)]
 pub struct Fanout {
 pub struct Fanout {
     /// The number of leaders to target for sending transactions.
     /// The number of leaders to target for sending transactions.
     pub send: usize,
     pub send: usize,
@@ -305,6 +308,7 @@ impl ConnectionWorkersScheduler {
 }
 }
 
 
 /// Sets up the QUIC endpoint for the scheduler to handle connections.
 /// Sets up the QUIC endpoint for the scheduler to handle connections.
+#[cfg_attr(feature = "agave-unstable-api", qualifiers(pub))]
 fn setup_endpoint(
 fn setup_endpoint(
     bind: BindTarget,
     bind: BindTarget,
     stake_identity: Option<StakeIdentity>,
     stake_identity: Option<StakeIdentity>,
@@ -367,7 +371,8 @@ impl WorkersBroadcaster for NonblockingBroadcaster {
 ///
 ///
 /// This function selects up to `send_fanout` addresses from the `leaders` list, ensuring that
 /// This function selects up to `send_fanout` addresses from the `leaders` list, ensuring that
 /// only unique addresses are included while maintaining their original order.
 /// only unique addresses are included while maintaining their original order.
-fn extract_send_leaders(leaders: &[SocketAddr], send_fanout: usize) -> Vec<SocketAddr> {
+#[cfg_attr(feature = "agave-unstable-api", qualifiers(pub))]
+pub fn extract_send_leaders(leaders: &[SocketAddr], send_fanout: usize) -> Vec<SocketAddr> {
     let send_count = send_fanout.min(leaders.len());
     let send_count = send_fanout.min(leaders.len());
     remove_duplicates(&leaders[..send_count])
     remove_duplicates(&leaders[..send_count])
 }
 }

+ 6 - 0
tpu-client-next/src/workers_cache.rs

@@ -2,6 +2,8 @@
 //! structures provide mechanisms for caching workers, sending transaction
 //! structures provide mechanisms for caching workers, sending transaction
 //! batches, and gathering send transaction statistics.
 //! batches, and gathering send transaction statistics.
 
 
+#[cfg(feature = "agave-unstable-api")]
+use qualifier_attr::qualifiers;
 use {
 use {
     crate::{
     crate::{
         connection_worker::ConnectionWorker, logging::debug, transaction_batch::TransactionBatch,
         connection_worker::ConnectionWorker, logging::debug, transaction_batch::TransactionBatch,
@@ -71,6 +73,7 @@ impl WorkerInfo {
 }
 }
 
 
 /// Spawns a worker to handle communication with a given peer.
 /// Spawns a worker to handle communication with a given peer.
+#[cfg_attr(feature = "agave-unstable-api", qualifiers(pub))]
 pub(crate) fn spawn_worker(
 pub(crate) fn spawn_worker(
     endpoint: &Endpoint,
     endpoint: &Endpoint,
     peer: &SocketAddr,
     peer: &SocketAddr,
@@ -127,6 +130,7 @@ pub enum WorkersCacheError {
 }
 }
 
 
 impl WorkersCache {
 impl WorkersCache {
+    #[cfg_attr(feature = "agave-unstable-api", qualifiers(pub))]
     pub(crate) fn new(capacity: usize, cancel: CancellationToken) -> Self {
     pub(crate) fn new(capacity: usize, cancel: CancellationToken) -> Self {
         Self {
         Self {
             workers: LruCache::new(capacity),
             workers: LruCache::new(capacity),
@@ -140,6 +144,7 @@ impl WorkersCache {
         self.workers.contains(peer)
         self.workers.contains(peer)
     }
     }
 
 
+    #[cfg_attr(feature = "agave-unstable-api", qualifiers(pub))]
     pub(crate) fn push(
     pub(crate) fn push(
         &mut self,
         &mut self,
         leader: SocketAddr,
         leader: SocketAddr,
@@ -266,6 +271,7 @@ impl WorkersCache {
     ///
     ///
     /// The method awaits the completion of all shutdown tasks, ensuring that
     /// The method awaits the completion of all shutdown tasks, ensuring that
     /// each worker is properly terminated.
     /// each worker is properly terminated.
+    #[cfg_attr(feature = "agave-unstable-api", qualifiers(pub))]
     pub(crate) async fn shutdown(&mut self) {
     pub(crate) async fn shutdown(&mut self) {
         // Interrupt any outstanding `send_transactions()` calls.
         // Interrupt any outstanding `send_transactions()` calls.
         self.cancel.cancel();
         self.cancel.cancel();