Sfoglia il codice sorgente

Only aggregate heaviest fork at the coordinator. (#3115)

* Only aggregate heaviest fork for the coordinator, do not exit until asked.

* Fix a bad merge.
Wen 1 anno fa
parent
commit
2bde0fc99c

+ 3 - 8
wen-restart/proto/wen_restart.proto

@@ -39,17 +39,12 @@ message HeaviestForkRecord {
     uint64 total_active_stake = 3;
     uint32 shred_version = 4;
     uint64 wallclock = 5;
-}
-
-message HeaviestForkAggregateFinal {
-    uint64 total_active_stake = 1;
-    uint64 total_active_stake_seen_supermajority = 2;
-    uint64 total_active_stake_agreed_with_me = 3;
+    string from = 6;
 }
 
 message HeaviestForkAggregateRecord {
-    map<string, HeaviestForkRecord> received = 1;
-    optional HeaviestForkAggregateFinal final_result = 2;
+    repeated HeaviestForkRecord received = 1;
+    uint64 total_active_stake = 2;
 }
 
 message GenerateSnapshotRecord {

+ 39 - 90
wen-restart/src/heaviest_fork_aggregate.rs

@@ -12,7 +12,6 @@ use {
 };
 
 pub(crate) struct HeaviestForkAggregate {
-    supermajority_threshold: f64,
     my_shred_version: u16,
     my_pubkey: Pubkey,
     // We use the epoch_stakes of the Epoch our heaviest bank is in. Proceed and exit only if
@@ -21,14 +20,6 @@ pub(crate) struct HeaviestForkAggregate {
     heaviest_forks: HashMap<Pubkey, RestartHeaviestFork>,
     block_stake_map: HashMap<(Slot, Hash), u64>,
     active_peers: HashSet<Pubkey>,
-    active_peers_seen_supermajority: HashSet<Pubkey>,
-}
-
-#[derive(Clone, Debug, PartialEq)]
-pub struct HeaviestForkFinalResult {
-    pub block_stake_map: HashMap<(Slot, Hash), u64>,
-    pub total_active_stake: u64,
-    pub total_active_stake_seen_supermajority: u64,
 }
 
 #[derive(Debug, PartialEq)]
@@ -42,7 +33,6 @@ pub enum HeaviestForkAggregateResult {
 
 impl HeaviestForkAggregate {
     pub(crate) fn new(
-        wait_for_supermajority_threshold_percent: u64,
         my_shred_version: u16,
         epoch_stakes: &EpochStakes,
         my_heaviest_fork_slot: Slot,
@@ -57,23 +47,20 @@ impl HeaviestForkAggregate {
             epoch_stakes.node_id_to_stake(my_pubkey).unwrap_or(0),
         );
         Self {
-            supermajority_threshold: wait_for_supermajority_threshold_percent as f64 / 100.0,
             my_shred_version,
             my_pubkey: *my_pubkey,
             epoch_stakes: epoch_stakes.clone(),
             heaviest_forks: HashMap::new(),
             block_stake_map,
             active_peers,
-            active_peers_seen_supermajority: HashSet::new(),
         }
     }
 
     pub(crate) fn aggregate_from_record(
         &mut self,
-        key_string: &str,
         record: &HeaviestForkRecord,
     ) -> Result<HeaviestForkAggregateResult> {
-        let from = Pubkey::from_str(key_string)?;
+        let from = Pubkey::from_str(&record.from)?;
         let bankhash = Hash::from_str(&record.bankhash)?;
         let restart_heaviest_fork = RestartHeaviestFork {
             from,
@@ -100,7 +87,6 @@ impl HeaviestForkAggregate {
         }
         if current_heaviest_fork == new_heaviest_fork
             || current_heaviest_fork.wallclock > new_heaviest_fork.wallclock
-            || current_heaviest_fork.observed_stake == new_heaviest_fork.observed_stake
         {
             return HeaviestForkAggregateResult::AlreadyExists;
         }
@@ -110,6 +96,7 @@ impl HeaviestForkAggregate {
             total_active_stake: new_heaviest_fork.observed_stake,
             shred_version: new_heaviest_fork.shred_version as u32,
             wallclock: new_heaviest_fork.wallclock,
+            from: new_heaviest_fork.from.to_string(),
         })
     }
 
@@ -117,7 +104,6 @@ impl HeaviestForkAggregate {
         &mut self,
         received_heaviest_fork: RestartHeaviestFork,
     ) -> HeaviestForkAggregateResult {
-        let total_stake = self.epoch_stakes.total_stake();
         let from = &received_heaviest_fork.from;
         let sender_stake = self.epoch_stakes.node_id_to_stake(from).unwrap_or(0);
         if sender_stake == 0 {
@@ -161,22 +147,11 @@ impl HeaviestForkAggregate {
                 total_active_stake: received_heaviest_fork.observed_stake,
                 shred_version: received_heaviest_fork.shred_version as u32,
                 wallclock: received_heaviest_fork.wallclock,
+                from: from.to_string(),
             })
         };
         self.heaviest_forks
             .insert(*from, received_heaviest_fork.clone());
-        if received_heaviest_fork.observed_stake as f64 / total_stake as f64
-            >= self.supermajority_threshold
-        {
-            self.active_peers_seen_supermajority.insert(*from);
-        }
-        if !self
-            .active_peers_seen_supermajority
-            .contains(&self.my_pubkey)
-            && self.total_active_stake() as f64 / total_stake as f64 >= self.supermajority_threshold
-        {
-            self.active_peers_seen_supermajority.insert(self.my_pubkey);
-        }
         result
     }
 
@@ -186,14 +161,6 @@ impl HeaviestForkAggregate {
         })
     }
 
-    pub(crate) fn total_active_stake_seen_supermajority(&self) -> u64 {
-        self.active_peers_seen_supermajority
-            .iter()
-            .fold(0, |sum: u64, pubkey| {
-                sum.saturating_add(self.epoch_stakes.node_id_to_stake(pubkey).unwrap_or(0))
-            })
-    }
-
     pub(crate) fn block_stake_map(self) -> HashMap<(Slot, Hash), u64> {
         self.block_stake_map
     }
@@ -244,7 +211,6 @@ mod tests {
         let heaviest_hash = Hash::new_unique();
         TestAggregateInitResult {
             heaviest_fork_aggregate: HeaviestForkAggregate::new(
-                75,
                 SHRED_VERSION,
                 root_bank.epoch_stakes(root_bank.epoch()).unwrap(),
                 heaviest_slot,
@@ -285,6 +251,7 @@ mod tests {
                     total_active_stake: 100,
                     shred_version: SHRED_VERSION as u32,
                     wallclock: timestamp1,
+                    from: pubkey.to_string(),
                 }),
             );
         }
@@ -316,6 +283,7 @@ mod tests {
                 total_active_stake: 100,
                 shred_version: SHRED_VERSION as u32,
                 wallclock: now,
+                from: new_active_validator.to_string(),
             }),
         );
         let expected_total_active_stake = (initial_num_active_validators + 2) as u64 * 100;
