Răsfoiți Sursa

Restore vortexor receiver -- renamed to agave-verified-packet-receiver (#6542)

* Revert "Revert "Support receiving verified transactions from the vortexor (#5321)" (#6525)"

This reverts commit 63cf093fbe350204b47adb128560bc5a37ce1e85.

* publish vortexor-receiver as it is used by core

* Rename solana-vortexor-receiver to agave-verified-packet-receiver

* Missed Cargo.lock update
Lijun Wang 5 luni în urmă
părinte
comite
18238bdfcb

+ 10 - 0
Cargo.lock

@@ -488,6 +488,15 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "agave-verified-packet-receiver"
+version = "3.0.0"
+dependencies = [
+ "assert_matches",
+ "solana-perf",
+ "solana-streamer",
+]
+
 [[package]]
 name = "agave-watchtower"
 version = "3.0.0"
@@ -7783,6 +7792,7 @@ dependencies = [
  "agave-feature-set",
  "agave-reserved-account-keys",
  "agave-transaction-view",
+ "agave-verified-packet-receiver",
  "ahash 0.8.11",
  "anyhow",
  "arrayvec",

+ 2 - 0
Cargo.toml

@@ -133,6 +133,7 @@ members = [
     "unified-scheduler-pool",
     "upload-perf",
     "validator",
+    "verified-packet-receiver",
     "version",
     "vortexor",
     "vote",
@@ -185,6 +186,7 @@ agave-precompiles = { path = "precompiles", version = "=3.0.0" }
 agave-reserved-account-keys = { path = "reserved-account-keys", version = "=3.0.0" }
 agave-thread-manager = { path = "thread-manager", version = "=3.0.0" }
 agave-transaction-view = { path = "transaction-view", version = "=3.0.0" }
+agave-verified-packet-receiver = { path = "verified-packet-receiver", version = "=3.0.0" }
 agave-xdp = { path = "xdp", version = "=3.0.0" }
 ahash = "0.8.11"
 anyhow = "1.0.98"

+ 1 - 0
core/Cargo.toml

@@ -44,6 +44,7 @@ frozen-abi = [
 agave-banking-stage-ingress-types = { workspace = true }
 agave-feature-set = { workspace = true }
 agave-transaction-view = { workspace = true }
+agave-verified-packet-receiver = { workspace = true }
 ahash = { workspace = true }
 anyhow = { workspace = true }
 arrayvec = { workspace = true }

+ 1 - 0
core/src/lib.rs

@@ -41,6 +41,7 @@ mod tpu_entry_notifier;
 pub mod tvu;
 pub mod unfrozen_gossip_verified_vote_hashes;
 pub mod validator;
+mod vortexor_receiver_adapter;
 pub mod vote_simulator;
 pub mod voting_service;
 pub mod warm_quic_cache_service;

+ 94 - 43
core/src/tpu.rs

@@ -28,6 +28,7 @@ use {
         staked_nodes_updater_service::StakedNodesUpdaterService,
         tpu_entry_notifier::TpuEntryNotifier,
         validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
+        vortexor_receiver_adapter::VortexorReceiverAdapter,
     },
     bytes::Bytes,
     crossbeam_channel::{bounded, unbounded, Receiver},
@@ -79,18 +80,34 @@ pub struct TpuSockets {
     pub vote_quic: Vec<UdpSocket>,
     /// Client-side socket for the forwarding votes.
     pub vote_forwarding_client: UdpSocket,
+    pub vortexor_receivers: Option<Vec<UdpSocket>>,
+}
+
+/// The `SigVerifier` enum is used to determine whether to use a local or remote signature verifier.
+enum SigVerifier {
+    Local(SigVerifyStage),
+    Remote(VortexorReceiverAdapter),
+}
+
+impl SigVerifier {
+    fn join(self) -> thread::Result<()> {
+        match self {
+            SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(),
+            SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(),
+        }
+    }
 }
 
 pub struct Tpu {
     fetch_stage: FetchStage,
-    sigverify_stage: SigVerifyStage,
+    sig_verifier: SigVerifier,
     vote_sigverify_stage: SigVerifyStage,
     banking_stage: BankingStage,
     forwarding_stage: JoinHandle<()>,
     cluster_info_vote_listener: ClusterInfoVoteListener,
     broadcast_stage: BroadcastStage,
-    tpu_quic_t: thread::JoinHandle<()>,
-    tpu_forwards_quic_t: thread::JoinHandle<()>,
+    tpu_quic_t: Option<thread::JoinHandle<()>>,
+    tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
     tpu_entry_notifier: Option<TpuEntryNotifier>,
     staked_nodes_updater_service: StakedNodesUpdaterService,
     tracer_thread_hdl: TracerThread,
@@ -150,6 +167,7 @@ impl Tpu {
             transactions_forwards_quic: transactions_forwards_quic_sockets,
             vote_quic: tpu_vote_quic_sockets,
             vote_forwarding_client: vote_forwarding_client_socket,
+            vortexor_receivers,
         } = sockets;
 
         let (packet_sender, packet_receiver) = unbounded();
@@ -203,47 +221,75 @@ impl Tpu {
         )
         .unwrap();
 
-        // Streamer for TPU
-        let SpawnServerResult {
-            endpoints: _,
-            thread: tpu_quic_t,
-            key_updater,
-        } = spawn_server_multi(
-            "solQuicTpu",
-            "quic_streamer_tpu",
-            transactions_quic_sockets,
-            keypair,
-            packet_sender,
-            exit.clone(),
-            staked_nodes.clone(),
-            tpu_quic_server_config,
-        )
-        .unwrap();
+        let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() {
+            // Streamer for TPU
+            let SpawnServerResult {
+                endpoints: _,
+                thread: tpu_quic_t,
+                key_updater,
+            } = spawn_server_multi(
+                "solQuicTpu",
+                "quic_streamer_tpu",
+                transactions_quic_sockets,
+                keypair,
+                packet_sender,
+                exit.clone(),
+                staked_nodes.clone(),
+                tpu_quic_server_config,
+            )
+            .unwrap();
+            (Some(tpu_quic_t), Some(key_updater))
+        } else {
+            (None, None)
+        };
 
-        // Streamer for TPU forward
-        let SpawnServerResult {
-            endpoints: _,
-            thread: tpu_forwards_quic_t,
-            key_updater: forwards_key_updater,
-        } = spawn_server_multi(
-            "solQuicTpuFwd",
-            "quic_streamer_tpu_forwards",
-            transactions_forwards_quic_sockets,
-            keypair,
-            forwarded_packet_sender,
-            exit.clone(),
-            staked_nodes.clone(),
-            tpu_fwd_quic_server_config,
-        )
-        .unwrap();
+        let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() {
+            // Streamer for TPU forward
+            let SpawnServerResult {
+                endpoints: _,
+                thread: tpu_forwards_quic_t,
+                key_updater: forwards_key_updater,
+            } = spawn_server_multi(
+                "solQuicTpuFwd",
+                "quic_streamer_tpu_forwards",
+                transactions_forwards_quic_sockets,
+                keypair,
+                forwarded_packet_sender,
+                exit.clone(),
+                staked_nodes.clone(),
+                tpu_fwd_quic_server_config,
+            )
+            .unwrap();
+            (Some(tpu_forwards_quic_t), Some(forwards_key_updater))
+        } else {
+            (None, None)
+        };
 
         let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
-        let sigverify_stage = {
+        let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers {
+            info!("starting vortexor adapter");
+            let sockets = vortexor_receivers.into_iter().map(Arc::new).collect();
+            let adapter = VortexorReceiverAdapter::new(
+                sockets,
+                Duration::from_millis(5),
+                tpu_coalesce,
+                non_vote_sender,
+                enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
+                exit.clone(),
+            );
+            SigVerifier::Remote(adapter)
+        } else {
+            info!("starting regular sigverify stage");
             let verifier = TransactionSigVerifier::new(
                 non_vote_sender,
                 enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
             );
-            SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
+            SigVerifier::Local(SigVerifyStage::new(
+                packet_receiver,
+                verifier,
+                "solSigVerTpu",
+                "tpu-verifier",
+            ))
         };
 
         let vote_sigverify_stage = {
@@ -329,14 +375,19 @@ impl Tpu {
         );
 
         let mut key_notifiers = key_notifiers.write().unwrap();
-        key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
-        key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
+        if let Some(key_updater) = key_updater {
+            key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
+        }
+        if let Some(forwards_key_updater) = forwards_key_updater {
+            key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
+        }
         key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater);
+
         key_notifiers.add(KeyUpdaterType::Forward, client_updater);
 
         Self {
             fetch_stage,
-            sigverify_stage,
+            sig_verifier,
             vote_sigverify_stage,
             banking_stage,
             forwarding_stage,
@@ -354,14 +405,14 @@ impl Tpu {
     pub fn join(self) -> thread::Result<()> {
         let results = vec![
             self.fetch_stage.join(),
-            self.sigverify_stage.join(),
+            self.sig_verifier.join(),
             self.vote_sigverify_stage.join(),
             self.cluster_info_vote_listener.join(),
             self.banking_stage.join(),
             self.forwarding_stage.join(),
             self.staked_nodes_updater_service.join(),
-            self.tpu_quic_t.join(),
-            self.tpu_forwards_quic_t.join(),
+            self.tpu_quic_t.map_or(Ok(()), |t| t.join()),
+            self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()),
             self.tpu_vote_quic_t.join(),
         ];
         let broadcast_result = self.broadcast_stage.join();

+ 1 - 0
core/src/validator.rs

@@ -1612,6 +1612,7 @@ impl Validator {
                 transactions_forwards_quic: node.sockets.tpu_forwards_quic,
                 vote_quic: node.sockets.tpu_vote_quic,
                 vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
+                vortexor_receivers: node.sockets.vortexor_receivers,
             },
             &rpc_subscriptions,
             transaction_status_sender,

+ 131 - 0
core/src/vortexor_receiver_adapter.rs

@@ -0,0 +1,131 @@
+//! Vortexor receiver adapter which wraps the VerifiedPacketReceiver
+//! to receive packet batches from the remote and sends the packets to the
+//! banking stage.
+
+use {
+    crate::banking_trace::TracedSender,
+    agave_banking_stage_ingress_types::BankingPacketBatch,
+    agave_verified_packet_receiver::receiver::VerifiedPacketReceiver,
+    crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
+    solana_perf::packet::PacketBatch,
+    std::{
+        net::UdpSocket,
+        sync::{atomic::AtomicBool, Arc},
+        thread::{self, Builder, JoinHandle},
+        time::{Duration, Instant},
+    },
+};
+
+#[inline]
+fn send(sender: &TracedSender, batch: Arc<Vec<PacketBatch>>, count: usize) -> Result<(), String> {
+    match sender.send(batch) {
+        Ok(_) => {
+            trace!("Sent batch: {count} received from vortexor successfully");
+            Ok(())
+        }
+        Err(err) => Err(format!("Failed to send batch {count} down {err:?}")),
+    }
+}
+
+pub struct VortexorReceiverAdapter {
+    thread_hdl: JoinHandle<()>,
+    receiver: VerifiedPacketReceiver,
+}
+
+const MAX_PACKET_BATCH_SIZE: usize = 8;
+
+impl VortexorReceiverAdapter {
+    pub fn new(
+        sockets: Vec<Arc<UdpSocket>>,
+        recv_timeout: Duration,
+        tpu_coalesce: Duration,
+        packets_sender: TracedSender,
+        forward_stage_sender: Option<Sender<(BankingPacketBatch, bool)>>,
+        exit: Arc<AtomicBool>,
+    ) -> Self {
+        let (batch_sender, batch_receiver) = unbounded();
+
+        let receiver =
+            VerifiedPacketReceiver::new(sockets, &batch_sender, tpu_coalesce, None, exit.clone());
+
+        let thread_hdl = Builder::new()
+            .name("vtxRcvAdptr".to_string())
+            .spawn(move || {
+                if let Err(msg) = Self::recv_send(
+                    batch_receiver,
+                    recv_timeout,
+                    MAX_PACKET_BATCH_SIZE,
+                    packets_sender,
+                    forward_stage_sender,
+                ) {
+                    info!("Quiting VortexorReceiverAdapter: {msg}");
+                }
+            })
+            .unwrap();
+        Self {
+            thread_hdl,
+            receiver,
+        }
+    }
+
+    pub fn join(self) -> thread::Result<()> {
+        self.thread_hdl.join()?;
+        self.receiver.join()
+    }
+
+    fn recv_send(
+        packet_batch_receiver: Receiver<PacketBatch>,
+        recv_timeout: Duration,
+        batch_size: usize,
+        traced_sender: TracedSender,
+        forward_stage_sender: Option<Sender<(BankingPacketBatch, bool)>>,
+    ) -> Result<(), String> {
+        loop {
+            match Self::receive_until(packet_batch_receiver.clone(), recv_timeout, batch_size) {
+                Ok(packet_batch) => {
+                    let count = packet_batch.len();
+                    // Send out packet batches
+                    if let Some(forward_stage_sender) = &forward_stage_sender {
+                        send(&traced_sender, packet_batch.clone(), count)?;
+                        // Send out packet batches to forward stage
+                        let _ = forward_stage_sender
+                            .try_send((packet_batch, false /* reject non-vote */));
+                    } else {
+                        send(&traced_sender, packet_batch, count)?;
+                    }
+                }
+                Err(err) => match err {
+                    RecvTimeoutError::Timeout => {
+                        continue;
+                    }
+                    RecvTimeoutError::Disconnected => {
+                        return Err("Disconnected from the input channel".to_string());
+                    }
+                },
+            }
+        }
+    }
+
+    /// Receives packet batches from VerifiedPacketReceiver with a timeout
+    fn receive_until(
+        packet_batch_receiver: Receiver<PacketBatch>,
+        recv_timeout: Duration,
+        batch_size: usize,
+    ) -> Result<BankingPacketBatch, RecvTimeoutError> {
+        let start = Instant::now();
+
+        let message = packet_batch_receiver.recv_timeout(recv_timeout)?;
+        let mut packet_batches = Vec::new();
+        packet_batches.push(message);
+
+        while let Ok(message) = packet_batch_receiver.try_recv() {
+            packet_batches.push(message);
+
+            if start.elapsed() >= recv_timeout || packet_batches.len() >= batch_size {
+                break;
+            }
+        }
+
+        Ok(Arc::new(packet_batches))
+    }
+}

+ 26 - 0
gossip/src/cluster_info.rs

@@ -2331,6 +2331,7 @@ pub struct Sockets {
     pub quic_vote_client: UdpSocket,
     /// Client-side socket for RPC/SendTransactionService.
     pub rpc_sts_client: UdpSocket,
+    pub vortexor_receivers: Option<Vec<UdpSocket>>,
 }
 
 pub struct NodeConfig {
@@ -2343,6 +2344,8 @@ pub struct NodeConfig {
     pub bind_ip_addr: IpAddr,
     pub public_tpu_addr: Option<SocketAddr>,
     pub public_tpu_forwards_addr: Option<SocketAddr>,
+    pub vortexor_receiver_addr: Option<SocketAddr>,
+
     /// The number of TVU receive sockets to create
     pub num_tvu_receive_sockets: NonZeroUsize,
     /// The number of TVU retransmit sockets to create
@@ -2504,6 +2507,7 @@ impl Node {
                 tpu_transaction_forwarding_client,
                 quic_vote_client,
                 rpc_sts_client,
+                vortexor_receivers: None,
             },
         }
     }
@@ -2651,6 +2655,7 @@ impl Node {
                 quic_vote_client,
                 tpu_transaction_forwarding_client,
                 rpc_sts_client,
+                vortexor_receivers: None,
             },
         }
     }
@@ -2666,6 +2671,7 @@ impl Node {
             num_tvu_receive_sockets,
             num_tvu_retransmit_sockets,
             num_quic_endpoints,
+            vortexor_receiver_addr,
         } = config;
 
         let gossip_addr = SocketAddr::new(advertised_ip, gossip_port);
@@ -2781,6 +2787,23 @@ impl Node {
         info.set_serve_repair(QUIC, (advertised_ip, serve_repair_quic_port))
             .unwrap();
 
+        let vortexor_receivers = vortexor_receiver_addr.map(|vortexor_receiver_addr| {
+            multi_bind_in_range_with_config(
+                vortexor_receiver_addr.ip(),
+                (
+                    vortexor_receiver_addr.port(),
+                    vortexor_receiver_addr.port() + 1,
+                ),
+                socket_config_reuseport,
+                32,
+            )
+            .unwrap_or_else(|_| {
+                panic!("Could not bind to the set vortexor_receiver_addr {vortexor_receiver_addr}")
+            })
+            .1
+        });
+
+        info!("vortexor_receivers is {vortexor_receivers:?}");
         trace!("new ContactInfo: {:?}", info);
         let sockets = Sockets {
             gossip,
@@ -2805,6 +2828,7 @@ impl Node {
             quic_vote_client,
             tpu_transaction_forwarding_client,
             rpc_sts_client,
+            vortexor_receivers,
         };
         info!("Bound all network sockets as follows: {:#?}", &sockets);
         Node { info, sockets }
@@ -3284,6 +3308,7 @@ mod tests {
             num_tvu_receive_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS,
             num_tvu_retransmit_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS,
             num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS,
+            vortexor_receiver_addr: None,
         };
 
         let node = Node::new_with_external_ip(&solana_pubkey::new_rand(), config);
@@ -3308,6 +3333,7 @@ mod tests {
             num_tvu_receive_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS,
             num_tvu_retransmit_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS,
             num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS,
+            vortexor_receiver_addr: None,
         };
 
         let node = Node::new_with_external_ip(&solana_pubkey::new_rand(), config);

+ 9 - 0
programs/sbf/Cargo.lock

@@ -229,6 +229,14 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "agave-verified-packet-receiver"
+version = "3.0.0"
+dependencies = [
+ "solana-perf",
+ "solana-streamer",
+]
+
 [[package]]
 name = "agave-xdp"
 version = "3.0.0"
@@ -6036,6 +6044,7 @@ dependencies = [
  "agave-banking-stage-ingress-types",
  "agave-feature-set",
  "agave-transaction-view",
+ "agave-verified-packet-receiver",
  "ahash 0.8.11",
  "anyhow",
  "arrayvec",

+ 9 - 0
svm/examples/Cargo.lock

@@ -150,6 +150,14 @@ dependencies = [
  "solana-svm-transaction",
 ]
 
+[[package]]
+name = "agave-verified-packet-receiver"
+version = "3.0.0"
+dependencies = [
+ "solana-perf",
+ "solana-streamer",
+]
+
 [[package]]
 name = "agave-xdp"
 version = "3.0.0"
@@ -5883,6 +5891,7 @@ dependencies = [
  "agave-banking-stage-ingress-types",
  "agave-feature-set",
  "agave-transaction-view",
+ "agave-verified-packet-receiver",
  "ahash 0.8.11",
  "anyhow",
  "arrayvec",

+ 9 - 0
validator/src/commands/run/args.rs

@@ -407,6 +407,15 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a,
                  --entrypoint or localhostwhen --entrypoint is not provided]",
             ),
     )
+    .arg(
+        Arg::with_name("tpu_vortexor_receiver_address")
+            .long("tpu-vortexor-receiver-address")
+            .value_name("HOST:PORT")
+            .takes_value(true)
+            .hidden(hidden_unless_forced())
+            .validator(solana_net_utils::is_host_port)
+            .help("TPU Vortexor Receiver address to which verified transaction packet will be forwarded."),
+    )
     .arg(
         Arg::with_name("public_rpc_addr")
             .long("public-rpc-address")

+ 14 - 0
validator/src/commands/run/execute.rs

@@ -1122,6 +1122,19 @@ pub fn execute(
         })
         .transpose()?;
 
+    let tpu_vortexor_receiver_address =
+        matches
+            .value_of("tpu_vortexor_receiver_address")
+            .map(|tpu_vortexor_receiver_address| {
+                solana_net_utils::parse_host_port(tpu_vortexor_receiver_address).unwrap_or_else(
+                    |err| {
+                        eprintln!("Failed to parse --tpu-vortexor-receiver-address: {err}");
+                        exit(1);
+                    },
+                )
+            });
+
+    info!("tpu_vortexor_receiver_address is {tpu_vortexor_receiver_address:?}");
     let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize);
 
     let tpu_max_connections_per_peer =
@@ -1149,6 +1162,7 @@ pub fn execute(
         num_tvu_receive_sockets: tvu_receive_threads,
         num_tvu_retransmit_sockets: tvu_retransmit_threads,
         num_quic_endpoints,
+        vortexor_receiver_addr: tpu_vortexor_receiver_address,
     };
 
     let cluster_entrypoints = entrypoint_addrs

+ 26 - 0
verified-packet-receiver/Cargo.toml

@@ -0,0 +1,26 @@
+[package]
+name = "agave-verified-packet-receiver"
+description = "Agave Verified Packet Receiver Receiver"
+documentation = "https://docs.rs/agave-verified-packet-receiver"
+publish = true
+version = { workspace = true }
+authors = { workspace = true }
+repository = { workspace = true }
+homepage = { workspace = true }
+license = { workspace = true }
+edition = { workspace = true }
+
+[package.metadata.docs.rs]
+targets = ["x86_64-unknown-linux-gnu"]
+
+[lib]
+crate-type = ["lib"]
+name = "agave_verified_packet_receiver"
+
+[dependencies]
+solana-perf = { workspace = true }
+solana-streamer = { workspace = true }
+
+[dev-dependencies]
+assert_matches = { workspace = true }
+solana-streamer = { workspace = true, features = ["dev-context-only-utils"] }

+ 60 - 0
verified-packet-receiver/Readme.md

@@ -0,0 +1,60 @@
+# Introduction
+The Vortexor is a service that can offload the tasks of receiving transactions
+from the public, performing signature verifications, and deduplications from the
+core validator, enabling it to focus on processing and executing the
+transactions. The verified and filtered transactions will then be forwarded to
+the validators linked with the Vortexor. This setup makes the TPU transaction
+ingestion and verification more scalable compared to a single-node solution.
+
+This module implements the VerifiedPacketReceiver in the below architecture
+which encapsulates the functionality of receiving the verified packet batches
+from the vortexor. In the first impelementation, we use UDP to receive the
+verified packets from the vortexor. It is designed to support other protocol
+option such as using QUIC.
+
+# Architecture
+Figure 1 describes the architecture diagram of the Vortexor and its
+relationship with the validator.
+
+                     +---------------------+
+                     |   Solana            |
+                     |   RPC / Web Socket  |
+                     |   Service           |
+                     +---------------------+
+                                |
+                                v
+                    +--------------------- VORTEXOR ------------------------+
+                    |           |                                           |
+                    |   +------------------+                                |
+                    |   | StakedKeyUpdater |                                |
+                    |   +------------------+                                |
+                    |           |                                           |
+                    |           v                                           |
+                    |   +-------------+        +--------------------+       |
+        TPU -->     |   | TPU Streamer| -----> | SigVerifier/Dedup  |       |
+        /QUIC       |   +-------------+        +--------------------+       |
+                    |        |                          |                   |
+                    |        v                          v                   |
+                    |  +----------------+     +------------------------+    |
+                    |  | Subscription   |<----| VerifiedPacketForwarder|    |
+                    |  | Management     |     +------------------------+    |
+                    |  +----------------+            |                      |
+                    +--------------------------------|----------------------+
+                                ^                    | (UDP/QUIC)
+    Heartbeat/subscriptions     |                    |
+                                |                    v
+                    +-------------------- AGAVE VALIDATOR ------------------+
+                    |                                                       |
+                    |  +----------------+      +-----------------------+    |
+          Config->  |  | Subscription   |      | VerifiedPacketReceiver|    |
+      Admin RPC     |  | Management     |      |                       |    |
+                    |  +----------------+      +-----------------------+    |
+                    |        |                           |                  |
+                    |        |                           v                  |
+                    |        v                      +-----------+           |
+                    |  +--------------------+       | Banking   |           |
+    Gossip <--------|--| Gossip/Contact Info|       | Stage     |           |
+                    |  +--------------------+       +-----------+           |
+                    +-------------------------------------------------------+
+
+                                       Figure 1.

+ 1 - 0
verified-packet-receiver/src/lib.rs

@@ -0,0 +1 @@
+pub mod receiver;

+ 58 - 0
verified-packet-receiver/src/receiver.rs

@@ -0,0 +1,58 @@
+/// This is responsible for receiving the verified and deduplicated transactions
+/// from the vortexor and sending down to the banking stage.
+use {
+    solana_perf::{packet::PacketBatchRecycler, recycler::Recycler},
+    solana_streamer::streamer::{self, PacketBatchSender, StreamerReceiveStats},
+    std::{
+        net::UdpSocket,
+        sync::{atomic::AtomicBool, Arc},
+        thread::{self, JoinHandle},
+        time::Duration,
+    },
+};
+
+pub struct VerifiedPacketReceiver {
+    thread_hdls: Vec<JoinHandle<()>>,
+}
+
+impl VerifiedPacketReceiver {
+    pub fn new(
+        sockets: Vec<Arc<UdpSocket>>,
+        sender: &PacketBatchSender,
+        coalesce: Duration,
+        in_vote_only_mode: Option<Arc<AtomicBool>>,
+        exit: Arc<AtomicBool>,
+    ) -> Self {
+        let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);
+
+        let tpu_stats = Arc::new(StreamerReceiveStats::new("vortexor_receiver"));
+
+        let thread_hdls = sockets
+            .into_iter()
+            .enumerate()
+            .map(|(i, socket)| {
+                streamer::receiver(
+                    format!("solVtxRcvr{i:02}"),
+                    socket,
+                    exit.clone(),
+                    sender.clone(),
+                    recycler.clone(),
+                    tpu_stats.clone(),
+                    Some(coalesce),
+                    true,
+                    in_vote_only_mode.clone(),
+                    false, // is_staked_service
+                )
+            })
+            .collect();
+
+        Self { thread_hdls }
+    }
+
+    pub fn join(self) -> thread::Result<()> {
+        for thread_hdl in self.thread_hdls {
+            thread_hdl.join()?;
+        }
+        Ok(())
+    }
+}

+ 3 - 1
vortexor/src/sender.rs

@@ -83,7 +83,9 @@ impl PacketBatchSender {
                     for batch in &packet_batches {
                         for packet_batch in batch.iter() {
                             for packet in packet_batch {
-                                packets.push(packet.data(0..).unwrap());
+                                if let Some(data) = packet.data(0..) {
+                                    packets.push(data);
+                                }
                             }
                         }
                     }