receive_and_buffer_utils.rs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. use {
  2. agave_banking_stage_ingress_types::BankingPacketBatch,
  3. crossbeam_channel::{unbounded, Receiver, Sender},
  4. rand::prelude::*,
  5. solana_account::AccountSharedData,
  6. solana_compute_budget_interface::ComputeBudgetInstruction,
  7. solana_core::banking_stage::{
  8. decision_maker::BufferedPacketsDecision,
  9. transaction_scheduler::{
  10. receive_and_buffer::{ReceiveAndBuffer, TransactionViewReceiveAndBuffer},
  11. transaction_state_container::StateContainer,
  12. },
  13. TOTAL_BUFFERED_PACKETS,
  14. },
  15. solana_genesis_config::GenesisConfig,
  16. solana_hash::Hash,
  17. solana_instruction::{AccountMeta, Instruction},
  18. solana_keypair::Keypair,
  19. solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
  20. solana_message::{Message, VersionedMessage},
  21. solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS},
  22. solana_pubkey::Pubkey,
  23. solana_runtime::{bank::Bank, bank_forks::BankForks},
  24. solana_sdk_ids::system_program,
  25. solana_signer::Signer,
  26. solana_transaction::versioned::VersionedTransaction,
  27. std::sync::{Arc, RwLock},
  28. };
  29. // the max number of instructions of given type that we can put into packet.
  30. pub const MAX_INSTRUCTIONS_PER_TRANSACTION: usize = 204;
  31. fn create_accounts(num_accounts: usize, genesis_config: &mut GenesisConfig) -> Vec<Keypair> {
  32. let owner = &system_program::id();
  33. let account_keypairs: Vec<Keypair> = (0..num_accounts).map(|_| Keypair::new()).collect();
  34. for keypair in account_keypairs.iter() {
  35. genesis_config.add_account(keypair.pubkey(), AccountSharedData::new(10000, 0, owner));
  36. }
  37. account_keypairs
  38. }
  39. /// Structure that returns correct provided blockhash or some incorrect hash
  40. /// with given probability.
  41. pub struct FaultyBlockhash {
  42. blockhash: Hash,
  43. probability_invalid_blockhash: f64,
  44. }
  45. impl FaultyBlockhash {
  46. /// Create a new faulty hash generator
  47. pub fn new(blockhash: Hash, probability_invalid_blockhash: f64) -> Self {
  48. Self {
  49. blockhash,
  50. probability_invalid_blockhash,
  51. }
  52. }
  53. pub fn get<R: Rng>(&self, rng: &mut R) -> Hash {
  54. if rng.gen::<f64>() < self.probability_invalid_blockhash {
  55. Hash::default()
  56. } else {
  57. self.blockhash
  58. }
  59. }
  60. }
  61. fn generate_transactions(
  62. num_txs: usize,
  63. bank: Arc<Bank>,
  64. fee_payers: &[Keypair],
  65. num_instructions_per_tx: usize,
  66. probability_invalid_blockhash: f64,
  67. set_rand_cu_price: bool,
  68. ) -> BankingPacketBatch {
  69. assert!(num_instructions_per_tx <= MAX_INSTRUCTIONS_PER_TRANSACTION);
  70. if set_rand_cu_price {
  71. assert!(
  72. num_instructions_per_tx > 0,
  73. "`num_instructions_per_tx` must be at least 1 when `set_rand_cu_price` flag is set to \
  74. count the set_compute_unit_price instruction."
  75. );
  76. }
  77. let blockhash = FaultyBlockhash::new(bank.last_blockhash(), probability_invalid_blockhash);
  78. let mut rng = rand::thread_rng();
  79. let mut fee_payers = fee_payers.iter().cycle();
  80. let txs: Vec<VersionedTransaction> = (0..num_txs)
  81. .map(|_| {
  82. let fee_payer = fee_payers.next().unwrap();
  83. let program_id = Pubkey::new_unique();
  84. let mut instructions = Vec::with_capacity(num_instructions_per_tx);
  85. if set_rand_cu_price {
  86. // Experiments with different distributions didn't show much of the effect on the performance.
  87. let compute_unit_price = rng.gen_range(0..1000);
  88. instructions.push(ComputeBudgetInstruction::set_compute_unit_price(
  89. compute_unit_price,
  90. ));
  91. }
  92. for _ in 0..num_instructions_per_tx.saturating_sub(1) {
  93. instructions.push(Instruction::new_with_bytes(
  94. program_id,
  95. &[0],
  96. vec![AccountMeta {
  97. pubkey: fee_payer.pubkey(),
  98. is_signer: true,
  99. is_writable: true,
  100. }],
  101. ));
  102. }
  103. VersionedTransaction::try_new(
  104. VersionedMessage::Legacy(Message::new_with_blockhash(
  105. &instructions,
  106. Some(&fee_payer.pubkey()),
  107. &blockhash.get(&mut rng),
  108. )),
  109. &[&fee_payer],
  110. )
  111. .unwrap()
  112. })
  113. .collect();
  114. BankingPacketBatch::new(to_packet_batches(&txs, NUM_PACKETS))
  115. }
  116. pub trait ReceiveAndBufferCreator {
  117. fn create(
  118. receiver: Receiver<Arc<Vec<PacketBatch>>>,
  119. bank_forks: Arc<RwLock<BankForks>>,
  120. ) -> Self;
  121. }
  122. impl ReceiveAndBufferCreator for TransactionViewReceiveAndBuffer {
  123. fn create(
  124. receiver: Receiver<Arc<Vec<PacketBatch>>>,
  125. bank_forks: Arc<RwLock<BankForks>>,
  126. ) -> Self {
  127. TransactionViewReceiveAndBuffer {
  128. receiver,
  129. bank_forks,
  130. }
  131. }
  132. }
  133. pub struct ReceiveAndBufferSetup<T: ReceiveAndBuffer> {
  134. // prepared transaction batches
  135. pub txs: BankingPacketBatch,
  136. // to send prepared transaction batches
  137. pub sender: Sender<Arc<Vec<PacketBatch>>>,
  138. // received transactions will be inserted into container
  139. pub container: <T as ReceiveAndBuffer>::Container,
  140. // receive_and_buffer for sdk or transaction_view
  141. pub receive_and_buffer: T,
  142. // hardcoded for bench to always Consume
  143. pub decision: BufferedPacketsDecision,
  144. }
  145. pub fn setup_receive_and_buffer<T: ReceiveAndBuffer + ReceiveAndBufferCreator>(
  146. num_txs: usize,
  147. num_instructions_per_tx: usize,
  148. probability_invalid_blockhash: f64,
  149. set_rand_cu_price: bool,
  150. use_single_payer: bool,
  151. ) -> ReceiveAndBufferSetup<T> {
  152. let GenesisConfigInfo {
  153. mut genesis_config, ..
  154. } = create_genesis_config(100_000);
  155. let num_fee_payers = if use_single_payer { 1 } else { num_txs };
  156. // fee payers will be verified, so have to create them properly
  157. let fee_payers = create_accounts(num_fee_payers, &mut genesis_config);
  158. let (bank, bank_forks) =
  159. Bank::new_for_benches(&genesis_config).wrap_with_bank_forks_for_tests();
  160. let (sender, receiver) = unbounded();
  161. let receive_and_buffer = T::create(receiver, bank_forks);
  162. let decision = BufferedPacketsDecision::Consume(bank.clone());
  163. let txs = generate_transactions(
  164. num_txs,
  165. bank.clone(),
  166. &fee_payers,
  167. num_instructions_per_tx,
  168. probability_invalid_blockhash,
  169. set_rand_cu_price,
  170. );
  171. let container = <T as ReceiveAndBuffer>::Container::with_capacity(TOTAL_BUFFERED_PACKETS);
  172. ReceiveAndBufferSetup {
  173. txs,
  174. sender,
  175. container,
  176. receive_and_buffer,
  177. decision,
  178. }
  179. }