@@ -399,6 +367,7 @@ mod tests {
                     total_active_stake: 1400,
                     shred_version: SHRED_VERSION as u32,
                     wallclock: now,
+                    from: pubkey.to_string(),
                 }),
             );
         }
@@ -406,12 +375,6 @@ mod tests {
             test_state.heaviest_fork_aggregate.total_active_stake(),
             1400
         );
-        assert_eq!(
-            test_state
-                .heaviest_fork_aggregate
-                .total_active_stake_seen_supermajority(),
-            0
-        );
 
         // test that when 75% of the stake is seeing supermajority,
         // the active percent seeing supermajority is 75%.
@@ -435,6 +398,7 @@ mod tests {
                     total_active_stake: 1500,
                     shred_version: SHRED_VERSION as u32,
                     wallclock: now,
+                    from: pubkey.to_string(),
                 }),
             );
         }
@@ -443,14 +407,6 @@ mod tests {
             test_state.heaviest_fork_aggregate.total_active_stake(),
             1500
         );
-        // I myself is seeing supermajority as well, with the 14 validators
-        // reporting 70%, the total active stake seeing supermajority is 1500 (75%).
-        assert_eq!(
-            test_state
-                .heaviest_fork_aggregate
-                .total_active_stake_seen_supermajority(),
-            1500
-        );
 
         // test that message from my pubkey is ignored.
         assert_eq!(
@@ -483,12 +439,13 @@ mod tests {
             bankhash: test_state.heaviest_hash.to_string(),
             shred_version: SHRED_VERSION as u32,
             total_active_stake: 100,
+            from: from.to_string(),
         };
         assert_eq!(test_state.heaviest_fork_aggregate.total_active_stake(), 100);
         assert_eq!(
             test_state
                 .heaviest_fork_aggregate
-                .aggregate_from_record(&from.to_string(), &record,)
+                .aggregate_from_record(&record)
                 .unwrap(),
             HeaviestForkAggregateResult::Inserted(record.clone()),
         );
@@ -528,6 +485,7 @@ mod tests {
                 bankhash: test_state.heaviest_hash.to_string(),
                 shred_version: SHRED_VERSION as u32,
                 total_active_stake: 200,
+                from: from.to_string(),
             }),
         );
 
