浏览代码

v1.18: limits number of nodes per IP address in Turbine (backport of #864) (#978)

limits number of nodes per IP address in Turbine (#864)

(cherry picked from commit c87b8303c7296d6dd7471da8f8f0fb83db8813fd)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
mergify[bot] 1 年之前
父节点
当前提交
a8c261b029
共有 2 个文件被更改,包括 64 次插入30 次删除
  1. 3 2
      turbine/benches/cluster_nodes.rs
  2. 61 28
      turbine/src/cluster_nodes.rs

+ 3 - 2
turbine/benches/cluster_nodes.rs

@@ -6,7 +6,7 @@ use {
     rand::{seq::SliceRandom, Rng},
     solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo,
     solana_ledger::shred::{Shred, ShredFlags},
-    solana_sdk::{clock::Slot, pubkey::Pubkey},
+    solana_sdk::{clock::Slot, genesis_config::ClusterType, pubkey::Pubkey},
     solana_turbine::{
         cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes},
         retransmit_stage::RetransmitStage,
@@ -21,7 +21,8 @@ fn make_cluster_nodes<R: Rng>(
     unstaked_ratio: Option<(u32, u32)>,
 ) -> (Vec<ContactInfo>, ClusterNodes<RetransmitStage>) {
     let (nodes, stakes, cluster_info) = make_test_cluster(rng, 5_000, unstaked_ratio);
-    let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
+    let cluster_nodes =
+        new_cluster_nodes::<RetransmitStage>(&cluster_info, ClusterType::Development, &stakes);
     (nodes, cluster_nodes)
 }
 

+ 61 - 28
turbine/src/cluster_nodes.rs

@@ -17,6 +17,7 @@ use {
     solana_sdk::{
         clock::{Epoch, Slot},
         feature_set,
+        genesis_config::ClusterType,
         native_token::LAMPORTS_PER_SOL,
         pubkey::Pubkey,
         signature::{Keypair, Signer},
@@ -29,7 +30,7 @@ use {
         collections::HashMap,
         iter::repeat_with,
         marker::PhantomData,
-        net::SocketAddr,
+        net::{IpAddr, SocketAddr},
         sync::{Arc, Mutex, RwLock},
         time::{Duration, Instant},
     },
@@ -39,6 +40,9 @@ use {
 const DATA_PLANE_FANOUT: usize = 200;
 pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4;
 
+// Limit number of nodes per IP address.
+const MAX_NUM_NODES_PER_IP_ADDRESS: usize = 10;
+
 #[derive(Debug, Error)]
 pub enum Error {
     #[error("Loopback from slot leader: {leader}, shred: {shred:?}")]
@@ -81,9 +85,6 @@ pub struct ClusterNodesCache<T> {
 pub struct RetransmitPeers<'a> {
     root_distance: usize, // distance from the root node
     children: Vec<&'a Node>,
-    // Maps tvu addresses to the first node
-    // in the shuffle with the same address.
-    addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
 }
 
 impl Node {
@@ -147,8 +148,12 @@ impl<T> ClusterNodes<T> {
 }
 
 impl ClusterNodes<BroadcastStage> {
-    pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Self {
-        new_cluster_nodes(cluster_info, stakes)
+    pub fn new(
+        cluster_info: &ClusterInfo,
+        cluster_type: ClusterType,
+        stakes: &HashMap<Pubkey, u64>,
+    ) -> Self {
+        new_cluster_nodes(cluster_info, cluster_type, stakes)
     }
 
     pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
@@ -168,16 +173,13 @@ impl ClusterNodes<RetransmitStage> {
         let RetransmitPeers {
             root_distance,
             children,
-            addrs,
         } = self.get_retransmit_peers(slot_leader, shred, fanout)?;
         let protocol = get_broadcast_protocol(shred);
-        let peers = children.into_iter().filter_map(|node| {
-            node.contact_info()?
-                .tvu(protocol)
-                .ok()
-                .filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
-        });
-        Ok((root_distance, peers.collect()))
+        let peers = children
+            .into_iter()
+            .filter_map(|node| node.contact_info()?.tvu(protocol).ok())
+            .collect();
+        Ok((root_distance, peers))
     }
 
     pub fn get_retransmit_peers(
@@ -197,19 +199,10 @@ impl ClusterNodes<RetransmitStage> {
         if let Some(index) = self.index.get(slot_leader) {
             weighted_shuffle.remove_index(*index);
         }
-        let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
         let mut rng = get_seeded_rng(slot_leader, shred);
-        let protocol = get_broadcast_protocol(shred);
         let nodes: Vec<_> = weighted_shuffle
             .shuffle(&mut rng)
             .map(|index| &self.nodes[index])
-            .inspect(|node| {
-                if let Some(node) = node.contact_info() {
-                    if let Ok(addr) = node.tvu(protocol) {
-                        addrs.entry(addr).or_insert(*node.pubkey());
-                    }
-                }
-            })
             .collect();
         let self_index = nodes
             .iter()
@@ -228,7 +221,6 @@ impl ClusterNodes<RetransmitStage> {
         Ok(RetransmitPeers {
             root_distance,
             children: peers.collect(),
-            addrs,
         })
     }
 
@@ -272,10 +264,11 @@ impl ClusterNodes<RetransmitStage> {
 
 pub fn new_cluster_nodes<T: 'static>(
     cluster_info: &ClusterInfo,
+    cluster_type: ClusterType,
     stakes: &HashMap<Pubkey, u64>,
 ) -> ClusterNodes<T> {
     let self_pubkey = cluster_info.id();
-    let nodes = get_nodes(cluster_info, stakes);
+    let nodes = get_nodes(cluster_info, cluster_type, stakes);
     let index: HashMap<_, _> = nodes
         .iter()
         .enumerate()
@@ -298,8 +291,21 @@ pub fn new_cluster_nodes<T: 'static>(
 
 // All staked nodes + other known tvu-peers + the node itself;
 // sorted by (stake, pubkey) in descending order.
-fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<Node> {
+fn get_nodes(
+    cluster_info: &ClusterInfo,
+    cluster_type: ClusterType,
+    stakes: &HashMap<Pubkey, u64>,
+) -> Vec<Node> {
     let self_pubkey = cluster_info.id();
+    let should_dedup_addrs = match cluster_type {
+        ClusterType::Development => false,
+        ClusterType::Devnet | ClusterType::Testnet | ClusterType::MainnetBeta => true,
+    };
+    // Maps IP addresses to number of nodes at that IP address.
+    let mut counts = {
+        let capacity = if should_dedup_addrs { stakes.len() } else { 0 };
+        HashMap::<IpAddr, usize>::with_capacity(capacity)
+    };
     // The local node itself.
     std::iter::once({
         let stake = stakes.get(&self_pubkey).copied().unwrap_or_default();
@@ -328,6 +334,30 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
     // Since sorted_by_key is stable, in case of duplicates, this
     // will keep nodes with contact-info.
     .dedup_by(|a, b| a.pubkey() == b.pubkey())
+    .filter_map(|node| {
+        if !should_dedup_addrs
+            || node
+                .contact_info()
+                .and_then(|node| node.tvu(Protocol::UDP).ok())
+                .map(|addr| {
+                    *counts
+                        .entry(addr.ip())
+                        .and_modify(|count| *count += 1)
+                        .or_insert(1)
+                })
+                <= Some(MAX_NUM_NODES_PER_IP_ADDRESS)
+        {
+            Some(node)
+        } else {
+            // If the node is not staked, drop it entirely. Otherwise, keep the
+            // pubkey for deterministic shuffle, but strip the contact-info so
+            // that no more packets are sent to this node.
+            (node.stake > 0u64).then(|| Node {
+                node: NodeId::from(node.pubkey()),
+                stake: node.stake,
+            })
+        }
+    })
     .collect()
 }
 
@@ -445,6 +475,7 @@ impl<T: 'static> ClusterNodesCache<T> {
         }
         let nodes = Arc::new(new_cluster_nodes::<T>(
             cluster_info,
+            root_bank.cluster_type(),
             &epoch_staked_nodes.unwrap_or_default(),
         ));
         *entry = Some((Instant::now(), Arc::clone(&nodes)));
@@ -594,7 +625,8 @@ mod tests {
         let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
         // ClusterInfo::tvu_peers excludes the node itself.
         assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
-        let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
+        let cluster_nodes =
+            new_cluster_nodes::<RetransmitStage>(&cluster_info, ClusterType::Development, &stakes);
         // All nodes with contact-info should be in the index.
         // Staked nodes with no contact-info should be included.
         assert!(cluster_nodes.nodes.len() > nodes.len());
@@ -629,7 +661,8 @@ mod tests {
         let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
         // ClusterInfo::tvu_peers excludes the node itself.
         assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
-        let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
+        let cluster_nodes =
+            ClusterNodes::<BroadcastStage>::new(&cluster_info, ClusterType::Development, &stakes);
         // All nodes with contact-info should be in the index.
         // Excluding this node itself.
         // Staked nodes with no contact-info should be included.