|
|
@@ -23,6 +23,7 @@ use {
|
|
|
staked_nodes_updater_service::StakedNodesUpdaterService,
|
|
|
tpu_entry_notifier::TpuEntryNotifier,
|
|
|
validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
|
|
|
+ vortexor_receiver_adapter::VortexorReceiverAdapter,
|
|
|
},
|
|
|
bytes::Bytes,
|
|
|
crossbeam_channel::{bounded, unbounded, Receiver},
|
|
|
@@ -73,18 +74,34 @@ pub struct TpuSockets {
|
|
|
pub vote_quic: Vec<UdpSocket>,
|
|
|
/// Client-side socket for the forwarding votes.
|
|
|
pub vote_forwards_client: UdpSocket,
|
|
|
+ pub vortexor_receivers: Option<Vec<UdpSocket>>,
|
|
|
+}
|
|
|
+
|
|
|
+/// The `SigVerifier` enum is used to determine whether to use a local or remote signature verifier.
|
|
|
+enum SigVerifier {
|
|
|
+ Local(SigVerifyStage),
|
|
|
+ Remote(VortexorReceiverAdapter),
|
|
|
+}
|
|
|
+
|
|
|
+impl SigVerifier {
|
|
|
+ fn join(self) -> thread::Result<()> {
|
|
|
+ match self {
|
|
|
+ SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(),
|
|
|
+ SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(),
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
pub struct Tpu {
|
|
|
fetch_stage: FetchStage,
|
|
|
- sigverify_stage: SigVerifyStage,
|
|
|
+ sig_verifier: SigVerifier,
|
|
|
vote_sigverify_stage: SigVerifyStage,
|
|
|
banking_stage: BankingStage,
|
|
|
forwarding_stage: JoinHandle<()>,
|
|
|
cluster_info_vote_listener: ClusterInfoVoteListener,
|
|
|
broadcast_stage: BroadcastStage,
|
|
|
- tpu_quic_t: thread::JoinHandle<()>,
|
|
|
- tpu_forwards_quic_t: thread::JoinHandle<()>,
|
|
|
+ tpu_quic_t: Option<thread::JoinHandle<()>>,
|
|
|
+ tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
|
|
|
tpu_entry_notifier: Option<TpuEntryNotifier>,
|
|
|
staked_nodes_updater_service: StakedNodesUpdaterService,
|
|
|
tracer_thread_hdl: TracerThread,
|
|
|
@@ -143,6 +160,7 @@ impl Tpu {
|
|
|
transactions_forwards_quic: transactions_forwards_quic_sockets,
|
|
|
vote_quic: tpu_vote_quic_sockets,
|
|
|
vote_forwards_client: vote_forwards_client_socket,
|
|
|
+ vortexor_receivers,
|
|
|
} = sockets;
|
|
|
|
|
|
let (packet_sender, packet_receiver) = unbounded();
|
|
|
@@ -196,47 +214,75 @@ impl Tpu {
|
|
|
)
|
|
|
.unwrap();
|
|
|
|
|
|
- // Streamer for TPU
|
|
|
- let SpawnServerResult {
|
|
|
- endpoints: _,
|
|
|
- thread: tpu_quic_t,
|
|
|
- key_updater,
|
|
|
- } = spawn_server_multi(
|
|
|
- "solQuicTpu",
|
|
|
- "quic_streamer_tpu",
|
|
|
- transactions_quic_sockets,
|
|
|
- keypair,
|
|
|
- packet_sender,
|
|
|
- exit.clone(),
|
|
|
- staked_nodes.clone(),
|
|
|
- tpu_quic_server_config,
|
|
|
- )
|
|
|
- .unwrap();
|
|
|
+ let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() {
|
|
|
+ // Streamer for TPU
|
|
|
+ let SpawnServerResult {
|
|
|
+ endpoints: _,
|
|
|
+ thread: tpu_quic_t,
|
|
|
+ key_updater,
|
|
|
+ } = spawn_server_multi(
|
|
|
+ "solQuicTpu",
|
|
|
+ "quic_streamer_tpu",
|
|
|
+ transactions_quic_sockets,
|
|
|
+ keypair,
|
|
|
+ packet_sender,
|
|
|
+ exit.clone(),
|
|
|
+ staked_nodes.clone(),
|
|
|
+ tpu_quic_server_config,
|
|
|
+ )
|
|
|
+ .unwrap();
|
|
|
+ (Some(tpu_quic_t), Some(key_updater))
|
|
|
+ } else {
|
|
|
+ (None, None)
|
|
|
+ };
|
|
|
|
|
|
- // Streamer for TPU forward
|
|
|
- let SpawnServerResult {
|
|
|
- endpoints: _,
|
|
|
- thread: tpu_forwards_quic_t,
|
|
|
- key_updater: forwards_key_updater,
|
|
|
- } = spawn_server_multi(
|
|
|
- "solQuicTpuFwd",
|
|
|
- "quic_streamer_tpu_forwards",
|
|
|
- transactions_forwards_quic_sockets,
|
|
|
- keypair,
|
|
|
- forwarded_packet_sender,
|
|
|
- exit.clone(),
|
|
|
- staked_nodes.clone(),
|
|
|
- tpu_fwd_quic_server_config,
|
|
|
- )
|
|
|
- .unwrap();
|
|
|
+ let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() {
|
|
|
+ // Streamer for TPU forward
|
|
|
+ let SpawnServerResult {
|
|
|
+ endpoints: _,
|
|
|
+ thread: tpu_forwards_quic_t,
|
|
|
+ key_updater: forwards_key_updater,
|
|
|
+ } = spawn_server_multi(
|
|
|
+ "solQuicTpuFwd",
|
|
|
+ "quic_streamer_tpu_forwards",
|
|
|
+ transactions_forwards_quic_sockets,
|
|
|
+ keypair,
|
|
|
+ forwarded_packet_sender,
|
|
|
+ exit.clone(),
|
|
|
+ staked_nodes.clone(),
|
|
|
+ tpu_fwd_quic_server_config,
|
|
|
+ )
|
|
|
+ .unwrap();
|
|
|
+ (Some(tpu_forwards_quic_t), Some(forwards_key_updater))
|
|
|
+ } else {
|
|
|
+ (None, None)
|
|
|
+ };
|
|
|
|
|
|
let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
|
|
|
- let sigverify_stage = {
|
|
|
+ let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers {
|
|
|
+ info!("starting vortexor adapter");
|
|
|
+ let sockets = vortexor_receivers.into_iter().map(Arc::new).collect();
|
|
|
+ let adapter = VortexorReceiverAdapter::new(
|
|
|
+ sockets,
|
|
|
+ Duration::from_millis(5),
|
|
|
+ tpu_coalesce,
|
|
|
+ non_vote_sender,
|
|
|
+ enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
|
|
|
+ exit.clone(),
|
|
|
+ );
|
|
|
+ SigVerifier::Remote(adapter)
|
|
|
+ } else {
|
|
|
+ info!("starting regular sigverify stage");
|
|
|
let verifier = TransactionSigVerifier::new(
|
|
|
non_vote_sender,
|
|
|
enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
|
|
|
);
|
|
|
- SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
|
|
|
+ SigVerifier::Local(SigVerifyStage::new(
|
|
|
+ packet_receiver,
|
|
|
+ verifier,
|
|
|
+ "solSigVerTpu",
|
|
|
+ "tpu-verifier",
|
|
|
+ ))
|
|
|
};
|
|
|
|
|
|
let vote_sigverify_stage = {
|
|
|
@@ -320,10 +366,18 @@ impl Tpu {
|
|
|
turbine_quic_endpoint_sender,
|
|
|
);
|
|
|
|
|
|
+ let mut key_updaters: Vec<Arc<dyn NotifyKeyUpdate + Send + Sync>> = Vec::new();
|
|
|
+ if let Some(key_updater) = key_updater {
|
|
|
+ key_updaters.push(key_updater);
|
|
|
+ }
|
|
|
+ if let Some(forwards_key_updater) = forwards_key_updater {
|
|
|
+ key_updaters.push(forwards_key_updater);
|
|
|
+ }
|
|
|
+ key_updaters.push(vote_streamer_key_updater);
|
|
|
(
|
|
|
Self {
|
|
|
fetch_stage,
|
|
|
- sigverify_stage,
|
|
|
+ sig_verifier,
|
|
|
vote_sigverify_stage,
|
|
|
banking_stage,
|
|
|
forwarding_stage,
|
|
|
@@ -336,21 +390,21 @@ impl Tpu {
|
|
|
tracer_thread_hdl,
|
|
|
tpu_vote_quic_t,
|
|
|
},
|
|
|
- vec![key_updater, forwards_key_updater, vote_streamer_key_updater],
|
|
|
+ key_updaters,
|
|
|
)
|
|
|
}
|
|
|
|
|
|
pub fn join(self) -> thread::Result<()> {
|
|
|
let results = vec![
|
|
|
self.fetch_stage.join(),
|
|
|
- self.sigverify_stage.join(),
|
|
|
+ self.sig_verifier.join(),
|
|
|
self.vote_sigverify_stage.join(),
|
|
|
self.cluster_info_vote_listener.join(),
|
|
|
self.banking_stage.join(),
|
|
|
self.forwarding_stage.join(),
|
|
|
self.staked_nodes_updater_service.join(),
|
|
|
- self.tpu_quic_t.join(),
|
|
|
- self.tpu_forwards_quic_t.join(),
|
|
|
+ self.tpu_quic_t.map_or(Ok(()), |t| t.join()),
|
|
|
+ self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()),
|
|
|
self.tpu_vote_quic_t.join(),
|
|
|
];
|
|
|
let broadcast_result = self.broadcast_stage.join();
|