@@ -576,19 +534,18 @@ mod tests {
         assert_eq!(test_state.heaviest_fork_aggregate.total_active_stake(), 200);
 
         // Record from validator with zero stake should be ignored.
+        let zero_stake_validator = Pubkey::new_unique();
         assert_eq!(
             test_state
                 .heaviest_fork_aggregate
-                .aggregate_from_record(
-                    &Pubkey::new_unique().to_string(),
-                    &HeaviestForkRecord {
-                        wallclock: timestamp(),
-                        slot: test_state.heaviest_slot,
-                        bankhash: test_state.heaviest_hash.to_string(),
-                        shred_version: SHRED_VERSION as u32,
-                        total_active_stake: 100,
-                    }
-                )
+                .aggregate_from_record(&HeaviestForkRecord {
+                    wallclock: timestamp(),
+                    slot: test_state.heaviest_slot,
+                    bankhash: test_state.heaviest_hash.to_string(),
+                    shred_version: SHRED_VERSION as u32,
+                    total_active_stake: 100,
+                    from: zero_stake_validator.to_string(),
+                })
                 .unwrap(),
             HeaviestForkAggregateResult::ZeroStakeIgnored,
         );
@@ -596,22 +553,20 @@ mod tests {
         assert_eq!(test_state.heaviest_fork_aggregate.total_active_stake(), 200);
 
         // Record from my pubkey should be ignored.
+        let my_pubkey = test_state.validator_voting_keypairs[MY_INDEX]
+            .node_keypair
+            .pubkey();
         assert_eq!(
             test_state
                 .heaviest_fork_aggregate
-                .aggregate_from_record(
-                    &test_state.validator_voting_keypairs[MY_INDEX]
-                        .node_keypair
-                        .pubkey()
-                        .to_string(),
-                    &HeaviestForkRecord {
-                        wallclock: timestamp(),
-                        slot: test_state.heaviest_slot,
-                        bankhash: test_state.heaviest_hash.to_string(),
-                        shred_version: SHRED_VERSION as u32,
-                        total_active_stake: 100,
-                    }
-                )
+                .aggregate_from_record(&HeaviestForkRecord {
+                    wallclock: timestamp(),
+                    slot: test_state.heaviest_slot,
+                    bankhash: test_state.heaviest_hash.to_string(),
+                    shred_version: SHRED_VERSION as u32,
+                    total_active_stake: 100,
+                    from: my_pubkey.to_string(),
+                })
                 .unwrap(),
             HeaviestForkAggregateResult::AlreadyExists,
         );
@@ -620,46 +575,40 @@ mod tests {
     #[test]
     fn test_aggregate_from_record_failures() {
         let mut test_state = test_aggregate_init();
+        let from = test_state.validator_voting_keypairs[0]
+            .node_keypair
+            .pubkey();
         let mut heaviest_fork_record = HeaviestForkRecord {
             wallclock: timestamp(),
             slot: test_state.heaviest_slot,
             bankhash: test_state.heaviest_hash.to_string(),
             shred_version: SHRED_VERSION as u32,
             total_active_stake: 100,
+            from: from.to_string(),
         };
         // First test that this is a valid record.
         assert_eq!(
             test_state
                 .heaviest_fork_aggregate
-                .aggregate_from_record(
-                    &test_state.validator_voting_keypairs[0]
-                        .node_keypair
-                        .pubkey()
-                        .to_string(),
-                    &heaviest_fork_record,
-                )
+                .aggregate_from_record(&heaviest_fork_record,)
                 .unwrap(),
             HeaviestForkAggregateResult::Inserted(heaviest_fork_record.clone()),
         );
         // Then test that it fails if the record is invalid.
 
         // Invalid pubkey.
+        heaviest_fork_record.from = "invalid_pubkey".to_string();
         assert!(test_state
             .heaviest_fork_aggregate
-            .aggregate_from_record("invalid_pubkey", &heaviest_fork_record,)
+            .aggregate_from_record(&heaviest_fork_record,)
             .is_err());
 
         // Invalid hash.
+        heaviest_fork_record.from = from.to_string();
         heaviest_fork_record.bankhash.clear();
         assert!(test_state
             .heaviest_fork_aggregate
-            .aggregate_from_record(
-                &test_state.validator_voting_keypairs[0]
-                    .node_keypair
-                    .pubkey()
-                    .to_string(),
-                &heaviest_fork_record,
-            )
+            .aggregate_from_record(&heaviest_fork_record,)
             .is_err());
     }
 }

+ 71 - 357
wen-restart/src/wen_restart.rs

@@ -8,8 +8,8 @@ use {
             LastVotedForkSlotsEpochInfo, LastVotedForkSlotsFinalResult,
         },
         solana::wen_restart_proto::{
-            self, ConflictMessage, GenerateSnapshotRecord, HeaviestForkAggregateFinal,
-            HeaviestForkAggregateRecord, HeaviestForkRecord, LastVotedForkSlotsAggregateFinal,
+            self, ConflictMessage, GenerateSnapshotRecord, HeaviestForkAggregateRecord,
+            HeaviestForkRecord, LastVotedForkSlotsAggregateFinal,
             LastVotedForkSlotsAggregateRecord, LastVotedForkSlotsEpochInfoRecord,
             LastVotedForkSlotsRecord, State as RestartState, WenRestartProgress,
         },
@@ -58,7 +58,7 @@ use {
             Arc, RwLock,
         },
         thread::sleep,
-        time::{Duration, Instant},
+        time::Duration,
     },
 };
 
@@ -70,10 +70,6 @@ const REPAIR_THRESHOLD: f64 = 0.42;
 // made regarding how much non-conforming/offline validators the
 // algorithm can tolerate.
 const HEAVIEST_FORK_THRESHOLD_DELTA: f64 = 0.38;
