Sfoglia il codice sorgente

v1.18: BankingStage Forwarding Filter (backport of #685) (#697)

* BankingStage Forwarding Filter (#685)

* add PacketFlags::FROM_STAKED_NODE

* Only forward packets from staked node

* fix local-cluster test forwarding

* review comment

* tpu_votes get marked as from_staked_node

(cherry picked from commit 1744e9efd74d83aeb15b384a8174949dbe753172)

# Conflicts:
#	sdk/src/packet.rs

* resolve conflict

* revert: local-cluster test changes

---------

Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>
Co-authored-by: Trent Nelson <trent@solana.com>
mergify[bot] 1 anno fa
parent
commit
7e9a53ee79

+ 1 - 0
bench-streamer/src/main.rs

@@ -116,6 +116,7 @@ fn main() -> Result<()> {
             Duration::from_millis(1), // coalesce
             true,
             None,
+            false,
         ));
     }
 

+ 1 - 0
core/src/banking_stage/forwarder.rs

@@ -161,6 +161,7 @@ impl Forwarder {
         self.update_data_budget();
         let packet_vec: Vec<_> = forwardable_packets
             .filter(|p| !p.meta().forwarded())
+            .filter(|p| p.meta().is_from_staked_node())
             .filter(|p| self.data_budget.take(p.meta().size))
             .filter_map(|p| p.data(..).map(|data| data.to_vec()))
             .collect();

+ 3 - 0
core/src/fetch_stage.rs

@@ -169,6 +169,7 @@ impl FetchStage {
                         coalesce,
                         true,
                         in_vote_only_mode.clone(),
+                        false, // unstaked connections
                     )
                 })
                 .collect()
@@ -190,6 +191,7 @@ impl FetchStage {
                         coalesce,
                         true,
                         in_vote_only_mode.clone(),
+                        false, // unstaked connections
                     )
                 })
                 .collect()
@@ -210,6 +212,7 @@ impl FetchStage {
                     coalesce,
                     true,
                     None,
+                    true, // only staked connections should be voting
                 )
             })
             .collect();

+ 2 - 0
core/src/repair/ancestor_hashes_service.rs

@@ -170,6 +170,7 @@ impl AncestorHashesService {
             Duration::from_millis(1), // coalesce
             false,                    // use_pinned_memory
             None,                     // in_vote_only_mode
+            false,                    //  is_staked_service
         );
 
         let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded();
@@ -1302,6 +1303,7 @@ mod test {
                 Duration::from_millis(1), // coalesce
                 false,
                 None,
+                false,
             );
             let (remote_request_sender, remote_request_receiver) = unbounded();
             let t_packet_adapter = Builder::new()

+ 1 - 0
core/src/repair/serve_repair_service.rs

@@ -46,6 +46,7 @@ impl ServeRepairService {
             Duration::from_millis(1), // coalesce
             false,                    // use_pinned_memory
             None,                     // in_vote_only_mode
+            false,                    // is_staked_service
         );
         let t_packet_adapter = Builder::new()
             .name(String::from("solServRAdapt"))

+ 1 - 0
core/src/shred_fetch_stage.rs

@@ -171,6 +171,7 @@ impl ShredFetchStage {
                     PACKET_COALESCE_DURATION,
                     true, // use_pinned_memory
                     None, // in_vote_only_mode
+                    false,
                 )
             })
             .collect();

+ 1 - 0
gossip/src/gossip_service.rs

@@ -58,6 +58,7 @@ impl GossipService {
             Duration::from_millis(1), // coalesce
             false,
             None,
+            false,
         );
         let (consume_sender, listen_receiver) = unbounded();
         let t_socket_consume = cluster_info.clone().start_socket_consume_thread(

+ 12 - 0
sdk/src/packet.rs

@@ -33,6 +33,8 @@ bitflags! {
         /// the packet is built.
         /// This field can be removed when the above feature gate is adopted by mainnet-beta.
         const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
+        /// For marking packets from staked nodes
+        const FROM_STAKED_NODE = 0b1000_0000;
     }
 }
 
@@ -213,6 +215,11 @@ impl Meta {
         self.port = socket_addr.port();
     }
 
+    pub fn set_from_staked_node(&mut self, from_staked_node: bool) {
+        self.flags
+            .set(PacketFlags::FROM_STAKED_NODE, from_staked_node);
+    }
+
     #[inline]
     pub fn discard(&self) -> bool {
         self.flags.contains(PacketFlags::DISCARD)
@@ -265,6 +272,11 @@ impl Meta {
     pub fn round_compute_unit_price(&self) -> bool {
         self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
     }
+
+    #[inline]
+    pub fn is_from_staked_node(&self) -> bool {
+        self.flags.contains(PacketFlags::FROM_STAKED_NODE)
+    }
 }
 
 impl Default for Meta {

+ 1 - 0
streamer/src/nonblocking/quic.rs

@@ -878,6 +878,7 @@ async fn handle_chunk(
                 if packet_accum.is_none() {
                     let mut meta = Meta::default();
                     meta.set_socket_addr(remote_addr);
+                    meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked(_)));
                     *packet_accum = Some(PacketAccumulator {
                         meta,
                         chunks: SmallVec::new(),

+ 8 - 1
streamer/src/streamer.rs

@@ -110,6 +110,7 @@ fn recv_loop(
     coalesce: Duration,
     use_pinned_memory: bool,
     in_vote_only_mode: Option<Arc<AtomicBool>>,
+    is_staked_service: bool,
 ) -> Result<()> {
     loop {
         let mut packet_batch = if use_pinned_memory {
@@ -147,7 +148,9 @@ fn recv_loop(
                     if len == PACKETS_PER_BATCH {
                         full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
                     }
-
+                    packet_batch
+                        .iter_mut()
+                        .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
                     packet_batch_sender.send(packet_batch)?;
                 }
                 break;
@@ -156,6 +159,7 @@ fn recv_loop(
     }
 }
 
+#[allow(clippy::too_many_arguments)]
 pub fn receiver(
     socket: Arc<UdpSocket>,
     exit: Arc<AtomicBool>,
@@ -165,6 +169,7 @@ pub fn receiver(
     coalesce: Duration,
     use_pinned_memory: bool,
     in_vote_only_mode: Option<Arc<AtomicBool>>,
+    is_staked_service: bool,
 ) -> JoinHandle<()> {
     let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
     assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
@@ -180,6 +185,7 @@ pub fn receiver(
                 coalesce,
                 use_pinned_memory,
                 in_vote_only_mode,
+                is_staked_service,
             );
         })
         .unwrap()
@@ -488,6 +494,7 @@ mod test {
             Duration::from_millis(1), // coalesce
             true,
             None,
+            false,
         );
         const NUM_PACKETS: usize = 5;
         let t_responder = {