Ver Fonte

wen_restart: Find the bank hash of the heaviest fork, replay if necessary. (#420)

* Find the bank hash of the heaviest fork, replay if necessary.

* Make it more explicit how heaviest fork slot is selected.

* Use process_single_slot instead of process_blockstore_from_root, the latter
may re-insert banks already frozen.

* Put BlockstoreProcessError into the error message.

* Check that all existing blocks link to correct parent before replay.

* Use the default number of threads instead.

* Check whether block is full and other small fixes.

* Fix root_bank and move comments to function level.

* Remove the extra parent link check.
Wen há 1 ano atrás
pai
commit
312f725f1e

+ 2 - 0
Cargo.lock

@@ -7651,6 +7651,7 @@ dependencies = [
  "prost-types",
  "protobuf-src",
  "rand 0.8.5",
+ "rayon",
  "rustc_version 0.4.0",
  "serial_test",
  "solana-accounts-db",
@@ -7659,6 +7660,7 @@ dependencies = [
  "solana-ledger",
  "solana-logger",
  "solana-program",
+ "solana-program-runtime",
  "solana-runtime",
  "solana-sdk",
  "solana-streamer",

+ 1 - 1
ledger/src/blockstore_processor.rs

@@ -1794,7 +1794,7 @@ fn supermajority_root_from_vote_accounts(
 // Processes and replays the contents of a single slot, returns Error
 // if failed to play the slot
 #[allow(clippy::too_many_arguments)]
-fn process_single_slot(
+pub fn process_single_slot(
     blockstore: &Blockstore,
     bank: &BankWithScheduler,
     replay_tx_thread_pool: &ThreadPool,

+ 3 - 0
programs/sbf/Cargo.lock

@@ -6610,11 +6610,14 @@ dependencies = [
  "prost-build",
  "prost-types",
  "protobuf-src",
+ "rayon",
  "rustc_version",
+ "solana-entry",
  "solana-gossip",
  "solana-ledger",
  "solana-logger",
  "solana-program",
+ "solana-program-runtime",
  "solana-runtime",
  "solana-sdk",
  "solana-vote-program",

+ 3 - 0
wen-restart/Cargo.toml

@@ -15,10 +15,13 @@ anyhow = { workspace = true }
 log = { workspace = true }
 prost = { workspace = true }
 prost-types = { workspace = true }
+rayon = { workspace = true }
+solana-entry = { workspace = true }
 solana-gossip = { workspace = true }
 solana-ledger = { workspace = true }
 solana-logger = { workspace = true }
 solana-program = { workspace = true }
+solana-program-runtime = { workspace = true }
 solana-runtime = { workspace = true }
 solana-sdk = { workspace = true }
 solana-vote-program = { workspace = true }

+ 284 - 46
wen-restart/src/wen_restart.rs

@@ -14,13 +14,20 @@ use {
     anyhow::Result,
     log::*,
     prost::Message,
+    solana_entry::entry::VerifyRecyclers,
     solana_gossip::{
         cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
         restart_crds_values::RestartLastVotedForkSlots,
     },
-    solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore},
+    solana_ledger::{
+        ancestor_iterator::AncestorIterator,
+        blockstore::Blockstore,
+        blockstore_processor::{process_single_slot, ConfirmationProgress, ProcessOptions},
+        leader_schedule_cache::LeaderScheduleCache,
+    },
     solana_program::{clock::Slot, hash::Hash},
-    solana_runtime::bank_forks::BankForks,
+    solana_program_runtime::timings::ExecuteTimings,
+    solana_runtime::{bank::Bank, bank_forks::BankForks},
     solana_sdk::timing::timestamp,
     solana_vote_program::vote_state::VoteTransaction,
     std::{
@@ -42,11 +49,16 @@ use {
 const REPAIR_THRESHOLD: f64 = 0.42;
 // When counting Heaviest Fork, only count those with no less than
 // 67% - 5% - (100% - active_stake) = active_stake - 38% stake.
+// 67% is the supermajority threshold (2/3), 5% is the assumption we
+// made regarding how much non-conforming/offline validators the
+// algorithm can tolerate.
 const HEAVIEST_FORK_THRESHOLD_DELTA: f64 = 0.38;
 
 #[derive(Debug, PartialEq)]
 pub enum WenRestartError {
     BlockNotFound(Slot),
+    BlockNotFull(Slot),
+    BlockNotFrozenAfterReplay(Slot, Option<String>),
     BlockNotLinkedToExpectedParent(Slot, Option<Slot>, Slot),
     ChildStakeLargerThanParent(Slot, u64, Slot, u64),
     Exiting,
@@ -62,6 +74,12 @@ impl std::fmt::Display for WenRestartError {
             WenRestartError::BlockNotFound(slot) => {
                 write!(f, "Block not found: {}", slot)
             }
+            WenRestartError::BlockNotFull(slot) => {
+                write!(f, "Block not full: {}", slot)
+            }
+            WenRestartError::BlockNotFrozenAfterReplay(slot, err) => {
+                write!(f, "Block not frozen after replay: {} {:?}", slot, err)
+            }
             WenRestartError::BlockNotLinkedToExpectedParent(slot, parent, expected_parent) => {
                 write!(
                     f,
@@ -145,10 +163,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
     exit: Arc<AtomicBool>,
     progress: &mut WenRestartProgress,
 ) -> Result<LastVotedForkSlotsFinalResult> {
-    let root_bank;
-    {
-        root_bank = bank_forks.read().unwrap().root_bank().clone();
-    }
+    let root_bank = bank_forks.read().unwrap().root_bank();
     let root_slot = root_bank.slot();
     let mut last_voted_fork_slots_aggregate = LastVotedForkSlotsAggregate::new(
         root_slot,
@@ -247,9 +262,7 @@ pub(crate) fn find_heaviest_fork(
     blockstore: Arc<Blockstore>,
     exit: Arc<AtomicBool>,
 ) -> Result<(Slot, Hash)> {
-    // Because everything else is stopped, it's okay to grab a big lock on bank_forks.
-    let my_bank_forks = bank_forks.read().unwrap();
-    let root_bank = my_bank_forks.root_bank().clone();
+    let root_bank = bank_forks.read().unwrap().root_bank();
     let root_slot = root_bank.slot();
     // TODO: Should use better epoch_stakes later.
     let epoch_stake = root_bank.epoch_stakes(root_bank.epoch()).unwrap();
@@ -264,12 +277,16 @@ pub(crate) fn find_heaviest_fork(
         .map(|(slot, _)| *slot)
         .collect::<Vec<Slot>>();
     slots.sort();
+
+    // The heaviest slot we selected will always be the last of the slots list, or root if the list is empty.
+    let heaviest_fork_slot = slots.last().map_or(root_slot, |x| *x);
+
     let mut expected_parent = root_slot;
-    for slot in slots {
+    for slot in &slots {
         if exit.load(Ordering::Relaxed) {
             return Err(WenRestartError::Exiting.into());
         }
-        if let Ok(Some(block_meta)) = blockstore.meta(slot) {
+        if let Ok(Some(block_meta)) = blockstore.meta(*slot) {
             if block_meta.parent_slot != Some(expected_parent) {
                 if expected_parent == root_slot {
                     error!("First block {} in repair list not linked to local root {}, this could mean our root is too old",
@@ -281,18 +298,111 @@ pub(crate) fn find_heaviest_fork(
                     );
                 }
                 return Err(WenRestartError::BlockNotLinkedToExpectedParent(
-                    slot,
+                    *slot,
                     block_meta.parent_slot,
                     expected_parent,
                 )
                 .into());
             }
-            expected_parent = slot;
+            if !block_meta.is_full() {
+                return Err(WenRestartError::BlockNotFull(*slot).into());
+            }
+            expected_parent = *slot;
         } else {
-            return Err(WenRestartError::BlockNotFound(slot).into());
+            return Err(WenRestartError::BlockNotFound(*slot).into());
         }
     }
-    Ok((expected_parent, Hash::default()))
+    let heaviest_fork_bankhash = find_bankhash_of_heaviest_fork(
+        heaviest_fork_slot,
+        slots,
+        blockstore.clone(),
+        bank_forks.clone(),
+        root_bank,
+        &exit,
+    )?;
+    info!(
+        "Heaviest fork found: slot: {}, bankhash: {:?}",
+        heaviest_fork_slot, heaviest_fork_bankhash
+    );
+    Ok((heaviest_fork_slot, heaviest_fork_bankhash))
+}
+
+// Find the hash of the heaviest fork, if block hasn't been replayed, replay to get the hash.
+fn find_bankhash_of_heaviest_fork(
+    heaviest_fork_slot: Slot,
+    slots: Vec<Slot>,
+    blockstore: Arc<Blockstore>,
+    bank_forks: Arc<RwLock<BankForks>>,
+    root_bank: Arc<Bank>,
+    exit: &Arc<AtomicBool>,
+) -> Result<Hash> {
+    let heaviest_fork_bankhash = bank_forks
+        .read()
+        .unwrap()
+        .get(heaviest_fork_slot)
+        .map(|bank| bank.hash());
+    if let Some(hash) = heaviest_fork_bankhash {
+        return Ok(hash);
+    }
+
+    let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&root_bank);
+    let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
+        .thread_name(|i| format!("solReplayTx{i:02}"))
+        .build()
+        .expect("new rayon threadpool");
+    let recyclers = VerifyRecyclers::default();
+    let mut timing = ExecuteTimings::default();
+    let opts = ProcessOptions::default();
+    // Grab one write lock until end of function because we are the only one touching bankforks now.
+    let mut my_bankforks = bank_forks.write().unwrap();
+    // Now replay all the missing blocks.
+    let mut parent_bank = root_bank;
+    for slot in slots {
+        if exit.load(Ordering::Relaxed) {
+            return Err(WenRestartError::Exiting.into());
+        }
+        let bank = match my_bankforks.get(slot) {
+            Some(cur_bank) => {
+                if !cur_bank.is_frozen() {
+                    return Err(WenRestartError::BlockNotFrozenAfterReplay(slot, None).into());
+                }
+                cur_bank
+            }
+            None => {
+                let new_bank = Bank::new_from_parent(
+                    parent_bank.clone(),
+                    &leader_schedule_cache
+                        .slot_leader_at(slot, Some(&parent_bank))
+                        .unwrap(),
+                    slot,
+                );
+                let bank_with_scheduler = my_bankforks.insert_from_ledger(new_bank);
+                let mut progress = ConfirmationProgress::new(parent_bank.last_blockhash());
+                if let Err(e) = process_single_slot(
+                    &blockstore,
+                    &bank_with_scheduler,
+                    &replay_tx_thread_pool,
+                    &opts,
+                    &recyclers,
+                    &mut progress,
+                    None,
+                    None,
+                    None,
+                    None,
+                    &mut timing,
+                ) {
+                    return Err(WenRestartError::BlockNotFrozenAfterReplay(
+                        slot,
+                        Some(e.to_string()),
+                    )
+                    .into());
+                }
+                my_bankforks.get(slot).unwrap()
+            }
+        };
+        parent_bank = bank;
+    }
+    Ok(parent_bank.hash())
 }
 
 pub fn wait_for_wen_restart(
@@ -577,6 +687,8 @@ mod tests {
     use {
         crate::wen_restart::{tests::wen_restart_proto::LastVotedForkSlotsAggregateFinal, *},
         assert_matches::assert_matches,
+        solana_accounts_db::hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
+        solana_entry::entry::create_ticks,
         solana_gossip::{
             cluster_info::ClusterInfo,
             contact_info::ContactInfo,
@@ -586,18 +698,17 @@ mod tests {
             restart_crds_values::RestartLastVotedForkSlots,
         },
         solana_ledger::{
-            blockstore::{make_chaining_slot_entries, Blockstore},
+            blockstore::{create_new_ledger, entries_to_test_shreds, Blockstore},
+            blockstore_options::LedgerColumnOptions,
+            blockstore_processor::{fill_blockstore_slot_with_ticks, test_process_blockstore},
             get_tmp_ledger_path_auto_delete,
         },
         solana_program::{
             hash::Hash,
             vote::state::{Vote, VoteStateUpdate},
         },
-        solana_runtime::{
-            bank::Bank,
-            genesis_utils::{
-                create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs,
-            },
+        solana_runtime::genesis_utils::{
+            create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs,
         },
         solana_sdk::{
             signature::{Keypair, Signer},
@@ -609,7 +720,8 @@ mod tests {
     };
 
     const SHRED_VERSION: u16 = 2;
-    const EXPECTED_SLOTS: usize = 400;
+    const EXPECTED_SLOTS: Slot = 90;
+    const TICKS_PER_SLOT: u64 = 2;
 
     fn push_restart_last_voted_fork_slots(
         cluster_info: Arc<ClusterInfo>,
@@ -648,16 +760,29 @@ mod tests {
         pub bank_forks: Arc<RwLock<BankForks>>,
         pub last_voted_fork_slots: Vec<Slot>,
         pub wen_restart_proto_path: PathBuf,
+        pub last_blockhash: Hash,
     }
 
     fn insert_slots_into_blockstore(
         blockstore: Arc<Blockstore>,
         first_parent: Slot,
         slots_to_insert: &[Slot],
-    ) {
-        for (shreds, _) in make_chaining_slot_entries(slots_to_insert, 2, first_parent) {
-            blockstore.insert_shreds(shreds, None, false).unwrap();
+        entries_per_slot: u64,
+        start_blockhash: Hash,
+    ) -> Hash {
+        let mut last_hash = start_blockhash;
+        let mut last_parent = first_parent;
+        for i in slots_to_insert {
+            last_hash = fill_blockstore_slot_with_ticks(
+                &blockstore,
+                entries_per_slot,
+                *i,
+                last_parent,
+                last_hash,
+            );
+            last_parent = *i;
         }
+        last_hash
     }
 
     fn wen_restart_test_init(ledger_path: &TempDir) -> WenRestartTestInitResult {
@@ -674,26 +799,45 @@ mod tests {
             node_keypair.clone(),
             SocketAddrSpace::Unspecified,
         ));
-        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
-        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
+        let GenesisConfigInfo {
+            mut genesis_config, ..
+        } = create_genesis_config_with_vote_accounts(
             10_000,
             &validator_voting_keypairs,
             vec![100; validator_voting_keypairs.len()],
         );
-        let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config);
-        let last_parent = (RestartLastVotedForkSlots::MAX_SLOTS >> 1)
-            .try_into()
-            .unwrap();
-        let mut last_voted_fork_slots = Vec::new();
-        last_voted_fork_slots.extend([1, last_parent]);
-        for i in 0..EXPECTED_SLOTS {
-            last_voted_fork_slots.push(
-                (RestartLastVotedForkSlots::MAX_SLOTS
-                    .saturating_add(i)
-                    .saturating_add(1)) as Slot,
-            );
-        }
-        insert_slots_into_blockstore(blockstore.clone(), 0, &last_voted_fork_slots);
+        genesis_config.ticks_per_slot = TICKS_PER_SLOT;
+        let start_blockhash = create_new_ledger(
+            ledger_path.path(),
+            &genesis_config,
+            MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
+            LedgerColumnOptions::default(),
+        )
+        .unwrap();
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let (bank_forks, ..) = test_process_blockstore(
+            &genesis_config,
+            &blockstore,
+            &ProcessOptions {
+                run_verification: true,
+                accounts_db_test_hash_calculation: true,
+                ..ProcessOptions::default()
+            },
+            Arc::default(),
+        );
+        let mut last_blockhash = start_blockhash;
+        // Skip block 1, 2 links directly to 0.
+        let last_parent: Slot = 2;
+        let mut last_voted_fork_slots: Vec<Slot> = Vec::new();
+        last_voted_fork_slots
+            .extend(last_parent..last_parent.saturating_add(EXPECTED_SLOTS).saturating_add(1));
+        last_blockhash = insert_slots_into_blockstore(
+            blockstore.clone(),
+            0,
+            &last_voted_fork_slots,
+            genesis_config.ticks_per_slot,
+            last_blockhash,
+        );
         last_voted_fork_slots.insert(0, 0);
         last_voted_fork_slots.reverse();
         let mut wen_restart_proto_path = ledger_path.path().to_path_buf();
@@ -706,6 +850,7 @@ mod tests {
             bank_forks,
             last_voted_fork_slots,
             wen_restart_proto_path,
+            last_blockhash,
         }
     }
 
@@ -847,10 +992,12 @@ mod tests {
         }
 
         // Simulating successful repair of missing blocks.
-        insert_slots_into_blockstore(
+        let _ = insert_slots_into_blockstore(
             test_state.blockstore.clone(),
             last_vote_slot,
             &expected_slots_to_repair,
+            TICKS_PER_SLOT,
+            test_state.last_blockhash,
         );
 
         let _ = wen_restart_thread_handle.join();
@@ -867,7 +1014,13 @@ mod tests {
             .collect();
         expected_slots_stake_map.extend(expected_slots_to_repair.iter().map(|slot| (*slot, 800)));
         let expected_heaviest_fork_slot = last_vote_slot + 2;
-        let expected_heaviest_fork_bankhash = Hash::default();
+        let expected_heaviest_fork_bankhash = test_state
+            .bank_forks
+            .read()
+            .unwrap()
+            .get(expected_heaviest_fork_slot)
+            .unwrap()
+            .hash();
         assert_eq!(
             progress,
             WenRestartProgress {
@@ -1197,10 +1350,12 @@ mod tests {
         }
 
         // Simulating successful repair of missing blocks.
-        insert_slots_into_blockstore(
+        let _ = insert_slots_into_blockstore(
             test_state.blockstore.clone(),
             last_vote_slot,
             &expected_slots_to_repair,
+            TICKS_PER_SLOT,
+            test_state.last_blockhash,
         );
 
         let last_voted_fork_slots = test_state.last_voted_fork_slots.clone();
@@ -1407,7 +1562,7 @@ mod tests {
         assert_eq!(
             find_heaviest_fork(
                 LastVotedForkSlotsFinalResult {
-                    slots_stake_map: vec![(1, 900), (last_vote_slot, 900)].into_iter().collect(),
+                    slots_stake_map: vec![(2, 900), (last_vote_slot, 900)].into_iter().collect(),
                     total_active_stake: 900,
                 },
                 test_state.bank_forks.clone(),
@@ -1420,7 +1575,90 @@ mod tests {
             WenRestartError::BlockNotLinkedToExpectedParent(
                 last_vote_slot,
                 Some(last_vote_slot - 1),
-                1
+                2
+            ),
+        );
+        // The following fails because the new slot is not full.
+        let not_full_slot = last_vote_slot + 5;
+        let parent_slot = last_vote_slot;
+        let num_slots = (not_full_slot - parent_slot).max(1);
+        let mut entries = create_ticks(num_slots * TICKS_PER_SLOT, 0, test_state.last_blockhash);
+        assert!(entries.len() > 1);
+        entries.pop();
+        let shreds = entries_to_test_shreds(
+            &entries,
+            not_full_slot,
+            parent_slot,
+            false,
+            0,
+            true, // merkle_variant
+        );
+        test_state
+            .blockstore
+            .insert_shreds(shreds, None, false)
+            .unwrap();
+        let mut slots_stake_map: HashMap<Slot, u64> = test_state
+            .last_voted_fork_slots
+            .iter()
+            .map(|slot| (*slot, 900))
+            .collect();
+        slots_stake_map.insert(not_full_slot, 800);
+        assert_eq!(
+            find_heaviest_fork(
+                LastVotedForkSlotsFinalResult {
+                    slots_stake_map,
+                    total_active_stake: 900,
+                },
+                test_state.bank_forks.clone(),
+                test_state.blockstore.clone(),
+                exit.clone(),
+            )
+            .unwrap_err()
+            .downcast::<WenRestartError>()
+            .unwrap(),
+            WenRestartError::BlockNotFull(not_full_slot)
+        );
+        // The following fails because we added two blocks at the end of the chain, they are full in blockstore
+        // but the parent of the first one is missing.
+        let missing_parent = last_vote_slot.saturating_add(1);
+        let new_slot = last_vote_slot.saturating_add(2);
+        let new_hash = insert_slots_into_blockstore(
+            test_state.blockstore.clone(),
+            last_vote_slot,
+            &[missing_parent],
+            1,
+            test_state.last_blockhash,
+        );
+        let _ = insert_slots_into_blockstore(
+            test_state.blockstore.clone(),
+            missing_parent,
+            &[new_slot],
+            TICKS_PER_SLOT,
+            new_hash,
+        );
+        let mut slots_stake_map: HashMap<Slot, u64> = test_state
+            .last_voted_fork_slots
+            .iter()
+            .map(|slot| (*slot, 900))
+            .collect();
+        slots_stake_map.insert(missing_parent, 800);
+        slots_stake_map.insert(new_slot, 800);
+        assert_eq!(
+            find_heaviest_fork(
+                LastVotedForkSlotsFinalResult {
+                    slots_stake_map,
+                    total_active_stake: 900,
+                },
+                test_state.bank_forks.clone(),
+                test_state.blockstore.clone(),
+                exit.clone(),
+            )
+            .unwrap_err()
+            .downcast::<WenRestartError>()
+            .unwrap(),
+            WenRestartError::BlockNotFrozenAfterReplay(
+                missing_parent,
+                Some("invalid block error: incomplete block".to_string())
             ),
         );
     }