|
|
@@ -1,7 +1,6 @@
|
|
|
use {
|
|
|
super::{
|
|
|
consumer::Consumer,
|
|
|
- forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
|
|
|
immutable_deserialized_packet::ImmutableDeserializedPacket,
|
|
|
latest_unprocessed_votes::{
|
|
|
LatestUnprocessedVotes, LatestValidatorVotePacket, VoteBatchInsertionMetrics,
|
|
|
@@ -13,19 +12,15 @@ use {
|
|
|
unprocessed_packet_batches::{
|
|
|
DeserializedPacket, PacketBatchInsertionMetrics, UnprocessedPacketBatches,
|
|
|
},
|
|
|
- BankingStageStats, FilterForwardingResults, ForwardOption,
|
|
|
+ BankingStageStats,
|
|
|
},
|
|
|
itertools::Itertools,
|
|
|
min_max_heap::MinMaxHeap,
|
|
|
solana_accounts_db::account_locks::validate_account_locks,
|
|
|
- solana_feature_set::FeatureSet,
|
|
|
solana_measure::measure_us,
|
|
|
solana_runtime::bank::Bank,
|
|
|
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
|
|
|
- solana_sdk::{
|
|
|
- clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, hash::Hash, saturating_add_assign,
|
|
|
- transaction::SanitizedTransaction,
|
|
|
- },
|
|
|
+ solana_sdk::{hash::Hash, transaction::SanitizedTransaction},
|
|
|
solana_svm::transaction_error_metrics::TransactionErrorMetrics,
|
|
|
std::{
|
|
|
collections::HashMap,
|
|
|
@@ -316,7 +311,7 @@ impl UnprocessedTransactionStorage {
|
|
|
}
|
|
|
|
|
|
pub fn should_not_process(&self) -> bool {
|
|
|
- // The gossip vote thread does not need to process or forward any votes, that is
|
|
|
+ // The gossip vote thread does not need to process any votes, that is
|
|
|
// handled by the tpu vote thread
|
|
|
if let Self::VoteStorage(vote_storage) = self {
|
|
|
return matches!(vote_storage.vote_source, VoteSource::Gossip);
|
|
|
@@ -332,22 +327,6 @@ impl UnprocessedTransactionStorage {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn forward_option(&self) -> ForwardOption {
|
|
|
- match self {
|
|
|
- Self::VoteStorage(vote_storage) => vote_storage.forward_option(),
|
|
|
- Self::LocalTransactionStorage(transaction_storage) => {
|
|
|
- transaction_storage.forward_option()
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- pub fn clear_forwarded_packets(&mut self) {
|
|
|
- match self {
|
|
|
- Self::LocalTransactionStorage(transaction_storage) => transaction_storage.clear(), // Since we set everything as forwarded this is the same
|
|
|
- Self::VoteStorage(vote_storage) => vote_storage.clear_forwarded_packets(),
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
pub(crate) fn insert_batch(
|
|
|
&mut self,
|
|
|
deserialized_packets: Vec<ImmutableDeserializedPacket>,
|
|
|
@@ -362,25 +341,6 @@ impl UnprocessedTransactionStorage {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn filter_forwardable_packets_and_add_batches(
|
|
|
- &mut self,
|
|
|
- bank: Arc<Bank>,
|
|
|
- forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
|
|
|
- ) -> FilterForwardingResults {
|
|
|
- match self {
|
|
|
- Self::LocalTransactionStorage(transaction_storage) => transaction_storage
|
|
|
- .filter_forwardable_packets_and_add_batches(
|
|
|
- bank,
|
|
|
- forward_packet_batches_by_accounts,
|
|
|
- ),
|
|
|
- Self::VoteStorage(vote_storage) => vote_storage
|
|
|
- .filter_forwardable_packets_and_add_batches(
|
|
|
- bank,
|
|
|
- forward_packet_batches_by_accounts,
|
|
|
- ),
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/// The processing function takes a stream of packets ready to process, and returns the indices
|
|
|
/// of the unprocessed packets that are eligible for retry. A return value of None means that
|
|
|
/// all packets are unprocessed and eligible for retry.
|
|
|
@@ -415,6 +375,13 @@ impl UnprocessedTransactionStorage {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ pub(crate) fn clear(&mut self) {
|
|
|
+ match self {
|
|
|
+ Self::LocalTransactionStorage(_) => {}
|
|
|
+ Self::VoteStorage(vote_storage) => vote_storage.clear(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) {
|
|
|
match self {
|
|
|
Self::LocalTransactionStorage(_) => (),
|
|
|
@@ -436,17 +403,6 @@ impl VoteStorage {
|
|
|
MAX_NUM_VOTES_RECEIVE
|
|
|
}
|
|
|
|
|
|
- fn forward_option(&self) -> ForwardOption {
|
|
|
- match self.vote_source {
|
|
|
- VoteSource::Tpu => ForwardOption::ForwardTpuVote,
|
|
|
- VoteSource::Gossip => ForwardOption::NotForward,
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- fn clear_forwarded_packets(&mut self) {
|
|
|
- self.latest_unprocessed_votes.clear_forwarded_packets();
|
|
|
- }
|
|
|
-
|
|
|
fn insert_batch(
|
|
|
&mut self,
|
|
|
deserialized_packets: Vec<ImmutableDeserializedPacket>,
|
|
|
@@ -467,23 +423,6 @@ impl VoteStorage {
|
|
|
)
|
|
|
}
|
|
|
|
|
|
- fn filter_forwardable_packets_and_add_batches(
|
|
|
- &mut self,
|
|
|
- bank: Arc<Bank>,
|
|
|
- forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
|
|
|
- ) -> FilterForwardingResults {
|
|
|
- if matches!(self.vote_source, VoteSource::Tpu) {
|
|
|
- let total_forwardable_packets = self
|
|
|
- .latest_unprocessed_votes
|
|
|
- .get_and_insert_forwardable_packets(bank, forward_packet_batches_by_accounts);
|
|
|
- return FilterForwardingResults {
|
|
|
- total_forwardable_packets,
|
|
|
- ..FilterForwardingResults::default()
|
|
|
- };
|
|
|
- }
|
|
|
- FilterForwardingResults::default()
|
|
|
- }
|
|
|
-
|
|
|
// returns `true` if the end of slot is reached
|
|
|
fn process_packets<F>(
|
|
|
&mut self,
|
|
|
@@ -560,6 +499,10 @@ impl VoteStorage {
|
|
|
scanner.finalize().payload.reached_end_of_slot
|
|
|
}
|
|
|
|
|
|
+ fn clear(&mut self) {
|
|
|
+ self.latest_unprocessed_votes.clear();
|
|
|
+ }
|
|
|
+
|
|
|
fn cache_epoch_boundary_info(&mut self, bank: &Bank) {
|
|
|
if matches!(self.vote_source, VoteSource::Gossip) {
|
|
|
panic!("Gossip vote thread should not be checking epoch boundary");
|
|
|
@@ -603,18 +546,6 @@ impl ThreadLocalUnprocessedPackets {
|
|
|
self.unprocessed_packet_batches.iter_mut()
|
|
|
}
|
|
|
|
|
|
- fn forward_option(&self) -> ForwardOption {
|
|
|
- match self.thread_type {
|
|
|
- ThreadType::Transactions => ForwardOption::ForwardTransaction,
|
|
|
- ThreadType::Voting(VoteSource::Tpu) => ForwardOption::ForwardTpuVote,
|
|
|
- ThreadType::Voting(VoteSource::Gossip) => ForwardOption::NotForward,
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- fn clear(&mut self) {
|
|
|
- self.unprocessed_packet_batches.clear();
|
|
|
- }
|
|
|
-
|
|
|
fn insert_batch(
|
|
|
&mut self,
|
|
|
deserialized_packets: Vec<ImmutableDeserializedPacket>,
|
|
|
@@ -626,123 +557,6 @@ impl ThreadLocalUnprocessedPackets {
|
|
|
)
|
|
|
}
|
|
|
|
|
|
- /// Filter out packets that fail to sanitize, or are no longer valid (could be
|
|
|
- /// too old, a duplicate of something already processed). Doing this in batches to avoid
|
|
|
- /// checking bank's blockhash and status cache per transaction which could be bad for performance.
|
|
|
- /// Added valid and sanitized packets to forwarding queue.
|
|
|
- fn filter_forwardable_packets_and_add_batches(
|
|
|
- &mut self,
|
|
|
- bank: Arc<Bank>,
|
|
|
- forward_buffer: &mut ForwardPacketBatchesByAccounts,
|
|
|
- ) -> FilterForwardingResults {
|
|
|
- let mut total_forwardable_packets: usize = 0;
|
|
|
- let mut total_packet_conversion_us: u64 = 0;
|
|
|
- let mut total_filter_packets_us: u64 = 0;
|
|
|
- let mut total_dropped_packets: usize = 0;
|
|
|
-
|
|
|
- let mut original_priority_queue = self.take_priority_queue();
|
|
|
- let original_capacity = original_priority_queue.capacity();
|
|
|
- let mut new_priority_queue = MinMaxHeap::with_capacity(original_capacity);
|
|
|
-
|
|
|
- // indicates if `forward_buffer` still accept more packets, see details at
|
|
|
- // `ForwardPacketBatchesByAccounts.rs`.
|
|
|
- let mut accepting_packets = true;
|
|
|
- // batch iterate through self.unprocessed_packet_batches in desc priority order
|
|
|
- new_priority_queue.extend(
|
|
|
- original_priority_queue
|
|
|
- .drain_desc()
|
|
|
- .chunks(UNPROCESSED_BUFFER_STEP_SIZE)
|
|
|
- .into_iter()
|
|
|
- .flat_map(|packets_to_process| {
|
|
|
- // Only process packets not yet forwarded
|
|
|
- let (forwarded_packets, packets_to_forward) =
|
|
|
- self.prepare_packets_to_forward(packets_to_process);
|
|
|
-
|
|
|
- [
|
|
|
- forwarded_packets,
|
|
|
- if accepting_packets {
|
|
|
- let (
|
|
|
- (sanitized_transactions, transaction_to_packet_indexes),
|
|
|
- packet_conversion_us,
|
|
|
- ) = measure_us!(self.sanitize_unforwarded_packets(
|
|
|
- &packets_to_forward,
|
|
|
- &bank,
|
|
|
- &mut total_dropped_packets
|
|
|
- ));
|
|
|
- saturating_add_assign!(
|
|
|
- total_packet_conversion_us,
|
|
|
- packet_conversion_us
|
|
|
- );
|
|
|
-
|
|
|
- let (forwardable_transaction_indexes, filter_packets_us) =
|
|
|
- measure_us!(Self::filter_invalid_transactions(
|
|
|
- &sanitized_transactions,
|
|
|
- &bank,
|
|
|
- &mut total_dropped_packets
|
|
|
- ));
|
|
|
- saturating_add_assign!(total_filter_packets_us, filter_packets_us);
|
|
|
- saturating_add_assign!(
|
|
|
- total_forwardable_packets,
|
|
|
- forwardable_transaction_indexes.len()
|
|
|
- );
|
|
|
-
|
|
|
- let accepted_packet_indexes =
|
|
|
- Self::add_filtered_packets_to_forward_buffer(
|
|
|
- forward_buffer,
|
|
|
- &packets_to_forward,
|
|
|
- &sanitized_transactions,
|
|
|
- &transaction_to_packet_indexes,
|
|
|
- &forwardable_transaction_indexes,
|
|
|
- &mut total_dropped_packets,
|
|
|
- &bank.feature_set,
|
|
|
- );
|
|
|
- accepting_packets = accepted_packet_indexes.len()
|
|
|
- == forwardable_transaction_indexes.len();
|
|
|
-
|
|
|
- self.unprocessed_packet_batches
|
|
|
- .mark_accepted_packets_as_forwarded(
|
|
|
- &packets_to_forward,
|
|
|
- &accepted_packet_indexes,
|
|
|
- );
|
|
|
-
|
|
|
- Self::collect_retained_packets(
|
|
|
- &mut self.unprocessed_packet_batches.message_hash_to_transaction,
|
|
|
- &packets_to_forward,
|
|
|
- &Self::prepare_filtered_packet_indexes(
|
|
|
- &transaction_to_packet_indexes,
|
|
|
- &forwardable_transaction_indexes,
|
|
|
- ),
|
|
|
- )
|
|
|
- } else {
|
|
|
- // skip sanitizing and filtering if not longer able to add more packets for forwarding
|
|
|
- saturating_add_assign!(total_dropped_packets, packets_to_forward.len());
|
|
|
- packets_to_forward
|
|
|
- },
|
|
|
- ]
|
|
|
- .concat()
|
|
|
- }),
|
|
|
- );
|
|
|
-
|
|
|
- // replace packet priority queue
|
|
|
- self.unprocessed_packet_batches.packet_priority_queue = new_priority_queue;
|
|
|
- self.verify_priority_queue(original_capacity);
|
|
|
-
|
|
|
- // Assert unprocessed queue is still consistent
|
|
|
- assert_eq!(
|
|
|
- self.unprocessed_packet_batches.packet_priority_queue.len(),
|
|
|
- self.unprocessed_packet_batches
|
|
|
- .message_hash_to_transaction
|
|
|
- .len()
|
|
|
- );
|
|
|
-
|
|
|
- FilterForwardingResults {
|
|
|
- total_forwardable_packets,
|
|
|
- total_dropped_packets,
|
|
|
- total_packet_conversion_us,
|
|
|
- total_filter_packets_us,
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/// Take self.unprocessed_packet_batches's priority_queue out, leave empty MinMaxHeap in its place.
|
|
|
fn take_priority_queue(&mut self) -> MinMaxHeap<Arc<ImmutableDeserializedPacket>> {
|
|
|
std::mem::replace(
|
|
|
@@ -768,105 +582,6 @@ impl ThreadLocalUnprocessedPackets {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- /// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding.
|
|
|
- fn sanitize_unforwarded_packets(
|
|
|
- &mut self,
|
|
|
- packets_to_process: &[Arc<ImmutableDeserializedPacket>],
|
|
|
- bank: &Bank,
|
|
|
- total_dropped_packets: &mut usize,
|
|
|
- ) -> (Vec<RuntimeTransaction<SanitizedTransaction>>, Vec<usize>) {
|
|
|
- // Get ref of ImmutableDeserializedPacket
|
|
|
- let deserialized_packets = packets_to_process.iter().map(|p| &**p);
|
|
|
- let (transactions, transaction_to_packet_indexes): (Vec<_>, Vec<_>) = deserialized_packets
|
|
|
- .enumerate()
|
|
|
- .filter_map(|(packet_index, deserialized_packet)| {
|
|
|
- deserialized_packet
|
|
|
- .build_sanitized_transaction(
|
|
|
- bank.vote_only_bank(),
|
|
|
- bank,
|
|
|
- bank.get_reserved_account_keys(),
|
|
|
- )
|
|
|
- .map(|(transaction, _deactivation_slot)| (transaction, packet_index))
|
|
|
- })
|
|
|
- .unzip();
|
|
|
-
|
|
|
- let filtered_count = packets_to_process.len().saturating_sub(transactions.len());
|
|
|
- saturating_add_assign!(*total_dropped_packets, filtered_count);
|
|
|
-
|
|
|
- (transactions, transaction_to_packet_indexes)
|
|
|
- }
|
|
|
-
|
|
|
- /// Checks sanitized transactions against bank, returns valid transaction indexes
|
|
|
- fn filter_invalid_transactions(
|
|
|
- transactions: &[RuntimeTransaction<SanitizedTransaction>],
|
|
|
- bank: &Bank,
|
|
|
- total_dropped_packets: &mut usize,
|
|
|
- ) -> Vec<usize> {
|
|
|
- let filter = vec![Ok(()); transactions.len()];
|
|
|
- let results = bank.check_transactions_with_forwarding_delay(
|
|
|
- transactions,
|
|
|
- &filter,
|
|
|
- FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET,
|
|
|
- );
|
|
|
-
|
|
|
- let filtered_count = transactions.len().saturating_sub(results.len());
|
|
|
- saturating_add_assign!(*total_dropped_packets, filtered_count);
|
|
|
-
|
|
|
- results
|
|
|
- .iter()
|
|
|
- .enumerate()
|
|
|
- .filter_map(|(tx_index, result)| result.as_ref().ok().map(|_| tx_index))
|
|
|
- .collect_vec()
|
|
|
- }
|
|
|
-
|
|
|
- fn prepare_filtered_packet_indexes(
|
|
|
- transaction_to_packet_indexes: &[usize],
|
|
|
- retained_transaction_indexes: &[usize],
|
|
|
- ) -> Vec<usize> {
|
|
|
- retained_transaction_indexes
|
|
|
- .iter()
|
|
|
- .map(|tx_index| transaction_to_packet_indexes[*tx_index])
|
|
|
- .collect_vec()
|
|
|
- }
|
|
|
-
|
|
|
- /// try to add filtered forwardable and valid packets to forward buffer;
|
|
|
- /// returns vector of packet indexes that were accepted for forwarding.
|
|
|
- fn add_filtered_packets_to_forward_buffer(
|
|
|
- forward_buffer: &mut ForwardPacketBatchesByAccounts,
|
|
|
- packets_to_process: &[Arc<ImmutableDeserializedPacket>],
|
|
|
- transactions: &[RuntimeTransaction<SanitizedTransaction>],
|
|
|
- transaction_to_packet_indexes: &[usize],
|
|
|
- forwardable_transaction_indexes: &[usize],
|
|
|
- total_dropped_packets: &mut usize,
|
|
|
- feature_set: &FeatureSet,
|
|
|
- ) -> Vec<usize> {
|
|
|
- let mut added_packets_count: usize = 0;
|
|
|
- let mut accepted_packet_indexes = Vec::with_capacity(transaction_to_packet_indexes.len());
|
|
|
- for forwardable_transaction_index in forwardable_transaction_indexes {
|
|
|
- let sanitized_transaction = &transactions[*forwardable_transaction_index];
|
|
|
- let forwardable_packet_index =
|
|
|
- transaction_to_packet_indexes[*forwardable_transaction_index];
|
|
|
- let immutable_deserialized_packet =
|
|
|
- packets_to_process[forwardable_packet_index].clone();
|
|
|
- if !forward_buffer.try_add_packet(
|
|
|
- sanitized_transaction,
|
|
|
- immutable_deserialized_packet,
|
|
|
- feature_set,
|
|
|
- ) {
|
|
|
- break;
|
|
|
- }
|
|
|
- accepted_packet_indexes.push(forwardable_packet_index);
|
|
|
- saturating_add_assign!(added_packets_count, 1);
|
|
|
- }
|
|
|
-
|
|
|
- let filtered_count = forwardable_transaction_indexes
|
|
|
- .len()
|
|
|
- .saturating_sub(added_packets_count);
|
|
|
- saturating_add_assign!(*total_dropped_packets, filtered_count);
|
|
|
-
|
|
|
- accepted_packet_indexes
|
|
|
- }
|
|
|
-
|
|
|
fn collect_retained_packets(
|
|
|
message_hash_to_transaction: &mut HashMap<Hash, DeserializedPacket>,
|
|
|
packets_to_process: &[Arc<ImmutableDeserializedPacket>],
|
|
|
@@ -959,36 +674,6 @@ impl ThreadLocalUnprocessedPackets {
|
|
|
|
|
|
reached_end_of_slot
|
|
|
}
|
|
|
-
|
|
|
- /// Prepare a chunk of packets for forwarding, filter out already forwarded packets while
|
|
|
- /// counting tracers.
|
|
|
- /// Returns Vec of unforwarded packets, and Vec<bool> of same size each indicates corresponding
|
|
|
- /// packet is tracer packet.
|
|
|
- fn prepare_packets_to_forward(
|
|
|
- &self,
|
|
|
- packets_to_forward: impl Iterator<Item = Arc<ImmutableDeserializedPacket>>,
|
|
|
- ) -> (
|
|
|
- Vec<Arc<ImmutableDeserializedPacket>>,
|
|
|
- Vec<Arc<ImmutableDeserializedPacket>>,
|
|
|
- ) {
|
|
|
- let mut forwarded_packets: Vec<Arc<ImmutableDeserializedPacket>> = vec![];
|
|
|
- let forwardable_packets = packets_to_forward
|
|
|
- .into_iter()
|
|
|
- .filter_map(|immutable_deserialized_packet| {
|
|
|
- if !self
|
|
|
- .unprocessed_packet_batches
|
|
|
- .is_forwarded(&immutable_deserialized_packet)
|
|
|
- {
|
|
|
- Some(immutable_deserialized_packet)
|
|
|
- } else {
|
|
|
- forwarded_packets.push(immutable_deserialized_packet);
|
|
|
- None
|
|
|
- }
|
|
|
- })
|
|
|
- .collect();
|
|
|
-
|
|
|
- (forwarded_packets, forwardable_packets)
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
#[cfg(test)]
|
|
|
@@ -996,14 +681,12 @@ mod tests {
|
|
|
use {
|
|
|
super::*,
|
|
|
itertools::iproduct,
|
|
|
- solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
|
|
solana_perf::packet::{Packet, PacketFlags},
|
|
|
solana_runtime::genesis_utils,
|
|
|
solana_sdk::{
|
|
|
hash::Hash,
|
|
|
signature::{Keypair, Signer},
|
|
|
system_transaction,
|
|
|
- transaction::Transaction,
|
|
|
},
|
|
|
solana_vote::vote_transaction::new_tower_sync_transaction,
|
|
|
solana_vote_program::vote_state::TowerSync,
|
|
|
@@ -1061,126 +744,6 @@ mod tests {
|
|
|
assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]);
|
|
|
}
|
|
|
|
|
|
- #[test]
|
|
|
- fn test_filter_and_forward_with_account_limits() {
|
|
|
- solana_logger::setup();
|
|
|
- let GenesisConfigInfo {
|
|
|
- genesis_config,
|
|
|
- mint_keypair,
|
|
|
- ..
|
|
|
- } = create_genesis_config(10);
|
|
|
- let (current_bank, _bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config);
|
|
|
-
|
|
|
- let simple_transactions: Vec<Transaction> = (0..256)
|
|
|
- .map(|_id| {
|
|
|
- // packets are deserialized upon receiving, failed packets will not be
|
|
|
- // forwarded; Therefore we need to create real packets here.
|
|
|
- let key1 = Keypair::new();
|
|
|
- system_transaction::transfer(
|
|
|
- &mint_keypair,
|
|
|
- &key1.pubkey(),
|
|
|
- genesis_config.rent.minimum_balance(0),
|
|
|
- genesis_config.hash(),
|
|
|
- )
|
|
|
- })
|
|
|
- .collect_vec();
|
|
|
-
|
|
|
- let mut packets: Vec<DeserializedPacket> = simple_transactions
|
|
|
- .iter()
|
|
|
- .enumerate()
|
|
|
- .map(|(packets_id, transaction)| {
|
|
|
- let mut p = Packet::from_data(None, transaction).unwrap();
|
|
|
- p.meta_mut().port = packets_id as u16;
|
|
|
- DeserializedPacket::new(p).unwrap()
|
|
|
- })
|
|
|
- .collect_vec();
|
|
|
-
|
|
|
- // all packets are forwarded
|
|
|
- {
|
|
|
- let buffered_packet_batches: UnprocessedPacketBatches =
|
|
|
- UnprocessedPacketBatches::from_iter(packets.clone(), packets.len());
|
|
|
- let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage(
|
|
|
- buffered_packet_batches,
|
|
|
- ThreadType::Transactions,
|
|
|
- );
|
|
|
- let mut forward_packet_batches_by_accounts =
|
|
|
- ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
|
|
|
-
|
|
|
- let FilterForwardingResults {
|
|
|
- total_forwardable_packets,
|
|
|
- ..
|
|
|
- } = transaction_storage.filter_forwardable_packets_and_add_batches(
|
|
|
- current_bank.clone(),
|
|
|
- &mut forward_packet_batches_by_accounts,
|
|
|
- );
|
|
|
- assert_eq!(total_forwardable_packets, 256);
|
|
|
-
|
|
|
- // packets in a batch are forwarded in arbitrary order; verify the ports match after
|
|
|
- // sorting
|
|
|
- let expected_ports: Vec<_> = (0..256).collect();
|
|
|
- let mut forwarded_ports: Vec<_> = forward_packet_batches_by_accounts
|
|
|
- .iter_batches()
|
|
|
- .flat_map(|batch| batch.get_forwardable_packets().map(|p| p.meta().port))
|
|
|
- .collect();
|
|
|
- forwarded_ports.sort_unstable();
|
|
|
- assert_eq!(expected_ports, forwarded_ports);
|
|
|
- }
|
|
|
-
|
|
|
- // some packets are forwarded
|
|
|
- {
|
|
|
- let num_already_forwarded = 16;
|
|
|
- for packet in &mut packets[0..num_already_forwarded] {
|
|
|
- packet.forwarded = true;
|
|
|
- }
|
|
|
- let buffered_packet_batches: UnprocessedPacketBatches =
|
|
|
- UnprocessedPacketBatches::from_iter(packets.clone(), packets.len());
|
|
|
- let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage(
|
|
|
- buffered_packet_batches,
|
|
|
- ThreadType::Transactions,
|
|
|
- );
|
|
|
- let mut forward_packet_batches_by_accounts =
|
|
|
- ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
|
|
|
- let FilterForwardingResults {
|
|
|
- total_forwardable_packets,
|
|
|
- ..
|
|
|
- } = transaction_storage.filter_forwardable_packets_and_add_batches(
|
|
|
- current_bank.clone(),
|
|
|
- &mut forward_packet_batches_by_accounts,
|
|
|
- );
|
|
|
- assert_eq!(
|
|
|
- total_forwardable_packets,
|
|
|
- packets.len() - num_already_forwarded
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
- // some packets are invalid (already processed)
|
|
|
- {
|
|
|
- let num_already_processed = 16;
|
|
|
- for tx in &simple_transactions[0..num_already_processed] {
|
|
|
- assert_eq!(current_bank.process_transaction(tx), Ok(()));
|
|
|
- }
|
|
|
- let buffered_packet_batches: UnprocessedPacketBatches =
|
|
|
- UnprocessedPacketBatches::from_iter(packets.clone(), packets.len());
|
|
|
- let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage(
|
|
|
- buffered_packet_batches,
|
|
|
- ThreadType::Transactions,
|
|
|
- );
|
|
|
- let mut forward_packet_batches_by_accounts =
|
|
|
- ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
|
|
|
- let FilterForwardingResults {
|
|
|
- total_forwardable_packets,
|
|
|
- ..
|
|
|
- } = transaction_storage.filter_forwardable_packets_and_add_batches(
|
|
|
- current_bank,
|
|
|
- &mut forward_packet_batches_by_accounts,
|
|
|
- );
|
|
|
- assert_eq!(
|
|
|
- total_forwardable_packets,
|
|
|
- packets.len() - num_already_processed
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
#[test]
|
|
|
fn test_unprocessed_transaction_storage_insert() -> Result<(), Box<dyn Error>> {
|
|
|
let keypair = Keypair::new();
|
|
|
@@ -1309,96 +872,4 @@ mod tests {
|
|
|
assert_eq!(1, transaction_storage.len());
|
|
|
Ok(())
|
|
|
}
|
|
|
-
|
|
|
- #[test]
|
|
|
- fn test_prepare_packets_to_forward() {
|
|
|
- solana_logger::setup();
|
|
|
- let GenesisConfigInfo {
|
|
|
- genesis_config,
|
|
|
- mint_keypair,
|
|
|
- ..
|
|
|
- } = create_genesis_config(10);
|
|
|
-
|
|
|
- let simple_transactions: Vec<Transaction> = (0..256)
|
|
|
- .map(|_id| {
|
|
|
- // packets are deserialized upon receiving, failed packets will not be
|
|
|
- // forwarded; Therefore we need to create real packets here.
|
|
|
- let key1 = Keypair::new();
|
|
|
- system_transaction::transfer(
|
|
|
- &mint_keypair,
|
|
|
- &key1.pubkey(),
|
|
|
- genesis_config.rent.minimum_balance(0),
|
|
|
- genesis_config.hash(),
|
|
|
- )
|
|
|
- })
|
|
|
- .collect_vec();
|
|
|
-
|
|
|
- let mut packets: Vec<DeserializedPacket> = simple_transactions
|
|
|
- .iter()
|
|
|
- .enumerate()
|
|
|
- .map(|(packets_id, transaction)| {
|
|
|
- let mut p = Packet::from_data(None, transaction).unwrap();
|
|
|
- p.meta_mut().port = packets_id as u16;
|
|
|
- DeserializedPacket::new(p).unwrap()
|
|
|
- })
|
|
|
- .collect_vec();
|
|
|
-
|
|
|
- // test preparing buffered packets for forwarding
|
|
|
- let test_prepareing_buffered_packets_for_forwarding =
|
|
|
- |buffered_packet_batches: UnprocessedPacketBatches| -> usize {
|
|
|
- let mut total_packets_to_forward: usize = 0;
|
|
|
-
|
|
|
- let mut unprocessed_transactions = ThreadLocalUnprocessedPackets {
|
|
|
- unprocessed_packet_batches: buffered_packet_batches,
|
|
|
- thread_type: ThreadType::Transactions,
|
|
|
- };
|
|
|
-
|
|
|
- let mut original_priority_queue = unprocessed_transactions.take_priority_queue();
|
|
|
- let _ = original_priority_queue
|
|
|
- .drain_desc()
|
|
|
- .chunks(128usize)
|
|
|
- .into_iter()
|
|
|
- .flat_map(|packets_to_process| {
|
|
|
- let (_, packets_to_forward) =
|
|
|
- unprocessed_transactions.prepare_packets_to_forward(packets_to_process);
|
|
|
- total_packets_to_forward += packets_to_forward.len();
|
|
|
- packets_to_forward
|
|
|
- })
|
|
|
- .collect::<MinMaxHeap<Arc<ImmutableDeserializedPacket>>>();
|
|
|
- total_packets_to_forward
|
|
|
- };
|
|
|
-
|
|
|
- {
|
|
|
- let buffered_packet_batches: UnprocessedPacketBatches =
|
|
|
- UnprocessedPacketBatches::from_iter(packets.clone(), packets.len());
|
|
|
- let total_packets_to_forward =
|
|
|
- test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
|
|
|
- assert_eq!(total_packets_to_forward, 256);
|
|
|
- }
|
|
|
-
|
|
|
- // some packets are forwarded
|
|
|
- {
|
|
|
- let num_already_forwarded = 16;
|
|
|
- for packet in &mut packets[0..num_already_forwarded] {
|
|
|
- packet.forwarded = true;
|
|
|
- }
|
|
|
- let buffered_packet_batches: UnprocessedPacketBatches =
|
|
|
- UnprocessedPacketBatches::from_iter(packets.clone(), packets.len());
|
|
|
- let total_packets_to_forward =
|
|
|
- test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
|
|
|
- assert_eq!(total_packets_to_forward, 256 - num_already_forwarded);
|
|
|
- }
|
|
|
-
|
|
|
- // all packets are forwarded
|
|
|
- {
|
|
|
- for packet in &mut packets {
|
|
|
- packet.forwarded = true;
|
|
|
- }
|
|
|
- let buffered_packet_batches: UnprocessedPacketBatches =
|
|
|
- UnprocessedPacketBatches::from_iter(packets.clone(), packets.len());
|
|
|
- let total_packets_to_forward =
|
|
|
- test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
|
|
|
- assert_eq!(total_packets_to_forward, 0);
|
|
|
- }
|
|
|
- }
|
|
|
}
|