-// We allow at most 5% of the stake to disagree with us.
-const HEAVIEST_FORK_DISAGREE_THRESHOLD_PERCENT: f64 = 5.0;
-// We update HeaviestFork every 5 minutes at least.
-const HEAVIEST_REFRESH_INTERVAL_IN_SECONDS: u64 = 300;
 
 #[derive(Debug, PartialEq)]
 pub enum WenRestartError {
@@ -641,7 +637,6 @@ pub(crate) fn find_bankhash_of_heaviest_fork(
 // Aggregate the heaviest fork and send updates to the cluster.
 pub(crate) fn aggregate_restart_heaviest_fork(
     wen_restart_path: &PathBuf,
-    wait_for_supermajority_threshold_percent: u64,
     cluster_info: Arc<ClusterInfo>,
     bank_forks: Arc<RwLock<BankForks>>,
     exit: Arc<AtomicBool>,
@@ -658,22 +653,12 @@ pub(crate) fn aggregate_restart_heaviest_fork(
     let my_heaviest_fork = progress.my_heaviest_fork.clone().unwrap();
     let heaviest_fork_slot = my_heaviest_fork.slot;
     let heaviest_fork_hash = Hash::from_str(&my_heaviest_fork.bankhash)?;
-    // When checking whether to exit aggregate_restart_heaviest_fork, use the epoch_stakes
-    // associated with the heaviest fork slot we picked. This ensures that everyone agreeing
-    // with me use the same EpochStakes to calculate the supermajority threshold.
+    // Use the epoch_stakes associated with the heaviest fork slot we picked.
     let epoch_stakes = root_bank
         .epoch_stakes(root_bank.epoch_schedule().get_epoch(heaviest_fork_slot))
         .unwrap();
     let total_stake = epoch_stakes.total_stake();
-    let adjusted_threshold_percent = wait_for_supermajority_threshold_percent
-        .saturating_sub(HEAVIEST_FORK_DISAGREE_THRESHOLD_PERCENT.round() as u64);
-    // The threshold for supermajority should definitely be higher than 67%.
-    assert!(
-        adjusted_threshold_percent > 67,
-        "Majority threshold too low"
-    );
     let mut heaviest_fork_aggregate = HeaviestForkAggregate::new(
-        adjusted_threshold_percent,
         cluster_info.my_shred_version(),
         epoch_stakes,
         heaviest_fork_slot,
@@ -681,38 +666,27 @@ pub(crate) fn aggregate_restart_heaviest_fork(
         &cluster_info.id(),
     );
     if let Some(aggregate_record) = &progress.heaviest_fork_aggregate {
-        for (key_string, message) in &aggregate_record.received {
-            if let Err(e) = heaviest_fork_aggregate.aggregate_from_record(key_string, message) {
+        for message in &aggregate_record.received {
+            if let Err(e) = heaviest_fork_aggregate.aggregate_from_record(message) {
                 // Do not abort wen_restart if we got one malformed message.
                 error!("Failed to aggregate from record: {:?}", e);
             }
         }
     } else {
         progress.heaviest_fork_aggregate = Some(HeaviestForkAggregateRecord {
-            received: HashMap::new(),
-            final_result: None,
+            received: Vec::new(),
+            total_active_stake: 0,
         });
     }
 
-    let mut total_active_stake = heaviest_fork_aggregate.total_active_stake();
-    progress
-        .my_heaviest_fork
-        .as_mut()
-        .unwrap()
-        .total_active_stake = total_active_stake;
-
     let mut cursor = solana_gossip::crds::Cursor::default();
-    // Init progress_changed to true and progress_last_sent to old time so we can send out the first Gossip message.
-    let mut progress_changed = true;
-    let mut progress_last_sent = Instant::now()
-        .checked_sub(Duration::from_secs(HEAVIEST_REFRESH_INTERVAL_IN_SECONDS))
-        .unwrap();
-    let majority_stake_required =
-        (total_stake as f64 / 100.0 * adjusted_threshold_percent as f64).round() as u64;
-    let mut total_active_stake_higher_than_supermajority = false;
+    let mut total_active_stake = 0;
     loop {
         if exit.load(Ordering::Relaxed) {
-            return Err(WenRestartError::Exiting.into());
+            for ((slot, hash), stake) in heaviest_fork_aggregate.block_stake_map().iter() {
+                info!("Slot: {}, Hash: {}, Stake: {}", slot, hash, stake,);
+            }
+            return Ok(());
         }
         let start = timestamp();
         for new_heaviest_fork in cluster_info.get_restart_heaviest_fork(&mut cursor) {
@@ -726,8 +700,7 @@ pub(crate) fn aggregate_restart_heaviest_fork(
                         .as_mut()
                         .unwrap()
                         .received
-                        .insert(from, record);
-                    progress_changed = true;
+                        .push(record);
                 }
                 HeaviestForkAggregateResult::DifferentVersionExists(old_record, new_record) => {
                     warn!("Different version from {from} exists old {old_record:#?} vs new {new_record:#?}");
@@ -748,128 +721,23 @@ pub(crate) fn aggregate_restart_heaviest_fork(
         if current_total_active_stake > total_active_stake {
             total_active_stake = current_total_active_stake;
             progress
-                .my_heaviest_fork
+                .heaviest_fork_aggregate
                 .as_mut()
                 .unwrap()
                 .total_active_stake = current_total_active_stake;
-            progress_changed = true;
-        }
-        if progress_changed {
-            progress_changed = false;
-            let total_active_stake_seen_supermajority =
-                heaviest_fork_aggregate.total_active_stake_seen_supermajority();
-            info!(
-                "Total active stake seeing supermajority: {} Total active stake: {} Required to exit {} Total stake {}",
-                total_active_stake_seen_supermajority,
-                heaviest_fork_aggregate.total_active_stake(),
-                majority_stake_required,
-                total_stake
-            );
-            let can_exit = total_active_stake_seen_supermajority >= majority_stake_required;
-            let saw_supermajority_first_time = current_total_active_stake
-                >= majority_stake_required
-                && !total_active_stake_higher_than_supermajority
-                && {
-                    total_active_stake_higher_than_supermajority = true;
-                    true
-                };
-            // Only send out updates every 5 minutes or when we can exit or active stake passes supermajority
-            // the first time.
-            if progress_last_sent.elapsed().as_secs() >= HEAVIEST_REFRESH_INTERVAL_IN_SECONDS
-                || can_exit
-                || saw_supermajority_first_time
-            {
-                cluster_info.push_restart_heaviest_fork(
-                    heaviest_fork_slot,
-                    heaviest_fork_hash,
-                    current_total_active_stake,
-                );
-                write_wen_restart_records(wen_restart_path, progress)?;
-                progress_last_sent = Instant::now();
-            }
-            if can_exit {
-                break;
-            }
         }
+        info!(
+            "Total active stake: {} Total stake {}",
+            heaviest_fork_aggregate.total_active_stake(),
+            total_stake
+        );
+        write_wen_restart_records(wen_restart_path, progress)?;
         let elapsed = timestamp().saturating_sub(start);
         let time_left = GOSSIP_SLEEP_MILLIS.saturating_sub(elapsed);
         if time_left > 0 {
             sleep(Duration::from_millis(time_left));
         }
     }
-
-    // Final check to see if supermajority agrees with us.
-    let total_active_stake = heaviest_fork_aggregate.total_active_stake();
-    let total_active_stake_seen_supermajority =
-        heaviest_fork_aggregate.total_active_stake_seen_supermajority();
-    let block_stake_map = heaviest_fork_aggregate.block_stake_map();
-    let total_active_stake_agreed_with_me = *block_stake_map
-        .get(&(heaviest_fork_slot, heaviest_fork_hash))
-        .unwrap_or(&0);
-    // It doesn't matter if 5% disagrees with us.
-    let success_threshold =
-        wait_for_supermajority_threshold_percent as f64 - HEAVIEST_FORK_DISAGREE_THRESHOLD_PERCENT;
-    if total_active_stake_agreed_with_me as f64 * 100.0 / total_stake as f64 >= success_threshold {
-        info!(
-            "Heaviest fork agreed upon by supermajority: slot: {}, bankhash: {}",
-            heaviest_fork_slot, heaviest_fork_hash
-        );
-        progress
-            .heaviest_fork_aggregate
-            .as_mut()
-            .unwrap()
-            .final_result = Some(HeaviestForkAggregateFinal {
-            total_active_stake,
-            total_active_stake_seen_supermajority,
-            total_active_stake_agreed_with_me,
-        });
-        Ok(())
-    } else {
-        info!(
-            "Not enough stake agreeing with our heaviest fork: slot: {},
-            bankhash: {}, stake aggreeing with us {} out of {}",
-            heaviest_fork_slot,
-            heaviest_fork_hash,
-            total_active_stake_agreed_with_me,
-            total_active_stake
-        );
-        let mut max_slot_hash = (0, Hash::default());
-        let mut max_stake = 0;
-        for (slot, hash) in block_stake_map.keys() {
-            let stake = block_stake_map[&(*slot, *hash)];
-            if stake > max_stake {
-                max_stake = stake;
-                max_slot_hash = (*slot, *hash);
-            }
-            info!(
-                "Slot: {}, Hash: {}, Stake: {}",
-                slot,
-                hash,
-                block_stake_map[&(*slot, *hash)]
-            );
-        }
-        let max_stake_percent = max_stake as f64 * 100.0 / total_stake as f64;
-        if max_stake_percent >= success_threshold {
-            warn!(
-                "Max stake slot: {}, hash: {}, stake: {:.2}% does not agree with my
-                choice, please go to discord to download the snapshot and restart
-                the validator with --wait-for-supermajority.",
-                max_slot_hash.0, max_slot_hash.1, max_stake_percent
-            );
-        } else {
-            warn!(
-                "Cluster consensus slot: {}, hash: {}, stake: {:.2}% does not agree,
-                please go to discord for next steps.",
-                max_slot_hash.0, max_slot_hash.1, max_stake_percent
-            );
-        }
-        Err(WenRestartError::NotEnoughStakeAgreeingWithUs(
-            heaviest_fork_slot,
-            heaviest_fork_hash,
-            block_stake_map,
-        )
-        .into())
-    }
 }
 
 pub(crate) fn receive_restart_heaviest_fork(
@@ -897,6 +765,7 @@ pub(crate) fn receive_restart_heaviest_fork(
                     total_active_stake: 0,
                     wallclock: new_heaviest_fork.wallclock,
                     shred_version: new_heaviest_fork.shred_version as u32,
+                    from: new_heaviest_fork.from.to_string(),
                 });
                 return Ok((coordinator_heaviest_slot, coordinator_heaviest_hash));
             }
@@ -989,6 +858,7 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> {
                             total_active_stake: 0,
                             wallclock: 0,
                             shred_version: config.cluster_info.my_shred_version() as u32,
+                            from: config.cluster_info.id().to_string(),
                         }
                     }
                 };
@@ -1008,7 +878,6 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> {
                     // TODO(wen): remove this aggregate.
                     aggregate_restart_heaviest_fork(
                         &config.wen_restart_path,
-                        config.wait_for_supermajority_threshold_percent,
                         config.cluster_info.clone(),
                         config.bank_forks.clone(),
                         config.exit.clone(),
@@ -1063,6 +932,15 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> {
                     --no-snapshot-fetch",
                     slot, hash, shred_version,
                 );
+                if config.cluster_info.id() == config.wen_restart_coordinator {
+                    aggregate_restart_heaviest_fork(
+                        &config.wen_restart_path,
+                        config.cluster_info.clone(),
+                        config.bank_forks.clone(),
+                        config.exit.clone(),
+                        &mut progress,
+                    )?;
+                }
                 return Ok(());
             }
         };
@@ -1658,6 +1536,9 @@ mod tests {
         let last_vote_bankhash = Hash::new_unique();
         let expected_slots_to_repair: Vec<Slot> =
             (last_vote_slot + 1..last_vote_slot + 3).collect();
+        let my_pubkey = &test_state.validator_voting_keypairs[MY_INDEX]
+            .node_keypair
+            .pubkey();
 
         let bank_snapshots_dir = tempfile::TempDir::new().unwrap();
         let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
@@ -1768,45 +1649,20 @@ mod tests {
             }
             sleep(Duration::from_millis(100));
         }
-        // Now simulate receiving HeaviestFork messages.
-        let mut expected_received_heaviest_fork = HashMap::new();
-        let validators_to_take: usize = ((WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT
-            - NON_CONFORMING_VALIDATOR_PERCENT)
-            * TOTAL_VALIDATOR_COUNT as u64
-            / 100
-            - 1)
-        .try_into()
-        .unwrap();
-        // HeaviestFork only requires 75% vs 80% required for LastVotedForkSlots. We have 5% stake, so we need 70%.
-        let total_active_stake_during_heaviest_fork = (validators_to_take + 1) as u64 * 100;
-        for keypairs in test_state
-            .validator_voting_keypairs
-            .iter()
-            .take(validators_to_take)
-        {
-            let node_pubkey = keypairs.node_keypair.pubkey();
-            let node = ContactInfo::new_rand(&mut rng, Some(node_pubkey));
-            let now = timestamp();
-            push_restart_heaviest_fork(
-                test_state.cluster_info.clone(),
-                &node,
-                expected_heaviest_fork_slot,
-                &expected_heaviest_fork_bankhash,
-                total_active_stake_during_heaviest_fork,
-                &keypairs.node_keypair,
-                now,
-            );
-            expected_received_heaviest_fork.insert(
-                node_pubkey.to_string(),
-                HeaviestForkRecord {
-                    slot: expected_heaviest_fork_slot,
-                    bankhash: expected_heaviest_fork_bankhash.to_string(),
-                    total_active_stake: total_active_stake_during_heaviest_fork,
-                    shred_version: SHRED_VERSION as u32,
-                    wallclock: now,
-                },
-            );
-        }
+        // Now simulate receiving HeaviestFork messages from coordinator.
+        let coordinator_keypair =
+            &test_state.validator_voting_keypairs[COORDINATOR_INDEX].node_keypair;
+        let node = ContactInfo::new_rand(&mut rng, Some(coordinator_keypair.pubkey()));
+        let now = timestamp();
+        push_restart_heaviest_fork(
+            test_state.cluster_info.clone(),
+            &node,
+            expected_heaviest_fork_slot,
+            &expected_heaviest_fork_bankhash,
+            0,
+            coordinator_keypair,
+            now,
+        );
 
         assert!(wen_restart_thread_handle.join().is_ok());
         exit.store(true, Ordering::Relaxed);
@@ -1826,13 +1682,13 @@ mod tests {
                 )
             })
             .collect();
+        let stake_for_new_slots = validators_to_take as u64 * 100;
         expected_slots_stake_map.extend(
             expected_slots_to_repair
                 .iter()
-                .map(|slot| (*slot, total_active_stake_during_heaviest_fork)),
+                .map(|slot| (*slot, stake_for_new_slots)),
         );
-        // We are simulating 5% joined LastVotedForkSlots but not HeaviestFork.
-        let voted_stake = total_active_stake_during_heaviest_fork + 100;
+        let voted_stake = (validators_to_take + 1) as u64 * 100;
         assert_eq!(
             progress,
             WenRestartProgress {
@@ -1874,6 +1730,7 @@ mod tests {
                     total_active_stake: 0,
                     shred_version: SHRED_VERSION as u32,
                     wallclock: 0,
+                    from: my_pubkey.to_string(),
                 }),
                 heaviest_fork_aggregate: None,
                 my_snapshot: Some(GenerateSnapshotRecord {
@@ -1892,6 +1749,7 @@ mod tests {
                         .as_ref()
                         .unwrap()
                         .wallclock,
+                    from: coordinator_keypair.pubkey().to_string(),
                 }),
                 ..Default::default()
             }
@@ -2143,6 +2001,7 @@ mod tests {
                 total_active_stake: 0,
                 shred_version: SHRED_VERSION as u32,
                 wallclock: 0,
+                from: Pubkey::new_unique().to_string(),
             }),
             ..Default::default()
         };
@@ -2272,6 +2131,7 @@ mod tests {
                 total_active_stake: 0,
                 shred_version: SHRED_VERSION as u32,
                 wallclock: 0,
+                from: Pubkey::new_unique().to_string(),
             }),
             last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord {
                 received: HashMap::new(),
@@ -2335,6 +2195,7 @@ mod tests {
                 total_active_stake: 0,
                 shred_version: SHRED_VERSION as u32,
                 wallclock: 0,
+                from: Pubkey::new_unique().to_string(),
             }),
             my_snapshot: Some(GenerateSnapshotRecord {
                 slot: 0,
@@ -2376,6 +2237,7 @@ mod tests {
                 total_active_stake: 0,
                 shred_version: SHRED_VERSION as u32,
                 wallclock: 0,
+                from: Pubkey::new_unique().to_string(),
             }),
             my_snapshot: Some(GenerateSnapshotRecord {
                 slot: last_vote_slot,
@@ -2662,12 +2524,14 @@ mod tests {
                 }],
             }),
         });
