Ver Fonte

replay: refactor set-root to enable alpenglow to take over (#7452)

* replay: refactor set-root to enable alpenglow to take over

* pr feedback: inline warn! messages instead of positional

* solana-votor -> agave-votor

* pr feedback: inline log

* remove outdated comment

* pr feedback: mark agave-votor as unstable api

* cargo-for-all-lock-files.sh check
Ashwin Sekar há 3 meses atrás
pai
commit
b0ffc10a7f
8 ficheiros alterados com 382 adições e 94 exclusões
  1. 44 0
      Cargo.lock
  2. 2 0
      Cargo.toml
  3. 1 0
      core/Cargo.toml
  4. 78 94
      core/src/replay_stage.rs
  5. 41 0
      programs/sbf/Cargo.lock
  6. 72 0
      votor/Cargo.toml
  7. 3 0
      votor/src/lib.rs
  8. 141 0
      votor/src/root_utils.rs

+ 44 - 0
Cargo.lock

@@ -535,6 +535,49 @@ dependencies = [
  "solana-streamer",
 ]
 
+[[package]]
+name = "agave-votor"
+version = "3.0.0"
+dependencies = [
+ "anyhow",
+ "bincode",
+ "bs58",
+ "crossbeam-channel",
+ "dashmap",
+ "etcd-client",
+ "itertools 0.12.1",
+ "log",
+ "parking_lot 0.12.3",
+ "qualifier_attr",
+ "rayon",
+ "serde",
+ "serde_bytes",
+ "serde_derive",
+ "solana-accounts-db",
+ "solana-bloom",
+ "solana-clock",
+ "solana-entry",
+ "solana-epoch-schedule",
+ "solana-frozen-abi",
+ "solana-frozen-abi-macro 3.0.0",
+ "solana-gossip",
+ "solana-hash",
+ "solana-keypair",
+ "solana-ledger",
+ "solana-logger 3.0.0",
+ "solana-measure",
+ "solana-metrics",
+ "solana-pubkey",
+ "solana-rpc",
+ "solana-runtime",
+ "solana-signature",
+ "solana-signer",
+ "solana-time-utils",
+ "solana-transaction",
+ "test-case",
+ "thiserror 2.0.14",
+]
+
 [[package]]
 name = "agave-watchtower"
 version = "3.0.0"
@@ -7903,6 +7946,7 @@ dependencies = [
  "agave-reserved-account-keys",
  "agave-transaction-view",
  "agave-verified-packet-receiver",
+ "agave-votor",
  "ahash 0.8.11",
  "anyhow",
  "arrayvec",

+ 2 - 0
Cargo.toml

@@ -134,6 +134,7 @@ members = [
     "version",
     "vortexor",
     "vote",
+    "votor",
     "watchtower",
     "wen-restart",
     "xdp",
@@ -184,6 +185,7 @@ agave-syscalls = { path = "syscalls", version = "=3.0.0" }
 agave-thread-manager = { path = "thread-manager", version = "=3.0.0" }
 agave-transaction-view = { path = "transaction-view", version = "=3.0.0" }
 agave-verified-packet-receiver = { path = "verified-packet-receiver", version = "=3.0.0" }
+agave-votor = { path = "votor", version = "=3.0.0" }
 agave-xdp = { path = "xdp", version = "=3.0.0" }
 ahash = "0.8.11"
 anyhow = "1.0.99"

+ 1 - 0
core/Cargo.toml

@@ -45,6 +45,7 @@ agave-banking-stage-ingress-types = { workspace = true }
 agave-feature-set = { workspace = true }
 agave-transaction-view = { workspace = true }
 agave-verified-packet-receiver = { workspace = true }
+agave-votor = { workspace = true, features = ["agave-unstable-api"] }
 ahash = { workspace = true }
 anyhow = { workspace = true }
 arrayvec = { workspace = true }

+ 78 - 94
core/src/replay_stage.rs

@@ -32,6 +32,7 @@ use {
         voting_service::VoteOp,
         window_service::DuplicateSlotReceiver,
     },
+    agave_votor::root_utils,
     crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
     rayon::{prelude::*, ThreadPool},
     solana_accounts_db::contains::Contains,
@@ -2400,8 +2401,15 @@ impl ReplayStage {
         let new_root = tower.record_bank_vote(bank);
 
         if let Some(new_root) = new_root {
+            let highest_super_majority_root = Some(
+                block_commitment_cache
+                    .read()
+                    .unwrap()
+                    .highest_super_majority_root(),
+            );
             Self::check_and_handle_new_root(
-                bank,
+                &identity_keypair.pubkey(),
+                bank.parent_slot(),
                 new_root,
                 bank_forks,
                 progress,
@@ -2409,7 +2417,7 @@ impl ReplayStage {
                 leader_schedule_cache,
                 snapshot_controller,
                 rpc_subscriptions,
-                block_commitment_cache,
+                highest_super_majority_root,
                 bank_notification_sender,
                 has_new_vote_been_rooted,
                 tracked_vote_transactions,
@@ -3967,8 +3975,12 @@ impl ReplayStage {
     }
 
     #[allow(clippy::too_many_arguments)]
+    /// A wrapper around `root_utils::check_and_handle_new_root` which:
+    /// - calls into `root_utils::set_bank_forks_root`
+    /// - Executes `set_progress_and_tower_bft_root` to cleanup tower bft structs and the progress map
     fn check_and_handle_new_root(
-        vote_bank: &Bank,
+        my_pubkey: &Pubkey,
+        parent_slot: Slot,
         new_root: Slot,
         bank_forks: &RwLock<BankForks>,
         progress: &mut ProgressMap,
@@ -3976,110 +3988,49 @@ impl ReplayStage {
         leader_schedule_cache: &Arc<LeaderScheduleCache>,
         snapshot_controller: Option<&SnapshotController>,
         rpc_subscriptions: Option<&RpcSubscriptions>,
-        block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
+        highest_super_majority_root: Option<Slot>,
         bank_notification_sender: &Option<BankNotificationSenderConfig>,
         has_new_vote_been_rooted: &mut bool,
         tracked_vote_transactions: &mut Vec<TrackedVoteTransaction>,
         drop_bank_sender: &Sender<Vec<BankWithScheduler>>,
         tbft_structs: &mut TowerBFTStructures,
     ) -> Result<(), SetRootError> {
-        // get the root bank before squash
-        let root_bank = bank_forks
-            .read()
-            .unwrap()
-            .get(new_root)
-            .expect("Root bank doesn't exist");
-        let mut rooted_banks = root_bank.parents();
-        let oldest_parent = rooted_banks.last().map(|last| last.parent_slot());
-        rooted_banks.push(root_bank.clone());
-        let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect();
-        // The following differs from rooted_slots by including the parent slot of the oldest parent bank.
-        let rooted_slots_with_parents = bank_notification_sender
-            .as_ref()
-            .is_some_and(|sender| sender.should_send_parents)
-            .then(|| {
-                let mut new_chain = rooted_slots.clone();
-                new_chain.push(oldest_parent.unwrap_or_else(|| vote_bank.parent_slot()));
-                new_chain
-            });
-
-        // Call leader schedule_cache.set_root() before blockstore.set_root() because
-        // bank_forks.root is consumed by repair_service to update gossip, so we don't want to
-        // get shreds for repair on gossip before we update leader schedule, otherwise they may
-        // get dropped.
-        leader_schedule_cache.set_root(rooted_banks.last().unwrap());
-        blockstore
-            .set_roots(rooted_slots.iter())
-            .expect("Ledger set roots failed");
-        let highest_super_majority_root = Some(
-            block_commitment_cache
-                .read()
-                .unwrap()
-                .highest_super_majority_root(),
-        );
-        Self::handle_new_root(
+        root_utils::check_and_handle_new_root(
+            parent_slot,
             new_root,
-            bank_forks,
-            progress,
             snapshot_controller,
             highest_super_majority_root,
-            has_new_vote_been_rooted,
-            tracked_vote_transactions,
+            bank_notification_sender,
             drop_bank_sender,
-            tbft_structs,
-        )?;
-        blockstore.slots_stats.mark_rooted(new_root);
-        if let Some(rpc_subscriptions) = rpc_subscriptions {
-            rpc_subscriptions.notify_roots(rooted_slots);
-        }
-        if let Some(sender) = bank_notification_sender {
-            let dependency_work = sender
-                .dependency_tracker
-                .as_ref()
-                .map(|s| s.get_current_declared_work());
-            sender
-                .sender
-                .send((BankNotification::NewRootBank(root_bank), dependency_work))
-                .unwrap_or_else(|err| warn!("bank_notification_sender failed: {err:?}"));
-
-            if let Some(new_chain) = rooted_slots_with_parents {
-                sender
-                    .sender
-                    .send((BankNotification::NewRootedChain(new_chain), dependency_work))
-                    .unwrap_or_else(|err| warn!("bank_notification_sender failed: {err:?}"));
-            }
-        }
-        info!("new root {new_root}");
-        Ok(())
+            blockstore,
+            leader_schedule_cache,
+            bank_forks,
+            rpc_subscriptions,
+            my_pubkey,
+            move |bank_forks| {
+                Self::set_progress_and_tower_bft_root(
+                    new_root,
+                    bank_forks,
+                    progress,
+                    has_new_vote_been_rooted,
+                    tracked_vote_transactions,
+                    tbft_structs,
+                )
+            },
+        )
     }
 
-    #[allow(clippy::too_many_arguments)]
-    pub fn handle_new_root(
+    // To avoid code duplication and keep compatibility with alpenglow, we add this
+    // extra callback in the rooting path. This happens immediately after setting the bank forks root
+    fn set_progress_and_tower_bft_root(
         new_root: Slot,
-        bank_forks: &RwLock<BankForks>,
+        bank_forks: &BankForks,
         progress: &mut ProgressMap,
-        snapshot_controller: Option<&SnapshotController>,
-        highest_super_majority_root: Option<Slot>,
         has_new_vote_been_rooted: &mut bool,
         tracked_vote_transactions: &mut Vec<TrackedVoteTransaction>,
-        drop_bank_sender: &Sender<Vec<BankWithScheduler>>,
         tbft_structs: &mut TowerBFTStructures,
-    ) -> Result<(), SetRootError> {
-        bank_forks.read().unwrap().prune_program_cache(new_root);
-        let removed_banks = bank_forks.write().unwrap().set_root(
-            new_root,
-            snapshot_controller,
-            highest_super_majority_root,
-        )?;
-
-        drop_bank_sender
-            .send(removed_banks)
-            .unwrap_or_else(|err| warn!("bank drop failed: {err:?}"));
-
-        // Dropping the bank_forks write lock and reacquiring as a read lock is
-        // safe because updates to bank_forks are only made by a single thread.
-        let r_bank_forks = bank_forks.read().unwrap();
-        let new_root_bank = &r_bank_forks[new_root];
+    ) {
+        let new_root_bank = &bank_forks[new_root];
         if !*has_new_vote_been_rooted {
             for TrackedVoteTransaction {
                 message_hash,
@@ -4098,15 +4049,17 @@ impl ReplayStage {
                 std::mem::take(tracked_vote_transactions);
             }
         }
-        progress.handle_new_root(&r_bank_forks);
+
+        progress.handle_new_root(bank_forks);
         let TowerBFTStructures {
             heaviest_subtree_fork_choice,
             duplicate_slots_tracker,
             duplicate_confirmed_slots,
-            epoch_slots_frozen_slots,
             unfrozen_gossip_verified_vote_hashes,
+            epoch_slots_frozen_slots,
+            ..
         } = tbft_structs;
-        heaviest_subtree_fork_choice.set_tree_root((new_root, r_bank_forks.root_bank().hash()));
+        heaviest_subtree_fork_choice.set_tree_root((new_root, bank_forks.root_bank().hash()));
         *duplicate_slots_tracker = duplicate_slots_tracker.split_off(&new_root);
         // duplicate_slots_tracker now only contains entries >= `new_root`
 
@@ -4115,10 +4068,41 @@ impl ReplayStage {
 
         unfrozen_gossip_verified_vote_hashes.set_root(new_root);
         *epoch_slots_frozen_slots = epoch_slots_frozen_slots.split_off(&new_root);
-        Ok(())
         // epoch_slots_frozen_slots now only contains entries >= `new_root`
     }
 
+    #[allow(clippy::too_many_arguments)]
+    pub fn handle_new_root(
+        new_root: Slot,
+        bank_forks: &RwLock<BankForks>,
+        progress: &mut ProgressMap,
+        snapshot_controller: Option<&SnapshotController>,
+        highest_super_majority_root: Option<Slot>,
+        has_new_vote_been_rooted: &mut bool,
+        tracked_vote_transactions: &mut Vec<TrackedVoteTransaction>,
+        drop_bank_sender: &Sender<Vec<BankWithScheduler>>,
+        tbft_structs: &mut TowerBFTStructures,
+    ) -> Result<(), SetRootError> {
+        root_utils::set_bank_forks_root(
+            new_root,
+            bank_forks,
+            snapshot_controller,
+            highest_super_majority_root,
+            drop_bank_sender,
+            move |bank_forks| {
+                Self::set_progress_and_tower_bft_root(
+                    new_root,
+                    bank_forks,
+                    progress,
+                    has_new_vote_been_rooted,
+                    tracked_vote_transactions,
+                    tbft_structs,
+                )
+            },
+        )?;
+        Ok(())
+    }
+
     fn generate_new_bank_forks(
         blockstore: &Blockstore,
         bank_forks: &RwLock<BankForks>,

+ 41 - 0
programs/sbf/Cargo.lock

@@ -278,6 +278,46 @@ dependencies = [
  "solana-streamer",
 ]
 
+[[package]]
+name = "agave-votor"
+version = "3.0.0"
+dependencies = [
+ "anyhow",
+ "bincode",
+ "bs58",
+ "crossbeam-channel",
+ "dashmap",
+ "etcd-client",
+ "itertools 0.12.1",
+ "log",
+ "parking_lot 0.12.2",
+ "qualifier_attr",
+ "rayon",
+ "serde",
+ "serde_bytes",
+ "serde_derive",
+ "solana-accounts-db",
+ "solana-bloom",
+ "solana-clock",
+ "solana-entry",
+ "solana-epoch-schedule",
+ "solana-gossip",
+ "solana-hash",
+ "solana-keypair",
+ "solana-ledger",
+ "solana-logger 3.0.0",
+ "solana-measure",
+ "solana-metrics",
+ "solana-pubkey",
+ "solana-rpc",
+ "solana-runtime",
+ "solana-signature",
+ "solana-signer",
+ "solana-time-utils",
+ "solana-transaction",
+ "thiserror 2.0.14",
+]
+
 [[package]]
 name = "agave-xdp"
 version = "3.0.0"
@@ -6189,6 +6229,7 @@ dependencies = [
  "agave-feature-set",
  "agave-transaction-view",
  "agave-verified-packet-receiver",
+ "agave-votor",
  "ahash 0.8.11",
  "anyhow",
  "arrayvec",

+ 72 - 0
votor/Cargo.toml

@@ -0,0 +1,72 @@
+[package]
+name = "agave-votor"
+description = "Blockchain, Rebuilt for Scale"
+documentation = "https://docs.rs/agave-votor"
+readme = "../README.md"
+version = { workspace = true }
+authors = { workspace = true }
+repository = { workspace = true }
+homepage = { workspace = true }
+license = { workspace = true }
+edition = { workspace = true }
+
+[features]
+agave-unstable-api = []
+dev-context-only-utils = ["solana-runtime/dev-context-only-utils"]
+frozen-abi = [
+    "dep:solana-frozen-abi",
+    "dep:solana-frozen-abi-macro",
+    "solana-accounts-db/frozen-abi",
+    "solana-bloom/frozen-abi",
+    "solana-ledger/frozen-abi",
+    "solana-runtime/frozen-abi",
+]
+
+[dependencies]
+anyhow = { workspace = true }
+bincode = { workspace = true }
+bs58 = { workspace = true }
+crossbeam-channel = { workspace = true }
+dashmap = { workspace = true, features = ["rayon", "raw-api"] }
+etcd-client = { workspace = true, features = ["tls"] }
+itertools = { workspace = true }
+log = { workspace = true }
+parking_lot = { workspace = true }
+qualifier_attr = { workspace = true }
+rayon = { workspace = true }
+serde = { workspace = true }
+serde_bytes = { workspace = true }
+serde_derive = { workspace = true }
+solana-accounts-db = { workspace = true }
+solana-bloom = { workspace = true }
+solana-clock = { workspace = true }
+solana-entry = { workspace = true }
+solana-epoch-schedule = { workspace = true }
+solana-frozen-abi = { workspace = true, optional = true, features = [
+    "frozen-abi",
+] }
+solana-frozen-abi-macro = { workspace = true, optional = true, features = [
+    "frozen-abi",
+] }
+solana-gossip = { workspace = true }
+solana-hash = { workspace = true }
+solana-keypair = { workspace = true }
+solana-ledger = { workspace = true }
+solana-logger = { workspace = true }
+solana-measure = { workspace = true }
+solana-metrics = { workspace = true }
+solana-pubkey = { workspace = true }
+solana-rpc = { workspace = true }
+solana-runtime = { workspace = true }
+solana-signature = { workspace = true }
+solana-signer = { workspace = true }
+solana-time-utils = { workspace = true }
+solana-transaction = { workspace = true }
+thiserror = { workspace = true }
+
+[dev-dependencies]
+solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
+test-case = { workspace = true }
+
+[lints]
+workspace = true

+ 3 - 0
votor/src/lib.rs

@@ -0,0 +1,3 @@
+#![cfg_attr(feature = "frozen-abi", feature(min_specialization))]
+#[cfg(feature = "agave-unstable-api")]
+pub mod root_utils;

+ 141 - 0
votor/src/root_utils.rs

@@ -0,0 +1,141 @@
+use {
+    crossbeam_channel::Sender,
+    log::{info, warn},
+    solana_clock::Slot,
+    solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
+    solana_pubkey::Pubkey,
+    solana_rpc::{
+        optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
+        rpc_subscriptions::RpcSubscriptions,
+    },
+    solana_runtime::{
+        bank_forks::{BankForks, SetRootError},
+        installed_scheduler_pool::BankWithScheduler,
+        snapshot_controller::SnapshotController,
+    },
+    std::sync::{Arc, RwLock},
+};
+
+/// Sets the new root, additionally performs the callback after setting the bank forks root
+/// During this transition period where both replay stage and votor can root depending on the feature flag we
+/// have a callback that cleans up progress map and other tower bft structures. Then the callgraph is
+///
+/// ReplayStage::check_and_handle_new_root -> root_utils::check_and_handle_new_root(callback)
+///                                                             |
+///                                                             v
+/// ReplayStage::handle_new_root           -> root_utils::set_bank_forks_root(callback) -> callback()
+///
+/// Votor does not need the progress map or other tower bft structures, so it will not use the callback.
+#[allow(clippy::too_many_arguments)]
+pub fn check_and_handle_new_root<CB>(
+    parent_slot: Slot,
+    new_root: Slot,
+    snapshot_controller: Option<&SnapshotController>,
+    highest_super_majority_root: Option<Slot>,
+    bank_notification_sender: &Option<BankNotificationSenderConfig>,
+    drop_bank_sender: &Sender<Vec<BankWithScheduler>>,
+    blockstore: &Blockstore,
+    leader_schedule_cache: &Arc<LeaderScheduleCache>,
+    bank_forks: &RwLock<BankForks>,
+    rpc_subscriptions: Option<&RpcSubscriptions>,
+    my_pubkey: &Pubkey,
+    callback: CB,
+) -> Result<(), SetRootError>
+where
+    CB: FnOnce(&BankForks),
+{
+    // get the root bank before squash
+    let root_bank = bank_forks
+        .read()
+        .unwrap()
+        .get(new_root)
+        .expect("Root bank doesn't exist");
+    let mut rooted_banks = root_bank.parents();
+    let oldest_parent = rooted_banks.last().map(|last| last.parent_slot());
+    rooted_banks.push(root_bank.clone());
+    let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect();
+    // The following differs from rooted_slots by including the parent slot of the oldest parent bank.
+    let rooted_slots_with_parents = bank_notification_sender
+        .as_ref()
+        .is_some_and(|sender| sender.should_send_parents)
+        .then(|| {
+            let mut new_chain = rooted_slots.clone();
+            new_chain.push(oldest_parent.unwrap_or(parent_slot));
+            new_chain
+        });
+
+    // Call leader schedule_cache.set_root() before blockstore.set_root() because
+    // bank_forks.root is consumed by repair_service to update gossip, so we don't want to
+    // get shreds for repair on gossip before we update leader schedule, otherwise they may
+    // get dropped.
+    leader_schedule_cache.set_root(rooted_banks.last().unwrap());
+    blockstore
+        .set_roots(rooted_slots.iter())
+        .expect("Ledger set roots failed");
+    set_bank_forks_root(
+        new_root,
+        bank_forks,
+        snapshot_controller,
+        highest_super_majority_root,
+        drop_bank_sender,
+        callback,
+    )?;
+    blockstore.slots_stats.mark_rooted(new_root);
+    if let Some(rpc_subscriptions) = rpc_subscriptions {
+        rpc_subscriptions.notify_roots(rooted_slots);
+    }
+    if let Some(sender) = bank_notification_sender {
+        let dependency_work = sender
+            .dependency_tracker
+            .as_ref()
+            .map(|s| s.get_current_declared_work());
+        sender
+            .sender
+            .send((BankNotification::NewRootBank(root_bank), dependency_work))
+            .unwrap_or_else(|err| warn!("bank_notification_sender failed: {err:?}"));
+
+        if let Some(new_chain) = rooted_slots_with_parents {
+            let dependency_work = sender
+                .dependency_tracker
+                .as_ref()
+                .map(|s| s.get_current_declared_work());
+            sender
+                .sender
+                .send((BankNotification::NewRootedChain(new_chain), dependency_work))
+                .unwrap_or_else(|err| warn!("bank_notification_sender failed: {err:?}"));
+        }
+    }
+    info!("{my_pubkey}: new root {new_root}");
+    Ok(())
+}
+
+/// Sets the bank forks root:
+/// - Prune the program cache
+/// - Prune bank forks and drop the removed banks
+/// - Calls the callback for use in replay stage and tests
+pub fn set_bank_forks_root<CB>(
+    new_root: Slot,
+    bank_forks: &RwLock<BankForks>,
+    snapshot_controller: Option<&SnapshotController>,
+    highest_super_majority_root: Option<Slot>,
+    drop_bank_sender: &Sender<Vec<BankWithScheduler>>,
+    callback: CB,
+) -> Result<(), SetRootError>
+where
+    CB: FnOnce(&BankForks),
+{
+    bank_forks.read().unwrap().prune_program_cache(new_root);
+    let removed_banks = bank_forks.write().unwrap().set_root(
+        new_root,
+        snapshot_controller,
+        highest_super_majority_root,
+    )?;
+
+    drop_bank_sender
+        .send(removed_banks)
+        .unwrap_or_else(|err| warn!("bank drop failed: {err:?}"));
+
+    let r_bank_forks = bank_forks.read().unwrap();
+    callback(&r_bank_forks);
+    Ok(())
+}