Explorar o código

tpu-client-next: add NodeAddressService to track leader updates on client side (#7533)

This PR introduces a new services `LeaderTpuCacheService`,
`SlotUpdateService`, and  `NodeAddressService` (wraps first
two).

These structures are deeply refactored structures from tpu-client used
for leader estimation earlier.

These changes solve the following problems:

1. Separates service that updates current slot from the service that
   updates nodes info. This allows later to use different slot info
   providers (grpc, rpc, etc) instead of default WebSocket
   implementation.
2. Simplifies code by using tokio properly.
3. Reducing number of rpc calls.
4. Opens path to remove dependency of tpu-client-next from
   tpu-client/connection-cache. In this PR I can only mark the old code
   to be deprecated, but removing it would break API (because function
   `created_leader_updater` is public).
5. This PR modifies the way we estimate current leaders: If the current
   slot is the last in epoch, consider both current and next slot
   leaders if they are different even if fanout=1.
kirill lykov hai 1 semana
pai
achega
bf768b7c1d

+ 6 - 0
Cargo.lock

@@ -11364,21 +11364,26 @@ dependencies = [
  "async-trait",
  "crossbeam-channel",
  "futures 0.3.31",
+ "futures-util",
  "log",
  "lru",
  "quinn",
  "rustls 0.23.34",
+ "serde_json",
  "solana-cli-config",
  "solana-clock",
  "solana-commitment-config",
  "solana-connection-cache",
+ "solana-epoch-schedule",
  "solana-keypair",
  "solana-measure",
  "solana-metrics",
  "solana-net-utils",
  "solana-pubkey",
+ "solana-pubsub-client",
  "solana-quic-definitions",
  "solana-rpc-client",
+ "solana-rpc-client-api",
  "solana-signer",
  "solana-streamer",
  "solana-time-utils",
@@ -11387,6 +11392,7 @@ dependencies = [
  "solana-tpu-client-next",
  "thiserror 2.0.17",
  "tokio",
+ "tokio-stream",
  "tokio-util 0.7.17",
  "tracing",
 ]

+ 6 - 0
dev-bins/Cargo.lock

@@ -9393,17 +9393,23 @@ name = "solana-tpu-client-next"
 version = "4.0.0-alpha.0"
 dependencies = [
  "async-trait",
+ "futures 0.3.31",
+ "futures-util",
  "log",
  "lru",
  "quinn",
  "rustls 0.23.34",
  "solana-clock",
+ "solana-commitment-config",
  "solana-connection-cache",
+ "solana-epoch-schedule",
  "solana-keypair",
  "solana-measure",
  "solana-metrics",
+ "solana-pubkey",
  "solana-quic-definitions",
  "solana-rpc-client",
+ "solana-rpc-client-api",
  "solana-streamer",
  "solana-time-utils",
  "solana-tls-utils",

+ 6 - 0
programs/sbf/Cargo.lock

@@ -9916,17 +9916,23 @@ name = "solana-tpu-client-next"
 version = "4.0.0-alpha.0"
 dependencies = [
  "async-trait",
+ "futures 0.3.31",
+ "futures-util",
  "log",
  "lru",
  "quinn",
  "rustls 0.23.34",
  "solana-clock",
+ "solana-commitment-config",
  "solana-connection-cache",
+ "solana-epoch-schedule",
  "solana-keypair",
  "solana-measure",
  "solana-metrics",
+ "solana-pubkey",
  "solana-quic-definitions",
  "solana-rpc-client",
+ "solana-rpc-client-api",
  "solana-streamer",
  "solana-time-utils",
  "solana-tls-utils",

+ 11 - 1
tpu-client-next/Cargo.toml

@@ -17,36 +17,46 @@ agave-unstable-api = []
 log = ["dep:log"]
 metrics = ["dep:solana-metrics"]
 tracing = ["dep:tracing"]
+websocket-node-address-service = ["dep:solana-pubsub-client", "dep:tokio-stream"]
 
 [dependencies]
 async-trait = { workspace = true }
+futures = { workspace = true }
+futures-util = { workspace = true }
 log = { workspace = true, optional = true }
 lru = { workspace = true }
 quinn = { workspace = true }
 rustls = { workspace = true }
 solana-clock = { workspace = true }
+solana-commitment-config = { workspace = true }
 solana-connection-cache = { workspace = true }
+solana-epoch-schedule = { workspace = true }
 solana-keypair = { workspace = true }
 solana-measure = { workspace = true }
 solana-metrics = { workspace = true, optional = true }
+solana-pubkey = { workspace = true }
+solana-pubsub-client = { workspace = true, optional = true }
 solana-quic-definitions = { workspace = true }
 solana-rpc-client = { workspace = true }
+solana-rpc-client-api = { workspace = true }
 solana-streamer = { workspace = true }
 solana-time-utils = { workspace = true }
 solana-tls-utils = { workspace = true }
 solana-tpu-client = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
+tokio-stream = { workspace = true, optional = true }
 tokio-util = { workspace = true }
 tracing = { workspace = true, optional = true }
 
 [dev-dependencies]
 crossbeam-channel = { workspace = true }
 futures = { workspace = true }
+serde_json = { workspace = true }
 solana-cli-config = { workspace = true }
 solana-commitment-config = { workspace = true }
 solana-net-utils = { workspace = true }
 solana-pubkey = { workspace = true }
 solana-signer = { workspace = true }
 solana-streamer = { workspace = true, features = ["dev-context-only-utils"] }
-solana-tpu-client-next = { path = ".", features = ["agave-unstable-api"] }
+solana-tpu-client-next = { path = ".", features = ["agave-unstable-api", "websocket-node-address-service"] }

+ 5 - 1
tpu-client-next/src/leader_updater.rs

@@ -68,6 +68,10 @@ impl fmt::Debug for LeaderUpdaterError {
 /// always returns the provided address instead of checking leader schedule.
 /// Otherwise, it creates a `LeaderUpdaterService` which dynamically updates the
 /// leaders by connecting to the network via the [`LeaderTpuService`].
+#[deprecated(
+    since = "3.1.0",
+    note = "Use create_leader_updater_with_config instead."
+)]
 pub async fn create_leader_updater(
     rpc_client: Arc<RpcClient>,
     websocket_url: String,
@@ -118,7 +122,7 @@ impl LeaderUpdater for LeaderUpdaterService {
 /// `PinnedLeaderUpdater` is an implementation of [`LeaderUpdater`] that always
 /// returns a fixed, "pinned" leader address. It is mainly used for testing.
 struct PinnedLeaderUpdater {
-    address: Vec<SocketAddr>,
+    pub address: Vec<SocketAddr>,
 }
 
 #[async_trait]

+ 6 - 0
tpu-client-next/src/lib.rs

@@ -16,6 +16,8 @@
 //! - **`log`**: Enables logging using `log` crate. It is enabled by default.
 //! - **`tracing`**: Enables logging using `tracing` crate instead of `log`. This feature is
 //!   mutually exclusive with `log`.
+//! - **`websocket-node-address-service`**: Enables implementation of
+//!   `WebsocketNodeAddressService` that provides slot updates via WebSocket interface.
 
 pub(crate) mod connection_worker;
 pub mod connection_workers_scheduler;
@@ -35,3 +37,7 @@ pub mod metrics;
 
 // Logging abstraction module
 pub(crate) mod logging;
+
+pub mod node_address_service;
+#[cfg(feature = "websocket-node-address-service")]
+pub mod websocket_node_address_service;

+ 3 - 3
tpu-client-next/src/logging.rs

@@ -5,9 +5,9 @@
 //! The features are mutually exclusive - only one can be enabled at a time.
 
 #[cfg(feature = "log")]
-pub use log::{debug, error, trace, warn};
+pub use log::{debug, error, info, trace, warn};
 #[cfg(feature = "tracing")]
-pub use tracing::{debug, error, trace, warn};
+pub use tracing::{debug, error, info, trace, warn};
 
 #[cfg(not(any(feature = "log", feature = "tracing")))]
 compile_error!("Either 'log' or 'tracing' feature must be enabled");
@@ -25,7 +25,7 @@ mod tests {
         // and can be called without errors
         debug!("Test debug message");
         error!("Test error message");
-        trace!("Test trace message");
+        info!("Test info message");
         warn!("Test warn message");
     }
 }

+ 187 - 0
tpu-client-next/src/node_address_service.rs

@@ -0,0 +1,187 @@
+//! This module provides [`NodeAddressService`] structure that implements [`LeaderUpdater`] trait to
+//! track upcoming leaders and maintains an up-to-date mapping of leader id to TPU socket address.
+//!
+//! # Examples
+//!
+//! This example shows how to use [`NodeAddressService`] to implement [`LeaderUpdater`] using some
+//! custom slot update provider. Typically, it can be done with zero-cost abstraction as shown
+//! below. The case of `WebSocketNodeAddressService` requires, contrary, introducing task and
+//! channel due to specifics of the PubsubClient API implementation.
+//!
+//! For the sake of the example, let's assume we have some custom slot updates that we receive by
+//! UDP.
+//!
+//! ```ignore
+//!  use async_stream::stream;
+//!  use tokio::net::UdpSocket;
+//!
+//!  pub struct SlotUpdaterNodeAddressService {
+//!    service: NodeAddressService,
+//! }
+//!
+//! impl SlotUpdaterNodeAddressService {
+//!    pub async fn run(
+//!        rpc_client: Arc<RpcClient>,
+//!        bind_address: SocketAddr,
+//!        config: LeaderTpuCacheServiceConfig,
+//!        cancel: CancellationToken,
+//!    ) -> Result<Self, NodeAddressServiceError> {
+//!        let socket = UdpSocket::bind(bind_address)
+//!            .await
+//!            .map_err(|_e| NodeAddressServiceError::InitializationFailed)?;
+//!        let stream = Self::udp_slot_event_stream(socket);
+//!        let service = NodeAddressService::run(rpc_client, stream, config, cancel).await?;
+//!
+//!        Ok(Self { service })
+//!    }
+//!
+//!    fn udp_slot_event_stream(socket: UdpSocket) -> impl Stream<Item = SlotEvent> + Send + 'static {
+//!        stream! {
+//!            let mut buf = vec![0u8; 2048];
+//!
+//!            loop {
+//!                match socket.recv_from(&mut buf).await {
+//!                    Ok((len, from)) => {
+//!                        let data = &buf[..len];
+//!                        match serde_json::from_slice::<SlotMessage>(data) {
+//!                            Ok(msg) => {
+//!                                match msg.status {
+//!                                    SlotStatus::FirstShredReceived => yield SlotEvent::Start(msg.slot),
+//!                                    SlotStatus::Completed => yield SlotEvent::End(msg.slot),
+//!                                    _ => continue,
+//!                                };
+//!                            }
+//!                            Err(e) => error!("Failed to parse SlotMessage from {from}: {e}"),
+//!                        }
+//!                    }
+//!                    Err(e) => {
+//!                        error!("UDP receive failed: {e}");
+//!                        break;
+//!                    }
+//!                }
+//!            }
+//!        }
+//!    }
+//! }
+//! ```
+//!
+use {
+    crate::{
+        leader_updater::LeaderUpdater,
+        logging::error,
+        node_address_service::{
+            leader_tpu_cache_service::{Error as LeaderTpuCacheServiceError, LeaderUpdateReceiver},
+            slot_update_service::Error as SlotUpdateServiceError,
+        },
+    },
+    async_trait::async_trait,
+    futures::StreamExt,
+    solana_clock::Slot,
+    std::{net::SocketAddr, sync::Arc},
+    thiserror::Error,
+    tokio::join,
+    tokio_util::sync::CancellationToken,
+};
+
+pub mod leader_tpu_cache_service;
+pub mod recent_leader_slots;
+pub mod slot_event;
+pub mod slot_receiver;
+pub mod slot_update_service;
+pub use {
+    leader_tpu_cache_service::{
+        ClusterInfoProvider, Config as LeaderTpuCacheServiceConfig, LeaderTpuCacheService,
+    },
+    recent_leader_slots::RecentLeaderSlots,
+    slot_event::SlotEvent,
+    slot_receiver::SlotReceiver,
+    slot_update_service::SlotUpdateService,
+};
+
+/// [`NodeAddressService`] is a convenience wrapper for [`SlotUpdateService`] and
+/// [`LeaderTpuCacheService`] to track upcoming leaders and maintains an up-to-date mapping of
+/// leader id to TPU socket address.
+pub struct NodeAddressService {
+    leaders_receiver: LeaderUpdateReceiver,
+    slot_receiver: SlotReceiver,
+    slot_update_service: SlotUpdateService,
+    leader_cache_service: LeaderTpuCacheService,
+}
+
+impl NodeAddressService {
+    /// Run the [`NodeAddressService`].
+    ///
+    /// On success it starts [`SlotUpdateService`] together with [`LeaderTpuCacheService`] and
+    /// returns [`NodeAddressService`] instance which provides method to fetch next leaders. To run
+    /// mentioned services, it takes `cluster_info_provider` which abstracts access to information
+    /// about the cluster (see [`ClusterInfoProvider`]), `slot_update_stream` provides stream of
+    /// slot updates, `start_slot` is the initial slot to start from, `config` provides
+    /// configuration for the leader TPU cache service, and finally `cancel` is a cancellation token
+    /// to stop the service.
+    ///
+    /// On failure, it will return appropriate error.
+    pub async fn run(
+        cluster_info_provider: Arc<impl ClusterInfoProvider + 'static>,
+        slot_update_stream: impl StreamExt<Item = SlotEvent> + Send + 'static,
+        config: LeaderTpuCacheServiceConfig,
+        cancel: CancellationToken,
+    ) -> Result<Self, NodeAddressServiceError> {
+        let initial_slot = cluster_info_provider
+            .initial_slot()
+            .await
+            .map_err(NodeAddressServiceError::from)?;
+        let (slot_receiver, slot_update_service) =
+            SlotUpdateService::run(initial_slot, slot_update_stream, cancel.clone())?;
+        let (leaders_receiver, leader_cache_service) = LeaderTpuCacheService::run(
+            cluster_info_provider,
+            slot_receiver.clone(),
+            config,
+            cancel,
+        )
+        .await?;
+
+        Ok(Self {
+            leaders_receiver,
+            slot_receiver,
+            slot_update_service,
+            leader_cache_service,
+        })
+    }
+
+    pub async fn shutdown(&mut self) -> Result<(), NodeAddressServiceError> {
+        let (slot_update_service_res, leader_cache_service_res) = join!(
+            self.slot_update_service.shutdown(),
+            self.leader_cache_service.shutdown(),
+        );
+        slot_update_service_res?;
+        leader_cache_service_res?;
+        Ok(())
+    }
+
+    /// Returns the estimated current slot.
+    pub fn estimated_current_slot(&self) -> Slot {
+        self.slot_receiver.slot()
+    }
+}
+
+#[async_trait]
+impl LeaderUpdater for NodeAddressService {
+    fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr> {
+        self.leaders_receiver.leaders(lookahead_leaders)
+    }
+
+    async fn stop(&mut self) {
+        if let Err(e) = self.shutdown().await {
+            error!("Failed to shutdown NodeAddressService: {e}");
+        }
+    }
+}
+
+#[derive(Debug, Error)]
+pub enum NodeAddressServiceError {
+    #[error(transparent)]
+    SlotUpdateServiceError(#[from] SlotUpdateServiceError),
+
+    #[error(transparent)]
+    LeaderTpuCacheServiceError(#[from] LeaderTpuCacheServiceError),
+}

+ 548 - 0
tpu-client-next/src/node_address_service/leader_tpu_cache_service.rs

@@ -0,0 +1,548 @@
+//! This module provides [`LeaderTpuCacheService`] structure along with [`LeaderUpdateReceiver`],
+//! [`Config`]. [`LeaderTpuCacheService`] tracks the current and upcoming Solana leader nodes and
+//! their TPU socket addresses.
+#![allow(clippy::arithmetic_side_effects)]
+use {
+    crate::{
+        connection_workers_scheduler::extract_send_leaders,
+        logging::{debug, error, info, warn},
+        node_address_service::SlotReceiver,
+    },
+    async_trait::async_trait,
+    solana_clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
+    solana_commitment_config::CommitmentConfig,
+    solana_pubkey::Pubkey,
+    solana_quic_definitions::QUIC_PORT_OFFSET,
+    solana_rpc_client::nonblocking::rpc_client::RpcClient,
+    solana_rpc_client_api::{client_error::Error as ClientError, response::RpcContactInfo},
+    std::{
+        collections::HashMap, future::Future, net::SocketAddr, str::FromStr, sync::Arc,
+        time::Instant,
+    },
+    thiserror::Error,
+    tokio::{
+        sync::watch,
+        task::JoinHandle,
+        time::{interval, Duration},
+    },
+    tokio_util::sync::CancellationToken,
+};
+
+/// Maximum number of slots used to build TPU socket fanout set
+const MAX_FANOUT_SLOTS: u64 = 100;
+
+/// Configuration for the [`LeaderTpuCacheService`].
+#[derive(Debug, Clone)]
+pub struct Config {
+    /// max number of leaders to look ahead for, not necessary unique.
+    pub lookahead_leaders: u8,
+    /// how often to refresh cluster nodes info.
+    pub refresh_nodes_info_every: Duration,
+    /// maximum number of consecutive failures to tolerate.
+    pub max_consecutive_failures: usize,
+}
+
+impl Default for Config {
+    fn default() -> Self {
+        Self {
+            lookahead_leaders: 1,
+            refresh_nodes_info_every: Duration::from_secs(5 * 60),
+            max_consecutive_failures: 10,
+        }
+    }
+}
+
+/// [`LeaderTpuCacheService`] is a background task that tracks the current and upcoming Solana
+/// leader nodes and updates their TPU socket addresses encapsulated in [`LeaderUpdateReceiver`] for
+/// downstream consumers.
+pub struct LeaderTpuCacheService {
+    handle: Option<JoinHandle<Result<(), Error>>>,
+    cancel: CancellationToken,
+}
+
+/// Receiver for leader TPU socket address updates from
+/// [`LeaderTpuCacheService`].
+#[derive(Clone)]
+pub struct LeaderUpdateReceiver {
+    receiver: watch::Receiver<NodesTpuInfo>,
+}
+
+impl LeaderUpdateReceiver {
+    pub fn leaders(&self, lookahead_leaders: usize) -> Vec<SocketAddr> {
+        let NodesTpuInfo { leaders, extend } = self.receiver.borrow().clone();
+        let lookahead_leaders = if extend {
+            lookahead_leaders.saturating_add(1)
+        } else {
+            lookahead_leaders
+        };
+        extract_send_leaders(&leaders, lookahead_leaders)
+    }
+}
+
+/// [`NodesTpuInfo`] holds the TPU addresses of the nodes scheduled to be leaders for upcoming
+/// slots. The `extend` flag indicates whether the list of leaders was extended by one to account
+/// for the case when the current slot is the last slot in a leader's consecutive slots.
+#[derive(Clone)]
+struct NodesTpuInfo {
+    leaders: Vec<SocketAddr>,
+    extend: bool,
+}
+
+impl LeaderTpuCacheService {
+    /// Run the [`LeaderTpuCacheService`], returning receiver and the service.
+    pub async fn run(
+        cluster_info: Arc<impl ClusterInfoProvider + 'static>,
+        slot_receiver: SlotReceiver,
+        config: Config,
+        cancel: CancellationToken,
+    ) -> Result<(LeaderUpdateReceiver, Self), Error> {
+        let (leader_tpu_map, epoch_info, slot_leaders) = initialize_state(
+            cluster_info.as_ref(),
+            slot_receiver.clone(),
+            config.max_consecutive_failures,
+        )
+        .await?;
+        let current_slot = slot_receiver.slot();
+        let lookahead_leaders =
+            adjust_lookahead(current_slot, &slot_leaders, config.lookahead_leaders);
+        let leaders = leader_sockets(
+            current_slot,
+            lookahead_leaders,
+            &slot_leaders,
+            &leader_tpu_map,
+        );
+
+        let (leaders_sender, leaders_receiver) = watch::channel(NodesTpuInfo {
+            leaders,
+            extend: config.lookahead_leaders != lookahead_leaders,
+        });
+
+        let handle = tokio::spawn(Self::run_loop(
+            cluster_info,
+            slot_receiver,
+            epoch_info,
+            slot_leaders,
+            leader_tpu_map,
+            config,
+            leaders_sender,
+            cancel.clone(),
+        ));
+
+        Ok((
+            LeaderUpdateReceiver {
+                receiver: leaders_receiver,
+            },
+            Self {
+                handle: Some(handle),
+                cancel,
+            },
+        ))
+    }
+
+    /// Gracefully shutdown the [`LeaderTpuCacheService`].
+    pub async fn shutdown(&mut self) -> Result<(), Error> {
+        self.cancel.cancel();
+        if let Some(handle) = self.handle.take() {
+            handle.await??;
+        }
+        Ok(())
+    }
+
+    async fn run_loop(
+        cluster_info: Arc<impl ClusterInfoProvider + 'static>,
+        mut slot_receiver: SlotReceiver,
+        mut epoch_info: EpochInfo,
+        mut slot_leaders: SlotLeaders,
+        mut leader_tpu_map: LeaderTpuMap,
+        config: Config,
+        leaders_sender: watch::Sender<NodesTpuInfo>,
+        cancel: CancellationToken,
+    ) -> Result<(), Error> {
+        let mut num_consecutive_failures: usize = 0;
+        let mut refresh_tpu_interval = interval(config.refresh_nodes_info_every);
+        loop {
+            tokio::select! {
+                _ = refresh_tpu_interval.tick() => {
+                    try_update(
+                        "cluster TPU ports",
+                        &mut leader_tpu_map,
+                        || LeaderTpuMap::new(cluster_info.as_ref()),
+                        &mut num_consecutive_failures,
+                        config.max_consecutive_failures,
+                    ).await?;
+                    debug!("Updated cluster TPU ports");
+                }
+                res = slot_receiver.changed() => {
+                    debug!("Changed slot receiver");
+                    if let Err(e) = res {
+                        warn!("Slot receiver channel closed: {e}");
+                        break;
+                    }
+
+                    let estimated_current_slot = slot_receiver.slot();
+                    update_leader_info(
+                        estimated_current_slot,
+                        cluster_info.as_ref(),
+                        &mut epoch_info,
+                        &mut slot_leaders,
+                        &mut num_consecutive_failures,
+                        config.max_consecutive_failures,
+                    ).await?;
+                    let current_slot = slot_receiver.slot();
+                    let lookahead_leaders = adjust_lookahead(
+                        current_slot,
+                        &slot_leaders,
+                        config.lookahead_leaders,
+                    );
+                    let leaders = leader_sockets(current_slot, lookahead_leaders, &slot_leaders, &leader_tpu_map);
+
+                    if let Err(e) = leaders_sender.send(NodesTpuInfo { leaders, extend: config.lookahead_leaders != lookahead_leaders }) {
+                        warn!("Unexpectedly dropped leaders_sender: {e}");
+                        return Err(Error::ChannelClosed);
+                    }
+                }
+
+                _ = cancel.cancelled() => {
+                    info!("Cancel signal received, stopping LeaderTpuCacheService.");
+                    break;
+                }
+            }
+        }
+        Ok(())
+    }
+}
+
+#[derive(Debug, Error)]
+pub enum Error {
+    #[error(transparent)]
+    RpcError(#[from] ClientError),
+
+    #[error("Failed to get slot leaders connecting to: {0}")]
+    SlotLeadersConnectionFailed(String),
+
+    #[error("Failed find any cluster node info for upcoming leaders, timeout: {0}")]
+    ClusterNodeNotFound(String),
+
+    #[error(transparent)]
+    JoinError(#[from] tokio::task::JoinError),
+
+    #[error("Unexpectedly dropped a channel.")]
+    ChannelClosed,
+
+    #[error("Failed to initialize LeaderTpuCacheService.")]
+    InitializationFailed,
+}
+
+/// [`ClusterInfoProvider`] provides information about the cluster such as epoch info, node tpu
+/// addresses, and leader schedule. Beside of that it also provides the initial slot to start from
+/// which is called once during initialization. All this information is required by
+/// [`LeaderTpuCacheService`] to estimate the next leader.
+#[async_trait]
+pub trait ClusterInfoProvider: Send + Sync {
+    async fn initial_slot(&self) -> Result<Slot, Error>;
+    async fn tpu_socket_map(&self) -> Result<HashMap<Pubkey, SocketAddr>, Error>;
+    async fn epoch_info(&self, first_slot: Slot) -> Result<(Slot, Slot), Error>;
+    async fn slot_leaders(&self, first_slot: Slot, slots_limit: u64) -> Result<Vec<Pubkey>, Error>;
+}
+
+async fn update_leader_info(
+    estimated_current_slot: Slot,
+    cluster_info: &impl ClusterInfoProvider,
+    epoch_info: &mut EpochInfo,
+    slot_leaders: &mut SlotLeaders,
+    num_consecutive_failures: &mut usize,
+    max_consecutive_failures: usize,
+) -> Result<(), Error> {
+    if estimated_current_slot > epoch_info.last_slot_in_epoch {
+        try_update(
+            "epoch info",
+            epoch_info,
+            || EpochInfo::new(cluster_info, estimated_current_slot),
+            num_consecutive_failures,
+            max_consecutive_failures,
+        )
+        .await?;
+    }
+    if estimated_current_slot.saturating_add(MAX_FANOUT_SLOTS) > slot_leaders.last_slot() {
+        try_update(
+            "slot leaders",
+            slot_leaders,
+            || {
+                SlotLeaders::new(
+                    cluster_info,
+                    estimated_current_slot,
+                    epoch_info.slots_in_epoch,
+                )
+            },
+            num_consecutive_failures,
+            max_consecutive_failures,
+        )
+        .await?;
+    }
+    Ok(())
+}
+
+/// Get the TPU sockets for slots starting from `first_slot` and until `first_slot +
+/// lookahead_leaders * NUM_CONSECUTIVE_LEADER_SLOTS`.
+///
+/// If it returns an empty vector, it might mean that we overran the local leader schedule cache or,
+/// less probable, that there is no TPU info available for corresponding slot leaders.
+fn leader_sockets(
+    first_slot: Slot,
+    lookahead_leaders: u8,
+    slot_leaders: &SlotLeaders,
+    leader_tpu_map: &LeaderTpuMap,
+) -> Vec<SocketAddr> {
+    let fanout_slots = (lookahead_leaders as u64).saturating_mul(NUM_CONSECUTIVE_LEADER_SLOTS);
+    let mut leader_sockets = Vec::with_capacity(lookahead_leaders as usize);
+    // `slot_leaders.first_slot` might have been advanced since caller last read it. Take the
+    // greater of the two values to ensure we are reading from the latest leader schedule.
+    let current_slot = std::cmp::max(first_slot, slot_leaders.first_slot);
+    for leader_slot in
+        (current_slot..current_slot + fanout_slots).step_by(NUM_CONSECUTIVE_LEADER_SLOTS as usize)
+    {
+        if let Some(leader) = slot_leaders.slot_leader(leader_slot) {
+            if let Some(tpu_socket) = leader_tpu_map.get(leader) {
+                leader_sockets.push(*tpu_socket);
+                debug!("Pushed leader {leader} TPU socket: {tpu_socket}");
+            } else {
+                // The leader is probably delinquent
+                debug!("TPU not available for leader {leader}");
+            }
+        } else {
+            // Overran the local leader schedule cache
+            warn!(
+                "Leader not known for slot {}; cache holds slots [{},{}]",
+                leader_slot,
+                slot_leaders.first_slot,
+                slot_leaders.last_slot()
+            );
+        }
+    }
+
+    leader_sockets
+}
+
+async fn initialize_state(
+    cluster_info: &impl ClusterInfoProvider,
+    slot_receiver: SlotReceiver,
+    max_attempts: usize,
+) -> Result<(LeaderTpuMap, EpochInfo, SlotLeaders), Error> {
+    const ATTEMPTS_SLEEP_DURATION: Duration = Duration::from_millis(100);
+    let mut leader_tpu_map = None;
+    let mut epoch_info = None;
+    let mut slot_leaders = None;
+    let mut num_attempts: usize = 0;
+    while num_attempts < max_attempts {
+        let iteration_start = Instant::now();
+        if leader_tpu_map.is_none() {
+            leader_tpu_map = LeaderTpuMap::new(cluster_info).await.ok();
+        }
+        if epoch_info.is_none() {
+            epoch_info = EpochInfo::new(cluster_info, slot_receiver.slot())
+                .await
+                .ok();
+        }
+
+        if let Some(epoch_info) = &epoch_info {
+            if slot_leaders.is_none() {
+                slot_leaders = SlotLeaders::new(
+                    cluster_info,
+                    slot_receiver.slot(),
+                    epoch_info.slots_in_epoch,
+                )
+                .await
+                .ok();
+            }
+        }
+        if leader_tpu_map.is_some() && epoch_info.is_some() && slot_leaders.is_some() {
+            return Ok((
+                leader_tpu_map.take().unwrap(),
+                epoch_info.take().unwrap(),
+                slot_leaders.take().unwrap(),
+            ));
+        }
+        num_attempts += 1;
+
+        let elapsed = iteration_start.elapsed();
+        if elapsed < ATTEMPTS_SLEEP_DURATION {
+            tokio::time::sleep(ATTEMPTS_SLEEP_DURATION - elapsed).await;
+        }
+    }
+    Err(Error::InitializationFailed)
+}
+
+fn adjust_lookahead(slot: Slot, slot_leaders: &SlotLeaders, lookahead_leaders: u8) -> u8 {
+    if slot_leaders
+        .is_leader_last_consecutive_slot(slot)
+        .unwrap_or(true)
+    {
+        lookahead_leaders.saturating_add(1)
+    } else {
+        lookahead_leaders
+    }
+}
+
+async fn try_update<F, Fut, T>(
+    label: &str,
+    data: &mut T,
+    make_call: F,
+    num_failures: &mut usize,
+    max_failures: usize,
+) -> Result<(), Error>
+where
+    F: FnOnce() -> Fut,
+    Fut: Future<Output = Result<T, Error>>,
+{
+    match make_call().await {
+        Ok(result) => {
+            *num_failures = 0;
+            debug!("{label} updated successfully");
+            *data = result;
+            Ok(())
+        }
+        Err(e) => {
+            *num_failures = num_failures.saturating_add(1);
+            warn!("Failed to update {label}: {e} ({num_failures} consecutive failures)",);
+
+            if *num_failures >= max_failures {
+                error!("Max consecutive failures for {label}, giving up.");
+                Err(e)
+            } else {
+                Ok(())
+            }
+        }
+    }
+}
+
+#[derive(Debug)]
+struct LeaderTpuMap {
+    leader_tpu_map: HashMap<Pubkey, SocketAddr>,
+}
+
+impl LeaderTpuMap {
+    async fn new(cluster_info: &impl ClusterInfoProvider) -> Result<Self, Error> {
+        let leader_tpu_map = cluster_info.tpu_socket_map().await?;
+        Ok(Self { leader_tpu_map })
+    }
+
+    fn get(&self, leader: &Pubkey) -> Option<&SocketAddr> {
+        self.leader_tpu_map.get(leader)
+    }
+}
+
+/// Structure [`SlotLeaders`] provides a view on the leaders schedule starting from `first_slot`.
+#[derive(PartialEq, Debug)]
+struct SlotLeaders {
+    first_slot: Slot,
+    leaders: Vec<Pubkey>,
+}
+
+impl SlotLeaders {
+    /// Creates a new [`SlotLeaders`] instance by fetching slot leaders up to `slots_limit`.
+    ///
+    /// Note, that if it managed to fetch less slot leaders than requested, it will still succeed.
+    async fn new(
+        cluster_info: &impl ClusterInfoProvider,
+        first_slot: Slot,
+        slots_limit: u64,
+    ) -> Result<Self, Error> {
+        Ok(Self {
+            first_slot,
+            leaders: cluster_info.slot_leaders(first_slot, slots_limit).await?,
+        })
+    }
+
+    fn last_slot(&self) -> Slot {
+        self.first_slot + self.leaders.len().saturating_sub(1) as u64
+    }
+
+    fn slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
+        slot.checked_sub(self.first_slot)
+            .and_then(|index| self.leaders.get(index as usize))
+    }
+
+    /// Returns `Some(true)` if the given `slot` is the last slot in the leader consecutive slots.
+    fn is_leader_last_consecutive_slot(&self, slot: Slot) -> Option<bool> {
+        slot.checked_sub(self.first_slot).and_then(|index| {
+            let index = index as usize;
+            if index + 1 < self.leaders.len() {
+                Some(self.leaders[index] != self.leaders[index + 1])
+            } else {
+                None
+            }
+        })
+    }
+}
+
+#[derive(PartialEq, Debug)]
+struct EpochInfo {
+    slots_in_epoch: Slot,
+    last_slot_in_epoch: Slot,
+}
+
+impl EpochInfo {
+    async fn new(cluster_info: &impl ClusterInfoProvider, first_slot: Slot) -> Result<Self, Error> {
+        let (slots_in_epoch, last_slot_in_epoch) = cluster_info.epoch_info(first_slot).await?;
+        Ok(Self {
+            slots_in_epoch,
+            last_slot_in_epoch,
+        })
+    }
+}
+
+#[async_trait]
+impl ClusterInfoProvider for RpcClient {
+    async fn initial_slot(&self) -> Result<Slot, Error> {
+        self.get_slot_with_commitment(CommitmentConfig::processed())
+            .await
+            .map_err(Error::RpcError)
+    }
+
+    async fn tpu_socket_map(&self) -> Result<HashMap<Pubkey, SocketAddr>, Error> {
+        let cluster_nodes = self.get_cluster_nodes().await.map_err(Error::RpcError)?;
+        Ok(extract_cluster_tpu_sockets(cluster_nodes))
+    }
+
+    async fn epoch_info(&self, first_slot: Slot) -> Result<(Slot, Slot), Error> {
+        let epoch_schedule = self.get_epoch_schedule().await.map_err(Error::RpcError)?;
+        let epoch = epoch_schedule.get_epoch(first_slot);
+        let slots_in_epoch = epoch_schedule.get_slots_in_epoch(epoch);
+        let last_slot_in_epoch = epoch_schedule.get_last_slot_in_epoch(epoch);
+        debug!(
+            "Updated slots in epoch: {slots_in_epoch}, last slot in epoch: {last_slot_in_epoch}",
+        );
+        Ok((slots_in_epoch, last_slot_in_epoch))
+    }
+
+    /// Returns the slot leaders starting from `first_slot` until `first_slot + slots_limit`.
+    ///
+    /// Partial results may be returned if `slots_limit` exceeds the maximum number of slots.
+    async fn slot_leaders(&self, first_slot: Slot, slots_limit: u64) -> Result<Vec<Pubkey>, Error> {
+        // `2` is used to avoid refetching the leaders until the middle of the requested range.
+        let max_slots_to_fetch = (2 * MAX_FANOUT_SLOTS).min(slots_limit);
+        let slot_leaders = self.get_slot_leaders(first_slot, max_slots_to_fetch).await;
+        debug!("Fetched slot leaders from slot {first_slot} for {slots_limit}. ");
+        slot_leaders.map_err(Error::RpcError)
+    }
+}
+
+fn extract_cluster_tpu_sockets(
+    cluster_contact_info: Vec<RpcContactInfo>,
+) -> HashMap<Pubkey, SocketAddr> {
+    cluster_contact_info
+        .into_iter()
+        .filter_map(|contact_info| {
+            let pubkey = Pubkey::from_str(&contact_info.pubkey).ok()?;
+            let socket = {
+                contact_info.tpu_quic.or_else(|| {
+                    let mut socket = contact_info.tpu?;
+                    let port = socket.port().checked_add(QUIC_PORT_OFFSET)?;
+                    socket.set_port(port);
+                    Some(socket)
+                })
+            }?;
+            Some((pubkey, socket))
+        })
+        .collect()
+}

+ 136 - 0
tpu-client-next/src/node_address_service/recent_leader_slots.rs

@@ -0,0 +1,136 @@
+//! This module provides [`RecentLeaderSlots`] to track recent leader slots.
+use {crate::node_address_service::SlotEvent, solana_clock::Slot, std::collections::VecDeque};
+
+// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots
+const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
+
+const RECENT_LEADER_SLOTS_CAPACITY: usize = 48;
+
+#[derive(Debug)]
+pub struct RecentLeaderSlots(VecDeque<SlotEvent>);
+
+impl RecentLeaderSlots {
+    pub fn new() -> Self {
+        Self(VecDeque::with_capacity(RECENT_LEADER_SLOTS_CAPACITY))
+    }
+}
+
+impl Default for RecentLeaderSlots {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl RecentLeaderSlots {
+    pub fn record(&mut self, slot_event: SlotEvent) {
+        while self.0.len() > RECENT_LEADER_SLOTS_CAPACITY.saturating_sub(1) {
+            self.0.pop_front();
+        }
+        self.0.push_back(slot_event);
+    }
+
+    // Estimate the current slot from recent slot notifications.
+    #[allow(clippy::arithmetic_side_effects)]
+    pub fn estimate_current_slot(&self) -> Slot {
+        let mut recent_slots: Vec<SlotEvent> = self.0.iter().cloned().collect();
+        assert!(
+            !recent_slots.is_empty(),
+            "method must be called after at least one record."
+        );
+        recent_slots.sort_by(|a, b| {
+            a.slot()
+                .cmp(&b.slot())
+                .then_with(|| b.is_start().cmp(&a.is_start())) // true before false
+        });
+
+        // Validators can broadcast invalid blocks that are far in the future so check if the
+        // current slot is in line with the recent progression.
+        let max_index = recent_slots.len() - 1;
+        let median_index = max_index / 2;
+        let median_recent_slot = recent_slots[median_index].slot();
+        let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
+        let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
+
+        let idx = recent_slots
+            .iter()
+            .rposition(|e| e.slot() <= max_reasonable_current_slot)
+            .expect("no reasonable slot");
+
+        let slot_event = &recent_slots[idx];
+        if slot_event.is_start() {
+            slot_event.slot()
+        } else {
+            slot_event.slot().saturating_add(1)
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {super::*, solana_clock::Slot};
+
+    impl From<Vec<Slot>> for RecentLeaderSlots {
+        fn from(recent_slots: Vec<Slot>) -> Self {
+            use std::collections::VecDeque;
+            assert!(!recent_slots.is_empty());
+
+            let mut events = VecDeque::with_capacity(recent_slots.len());
+
+            for slot in recent_slots {
+                events.push_back(SlotEvent::Start(slot));
+                events.push_back(SlotEvent::End(slot));
+            }
+
+            Self(events)
+        }
+    }
+
+    #[test]
+    fn test_recent_leader_slots() {
+        let mut recent_slots: Vec<Slot> = (1..=12).collect();
+        assert_eq!(
+            RecentLeaderSlots::from(recent_slots.clone()).estimate_current_slot(),
+            13
+        );
+
+        recent_slots.reverse();
+        assert_eq!(
+            RecentLeaderSlots::from(recent_slots).estimate_current_slot(),
+            13
+        );
+
+        let mut recent_slots = RecentLeaderSlots::new();
+        recent_slots.record(SlotEvent::Start(13));
+        assert_eq!(recent_slots.estimate_current_slot(), 13);
+        recent_slots.record(SlotEvent::Start(14));
+        assert_eq!(recent_slots.estimate_current_slot(), 14);
+        recent_slots.record(SlotEvent::Start(15));
+        assert_eq!(recent_slots.estimate_current_slot(), 15);
+
+        assert_eq!(
+            RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]).estimate_current_slot(),
+            2 + MAX_SLOT_SKIP_DISTANCE,
+        );
+        assert_eq!(
+            RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]).estimate_current_slot(),
+            3 + MAX_SLOT_SKIP_DISTANCE,
+        );
+
+        assert_eq!(
+            RecentLeaderSlots::from(vec![1, 100]).estimate_current_slot(),
+            2
+        );
+        assert_eq!(
+            RecentLeaderSlots::from(vec![1, 2, 100]).estimate_current_slot(),
+            3
+        );
+        assert_eq!(
+            RecentLeaderSlots::from(vec![1, 2, 3, 100]).estimate_current_slot(),
+            4
+        );
+        assert_eq!(
+            RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]).estimate_current_slot(),
+            4
+        );
+    }
+}

+ 26 - 0
tpu-client-next/src/node_address_service/slot_event.rs

@@ -0,0 +1,26 @@
+//! This module provides [`SlotEvent`] enum that encapsulates slot start and end events.
+//!
+//! The implementation of the slot update provider defines semantics of these events. But typically,
+//! `SlotEvent::Start` means `FirstShredReceived`` while `SlotEvent::End` means `Completed``.
+use solana_clock::Slot;
+
+/// [`SlotEvent`] represents slot start and end events.
+#[derive(Debug, Clone)]
+pub enum SlotEvent {
+    Start(Slot),
+    End(Slot),
+}
+
+impl SlotEvent {
+    /// Get the slot associated with the event.
+    pub fn slot(&self) -> Slot {
+        match self {
+            SlotEvent::Start(slot) | SlotEvent::End(slot) => *slot,
+        }
+    }
+
+    /// Check if the event is a start event.
+    pub fn is_start(&self) -> bool {
+        matches!(self, SlotEvent::Start(_))
+    }
+}

+ 29 - 0
tpu-client-next/src/node_address_service/slot_receiver.rs

@@ -0,0 +1,29 @@
+//! This module provides [`SlotReceiver`] structure.
+use {solana_clock::Slot, thiserror::Error, tokio::sync::watch};
+
+/// Receiver for slot updates from slot update services.
+#[derive(Clone)]
+pub struct SlotReceiver(watch::Receiver<Slot>);
+
+impl SlotReceiver {
+    pub fn new(receiver: watch::Receiver<Slot>) -> Self {
+        Self(receiver)
+    }
+
+    pub fn slot(&self) -> Slot {
+        *self.0.borrow()
+    }
+
+    pub async fn changed(&mut self) -> Result<(), SlotReceiverError> {
+        self.0
+            .changed()
+            .await
+            .map_err(|_| SlotReceiverError::ChannelClosed)
+    }
+}
+
+#[derive(Debug, Error)]
+pub enum SlotReceiverError {
+    #[error("Unexpectedly dropped a channel.")]
+    ChannelClosed,
+}

+ 87 - 0
tpu-client-next/src/node_address_service/slot_update_service.rs

@@ -0,0 +1,87 @@
+//! This module provides [`SlotUpdateService`] that is used to get slot updates using provided
+//! stream.
+use {
+    crate::{
+        logging::info,
+        node_address_service::{RecentLeaderSlots, SlotEvent, SlotReceiver},
+    },
+    futures::StreamExt,
+    solana_clock::Slot,
+    std::pin::pin,
+    thiserror::Error,
+    tokio::{sync::watch, task::JoinHandle},
+    tokio_util::sync::CancellationToken,
+};
+
+/// [`SlotUpdateService`] updates the current slot by subscribing to the slot updates using provided
+/// stream.
+pub struct SlotUpdateService {
+    handle: Option<JoinHandle<Result<(), Error>>>,
+    cancel: CancellationToken,
+}
+
+impl SlotUpdateService {
+    /// Run the [`SlotUpdateService`].
+    pub fn run(
+        initial_current_slot: Slot,
+        slot_update_stream: impl StreamExt<Item = SlotEvent> + Send + 'static,
+        cancel: CancellationToken,
+    ) -> Result<(SlotReceiver, Self), Error> {
+        let mut recent_slots = RecentLeaderSlots::new();
+        let (slot_sender, slot_receiver) = watch::channel(initial_current_slot);
+        let cancel_clone = cancel.clone();
+
+        let main_loop = async move {
+            let mut slot_update_stream = pin!(slot_update_stream);
+            let mut cached_estimated_slot = initial_current_slot;
+            loop {
+                tokio::select! {
+                    Some(slot_event) = slot_update_stream.next() => {
+                        recent_slots.record(slot_event);
+                        let estimated_slots = recent_slots.estimate_current_slot();
+                        // Send update only if the estimated slot has advanced.
+                        if estimated_slots > cached_estimated_slot && slot_sender.send(estimated_slots).is_err() {
+                            info!("Stop SlotUpdateService: all slot receivers have been dropped.");
+                            break;
+                        }
+                        cached_estimated_slot = estimated_slots;
+                    }
+
+                    _ = cancel.cancelled() => {
+                        info!("LeaderTracker cancelled, exiting slot watcher.");
+                        break;
+                    }
+                }
+            }
+            Ok(())
+        };
+
+        let handle = tokio::spawn(main_loop);
+
+        Ok((
+            SlotReceiver::new(slot_receiver),
+            Self {
+                handle: Some(handle),
+                cancel: cancel_clone,
+            },
+        ))
+    }
+
+    /// Shutdown the [`SlotUpdateService`].
+    pub async fn shutdown(&mut self) -> Result<(), Error> {
+        self.cancel.cancel();
+        if let Some(handle) = self.handle.take() {
+            handle.await??;
+        }
+        Ok(())
+    }
+}
+
+#[derive(Debug, Error)]
+pub enum Error {
+    #[error(transparent)]
+    JoinError(#[from] tokio::task::JoinError),
+
+    #[error("Failed to initialize WebsocketSlotUpdateService.")]
+    InitializationFailed,
+}

+ 144 - 0
tpu-client-next/src/websocket_node_address_service.rs

@@ -0,0 +1,144 @@
+//! This module provides [`WebsocketNodeAddressService`] that is used to get slot
+//! updates via WebSocket interface.
+use {
+    crate::{
+        leader_updater::LeaderUpdater,
+        logging::{error, info},
+        node_address_service::{
+            LeaderTpuCacheServiceConfig, NodeAddressService, NodeAddressServiceError, SlotEvent,
+        },
+    },
+    async_trait::async_trait,
+    futures::Stream,
+    futures_util::stream::StreamExt,
+    solana_clock::Slot,
+    solana_pubsub_client::nonblocking::pubsub_client::{PubsubClient, PubsubClientError},
+    solana_rpc_client::nonblocking::rpc_client::RpcClient,
+    solana_rpc_client_api::{client_error::Error as ClientError, response::SlotUpdate},
+    std::{net::SocketAddr, sync::Arc, time::Duration},
+    thiserror::Error,
+    tokio::{
+        sync::mpsc::{self, error::SendTimeoutError},
+        task::JoinHandle,
+    },
+    tokio_stream::wrappers::ReceiverStream,
+    tokio_util::sync::CancellationToken,
+};
+
+/// [`WebsocketNodeAddressService`] provides node updates using WebSocket Pubsub
+/// client for the slot updates.
+pub struct WebsocketNodeAddressService {
+    service: NodeAddressService,
+    ws_task_handle: Option<JoinHandle<Result<(), Error>>>,
+}
+
+impl WebsocketNodeAddressService {
+    pub async fn run(
+        rpc_client: Arc<RpcClient>,
+        websocket_url: String,
+        config: LeaderTpuCacheServiceConfig,
+        cancel: CancellationToken,
+    ) -> Result<Self, Error> {
+        let (websocket_slot_event_stream, ws_task_handle) =
+            websocket_slot_event_stream(websocket_url);
+        let service =
+            NodeAddressService::run(rpc_client, websocket_slot_event_stream, config, cancel)
+                .await?;
+
+        Ok(Self {
+            service,
+            ws_task_handle: Some(ws_task_handle),
+        })
+    }
+
+    pub async fn shutdown(&mut self) -> Result<(), Error> {
+        self.service.shutdown().await?;
+        if let Some(handle) = self.ws_task_handle.take() {
+            handle.await??;
+        }
+        Ok(())
+    }
+
+    /// Returns the estimated current slot.
+    pub fn current_slot(&self) -> Slot {
+        self.service.estimated_current_slot()
+    }
+}
+
+#[async_trait]
+impl LeaderUpdater for WebsocketNodeAddressService {
+    fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr> {
+        self.service.next_leaders(lookahead_leaders)
+    }
+
+    async fn stop(&mut self) {
+        if let Err(e) = self.shutdown().await {
+            error!("Failed to shutdown WebsocketNodeAddressService: {e}");
+        }
+    }
+}
+
+#[derive(Debug, Error)]
+pub enum Error {
+    #[error(transparent)]
+    RpcError(#[from] ClientError),
+
+    #[error(transparent)]
+    PubsubError(#[from] PubsubClientError),
+
+    #[error(transparent)]
+    JoinError(#[from] tokio::task::JoinError),
+
+    #[error(transparent)]
+    NodeAddressServiceError(#[from] NodeAddressServiceError),
+}
+
+fn websocket_slot_event_stream(
+    websocket_url: String,
+) -> (impl Stream<Item = SlotEvent>, JoinHandle<Result<(), Error>>) {
+    const SEND_TIMEOUT: Duration = Duration::from_millis(100);
+    let (tx, rx) = mpsc::channel::<SlotEvent>(256);
+
+    let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
+        let pubsub_client = PubsubClient::new(websocket_url).await?;
+        let (mut notifications, unsubscribe) = pubsub_client.slot_updates_subscribe().await?;
+
+        while let Some(event) = notifications.next().await {
+            let Some(event) = map_websocket_update_to_slot_event(event) else {
+                continue;
+            };
+            let Err(send_error) = tx.send_timeout(event, SEND_TIMEOUT).await else {
+                continue;
+            };
+            match send_error {
+                SendTimeoutError::Closed(_) => {
+                    info!("Slot event receiver dropped, exiting websocket slot event stream.");
+                    break;
+                }
+                SendTimeoutError::Timeout(_) => {
+                    info!(
+                        "Timed out sending slot event: stream is not consumed fast enough, \
+                         continuing."
+                    );
+                }
+            }
+        }
+        // `notifications` requires a valid reference to `pubsub_client`, so
+        // `notifications` must be dropped before moving `pubsub_client` via
+        // `shutdown()`.
+        drop(notifications);
+        unsubscribe().await;
+        pubsub_client.shutdown().await?;
+        Ok(())
+    });
+
+    (ReceiverStream::new(rx), handle)
+}
+
+fn map_websocket_update_to_slot_event(update: SlotUpdate) -> Option<SlotEvent> {
+    match update {
+        SlotUpdate::FirstShredReceived { slot, .. } => Some(SlotEvent::Start(slot)),
+        SlotUpdate::Completed { slot, .. } => Some(SlotEvent::End(slot)),
+        _ => None,
+    }
+}

+ 9 - 3
tpu-client-next/tests/connection_workers_scheduler_test.rs

@@ -1,3 +1,8 @@
+#[allow(deprecated)]
+// Reason: This deprecated function internally creates a
+// PinnedLeaderUpdater. This structure we want to move to tests as soon as
+// we can remove create_leader_updater function.
+use solana_tpu_client_next::leader_updater::create_leader_updater;
 use {
     crossbeam_channel::Receiver as CrossbeamReceiver,
     futures::future::BoxFuture,
@@ -21,7 +26,6 @@ use {
         connection_workers_scheduler::{
             BindTarget, ConnectionWorkersSchedulerConfig, Fanout, StakeIdentity,
         },
-        leader_updater::create_leader_updater,
         send_transaction_stats::SendTransactionStatsNonAtomic,
         transaction_batch::TransactionBatch,
         ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats,
@@ -85,12 +89,15 @@ async fn setup_connection_worker_scheduler(
         CommitmentConfig::confirmed(),
     ));
 
+    let config = test_config(stake_identity);
+
     // Setup sending txs
+    let cancel = CancellationToken::new();
+    #[allow(deprecated)]
     let leader_updater = create_leader_updater(rpc_client, websocket_url, Some(tpu_address))
         .await
         .expect("Leader updates was successfully created");
 
-    let cancel = CancellationToken::new();
     let (update_identity_sender, update_identity_receiver) = watch::channel(None);
     let scheduler = ConnectionWorkersScheduler::new(
         leader_updater,
@@ -98,7 +105,6 @@ async fn setup_connection_worker_scheduler(
         update_identity_receiver,
         cancel.clone(),
     );
-    let config = test_config(stake_identity);
     let scheduler = tokio::spawn(scheduler.run(config));
 
     (scheduler, update_identity_sender, cancel)