+        let my_pubkey = Pubkey::new_unique();
         let my_heaviest_fork = Some(HeaviestForkRecord {
             slot: 1,
             bankhash: Hash::default().to_string(),
             total_active_stake: 900,
             shred_version: SHRED_VERSION as u32,
             wallclock: 0,
+            from: my_pubkey.to_string(),
         });
         let my_bankhash = Hash::new_unique();
         let new_shred_version = SHRED_VERSION + 57;
@@ -2678,12 +2542,8 @@ mod tests {
             shred_version: new_shred_version as u32,
         });
         let heaviest_fork_aggregate = Some(HeaviestForkAggregateRecord {
-            received: HashMap::new(),
-            final_result: Some(HeaviestForkAggregateFinal {
-                total_active_stake: 900,
-                total_active_stake_seen_supermajority: 900,
-                total_active_stake_agreed_with_me: 900,
-            }),
+            received: Vec::new(),
+            total_active_stake: 0,
         });
         let expected_slots_stake_map: HashMap<Slot, u64> =
             vec![(0, 900), (1, 800)].into_iter().collect();
@@ -2763,6 +2623,7 @@ mod tests {
                         total_active_stake: 900,
                         shred_version: SHRED_VERSION as u32,
                         wallclock: 0,
+                        from: my_pubkey.to_string(),
                     }),
                 },
                 WenRestartProgressInternalState::HeaviestFork {
@@ -3073,6 +2934,7 @@ mod tests {
                     .saturating_mul(TOTAL_VALIDATOR_COUNT as u64),
                 shred_version: SHRED_VERSION as u32,
                 wallclock: 0,
+                from: test_state.cluster_info.id().to_string(),
             }),
             ..Default::default()
         };
