Browse Source

v2.0: banking_stage: do not insert legacy vote ixs, refactor & unstaked (backport of #2888) (#2901)

* banking_stage: do not insert legacy vote ixs, refactor & unstaked (#2888)

* banking_stage: do not insert legacy vote ixs, refactor & unstaked

* pr feedback: use matches instead of separate fn

(cherry picked from commit 1334fb5248390dfdd193feeec5fa8f86763668fc)

# Conflicts:
#	core/src/banking_stage/latest_unprocessed_votes.rs

* fix conflicts

* rekey feature to indicate it must not be activated

---------

Co-authored-by: Ashwin Sekar <ashwin@anza.xyz>
Co-authored-by: Ashwin Sekar <ashwin@solana.com>
mergify[bot] 1 year ago
parent
commit
573db9c5d8

+ 8 - 2
core/src/banking_stage.rs

@@ -434,7 +434,10 @@ impl BankingStage {
         let batch_limit =
             TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize);
         // Keeps track of extraneous vote transactions for the vote threads
-        let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
+        let latest_unprocessed_votes = {
+            let bank = bank_forks.read().unwrap().working_bank();
+            Arc::new(LatestUnprocessedVotes::new(&bank))
+        };
 
         let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
         let committer = Committer::new(
@@ -517,7 +520,10 @@ impl BankingStage {
         // Once an entry has been recorded, its blockhash is registered with the bank.
         let data_budget = Arc::new(DataBudget::default());
         // Keeps track of extraneous vote transactions for the vote threads
-        let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
+        let latest_unprocessed_votes = {
+            let bank = bank_forks.read().unwrap().working_bank();
+            Arc::new(LatestUnprocessedVotes::new(&bank))
+        };
 
         let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
         let committer = Committer::new(

+ 3 - 0
core/src/banking_stage/forwarder.rs

@@ -103,6 +103,9 @@ impl Forwarder {
         // load all accounts from address loader;
         let current_bank = self.bank_forks.read().unwrap().working_bank();
 
+        // if we have crossed an epoch boundary, recache any state
+        unprocessed_transaction_storage.cache_epoch_boundary_info(&current_bank);
+
         // sanitize and filter packets that are no longer valid (could be too old, a duplicate of something
         // already processed), then add to forwarding buffer.
         let filter_forwarding_result = unprocessed_transaction_storage

+ 245 - 55
core/src/banking_stage/latest_unprocessed_votes.rs

@@ -9,6 +9,7 @@ use {
     solana_runtime::bank::Bank,
     solana_sdk::{
         clock::{Slot, UnixTimestamp},
+        feature_set::{self},
         program_utils::limited_deserialize,
         pubkey::Pubkey,
     },
@@ -18,7 +19,7 @@ use {
         collections::HashMap,
         ops::DerefMut,
         sync::{
-            atomic::{AtomicUsize, Ordering},
+            atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
             Arc, RwLock,
         },
     },
@@ -42,18 +43,23 @@ pub struct LatestValidatorVotePacket {
 }
 
 impl LatestValidatorVotePacket {
-    pub fn new(packet: Packet, vote_source: VoteSource) -> Result<Self, DeserializedPacketError> {
+    pub fn new(
+        packet: Packet,
+        vote_source: VoteSource,
+        deprecate_legacy_vote_ixs: bool,
+    ) -> Result<Self, DeserializedPacketError> {
         if !packet.meta().is_simple_vote_tx() {
             return Err(DeserializedPacketError::VoteTransactionError);
         }
 
         let vote = Arc::new(ImmutableDeserializedPacket::new(packet)?);
-        Self::new_from_immutable(vote, vote_source)
+        Self::new_from_immutable(vote, vote_source, deprecate_legacy_vote_ixs)
     }
 
     pub fn new_from_immutable(
         vote: Arc<ImmutableDeserializedPacket>,
         vote_source: VoteSource,
+        deprecate_legacy_vote_ixs: bool,
     ) -> Result<Self, DeserializedPacketError> {
         let message = vote.transaction().get_message();
         let (_, instruction) = message
@@ -61,9 +67,20 @@ impl LatestValidatorVotePacket {
             .next()
             .ok_or(DeserializedPacketError::VoteTransactionError)?;
 
+        let instruction_filter = |ix: &VoteInstruction| {
+            if deprecate_legacy_vote_ixs {
+                matches!(
+                    ix,
+                    VoteInstruction::TowerSync(_) | VoteInstruction::TowerSyncSwitch(_, _),
+                )
+            } else {
+                ix.is_single_vote_state_update()
+            }
+        };
+
         match limited_deserialize::<VoteInstruction>(&instruction.data) {
             Ok(vote_state_update_instruction)
-                if vote_state_update_instruction.is_single_vote_state_update() =>
+                if instruction_filter(&vote_state_update_instruction) =>
             {
                 let &pubkey = message
                     .message
@@ -116,29 +133,6 @@ impl LatestValidatorVotePacket {
     }
 }
 
-// TODO: replace this with rand::seq::index::sample_weighted once we can update rand to 0.8+
-// This requires updating dependencies of ed25519-dalek as rand_core is not compatible cross
-// version https://github.com/dalek-cryptography/ed25519-dalek/pull/214
-pub(crate) fn weighted_random_order_by_stake<'a>(
-    bank: &Bank,
-    pubkeys: impl Iterator<Item = &'a Pubkey>,
-) -> impl Iterator<Item = Pubkey> {
-    // Efraimidis and Spirakis algo for weighted random sample without replacement
-    let staked_nodes = bank.staked_nodes();
-    let mut pubkey_with_weight: Vec<(f64, Pubkey)> = pubkeys
-        .filter_map(|&pubkey| {
-            let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0);
-            if stake == 0 {
-                None // Ignore votes from unstaked validators
-            } else {
-                Some((thread_rng().gen::<f64>().powf(1.0 / (stake as f64)), pubkey))
-            }
-        })
-        .collect::<Vec<_>>();
-    pubkey_with_weight.sort_by(|(w1, _), (w2, _)| w1.partial_cmp(w2).unwrap());
-    pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey)
-}
-
 #[derive(Default, Debug)]
 pub(crate) struct VoteBatchInsertionMetrics {
     pub(crate) num_dropped_gossip: usize,
@@ -149,11 +143,23 @@ pub(crate) struct VoteBatchInsertionMetrics {
 pub struct LatestUnprocessedVotes {
     latest_votes_per_pubkey: RwLock<HashMap<Pubkey, Arc<RwLock<LatestValidatorVotePacket>>>>,
     num_unprocessed_votes: AtomicUsize,
+    // These are only ever written to by the tpu vote thread
+    cached_staked_nodes: RwLock<Arc<HashMap<Pubkey, u64>>>,
+    deprecate_legacy_vote_ixs: AtomicBool,
+    current_epoch: AtomicU64,
 }
 
 impl LatestUnprocessedVotes {
-    pub fn new() -> Self {
-        Self::default()
+    pub fn new(bank: &Bank) -> Self {
+        let deprecate_legacy_vote_ixs = bank
+            .feature_set
+            .is_active(&feature_set::deprecate_legacy_vote_ixs::id());
+        Self {
+            cached_staked_nodes: RwLock::new(bank.current_epoch_staked_nodes().clone()),
+            current_epoch: AtomicU64::new(bank.epoch()),
+            deprecate_legacy_vote_ixs: AtomicBool::new(deprecate_legacy_vote_ixs),
+            ..Self::default()
+        }
     }
 
     pub fn len(&self) -> usize {
@@ -164,6 +170,17 @@ impl LatestUnprocessedVotes {
         self.len() == 0
     }
 
+    fn filter_unstaked_votes<'a>(
+        &'a self,
+        votes: impl Iterator<Item = LatestValidatorVotePacket> + 'a,
+    ) -> impl Iterator<Item = LatestValidatorVotePacket> + 'a {
+        let staked_nodes = self.cached_staked_nodes.read().unwrap();
+        votes.filter(move |vote| {
+            let stake = staked_nodes.get(&vote.pubkey()).copied().unwrap_or(0);
+            stake > 0
+        })
+    }
+
     pub(crate) fn insert_batch(
         &self,
         votes: impl Iterator<Item = LatestValidatorVotePacket>,
@@ -172,7 +189,7 @@ impl LatestUnprocessedVotes {
         let mut num_dropped_gossip = 0;
         let mut num_dropped_tpu = 0;
 
-        for vote in votes {
+        for vote in self.filter_unstaked_votes(votes) {
             if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) {
                 match vote.vote_source {
                     VoteSource::Gossip => num_dropped_gossip += 1,
@@ -283,6 +300,48 @@ impl LatestUnprocessedVotes {
             .and_then(|l| l.read().unwrap().timestamp())
     }
 
+    #[cfg(test)]
+    pub(crate) fn set_staked_nodes(&self, staked_nodes: &[Pubkey]) {
+        let staked_nodes: HashMap<Pubkey, u64> =
+            staked_nodes.iter().map(|pk| (*pk, 1u64)).collect();
+        *self.cached_staked_nodes.write().unwrap() = Arc::new(staked_nodes);
+    }
+
+    fn weighted_random_order_by_stake(&self) -> impl Iterator<Item = Pubkey> {
+        // Efraimidis and Spirakis algo for weighted random sample without replacement
+        let staked_nodes = self.cached_staked_nodes.read().unwrap();
+        let latest_votes_per_pubkey = self.latest_votes_per_pubkey.read().unwrap();
+        let mut pubkey_with_weight: Vec<(f64, Pubkey)> = latest_votes_per_pubkey
+            .keys()
+            .filter_map(|&pubkey| {
+                let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0);
+                if stake == 0 {
+                    None // Ignore votes from unstaked validators
+                } else {
+                    Some((thread_rng().gen::<f64>().powf(1.0 / (stake as f64)), pubkey))
+                }
+            })
+            .collect::<Vec<_>>();
+        pubkey_with_weight.sort_by(|(w1, _), (w2, _)| w1.partial_cmp(w2).unwrap());
+        pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey)
+    }
+
+    /// Recache the staked nodes based on a bank from the new epoch.
+    /// This should only be run by the TPU vote thread
+    pub(super) fn cache_epoch_boundary_info(&self, bank: &Bank) {
+        if bank.epoch() <= self.current_epoch.load(Ordering::Relaxed) {
+            return;
+        }
+        let mut staked_nodes = self.cached_staked_nodes.write().unwrap();
+        *staked_nodes = bank.current_epoch_staked_nodes().clone();
+        self.current_epoch.store(bank.epoch(), Ordering::Relaxed);
+        self.deprecate_legacy_vote_ixs.store(
+            bank.feature_set
+                .is_active(&feature_set::deprecate_legacy_vote_ixs::id()),
+            Ordering::Relaxed,
+        );
+    }
+
     /// Returns how many packets were forwardable
     /// Performs a weighted random order based on stake and stops forwarding at the first error
     /// Votes from validators with 0 stakes are ignored
@@ -292,11 +351,7 @@ impl LatestUnprocessedVotes {
         forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
     ) -> usize {
         let mut continue_forwarding = true;
-        let pubkeys_by_stake = weighted_random_order_by_stake(
-            &bank,
-            self.latest_votes_per_pubkey.read().unwrap().keys(),
-        )
-        .collect_vec();
+        let pubkeys_by_stake = self.weighted_random_order_by_stake();
         pubkeys_by_stake
             .into_iter()
             .filter(|&pubkey| {
@@ -337,14 +392,8 @@ impl LatestUnprocessedVotes {
     }
 
     /// Drains all votes yet to be processed sorted by a weighted random ordering by stake
-    pub fn drain_unprocessed(&self, bank: Arc<Bank>) -> Vec<Arc<ImmutableDeserializedPacket>> {
-        let pubkeys_by_stake = weighted_random_order_by_stake(
-            &bank,
-            self.latest_votes_per_pubkey.read().unwrap().keys(),
-        )
-        .collect_vec();
-        pubkeys_by_stake
-            .into_iter()
+    pub fn drain_unprocessed(&self, _bank: Arc<Bank>) -> Vec<Arc<ImmutableDeserializedPacket>> {
+        self.weighted_random_order_by_stake()
             .filter_map(|pubkey| {
                 self.get_entry(pubkey).and_then(|lock| {
                     let mut latest_vote = lock.write().unwrap();
@@ -372,6 +421,10 @@ impl LatestUnprocessedVotes {
                 }
             });
     }
+
+    pub(super) fn should_deprecate_legacy_vote_ixs(&self) -> bool {
+        self.deprecate_legacy_vote_ixs.load(Ordering::Relaxed)
+    }
 }
 
 #[cfg(test)]
@@ -385,7 +438,10 @@ mod tests {
             bank::Bank,
             genesis_utils::{self, ValidatorVoteKeypairs},
         },
-        solana_sdk::{hash::Hash, signature::Signer, system_transaction::transfer},
+        solana_sdk::{
+            epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, genesis_config::GenesisConfig, hash::Hash,
+            signature::Signer, system_transaction::transfer,
+        },
         solana_vote_program::{
             vote_state::TowerSync,
             vote_transaction::{new_tower_sync_transaction, new_vote_transaction},
@@ -414,7 +470,7 @@ mod tests {
             .meta_mut()
             .flags
             .set(PacketFlags::SIMPLE_VOTE_TX, true);
-        LatestValidatorVotePacket::new(packet, vote_source).unwrap()
+        LatestValidatorVotePacket::new(packet, vote_source, true).unwrap()
     }
 
     fn deserialize_packets<'a>(
@@ -423,7 +479,8 @@ mod tests {
         vote_source: VoteSource,
     ) -> impl Iterator<Item = LatestValidatorVotePacket> + 'a {
         packet_indexes.iter().filter_map(move |packet_index| {
-            LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok()
+            LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source, true)
+                .ok()
         })
     }
 
@@ -540,9 +597,13 @@ mod tests {
 
     #[test]
     fn test_update_latest_vote() {
-        let latest_unprocessed_votes = LatestUnprocessedVotes::new();
+        let latest_unprocessed_votes = LatestUnprocessedVotes::default();
         let keypair_a = ValidatorVoteKeypairs::new_rand();
         let keypair_b = ValidatorVoteKeypairs::new_rand();
+        latest_unprocessed_votes.set_staked_nodes(&[
+            keypair_a.node_keypair.pubkey(),
+            keypair_b.node_keypair.pubkey(),
+        ]);
 
         let vote_a = from_slots(vec![(0, 2), (1, 1)], VoteSource::Gossip, &keypair_a, None);
         let vote_b = from_slots(
@@ -743,7 +804,7 @@ mod tests {
     fn test_update_latest_vote_race() {
         // There was a race condition in updating the same pubkey in the hashmap
         // when the entry does not initially exist.
-        let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
+        let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default());
 
         const NUM_VOTES: usize = 100;
         let keypairs = Arc::new(
@@ -751,6 +812,11 @@ mod tests {
                 .map(|_| ValidatorVoteKeypairs::new_rand())
                 .collect_vec(),
         );
+        let staked_nodes = keypairs
+            .iter()
+            .map(|kp| kp.node_keypair.pubkey())
+            .collect_vec();
+        latest_unprocessed_votes.set_staked_nodes(&staked_nodes);
 
         // Insert votes in parallel
         let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes,
@@ -782,7 +848,7 @@ mod tests {
 
     #[test]
     fn test_simulate_threads() {
-        let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
+        let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default());
         let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone();
         let keypairs = Arc::new(
             (0..10)
@@ -790,6 +856,11 @@ mod tests {
                 .collect_vec(),
         );
         let keypairs_tpu = keypairs.clone();
+        let staked_nodes = keypairs
+            .iter()
+            .map(|kp| kp.node_keypair.pubkey())
+            .collect_vec();
+        latest_unprocessed_votes.set_staked_nodes(&staked_nodes);
         let vote_limit = 1000;
 
         let gossip = Builder::new()
@@ -845,8 +916,15 @@ mod tests {
 
     #[test]
     fn test_forwardable_packets() {
-        let latest_unprocessed_votes = LatestUnprocessedVotes::new();
-        let bank = Arc::new(Bank::default_for_tests());
+        let latest_unprocessed_votes = LatestUnprocessedVotes::default();
+        let bank_0 = Bank::new_for_tests(&GenesisConfig::default());
+        let bank = Bank::new_from_parent(
+            Arc::new(bank_0),
+            &Pubkey::new_unique(),
+            MINIMUM_SLOTS_PER_EPOCH,
+        );
+        assert_eq!(bank.epoch(), 1);
+        let bank = Arc::new(bank);
         let mut forward_packet_batches_by_accounts =
             ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
 
@@ -858,7 +936,8 @@ mod tests {
         latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */);
         latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */);
 
-        // Don't forward 0 stake accounts
+        // Recache on epoch boundary and don't forward 0 stake accounts
+        latest_unprocessed_votes.cache_epoch_boundary_info(&bank);
         let forwarded = latest_unprocessed_votes
             .get_and_insert_forwardable_packets(bank, &mut forward_packet_batches_by_accounts);
         assert_eq!(0, forwarded);
@@ -876,11 +955,17 @@ mod tests {
             200,
         )
         .genesis_config;
-        let bank = Bank::new_for_tests(&config);
+        let bank_0 = Bank::new_for_tests(&config);
+        let bank = Bank::new_from_parent(
+            Arc::new(bank_0),
+            &Pubkey::new_unique(),
+            2 * MINIMUM_SLOTS_PER_EPOCH,
+        );
         let mut forward_packet_batches_by_accounts =
             ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
 
         // Don't forward votes from gossip
+        latest_unprocessed_votes.cache_epoch_boundary_info(&bank);
         let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets(
             Arc::new(bank),
             &mut forward_packet_batches_by_accounts,
@@ -901,11 +986,17 @@ mod tests {
             200,
         )
         .genesis_config;
-        let bank = Arc::new(Bank::new_for_tests(&config));
+        let bank_0 = Bank::new_for_tests(&config);
+        let bank = Arc::new(Bank::new_from_parent(
+            Arc::new(bank_0),
+            &Pubkey::new_unique(),
+            3 * MINIMUM_SLOTS_PER_EPOCH,
+        ));
         let mut forward_packet_batches_by_accounts =
             ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
 
         // Forward from TPU
+        latest_unprocessed_votes.cache_epoch_boundary_info(&bank);
         let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets(
             bank.clone(),
             &mut forward_packet_batches_by_accounts,
@@ -938,11 +1029,17 @@ mod tests {
 
     #[test]
     fn test_clear_forwarded_packets() {
-        let latest_unprocessed_votes = LatestUnprocessedVotes::new();
+        let latest_unprocessed_votes = LatestUnprocessedVotes::default();
         let keypair_a = ValidatorVoteKeypairs::new_rand();
         let keypair_b = ValidatorVoteKeypairs::new_rand();
         let keypair_c = ValidatorVoteKeypairs::new_rand();
         let keypair_d = ValidatorVoteKeypairs::new_rand();
+        latest_unprocessed_votes.set_staked_nodes(&[
+            keypair_a.node_keypair.pubkey(),
+            keypair_b.node_keypair.pubkey(),
+            keypair_c.node_keypair.pubkey(),
+            keypair_d.node_keypair.pubkey(),
+        ]);
 
         let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None);
         let mut vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None);
@@ -976,4 +1073,97 @@ mod tests {
             latest_unprocessed_votes.get_latest_vote_slot(keypair_d.node_keypair.pubkey())
         );
     }
+
+    #[test]
+    fn test_insert_batch_unstaked() {
+        let keypair_a = ValidatorVoteKeypairs::new_rand();
+        let keypair_b = ValidatorVoteKeypairs::new_rand();
+        let keypair_c = ValidatorVoteKeypairs::new_rand();
+        let keypair_d = ValidatorVoteKeypairs::new_rand();
+
+        let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None);
+        let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None);
+        let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None);
+        let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None);
+        let votes = [
+            vote_a.clone(),
+            vote_b.clone(),
+            vote_c.clone(),
+            vote_d.clone(),
+        ]
+        .into_iter();
+
+        let bank_0 = Bank::new_for_tests(&GenesisConfig::default());
+        let latest_unprocessed_votes = LatestUnprocessedVotes::new(&bank_0);
+
+        // Insert batch should filter out all votes as they are unstaked
+        latest_unprocessed_votes.insert_batch(votes.clone(), true);
+        assert!(latest_unprocessed_votes.is_empty());
+
+        // Bank in same epoch should not update stakes
+        let config = genesis_utils::create_genesis_config_with_leader(
+            100,
+            &keypair_a.node_keypair.pubkey(),
+            200,
+        )
+        .genesis_config;
+        let bank_0 = Bank::new_for_tests(&config);
+        let bank = Bank::new_from_parent(
+            Arc::new(bank_0),
+            &Pubkey::new_unique(),
+            MINIMUM_SLOTS_PER_EPOCH - 1,
+        );
+        assert_eq!(bank.epoch(), 0);
+        latest_unprocessed_votes.cache_epoch_boundary_info(&bank);
+        latest_unprocessed_votes.insert_batch(votes.clone(), true);
+        assert!(latest_unprocessed_votes.is_empty());
+
+        // Bank in next epoch should update stakes
+        let config = genesis_utils::create_genesis_config_with_leader(
+            100,
+            &keypair_b.node_keypair.pubkey(),
+            200,
+        )
+        .genesis_config;
+        let bank_0 = Bank::new_for_tests(&config);
+        let bank = Bank::new_from_parent(
+            Arc::new(bank_0),
+            &Pubkey::new_unique(),
+            MINIMUM_SLOTS_PER_EPOCH,
+        );
+        assert_eq!(bank.epoch(), 1);
+        latest_unprocessed_votes.cache_epoch_boundary_info(&bank);
+        latest_unprocessed_votes.insert_batch(votes.clone(), true);
+        assert_eq!(latest_unprocessed_votes.len(), 1);
+        assert_eq!(
+            latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()),
+            Some(vote_b.slot())
+        );
+
+        // Previously unstaked votes are not (yet) removed
+        let config = genesis_utils::create_genesis_config_with_leader(
+            100,
+            &keypair_c.node_keypair.pubkey(),
+            200,
+        )
+        .genesis_config;
+        let bank_0 = Bank::new_for_tests(&config);
+        let bank = Bank::new_from_parent(
+            Arc::new(bank_0),
+            &Pubkey::new_unique(),
+            3 * MINIMUM_SLOTS_PER_EPOCH,
+        );
+        assert_eq!(bank.epoch(), 2);
+        latest_unprocessed_votes.cache_epoch_boundary_info(&bank);
+        latest_unprocessed_votes.insert_batch(votes.clone(), true);
+        assert_eq!(latest_unprocessed_votes.len(), 2);
+        assert_eq!(
+            latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()),
+            Some(vote_b.slot())
+        );
+        assert_eq!(
+            latest_unprocessed_votes.get_latest_vote_slot(keypair_c.node_keypair.pubkey()),
+            Some(vote_c.slot())
+        );
+    }
 }

+ 41 - 5
core/src/banking_stage/unprocessed_transaction_storage.rs

@@ -412,6 +412,13 @@ impl UnprocessedTransactionStorage {
             ),
         }
     }
+
+    pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) {
+        match self {
+            Self::LocalTransactionStorage(_) => (),
+            Self::VoteStorage(vote_storage) => vote_storage.cache_epoch_boundary_info(bank),
+        }
+    }
 }
 
 impl VoteStorage {
@@ -449,6 +456,8 @@ impl VoteStorage {
                     LatestValidatorVotePacket::new_from_immutable(
                         Arc::new(deserialized_packet),
                         self.vote_source,
+                        self.latest_unprocessed_votes
+                            .should_deprecate_legacy_vote_ixs(),
                     )
                     .ok()
                 }),
@@ -512,6 +521,10 @@ impl VoteStorage {
             should_process_packet,
         );
 
+        let deprecate_legacy_vote_ixs = self
+            .latest_unprocessed_votes
+            .should_deprecate_legacy_vote_ixs();
+
         while let Some((packets, payload)) = scanner.iterate() {
             let vote_packets = packets.iter().map(|p| (*p).clone()).collect_vec();
 
@@ -521,6 +534,7 @@ impl VoteStorage {
                         LatestValidatorVotePacket::new_from_immutable(
                             vote_packets[*i].clone(),
                             self.vote_source,
+                            deprecate_legacy_vote_ixs,
                         )
                         .ok()
                     }),
@@ -529,7 +543,12 @@ impl VoteStorage {
             } else {
                 self.latest_unprocessed_votes.insert_batch(
                     vote_packets.into_iter().filter_map(|packet| {
-                        LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok()
+                        LatestValidatorVotePacket::new_from_immutable(
+                            packet,
+                            self.vote_source,
+                            deprecate_legacy_vote_ixs,
+                        )
+                        .ok()
                     }),
                     true, // should_replenish_taken_votes
                 );
@@ -538,6 +557,14 @@ impl VoteStorage {
 
         scanner.finalize().payload.reached_end_of_slot
     }
+
+    fn cache_epoch_boundary_info(&mut self, bank: &Bank) {
+        if matches!(self.vote_source, VoteSource::Gossip) {
+            panic!("Gossip vote thread should not be checking epoch boundary");
+        }
+        self.latest_unprocessed_votes
+            .cache_epoch_boundary_info(bank);
+    }
 }
 
 impl ThreadLocalUnprocessedPackets {
@@ -1255,9 +1282,16 @@ mod tests {
             assert!(deserialized_packets.contains(&big_transfer));
         }
 
-        for vote_source in [VoteSource::Gossip, VoteSource::Tpu] {
+        for (vote_source, staked) in [VoteSource::Gossip, VoteSource::Tpu]
+            .into_iter()
+            .flat_map(|vs| [(vs, true), (vs, false)])
+        {
+            let latest_unprocessed_votes = LatestUnprocessedVotes::default();
+            if staked {
+                latest_unprocessed_votes.set_staked_nodes(&[keypair.pubkey()]);
+            }
             let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage(
-                Arc::new(LatestUnprocessedVotes::new()),
+                Arc::new(latest_unprocessed_votes),
                 vote_source,
             );
             transaction_storage.insert_batch(vec![
@@ -1265,7 +1299,7 @@ mod tests {
                 ImmutableDeserializedPacket::new(vote.clone())?,
                 ImmutableDeserializedPacket::new(big_transfer.clone())?,
             ]);
-            assert_eq!(1, transaction_storage.len());
+            assert_eq!(if staked { 1 } else { 0 }, transaction_storage.len());
         }
         Ok(())
     }
@@ -1291,8 +1325,10 @@ mod tests {
         )?;
         vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true);
 
+        let latest_unprocessed_votes = LatestUnprocessedVotes::default();
+        latest_unprocessed_votes.set_staked_nodes(&[node_keypair.pubkey()]);
         let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage(
-            Arc::new(LatestUnprocessedVotes::new()),
+            Arc::new(latest_unprocessed_votes),
             VoteSource::Tpu,
         );
 

+ 14 - 0
runtime/src/bank.rs

@@ -6108,6 +6108,15 @@ impl Bank {
         Some(vote_account.clone())
     }
 
+    /// Get the EpochStakes for the current Bank::epoch
+    pub fn current_epoch_stakes(&self) -> &EpochStakes {
+        // The stakes for a given epoch (E) in self.epoch_stakes are keyed by leader schedule epoch
+        // (E + 1) so the stakes for the current epoch are stored at self.epoch_stakes[E + 1]
+        self.epoch_stakes
+            .get(&self.epoch.saturating_add(1))
+            .expect("Current epoch stakes must exist")
+    }
+
     /// Get the EpochStakes for a given epoch
     pub fn epoch_stakes(&self, epoch: Epoch) -> Option<&EpochStakes> {
         self.epoch_stakes.get(&epoch)
@@ -6117,6 +6126,11 @@ impl Bank {
         &self.epoch_stakes
     }
 
+    /// Get the staked nodes map for the current Bank::epoch
+    pub fn current_epoch_staked_nodes(&self) -> Arc<HashMap<Pubkey, u64>> {
+        self.current_epoch_stakes().stakes().staked_nodes()
+    }
+
     pub fn epoch_staked_nodes(&self, epoch: Epoch) -> Option<Arc<HashMap<Pubkey, u64>>> {
         Some(self.epoch_stakes.get(&epoch)?.stakes().staked_nodes())
     }

+ 5 - 0
sdk/src/feature_set.rs

@@ -853,6 +853,10 @@ pub mod enable_turbine_extended_fanout_experiments {
     solana_sdk::declare_id!("BZn14Liea52wtBwrXUxTv6vojuTTmfc7XGEDTXrvMD7b");
 }
 
+pub mod deprecate_legacy_vote_ixs {
+    solana_program::declare_id!("mustrekeyysGrhwdiwU42tCadZL8GcBb1i2GYhMopQv");
+}
+
 lazy_static! {
     /// Map of feature identifiers to user-visible description
     pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@@ -1061,6 +1065,7 @@ lazy_static! {
         (vote_only_retransmitter_signed_fec_sets::id(), "vote only on retransmitter signed fec sets"),
         (partitioned_epoch_rewards_superfeature::id(), "replaces enable_partitioned_epoch_reward to enable partitioned rewards at epoch boundary SIMD-0118"),
         (enable_turbine_extended_fanout_experiments::id(), "enable turbine extended fanout experiments #2373"),
+        (deprecate_legacy_vote_ixs::id(), "Deprecate legacy vote instructions"),
         /*************** ADD NEW FEATURES HERE ***************/
     ]
     .iter()