Răsfoiți Sursa

v2.0: reworks max number of outgoing push messages (backport of #3016) (#3038)

reworks max number of outgoing push messages (#3016)

max_bytes for outgoing push messages is pretty outdated and does not
allow gossip to function properly with current testnet cluster size.

In particular it does not allow to clear out queue of pending push
messages unless the new_push_messages function is called very frequently
which involves repeatedly locking/unlocking CRDS table.

Additionally leaving gossip entries in the queue for the next round will
add delay to propagating push messages which can compound as messages go
through several hops.

(cherry picked from commit 489f483e1d7b30ef114e0123994818b2accfa389)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
mergify[bot] 1 an în urmă
părinte
comite
4f423a512a
1 a modificat fișierele cu 5 adăugiri și 13 ștergeri
  1. 5 13
      gossip/src/crds_gossip_push.rs

+ 5 - 13
gossip/src/crds_gossip_push.rs

@@ -21,10 +21,8 @@ use {
         push_active_set::PushActiveSet,
         received_cache::ReceivedCache,
     },
-    bincode::serialized_size,
     itertools::Itertools,
     solana_sdk::{
-        packet::PACKET_DATA_SIZE,
         pubkey::Pubkey,
         signature::{Keypair, Signer},
         timing::timestamp,
@@ -53,8 +51,6 @@ const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
 const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT + 3;
 
 pub struct CrdsGossipPush {
-    /// Max bytes per message
-    max_bytes: usize,
     /// Active set of validators for push
     active_set: RwLock<PushActiveSet>,
     /// Cursor into the crds table for values to push.
@@ -74,8 +70,6 @@ pub struct CrdsGossipPush {
 impl Default for CrdsGossipPush {
     fn default() -> Self {
         Self {
-            // Allow upto 64 Crds Values per PUSH
-            max_bytes: PACKET_DATA_SIZE * 64,
             active_set: RwLock::default(),
             crds_cursor: Mutex::default(),
             received_cache: Mutex::new(ReceivedCache::new(2 * CRDS_UNIQUE_PUBKEY_CAPACITY)),
@@ -180,10 +174,10 @@ impl CrdsGossipPush {
         usize, // number of values
         usize, // number of push messages
     ) {
+        const MAX_NUM_PUSHES: usize = 1 << 12;
         let active_set = self.active_set.read().unwrap();
         let mut num_pushes = 0;
         let mut num_values = 0;
-        let mut total_bytes: usize = 0;
         let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
         let wallclock_window = self.wallclock_window(now);
         let mut crds_cursor = self.crds_cursor.lock().unwrap();
@@ -193,12 +187,7 @@ impl CrdsGossipPush {
             .get_entries(crds_cursor.deref_mut())
             .map(|entry| &entry.value)
             .filter(|value| wallclock_window.contains(&value.wallclock()));
-        for value in entries {
-            let serialized_size = serialized_size(&value).unwrap();
-            total_bytes = total_bytes.saturating_add(serialized_size as usize);
-            if total_bytes > self.max_bytes {
-                break;
-            }
+        'outer: for value in entries {
             num_values += 1;
             let origin = value.pubkey();
             let nodes = active_set.get_nodes(
@@ -210,6 +199,9 @@ impl CrdsGossipPush {
             for node in nodes.take(self.push_fanout) {
                 push_messages.entry(*node).or_default().push(value.clone());
                 num_pushes += 1;
+                if num_pushes >= MAX_NUM_PUSHES {
+                    break 'outer;
+                }
             }
         }
         drop(crds);