@@ -3084,7 +2946,6 @@ mod tests {
             .spawn(move || {
                 let result = aggregate_restart_heaviest_fork(
                     &wen_restart_path,
-                    WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
                     cluster_info,
                     bank_forks,
                     exit,
@@ -3103,162 +2964,22 @@ mod tests {
     }
 
     #[test]
-    fn test_aggregate_heaviest_fork_send_gossip_early() {
+    fn test_aggregate_heaviest_fork() {
         let ledger_path = get_tmp_ledger_path_auto_delete!();
         let test_state = wen_restart_test_init(&ledger_path);
         let heaviest_fork_slot = test_state.last_voted_fork_slots[0] + 3;
         let heaviest_fork_bankhash = Hash::new_unique();
-
-        let mut cursor = solana_gossip::crds::Cursor::default();
-        // clear the heaviest fork queue so we make sure a new HeaviestFork is sent out later.
-        let _ = test_state
-            .cluster_info
-            .get_restart_heaviest_fork(&mut cursor);
-
+        let expected_active_stake = (WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT
+            - NON_CONFORMING_VALIDATOR_PERCENT)
+            * TOTAL_VALIDATOR_COUNT as u64;
         let exit = Arc::new(AtomicBool::new(false));
         let thread = start_aggregate_heaviest_fork_thread(
             &test_state,
             heaviest_fork_slot,
             heaviest_fork_bankhash,
             exit.clone(),
-            Some(WenRestartError::Exiting),
-        );
-        // Find the first HeaviestFork message sent out entering the loop.
-        let my_pubkey = test_state.cluster_info.id();
-        let mut found_myself = false;
-        while !found_myself {
-            sleep(Duration::from_millis(100));
-            test_state.cluster_info.flush_push_queue();
-            for gossip_record in test_state
-                .cluster_info
-                .get_restart_heaviest_fork(&mut cursor)
-            {
-                if gossip_record.from == my_pubkey && gossip_record.observed_stake > 0 {
-                    found_myself = true;
-                    break;
-                }
-            }
-        }
-        // Simulating everyone sending out the first RestartHeaviestFork message, Gossip propagation takes
-        // time, so the observed_stake is probably smaller than actual active stake. We should send out
-        // heaviest fork indicating we have active stake exceeding supermajority.
-        let validators_to_take: usize = ((WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT
-            - NON_CONFORMING_VALIDATOR_PERCENT)
-            * TOTAL_VALIDATOR_COUNT as u64
-            / 100
-            - 1)
-        .try_into()
-        .unwrap();
-        for keypair in test_state
-            .validator_voting_keypairs
-            .iter()
-            .take(validators_to_take)
-        {
-            let node_pubkey = keypair.node_keypair.pubkey();
-            let node = ContactInfo::new_rand(&mut rand::thread_rng(), Some(node_pubkey));
-            let now = timestamp();
-            push_restart_heaviest_fork(
-                test_state.cluster_info.clone(),
-                &node,
-                heaviest_fork_slot,
-                &heaviest_fork_bankhash,
-                100,
-                &keypair.node_keypair,
-                now,
-            );
-        }
-        let mut found_myself = false;
-        let expected_active_stake = (WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT
-            - NON_CONFORMING_VALIDATOR_PERCENT)
-            * TOTAL_VALIDATOR_COUNT as u64;
-        while !found_myself {
-            sleep(Duration::from_millis(100));
-            test_state.cluster_info.flush_push_queue();
-            for gossip_record in test_state
-                .cluster_info
-                .get_restart_heaviest_fork(&mut cursor)
-            {
-                if gossip_record.from == my_pubkey
-                    && gossip_record.observed_stake == expected_active_stake
-                {
-                    found_myself = true;
-                    break;
-                }
-            }
-        }
-        exit.store(true, Ordering::Relaxed);
-        assert!(thread.join().is_ok());
-    }
-
-    #[test]
-    fn test_aggregate_heaviest_fork() {
-        let ledger_path = get_tmp_ledger_path_auto_delete!();
-        let test_state = wen_restart_test_init(&ledger_path);
-        let heaviest_fork_slot = test_state.last_voted_fork_slots[0] + 3;
-        let heaviest_fork_bankhash = Hash::new_unique();
-        let expected_active_stake = (WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT
-            - NON_CONFORMING_VALIDATOR_PERCENT)
-            * TOTAL_VALIDATOR_COUNT as u64;
-        let progress = wen_restart_proto::WenRestartProgress {
-            state: RestartState::HeaviestFork.into(),
-            my_heaviest_fork: Some(HeaviestForkRecord {
-                slot: heaviest_fork_slot,
-                bankhash: heaviest_fork_bankhash.to_string(),
-                total_active_stake: expected_active_stake,
-                shred_version: SHRED_VERSION as u32,
-                wallclock: 0,
-            }),
-            ..Default::default()
-        };
-
-        let different_bankhash = Hash::new_unique();
-        let validators_to_take: usize = ((WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT
-            - NON_CONFORMING_VALIDATOR_PERCENT)
-            * TOTAL_VALIDATOR_COUNT as u64
-            / 100
-            - 1)
-        .try_into()
-        .unwrap();
-        for keypair in test_state
-            .validator_voting_keypairs
-            .iter()
-            .take(validators_to_take)
-        {
-            let node_pubkey = keypair.node_keypair.pubkey();
-            let node = ContactInfo::new_rand(&mut rand::thread_rng(), Some(node_pubkey));
-            let now = timestamp();
-            push_restart_heaviest_fork(
-                test_state.cluster_info.clone(),
-                &node,
-                heaviest_fork_slot,
-                &different_bankhash,
-                expected_active_stake,
-                &keypair.node_keypair,
-                now,
-            );
-        }
-        let mut expected_block_stake_map = HashMap::new();
-        expected_block_stake_map.insert((heaviest_fork_slot, heaviest_fork_bankhash), 100);
-        expected_block_stake_map.insert((heaviest_fork_slot, different_bankhash), 1400);
-        assert_eq!(
-            aggregate_restart_heaviest_fork(
-                &test_state.wen_restart_proto_path,
-                WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
-                test_state.cluster_info.clone(),
-                test_state.bank_forks.clone(),
-                Arc::new(AtomicBool::new(false)),
-                &mut progress.clone(),
-            )
-            .unwrap_err()
-            .downcast::<WenRestartError>()
-            .unwrap(),
-            WenRestartError::NotEnoughStakeAgreeingWithUs(
-                heaviest_fork_slot,
-                heaviest_fork_bankhash,
-                expected_block_stake_map
-            ),
+            None,
         );
-        // If we have enough stake agreeing with us, we should be able to aggregate the heaviest fork.
         let validators_to_take: usize =
             (WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT * TOTAL_VALIDATOR_COUNT as u64 / 100 - 1)
                 .try_into()
@@ -3281,15 +3002,8 @@ mod tests {
                 now,
             );
         }
-        assert!(aggregate_restart_heaviest_fork(
-            &test_state.wen_restart_proto_path,
-            WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
-            test_state.cluster_info.clone(),
-            test_state.bank_forks.clone(),
-            Arc::new(AtomicBool::new(false)),
-            &mut progress.clone(),
-        )
-        .is_ok());
+        exit.store(true, Ordering::Relaxed);
+        assert!(thread.join().is_ok());
     }
 
     #[test]