Explorar o código

v3.0: runtime: Avoid locking during stake vote rewards calculation (backport of #7742) (#8012)

`calculate_stake_vote_rewards` was storing accumulated rewards per vote
account in a `DashMap`, which then was used in a parallel iterator over
all stake delegations.

There are over 1,000,000 stake delegations and around 1,000 validators.
Each thread processes one of the stake delegations and tries to acquire
the lock on a `DashMap` shard corresponding to a validator. Given that
the number of validators is disproportionally small and they have
thousands of delegations, such solution results in high contention,
with some threads spending the most of their time on waiting for lock.

The time spent on these calculations was ~208.47ms:

```
redeem_rewards_us=208475i
```

Fix that by:

* Removing the `DashMap` and instead using `fold` and `reduce`
  operations to build a regular `HashMap`.
* Pre-allocating the `stake_rewards` vector and passing
  `&mut [MaybeUninit<PartitionedStakeReward>]` to the thread pool.
* Pulling the optimization of `StakeHistory::get` in
  `solana-stake-interface`. solana-program/stake#81

```
redeem_rewards_us=48781i
```

(cherry picked from commit 8aa41ea)

Co-authored-by: Michal R <vad.sol@proton.me>
mergify[bot] hai 1 mes
pai
achega
6001c5d60e

+ 3 - 3
runtime/src/bank.rs

@@ -63,7 +63,7 @@ use {
     agave_syscalls::{
         create_program_runtime_environment_v1, create_program_runtime_environment_v2,
     },
-    ahash::{AHashSet, RandomState},
+    ahash::AHashSet,
     dashmap::DashMap,
     log::*,
     partitioned_epoch_rewards::PartitionedRewardsCalculation,
@@ -113,7 +113,7 @@ use {
     solana_program_runtime::{
         invoke_context::BuiltinFunctionWithContext, loaded_programs::ProgramCacheEntry,
     },
-    solana_pubkey::Pubkey,
+    solana_pubkey::{Pubkey, PubkeyHasherBuilder},
     solana_reward_info::RewardInfo,
     solana_runtime_transaction::{
         runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta,
@@ -920,7 +920,7 @@ struct VoteReward {
     vote_rewards: u64,
 }
 
-type VoteRewards = DashMap<Pubkey, VoteReward, RandomState>;
+type VoteRewards = HashMap<Pubkey, VoteReward, PubkeyHasherBuilder>;
 
 #[derive(Debug, Default)]
 pub struct NewBankOptions {

+ 323 - 110
runtime/src/bank/partitioned_epoch_rewards/calculation.rs

@@ -3,8 +3,8 @@ use {
         epoch_rewards_hasher::hash_rewards_into_partitions, Bank,
         CalculateRewardsAndDistributeVoteRewardsResult, CalculateValidatorRewardsResult,
         EpochRewardCalculateParamInfo, PartitionedRewardsCalculation, PartitionedStakeReward,
-        StakeRewardCalculation, VoteRewardsAccounts, VoteRewardsAccountsStorable,
-        REWARD_CALCULATION_NUM_BLOCKS,
+        PartitionedStakeRewards, StakeRewardCalculation, VoteRewardsAccounts,
+        VoteRewardsAccountsStorable, REWARD_CALCULATION_NUM_BLOCKS,
     },
     crate::{
         bank::{
@@ -18,27 +18,84 @@ use {
         stake_account::StakeAccount,
         stakes::Stakes,
     },
-    ahash::random_state::RandomState as AHashRandomState,
-    dashmap::DashMap,
     log::{debug, info},
     rayon::{
-        iter::{IntoParallelRefIterator, ParallelIterator},
+        iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator},
         ThreadPool,
     },
-    solana_account::ReadableAccount,
     solana_clock::{Epoch, Slot},
-    solana_measure::measure_us,
+    solana_measure::{measure::Measure, measure_us},
     solana_pubkey::Pubkey,
-    solana_stake_interface::state::Delegation,
+    solana_stake_interface::{stake_history::StakeHistory, state::Delegation},
     solana_sysvar::epoch_rewards::EpochRewards,
-    solana_vote::vote_account::VoteAccount,
-    solana_vote_program::vote_state::VoteStateVersions,
-    std::sync::{
-        atomic::{AtomicU64, Ordering::Relaxed},
-        Arc,
+    solana_vote::vote_account::VoteAccounts,
+    std::{
+        ops::Add,
+        sync::{atomic::Ordering::Relaxed, Arc},
     },
 };
 
+#[derive(Debug)]
+struct DelegationRewards {
+    stake_reward: PartitionedStakeReward,
+    vote_pubkey: Pubkey,
+    vote_reward: VoteReward,
+}
+
+#[derive(Default)]
+struct RewardsAccumulator {
+    vote_rewards: VoteRewards,
+    num_stake_rewards: usize,
+    total_stake_rewards_lamports: u64,
+}
+
+impl RewardsAccumulator {
+    fn add_reward(&mut self, vote_pubkey: Pubkey, vote_reward: VoteReward, stakers_reward: u64) {
+        self.vote_rewards
+            .entry(vote_pubkey)
+            .and_modify(|dst_vote_reward| {
+                dst_vote_reward.vote_rewards = dst_vote_reward
+                    .vote_rewards
+                    .saturating_add(vote_reward.vote_rewards)
+            })
+            .or_insert(vote_reward);
+        self.num_stake_rewards = self.num_stake_rewards.saturating_add(1);
+        self.total_stake_rewards_lamports = self
+            .total_stake_rewards_lamports
+            .saturating_add(stakers_reward);
+    }
+}
+
+impl Add for RewardsAccumulator {
+    type Output = Self;
+
+    fn add(self, rhs: Self) -> Self::Output {
+        // Check which instance has more vote rewards. Treat the bigger one
+        // as a destination, which is going to be extended. This way we make
+        // the reallocation as small as possible.
+        let (mut dst, src) = if self.vote_rewards.len() >= rhs.vote_rewards.len() {
+            (self, rhs)
+        } else {
+            (rhs, self)
+        };
+        for (vote_pubkey, vote_reward) in src.vote_rewards {
+            dst.vote_rewards
+                .entry(vote_pubkey)
+                .and_modify(|dst_vote_reward: &mut VoteReward| {
+                    dst_vote_reward.vote_rewards = dst_vote_reward
+                        .vote_rewards
+                        .saturating_add(vote_reward.vote_rewards)
+                })
+                .or_insert(vote_reward);
+        }
+        dst.num_stake_rewards = dst.num_stake_rewards.saturating_add(src.num_stake_rewards);
+        dst.total_stake_rewards_lamports = dst
+            .total_stake_rewards_lamports
+            .saturating_add(src.total_stake_rewards_lamports);
+        dst
+    }
+}
+
 impl Bank {
     /// Begin the process of calculating and distributing rewards.
     /// This process can take multiple slots.
@@ -325,6 +382,70 @@ impl Bank {
         }
     }
 
+    fn redeem_delegation_rewards(
+        &self,
+        rewarded_epoch: Epoch,
+        stake_pubkey: &Pubkey,
+        stake_account: &StakeAccount<Delegation>,
+        point_value: &PointValue,
+        stake_history: &StakeHistory,
+        cached_vote_accounts: &VoteAccounts,
+        reward_calc_tracer: Option<impl RewardCalcTracer>,
+        new_rate_activation_epoch: Option<Epoch>,
+    ) -> Option<DelegationRewards> {
+        // curry closure to add the contextual stake_pubkey
+        let reward_calc_tracer = reward_calc_tracer.as_ref().map(|outer| {
+            // inner
+            move |inner_event: &_| {
+                outer(&RewardCalculationEvent::Staking(stake_pubkey, inner_event))
+            }
+        });
+
+        let stake_pubkey = *stake_pubkey;
+        let vote_pubkey = stake_account.delegation().voter_pubkey;
+        let Some(vote_account) = cached_vote_accounts.get(&vote_pubkey) else {
+            debug!("could not find vote account {vote_pubkey} in cache");
+            return None;
+        };
+        let vote_state = vote_account.vote_state_view();
+        let stake_state = stake_account.stake_state();
+
+        match redeem_rewards(
+            rewarded_epoch,
+            stake_state,
+            vote_state,
+            point_value,
+            stake_history,
+            reward_calc_tracer,
+            new_rate_activation_epoch,
+        ) {
+            Ok((stake_reward, vote_rewards, stake)) => {
+                let commission = vote_state.commission();
+                let stake_reward = PartitionedStakeReward {
+                    stake_pubkey,
+                    stake,
+                    stake_reward,
+                    commission,
+                };
+                let vote_account = vote_account.into();
+                let vote_reward = VoteReward {
+                    commission,
+                    vote_account,
+                    vote_rewards,
+                };
+                Some(DelegationRewards {
+                    stake_reward,
+                    vote_pubkey,
+                    vote_reward,
+                })
+            }
+            Err(e) => {
+                debug!("redeem_rewards() failed for {stake_pubkey}: {e:?}");
+                None
+            }
+        }
+    }
+
     /// Calculates epoch rewards for stake/vote accounts
     /// Returns vote rewards, stake rewards, and the sum of all stake rewards in lamports
     fn calculate_stake_vote_rewards(
@@ -343,103 +464,90 @@ impl Bank {
         } = reward_calculate_params;
 
         let new_warmup_cooldown_rate_epoch = self.new_warmup_cooldown_rate_epoch();
-        let estimated_num_vote_accounts = cached_vote_accounts.len();
-        let vote_account_rewards: VoteRewards = DashMap::with_capacity_and_hasher_and_shard_amount(
-            estimated_num_vote_accounts,
-            AHashRandomState::default(),
-            1024, // shard amount
-        );
 
-        let total_stake_rewards = AtomicU64::default();
-        const ASSERT_STAKE_CACHE: bool = false; // Turn this on to assert that all vote accounts are in the cache
-        let (stake_rewards, measure_stake_rewards_us) = measure_us!(thread_pool.install(|| {
+        let mut measure_redeem_rewards = Measure::start("redeem-rewards");
+        // For N stake delegations, where N is >1,000,000, we produce:
+        // * N stake rewards,
+        // * M vote rewards, where M is a number of stake nodes. Currently, way
+        //   smaller number than 1,000,000. And we can expect it to always be
+        //   significantly smaller than number of delegations.
+        //
+        // Producing the stake reward with rayon triggers a lot of
+        // (re)allocations. To avoid that, we allocate it at the start and
+        // pass `stake_rewards.spare_capacity_mut()` as one of iterators.
+        let mut stake_rewards = PartitionedStakeRewards::with_capacity(stake_delegations.len());
+        let rewards_accumulator: RewardsAccumulator = thread_pool.install(|| {
             stake_delegations
                 .par_iter()
-                .filter_map(|(stake_pubkey, stake_account)| {
-                    // curry closure to add the contextual stake_pubkey
-                    let reward_calc_tracer = reward_calc_tracer.as_ref().map(|outer| {
-                        // inner
-                        move |inner_event: &_| {
-                            outer(&RewardCalculationEvent::Staking(stake_pubkey, inner_event))
-                        }
-                    });
-
-                    let stake_pubkey = **stake_pubkey;
-                    let vote_pubkey = stake_account.delegation().voter_pubkey;
-                    let vote_account_from_cache = cached_vote_accounts.get(&vote_pubkey);
-                    if ASSERT_STAKE_CACHE && vote_account_from_cache.is_none() {
-                        let account_from_db = self.get_account_with_fixed_root(&vote_pubkey);
-                        if let Some(account_from_db) = account_from_db {
-                            if VoteStateVersions::is_correct_size_and_initialized(
-                                account_from_db.data(),
-                            ) && VoteAccount::try_from(account_from_db.clone()).is_ok()
-                            {
-                                panic!(
-                                    "Vote account {vote_pubkey} not found in cache, but found in \
-                                     db: {account_from_db:?}"
-                                );
-                            }
-                        }
-                    }
-                    let vote_account = vote_account_from_cache?;
-                    let vote_state_view = vote_account.vote_state_view();
-                    let mut stake_state = *stake_account.stake_state();
-
-                    let redeemed = redeem_rewards(
+                .zip_eq(stake_rewards.spare_capacity_mut())
+                .with_min_len(500)
+                .filter_map(|((stake_pubkey, stake_account), stake_reward_ref)| {
+                    let maybe_reward_record = self.redeem_delegation_rewards(
                         rewarded_epoch,
-                        &mut stake_state,
-                        vote_state_view,
+                        stake_pubkey,
+                        stake_account,
                         &point_value,
                         stake_history,
+                        cached_vote_accounts,
                         reward_calc_tracer.as_ref(),
                         new_warmup_cooldown_rate_epoch,
                     );
-
-                    if let Ok((stakers_reward, voters_reward)) = redeemed {
-                        let commission = vote_state_view.commission();
-
-                        // track voter rewards
-                        let mut voters_reward_entry = vote_account_rewards
-                            .entry(vote_pubkey)
-                            .or_insert(VoteReward {
-                                commission,
-                                vote_account: vote_account.into(),
-                                vote_rewards: 0,
-                            });
-
-                        voters_reward_entry.vote_rewards = voters_reward_entry
-                            .vote_rewards
-                            .saturating_add(voters_reward);
-
-                        total_stake_rewards.fetch_add(stakers_reward, Relaxed);
-
-                        // Safe to unwrap because all stake_delegations are type
-                        // StakeAccount<Delegation>, which will always only wrap
-                        // a `StakeStateV2::Stake` variant.
-                        let stake = stake_state.stake().unwrap();
-                        return Some(PartitionedStakeReward {
-                            stake_pubkey,
-                            stake_reward: stakers_reward,
-                            stake,
-                            commission,
-                        });
-                    } else {
-                        debug!("redeem_rewards() failed for {stake_pubkey}: {redeemed:?}");
-                    }
-                    None
+                    let (stake_reward, maybe_reward_record) = match maybe_reward_record {
+                        Some(res) => {
+                            let DelegationRewards {
+                                stake_reward,
+                                vote_pubkey,
+                                vote_reward,
+                            } = res;
+                            let stakers_reward = stake_reward.stake_reward;
+                            (
+                                Some(stake_reward),
+                                Some((stakers_reward, vote_pubkey, vote_reward)),
+                            )
+                        }
+                        None => (None, None),
+                    };
+                    // It's important that for every stake delegation, we write
+                    // a value to the cell of the stake rewards vector,
+                    // regardless of whether it's `Some` or `None` variant.
+                    // This allows us to pre-allocate the vector with the known
+                    // size and avoid re-allocations, which were the bottleneck
+                    // in this path.
+                    stake_reward_ref.write(stake_reward);
+                    maybe_reward_record
                 })
-                .collect()
-        }));
-        let (vote_rewards, measure_vote_rewards_us) =
-            measure_us!(Self::calc_vote_accounts_to_store(vote_account_rewards));
-
-        metrics.redeem_rewards_us += measure_stake_rewards_us + measure_vote_rewards_us;
+                .fold(
+                    RewardsAccumulator::default,
+                    |mut rewards_accumulator, (stake_reward, vote_pubkey, vote_reward)| {
+                        rewards_accumulator.add_reward(vote_pubkey, vote_reward, stake_reward);
+                        rewards_accumulator
+                    },
+                )
+                .reduce(
+                    RewardsAccumulator::default,
+                    |rewards_accumulator_a, rewards_accumulator_b| {
+                        rewards_accumulator_a + rewards_accumulator_b
+                    },
+                )
+        });
+        let RewardsAccumulator {
+            vote_rewards,
+            num_stake_rewards,
+            total_stake_rewards_lamports,
+        } = rewards_accumulator;
+        // SAFETY: We initialized all the `stake_rewards` elements up to the capacity.
+        unsafe {
+            stake_rewards.assume_init(num_stake_rewards);
+        }
+        let vote_rewards = Self::calc_vote_accounts_to_store(vote_rewards);
+        measure_redeem_rewards.stop();
+        metrics.redeem_rewards_us = measure_redeem_rewards.as_us();
 
         (
             vote_rewards,
             StakeRewardCalculation {
                 stake_rewards: Arc::new(stake_rewards),
-                total_stake_rewards_lamports: total_stake_rewards.load(Relaxed),
+                total_stake_rewards_lamports,
             },
         )
     }
@@ -521,7 +629,7 @@ impl Bank {
         epoch_rewards_sysvar: &EpochRewards,
         reward_calc_tracer: Option<impl RewardCalcTracer>,
         thread_pool: &ThreadPool,
-    ) -> (Arc<Vec<PartitionedStakeReward>>, Vec<Vec<usize>>) {
+    ) -> (Arc<PartitionedStakeRewards>, Vec<Vec<usize>>) {
         assert!(epoch_rewards_sysvar.active);
         // If rewards are active, the rewarded epoch is always the immediately
         // preceding epoch.
@@ -583,11 +691,13 @@ mod tests {
         agave_feature_set::FeatureSet,
         rayon::ThreadPoolBuilder,
         solana_account::{accounts_equal, state_traits::StateMut, ReadableAccount},
+        solana_accounts_db::partitioned_rewards::PartitionedEpochRewardsConfig,
         solana_genesis_config::GenesisConfig,
         solana_native_token::LAMPORTS_PER_SOL,
         solana_reward_info::RewardType,
         solana_stake_interface::state::{Delegation, StakeStateV2},
         solana_vote_interface::state::VoteStateV3,
+        solana_vote_program::vote_state,
         std::sync::{Arc, RwLockReadGuard},
     };
 
@@ -670,10 +780,21 @@ mod tests {
     fn test_rewards_computation() {
         solana_logger::setup();
 
-        let expected_num_delegations = 100;
-        let bank = create_default_reward_bank(expected_num_delegations, SLOTS_PER_EPOCH)
-            .0
-            .bank;
+        // Delegations with sufficient stake to get rewards (2 SOL).
+        let delegations_with_rewards = 100;
+        // Delegations with insufficient stake (0.5 SOL).
+        let delegations_without_rewards = 10;
+        let stakes = (0..delegations_with_rewards)
+            .map(|_| 2_000_000_000)
+            .chain((0..delegations_without_rewards).map(|_| 500_000_000))
+            .collect::<Vec<_>>();
+        let bank = create_reward_bank_with_specific_stakes(
+            stakes,
+            PartitionedEpochRewardsConfig::default().stake_account_stores_per_block,
+            SLOTS_PER_EPOCH,
+        )
+        .0
+        .bank;
 
         // Calculate rewards
         let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -707,7 +828,10 @@ mod tests {
         );
 
         // assert that number of stake rewards matches
-        assert_eq!(stake_rewards.stake_rewards.len(), expected_num_delegations);
+        assert_eq!(
+            stake_rewards.stake_rewards.num_rewards(),
+            delegations_with_rewards
+        );
     }
 
     #[test]
@@ -828,7 +952,7 @@ mod tests {
         );
         assert_eq!(vote_pubkey_from_result, vote_pubkey);
 
-        assert_eq!(stake_reward_calculation.stake_rewards.len(), 1);
+        assert_eq!(stake_reward_calculation.stake_rewards.num_rewards(), 1);
         let expected_reward = {
             let stake_reward = 8_400_000_000_000;
             let stake_state: StakeStateV2 = stake_account.state().unwrap();
@@ -842,7 +966,15 @@ mod tests {
                 commission,
             }
         };
-        assert_eq!(stake_reward_calculation.stake_rewards[0], expected_reward);
+        assert_eq!(
+            stake_reward_calculation
+                .stake_rewards
+                .get(0)
+                .unwrap()
+                .as_ref()
+                .unwrap(),
+            &expected_reward
+        );
     }
 
     fn compare_stake_rewards(
@@ -851,10 +983,7 @@ mod tests {
     ) {
         for (i, partition) in received_stake_rewards.iter().enumerate() {
             let expected_partition = &expected_stake_rewards[i];
-            assert_eq!(partition.len(), expected_partition.len());
-            for reward in partition {
-                assert!(expected_partition.iter().any(|x| x == reward));
-            }
+            assert_eq!(partition, expected_partition);
         }
     }
 
@@ -934,7 +1063,7 @@ mod tests {
         );
         // First partition has already been distributed, so recalculation
         // returns 0 rewards
-        assert_eq!(recalculated_rewards[0].len(), 0);
+        assert_eq!(recalculated_rewards[0].num_rewards(), 0);
         let starting_index = (bank.block_height() + 1
             - epoch_rewards_sysvar.distribution_starting_block_height)
             as usize;
@@ -1114,7 +1243,7 @@ mod tests {
         assert_eq!(expected_stake_rewards.len(), recalculated_rewards.len());
         // First partition has already been distributed, so recalculation
         // returns 0 rewards
-        assert_eq!(recalculated_rewards[0].len(), 0);
+        assert_eq!(recalculated_rewards[0].num_rewards(), 0);
         let epoch_rewards_sysvar = bank.get_epoch_rewards_sysvar();
         let starting_index = (bank.block_height() + 1
             - epoch_rewards_sysvar.distribution_starting_block_height)
@@ -1184,8 +1313,92 @@ mod tests {
             distribution_status.distribution_starting_block_height
         );
         assert_eq!(
-            calculation_status.all_stake_rewards.len(),
+            calculation_status.all_stake_rewards.num_rewards(),
             expected_num_stake_rewards
         );
     }
+
+    #[test]
+    fn test_reward_accumulator() {
+        let mut accumulator1 = RewardsAccumulator::default();
+        let mut accumulator2 = RewardsAccumulator::default();
+
+        let vote_pubkey_a = Pubkey::new_unique();
+        let vote_account_a =
+            vote_state::create_account(&vote_pubkey_a, &Pubkey::new_unique(), 20, 100);
+        let vote_pubkey_b = Pubkey::new_unique();
+        let vote_account_b =
+            vote_state::create_account(&vote_pubkey_b, &Pubkey::new_unique(), 20, 100);
+        let vote_pubkey_c = Pubkey::new_unique();
+        let vote_account_c =
+            vote_state::create_account(&vote_pubkey_c, &Pubkey::new_unique(), 20, 100);
+
+        accumulator1.add_reward(
+            vote_pubkey_a,
+            VoteReward {
+                vote_account: vote_account_a.clone(),
+                commission: 10,
+                vote_rewards: 50,
+            },
+            50,
+        );
+        accumulator1.add_reward(
+            vote_pubkey_b,
+            VoteReward {
+                vote_account: vote_account_b.clone(),
+                commission: 10,
+                vote_rewards: 50,
+            },
+            50,
+        );
+        accumulator2.add_reward(
+            vote_pubkey_b,
+            VoteReward {
+                vote_account: vote_account_b,
+                commission: 10,
+                vote_rewards: 30,
+            },
+            30,
+        );
+        accumulator2.add_reward(
+            vote_pubkey_c,
+            VoteReward {
+                vote_account: vote_account_c,
+                commission: 10,
+                vote_rewards: 50,
+            },
+            50,
+        );
+
+        assert_eq!(accumulator1.num_stake_rewards, 2);
+        assert_eq!(accumulator1.total_stake_rewards_lamports, 100);
+        let vote_reward_a_1 = accumulator1.vote_rewards.get(&vote_pubkey_a).unwrap();
+        assert_eq!(vote_reward_a_1.commission, 10);
+        assert_eq!(vote_reward_a_1.vote_rewards, 50);
+        let vote_reward_b_1 = accumulator1.vote_rewards.get(&vote_pubkey_b).unwrap();
+        assert_eq!(vote_reward_b_1.commission, 10);
+        assert_eq!(vote_reward_b_1.vote_rewards, 50);
+
+        let vote_reward_b_2 = accumulator2.vote_rewards.get(&vote_pubkey_b).unwrap();
+        assert_eq!(vote_reward_b_2.commission, 10);
+        assert_eq!(vote_reward_b_2.vote_rewards, 30);
+        let vote_reward_c_2 = accumulator2.vote_rewards.get(&vote_pubkey_c).unwrap();
+        assert_eq!(vote_reward_c_2.commission, 10);
+        assert_eq!(vote_reward_c_2.vote_rewards, 50);
+
+        let accumulator = accumulator1 + accumulator2;
+
+        assert_eq!(accumulator.num_stake_rewards, 4);
+        assert_eq!(accumulator.total_stake_rewards_lamports, 180);
+        let vote_reward_a = accumulator.vote_rewards.get(&vote_pubkey_a).unwrap();
+        assert_eq!(vote_reward_a.commission, 10);
+        assert_eq!(vote_reward_a.vote_rewards, 50);
+        let vote_reward_b = accumulator.vote_rewards.get(&vote_pubkey_b).unwrap();
+        assert_eq!(vote_reward_b.commission, 10);
+        // sum of the vote rewards from both accumulators
+        assert_eq!(vote_reward_b.vote_rewards, 80);
+        let vote_reward_c = accumulator.vote_rewards.get(&vote_pubkey_c).unwrap();
+        assert_eq!(vote_reward_c.commission, 10);
+        assert_eq!(vote_reward_c.vote_rewards, 50);
+    }
 }

+ 16 - 12
runtime/src/bank/partitioned_epoch_rewards/distribution.rs

@@ -163,7 +163,7 @@ impl Bank {
         let metrics = RewardsStoreMetrics {
             pre_capitalization,
             post_capitalization: self.capitalization(),
-            total_stake_accounts_count: partition_rewards.all_stake_rewards.len(),
+            total_stake_accounts_count: partition_rewards.all_stake_rewards.num_rewards(),
             total_num_partitions: partition_rewards.partition_indices.len(),
             partition_index,
             store_stake_accounts_us,
@@ -269,8 +269,12 @@ impl Bank {
                 .unwrap_or_else(|| {
                     panic!(
                         "partition reward out of bound: {index} >= {}",
-                        partition_rewards.all_stake_rewards.len()
+                        partition_rewards.all_stake_rewards.total_len()
                     )
+                })
+                .as_ref()
+                .unwrap_or_else(|| {
+                    panic!("partition reward {index} is empty");
                 });
             let stake_pubkey = partitioned_stake_reward.stake_pubkey;
             let reward_amount = partitioned_stake_reward.stake_reward;
@@ -307,7 +311,7 @@ mod tests {
             bank::{
                 partitioned_epoch_rewards::{
                     epoch_rewards_hasher::hash_rewards_into_partitions, tests::convert_rewards,
-                    REWARD_CALCULATION_NUM_BLOCKS,
+                    PartitionedStakeRewards, REWARD_CALCULATION_NUM_BLOCKS,
                 },
                 tests::create_genesis_config,
             },
@@ -339,8 +343,8 @@ mod tests {
         let expected_num = 100;
 
         let stake_rewards = (0..expected_num)
-            .map(|_| PartitionedStakeReward::new_random())
-            .collect::<Vec<_>>();
+            .map(|_| Some(PartitionedStakeReward::new_random()))
+            .collect::<PartitionedStakeRewards>();
 
         let partition_indices =
             hash_rewards_into_partitions(&stake_rewards, &Hash::new_from_array([1; 32]), 2);
@@ -363,8 +367,8 @@ mod tests {
         let expected_num = 1;
 
         let stake_rewards = (0..expected_num)
-            .map(|_| PartitionedStakeReward::new_random())
-            .collect::<Vec<_>>();
+            .map(|_| Some(PartitionedStakeReward::new_random()))
+            .collect::<PartitionedStakeRewards>();
 
         let partition_indices = hash_rewards_into_partitions(
             &stake_rewards,
@@ -388,7 +392,7 @@ mod tests {
 
         bank.set_epoch_reward_status_distribution(
             bank.block_height() + REWARD_CALCULATION_NUM_BLOCKS,
-            Arc::new(vec![]),
+            Arc::new(PartitionedStakeRewards::default()),
             vec![],
         );
 
@@ -745,11 +749,11 @@ mod tests {
             .map(|_| StakeReward::new_random())
             .collect::<Vec<_>>();
         populate_starting_stake_accounts_from_stake_rewards(&bank, &stake_rewards);
-        let converted_rewards: Vec<_> = convert_rewards(stake_rewards);
+        let converted_rewards = convert_rewards(stake_rewards);
 
         let expected_total = converted_rewards
-            .iter()
-            .map(|stake_reward| stake_reward.stake_reward)
+            .enumerated_rewards_iter()
+            .map(|(_, stake_reward)| stake_reward.stake_reward)
             .sum::<u64>();
 
         let partitioned_rewards = StartBlockHeightAndPartitionedRewards {
@@ -772,7 +776,7 @@ mod tests {
 
         let partitioned_rewards = StartBlockHeightAndPartitionedRewards {
             distribution_starting_block_height: bank.block_height() + REWARD_CALCULATION_NUM_BLOCKS,
-            all_stake_rewards: Arc::new(vec![]),
+            all_stake_rewards: Arc::new(PartitionedStakeRewards::default()),
             partition_indices: vec![vec![]],
         };
 

+ 8 - 8
runtime/src/bank/partitioned_epoch_rewards/epoch_rewards_hasher.rs

@@ -1,17 +1,17 @@
 use {
-    crate::bank::partitioned_epoch_rewards::PartitionedStakeReward, itertools::enumerate,
+    crate::bank::partitioned_epoch_rewards::PartitionedStakeRewards,
     solana_epoch_rewards_hasher::EpochRewardsHasher, solana_hash::Hash,
 };
 
 pub(in crate::bank::partitioned_epoch_rewards) fn hash_rewards_into_partitions(
-    stake_rewards: &[PartitionedStakeReward],
+    stake_rewards: &PartitionedStakeRewards,
     parent_blockhash: &Hash,
     num_partitions: usize,
 ) -> Vec<Vec<usize>> {
     let hasher = EpochRewardsHasher::new(num_partitions, parent_blockhash);
     let mut indices = vec![vec![]; num_partitions];
 
-    for (i, reward) in enumerate(stake_rewards) {
+    for (i, reward) in stake_rewards.enumerated_rewards_iter() {
         // clone here so the hasher's state is re-used on each call to `hash_address_to_partition`.
         // This prevents us from re-hashing the seed each time.
         // The clone is explicit (as opposed to an implicit copy) so it is clear this is intended.
@@ -43,8 +43,8 @@ mod tests {
         let expected_num = 12345;
 
         let stake_rewards = (0..expected_num)
-            .map(|_| PartitionedStakeReward::new_random())
-            .collect::<Vec<_>>();
+            .map(|_| Some(PartitionedStakeReward::new_random()))
+            .collect::<PartitionedStakeRewards>();
 
         let partition_indices = hash_rewards_into_partitions(&stake_rewards, &Hash::default(), 5);
         let total_num_after_hash_partition: usize = partition_indices.iter().map(|x| x.len()).sum();
@@ -55,7 +55,7 @@ mod tests {
 
     #[test]
     fn test_hash_rewards_into_partitions_empty() {
-        let stake_rewards = vec![];
+        let stake_rewards = PartitionedStakeRewards::default();
 
         let num_partitions = 5;
         let partition_indices =
@@ -79,8 +79,8 @@ mod tests {
         // simulate 40K - 1 rewards, the expected num of credit blocks should be 10.
         let expected_num = 40959;
         let stake_rewards = (0..expected_num)
-            .map(|_| PartitionedStakeReward::new_random())
-            .collect::<Vec<_>>();
+            .map(|_| Some(PartitionedStakeReward::new_random()))
+            .collect::<PartitionedStakeRewards>();
 
         let partition_indices =
             hash_rewards_into_partitions(&stake_rewards, &Hash::new_from_array([1; 32]), 10);

+ 111 - 20
runtime/src/bank/partitioned_epoch_rewards/mod.rs

@@ -20,7 +20,7 @@ use {
     solana_reward_info::RewardInfo,
     solana_stake_interface::state::{Delegation, Stake},
     solana_vote::vote_account::VoteAccounts,
-    std::sync::Arc,
+    std::{mem::MaybeUninit, sync::Arc},
 };
 
 /// Number of blocks for reward calculation and storing vote accounts.
@@ -39,14 +39,79 @@ pub(crate) struct PartitionedStakeReward {
     pub commission: u8,
 }
 
-type PartitionedStakeRewards = Vec<PartitionedStakeReward>;
+/// A vector of stake rewards.
+#[derive(Debug, Default, PartialEq)]
+pub(crate) struct PartitionedStakeRewards {
+    /// Inner vector.
+    rewards: Vec<Option<PartitionedStakeReward>>,
+    /// Number of stake rewards.
+    num_rewards: usize,
+}
+
+impl PartitionedStakeRewards {
+    pub(crate) fn with_capacity(capacity: usize) -> Self {
+        let rewards = Vec::with_capacity(capacity);
+        Self {
+            rewards,
+            num_rewards: 0,
+        }
+    }
+
+    /// Number of stake rewards.
+    pub(crate) fn num_rewards(&self) -> usize {
+        self.num_rewards
+    }
+
+    /// Total length, including both `Some` and `None` elements.
+    pub(crate) fn total_len(&self) -> usize {
+        self.rewards.len()
+    }
+
+    pub(crate) fn get(&self, index: usize) -> Option<&Option<PartitionedStakeReward>> {
+        self.rewards.get(index)
+    }
+
+    pub(crate) fn enumerated_rewards_iter(
+        &self,
+    ) -> impl Iterator<Item = (usize, &PartitionedStakeReward)> {
+        self.rewards
+            .iter()
+            .enumerate()
+            .filter_map(|(index, reward)| Some((index, reward.as_ref()?)))
+    }
+
+    fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<Option<PartitionedStakeReward>>] {
+        self.rewards.spare_capacity_mut()
+    }
+
+    unsafe fn assume_init(&mut self, num_stake_rewards: usize) {
+        self.rewards.set_len(self.rewards.capacity());
+        self.num_rewards = num_stake_rewards;
+    }
+}
+
+#[cfg(test)]
+impl FromIterator<Option<PartitionedStakeReward>> for PartitionedStakeRewards {
+    fn from_iter<T: IntoIterator<Item = Option<PartitionedStakeReward>>>(iter: T) -> Self {
+        let mut len_some: usize = 0;
+        let rewards = Vec::from_iter(iter.into_iter().inspect(|reward| {
+            if reward.is_some() {
+                len_some = len_some.saturating_add(1);
+            }
+        }));
+        Self {
+            rewards,
+            num_rewards: len_some,
+        }
+    }
+}
 
 #[derive(Debug, Clone, PartialEq)]
 pub(crate) struct StartBlockHeightAndRewards {
     /// the block height of the slot at which rewards distribution began
     pub(crate) distribution_starting_block_height: u64,
     /// calculated epoch rewards before partitioning
-    pub(crate) all_stake_rewards: Arc<Vec<PartitionedStakeReward>>,
+    pub(crate) all_stake_rewards: Arc<PartitionedStakeRewards>,
 }
 
 #[derive(Debug, Clone, PartialEq)]
@@ -55,7 +120,7 @@ pub(crate) struct StartBlockHeightAndPartitionedRewards {
     pub(crate) distribution_starting_block_height: u64,
 
     /// calculated epoch rewards pending distribution
-    pub(crate) all_stake_rewards: Arc<Vec<PartitionedStakeReward>>,
+    pub(crate) all_stake_rewards: Arc<PartitionedStakeRewards>,
 
     /// indices of calculated epoch rewards per partition, outer Vec is by
     /// partition (one partition per block), inner Vec is the indices for one
@@ -196,7 +261,7 @@ pub(super) struct CalculateRewardsAndDistributeVoteRewardsResult {
     /// vote accounts
     pub(super) point_value: PointValue,
     /// stake rewards that still need to be distributed
-    pub(super) stake_rewards: Arc<Vec<PartitionedStakeReward>>,
+    pub(super) stake_rewards: Arc<PartitionedStakeRewards>,
 }
 
 pub(crate) type StakeRewards = Vec<StakeReward>;
@@ -234,7 +299,7 @@ impl Bank {
     pub(crate) fn set_epoch_reward_status_calculation(
         &mut self,
         distribution_starting_block_height: u64,
-        stake_rewards: Arc<Vec<PartitionedStakeReward>>,
+        stake_rewards: Arc<PartitionedStakeRewards>,
     ) {
         self.epoch_reward_status =
             EpochRewardStatus::Active(EpochRewardPhase::Calculation(StartBlockHeightAndRewards {
@@ -246,7 +311,7 @@ impl Bank {
     pub(crate) fn set_epoch_reward_status_distribution(
         &mut self,
         distribution_starting_block_height: u64,
-        all_stake_rewards: Arc<Vec<PartitionedStakeReward>>,
+        all_stake_rewards: Arc<PartitionedStakeRewards>,
         partition_indices: Vec<Vec<usize>>,
     ) {
         self.epoch_reward_status = EpochRewardStatus::Active(EpochRewardPhase::Distribution(
@@ -277,7 +342,7 @@ impl Bank {
         &self,
         rewards: &PartitionedStakeRewards,
     ) -> u64 {
-        let total_stake_accounts = rewards.len();
+        let total_stake_accounts = rewards.num_rewards();
         if self.epoch_schedule.warmup && self.epoch < self.first_normal_epoch() {
             1
         } else {
@@ -352,9 +417,9 @@ mod tests {
     }
 
     pub fn build_partitioned_stake_rewards(
-        stake_rewards: &[PartitionedStakeReward],
+        stake_rewards: &PartitionedStakeRewards,
         partition_indices: &[Vec<usize>],
-    ) -> Vec<Vec<PartitionedStakeReward>> {
+    ) -> Vec<PartitionedStakeRewards> {
         partition_indices
             .iter()
             .map(|partition_index| {
@@ -362,8 +427,8 @@ mod tests {
                 // that belong to this partition
                 partition_index
                     .iter()
-                    .map(|&index| stake_rewards[index].clone())
-                    .collect::<Vec<_>>()
+                    .map(|&index| stake_rewards.get(index).unwrap().clone())
+                    .collect::<PartitionedStakeRewards>()
             })
             .collect::<Vec<_>>()
     }
@@ -373,7 +438,7 @@ mod tests {
     ) -> PartitionedStakeRewards {
         stake_rewards
             .into_iter()
-            .map(|stake_reward| PartitionedStakeReward::maybe_from(&stake_reward).unwrap())
+            .map(|stake_reward| Some(PartitionedStakeReward::maybe_from(&stake_reward).unwrap()))
             .collect()
     }
 
@@ -547,8 +612,8 @@ mod tests {
         let expected_num = 100;
 
         let stake_rewards = (0..expected_num)
-            .map(|_| PartitionedStakeReward::new_random())
-            .collect::<Vec<_>>();
+            .map(|_| Some(PartitionedStakeReward::new_random()))
+            .collect::<PartitionedStakeRewards>();
 
         let partition_indices = vec![(0..expected_num).collect()];
 
@@ -599,8 +664,8 @@ mod tests {
             |num_stakes: u64, expected_num_reward_distribution_blocks: u64| {
                 // Given the short epoch, i.e. 32 slots, we should cap the number of reward distribution blocks to 32/10 = 3.
                 let stake_rewards = (0..num_stakes)
-                    .map(|_| PartitionedStakeReward::new_random())
-                    .collect::<Vec<_>>();
+                    .map(|_| Some(PartitionedStakeReward::new_random()))
+                    .collect::<PartitionedStakeRewards>();
 
                 assert_eq!(
                     bank.get_reward_distribution_num_blocks(&stake_rewards),
@@ -637,8 +702,8 @@ mod tests {
         // Given 8k rewards, it will take 2 blocks to credit all the rewards
         let expected_num = 8192;
         let stake_rewards = (0..expected_num)
-            .map(|_| PartitionedStakeReward::new_random())
-            .collect::<Vec<_>>();
+            .map(|_| Some(PartitionedStakeReward::new_random()))
+            .collect::<PartitionedStakeRewards>();
 
         assert_eq!(bank.get_reward_distribution_num_blocks(&stake_rewards), 2);
     }
@@ -650,7 +715,33 @@ mod tests {
         let (genesis_config, _mint_keypair) = create_genesis_config(1_000_000 * LAMPORTS_PER_SOL);
 
         let bank = Bank::new_for_tests(&genesis_config);
-        let rewards = vec![];
+        let rewards = PartitionedStakeRewards::default();
+        assert_eq!(bank.get_reward_distribution_num_blocks(&rewards), 1);
+    }
+
+    /// Test get_reward_distribution_num_blocks with `None` elements in the
+    /// partitioned stake rewards. `None` elements can occur if for any stake
+    /// delegation:
+    /// * there is no payout or if any deserved payout is < 1 lamport
+    /// * corresponding vote account was not found in cache and accounts-db
+    #[test]
+    fn test_get_reward_distribution_num_blocks_none() {
+        let rewards_all = 8192;
+        let expected_rewards_some = 6144;
+        let rewards = (0..rewards_all)
+            .map(|i| {
+                if i % 4 == 0 {
+                    None
+                } else {
+                    Some(PartitionedStakeReward::new_random())
+                }
+            })
+            .collect::<PartitionedStakeRewards>();
+        assert_eq!(rewards.rewards.len(), rewards_all);
+        assert_eq!(rewards.num_rewards(), expected_rewards_some);
+
+        let (genesis_config, _mint_keypair) = create_genesis_config(1_000_000 * LAMPORTS_PER_SOL);
+        let bank = Bank::new_for_tests(&genesis_config);
         assert_eq!(bank.get_reward_distribution_num_blocks(&rewards), 1);
     }
 

+ 3 - 3
runtime/src/bank/tests.rs

@@ -11248,7 +11248,7 @@ fn test_system_instruction_unsigned_transaction() {
 
 #[test]
 fn test_calc_vote_accounts_to_store_empty() {
-    let vote_account_rewards = DashMap::default();
+    let vote_account_rewards = HashMap::default();
     let result = Bank::calc_vote_accounts_to_store(vote_account_rewards);
     assert_eq!(
         result.accounts_with_rewards.len(),
@@ -11259,7 +11259,7 @@ fn test_calc_vote_accounts_to_store_empty() {
 
 #[test]
 fn test_calc_vote_accounts_to_store_overflow() {
-    let vote_account_rewards = DashMap::default();
+    let mut vote_account_rewards = HashMap::default();
     let pubkey = solana_pubkey::new_rand();
     let mut vote_account = AccountSharedData::default();
     vote_account.set_lamports(u64::MAX);
@@ -11284,7 +11284,7 @@ fn test_calc_vote_accounts_to_store_normal() {
     let pubkey = solana_pubkey::new_rand();
     for commission in 0..2 {
         for vote_rewards in 0..2 {
-            let vote_account_rewards = DashMap::default();
+            let mut vote_account_rewards = HashMap::default();
             let mut vote_account = AccountSharedData::default();
             vote_account.set_lamports(1);
             vote_account_rewards.insert(

+ 10 - 6
runtime/src/inflation_rewards/mod.rs

@@ -21,17 +21,20 @@ struct CalculatedStakeRewards {
     new_credits_observed: u64,
 }
 
-// utility function
-// returns a tuple of (stakers_reward,voters_reward)
+/// Redeems rewards for the given epoch, stake state and vote state.
+/// Returns a tuple of:
+/// * Stakers reward
+/// * Voters reward
+/// * Updated stake information
 pub fn redeem_rewards(
     rewarded_epoch: Epoch,
-    stake_state: &mut StakeStateV2,
+    stake_state: &StakeStateV2,
     vote_state: &VoteStateView,
     point_value: &PointValue,
     stake_history: &StakeHistory,
     inflation_point_calc_tracer: Option<impl Fn(&InflationPointCalculationEvent)>,
     new_rate_activation_epoch: Option<Epoch>,
-) -> Result<(u64, u64), InstructionError> {
+) -> Result<(u64, u64, Stake), InstructionError> {
     if let StakeStateV2::Stake(meta, stake, _stake_flags) = stake_state {
         if let Some(inflation_point_calc_tracer) = inflation_point_calc_tracer.as_ref() {
             inflation_point_calc_tracer(
@@ -49,16 +52,17 @@ pub fn redeem_rewards(
             ));
         }
 
+        let mut stake = *stake;
         if let Some((stakers_reward, voters_reward)) = redeem_stake_rewards(
             rewarded_epoch,
-            stake,
+            &mut stake,
             point_value,
             vote_state,
             stake_history,
             inflation_point_calc_tracer,
             new_rate_activation_epoch,
         ) {
-            Ok((stakers_reward, voters_reward))
+            Ok((stakers_reward, voters_reward, stake))
         } else {
             Err(StakeError::NoCreditsToRedeem.into())
         }