receive_and_buffer_utils.rs 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. use {
  2. agave_banking_stage_ingress_types::BankingPacketBatch,
  3. crossbeam_channel::{unbounded, Receiver, Sender},
  4. rand::prelude::*,
  5. solana_account::AccountSharedData,
  6. solana_compute_budget_interface::ComputeBudgetInstruction,
  7. solana_core::banking_stage::{
  8. decision_maker::BufferedPacketsDecision,
  9. packet_deserializer::PacketDeserializer,
  10. transaction_scheduler::{
  11. receive_and_buffer::{
  12. ReceiveAndBuffer, SanitizedTransactionReceiveAndBuffer,
  13. TransactionViewReceiveAndBuffer,
  14. },
  15. transaction_state_container::StateContainer,
  16. },
  17. TOTAL_BUFFERED_PACKETS,
  18. },
  19. solana_genesis_config::GenesisConfig,
  20. solana_hash::Hash,
  21. solana_instruction::{AccountMeta, Instruction},
  22. solana_keypair::Keypair,
  23. solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
  24. solana_message::{Message, VersionedMessage},
  25. solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS},
  26. solana_poh::poh_recorder::BankStart,
  27. solana_pubkey::Pubkey,
  28. solana_runtime::{bank::Bank, bank_forks::BankForks},
  29. solana_sdk_ids::system_program,
  30. solana_signer::Signer,
  31. solana_transaction::versioned::VersionedTransaction,
  32. std::{
  33. sync::{Arc, RwLock},
  34. time::Instant,
  35. },
  36. };
  37. // the max number of instructions of given type that we can put into packet.
  38. pub const MAX_INSTRUCTIONS_PER_TRANSACTION: usize = 204;
  39. fn create_accounts(num_accounts: usize, genesis_config: &mut GenesisConfig) -> Vec<Keypair> {
  40. let owner = &system_program::id();
  41. let account_keypairs: Vec<Keypair> = (0..num_accounts).map(|_| Keypair::new()).collect();
  42. for keypair in account_keypairs.iter() {
  43. genesis_config.add_account(keypair.pubkey(), AccountSharedData::new(10000, 0, owner));
  44. }
  45. account_keypairs
  46. }
  47. /// Structure that returns correct provided blockhash or some incorrect hash
  48. /// with given probability.
  49. pub struct FaultyBlockhash {
  50. blockhash: Hash,
  51. probability_invalid_blockhash: f64,
  52. }
  53. impl FaultyBlockhash {
  54. /// Create a new faulty hash generator
  55. pub fn new(blockhash: Hash, probability_invalid_blockhash: f64) -> Self {
  56. Self {
  57. blockhash,
  58. probability_invalid_blockhash,
  59. }
  60. }
  61. pub fn get<R: Rng>(&self, rng: &mut R) -> Hash {
  62. if rng.gen::<f64>() < self.probability_invalid_blockhash {
  63. Hash::default()
  64. } else {
  65. self.blockhash
  66. }
  67. }
  68. }
  69. fn generate_transactions(
  70. num_txs: usize,
  71. bank: Arc<Bank>,
  72. fee_payers: &[Keypair],
  73. num_instructions_per_tx: usize,
  74. probability_invalid_blockhash: f64,
  75. set_rand_cu_price: bool,
  76. ) -> BankingPacketBatch {
  77. assert!(num_instructions_per_tx <= MAX_INSTRUCTIONS_PER_TRANSACTION);
  78. if set_rand_cu_price {
  79. assert!(num_instructions_per_tx > 0,
  80. "`num_instructions_per_tx` must be at least 1 when `set_rand_cu_price` flag is set to count\
  81. the set_compute_unit_price instruction.");
  82. }
  83. let blockhash = FaultyBlockhash::new(bank.last_blockhash(), probability_invalid_blockhash);
  84. let mut rng = rand::thread_rng();
  85. let mut fee_payers = fee_payers.iter().cycle();
  86. let txs: Vec<VersionedTransaction> = (0..num_txs)
  87. .map(|_| {
  88. let fee_payer = fee_payers.next().unwrap();
  89. let program_id = Pubkey::new_unique();
  90. let mut instructions = Vec::with_capacity(num_instructions_per_tx);
  91. if set_rand_cu_price {
  92. // Experiments with different distributions didn't show much of the effect on the performance.
  93. let compute_unit_price = rng.gen_range(0..1000);
  94. instructions.push(ComputeBudgetInstruction::set_compute_unit_price(
  95. compute_unit_price,
  96. ));
  97. }
  98. for _ in 0..num_instructions_per_tx.saturating_sub(1) {
  99. instructions.push(Instruction::new_with_bytes(
  100. program_id,
  101. &[0],
  102. vec![AccountMeta {
  103. pubkey: fee_payer.pubkey(),
  104. is_signer: true,
  105. is_writable: true,
  106. }],
  107. ));
  108. }
  109. VersionedTransaction::try_new(
  110. VersionedMessage::Legacy(Message::new_with_blockhash(
  111. &instructions,
  112. Some(&fee_payer.pubkey()),
  113. &blockhash.get(&mut rng),
  114. )),
  115. &[&fee_payer],
  116. )
  117. .unwrap()
  118. })
  119. .collect();
  120. BankingPacketBatch::new(to_packet_batches(&txs, NUM_PACKETS))
  121. }
  122. pub trait ReceiveAndBufferCreator {
  123. fn create(
  124. receiver: Receiver<Arc<Vec<PacketBatch>>>,
  125. bank_forks: Arc<RwLock<BankForks>>,
  126. ) -> Self;
  127. }
  128. impl ReceiveAndBufferCreator for TransactionViewReceiveAndBuffer {
  129. fn create(
  130. receiver: Receiver<Arc<Vec<PacketBatch>>>,
  131. bank_forks: Arc<RwLock<BankForks>>,
  132. ) -> Self {
  133. TransactionViewReceiveAndBuffer {
  134. receiver,
  135. bank_forks,
  136. }
  137. }
  138. }
  139. impl ReceiveAndBufferCreator for SanitizedTransactionReceiveAndBuffer {
  140. fn create(
  141. receiver: Receiver<Arc<Vec<PacketBatch>>>,
  142. bank_forks: Arc<RwLock<BankForks>>,
  143. ) -> Self {
  144. SanitizedTransactionReceiveAndBuffer::new(PacketDeserializer::new(receiver), bank_forks)
  145. }
  146. }
  147. pub struct ReceiveAndBufferSetup<T: ReceiveAndBuffer> {
  148. // prepared transaction batches
  149. pub txs: BankingPacketBatch,
  150. // to send prepared transaction batches
  151. pub sender: Sender<Arc<Vec<PacketBatch>>>,
  152. // received transactions will be inserted into container
  153. pub container: <T as ReceiveAndBuffer>::Container,
  154. // receive_and_buffer for sdk or transaction_view
  155. pub receive_and_buffer: T,
  156. // hardcoded for bench to always Consume
  157. pub decision: BufferedPacketsDecision,
  158. }
  159. pub fn setup_receive_and_buffer<T: ReceiveAndBuffer + ReceiveAndBufferCreator>(
  160. num_txs: usize,
  161. num_instructions_per_tx: usize,
  162. probability_invalid_blockhash: f64,
  163. set_rand_cu_price: bool,
  164. use_single_payer: bool,
  165. ) -> ReceiveAndBufferSetup<T> {
  166. let GenesisConfigInfo {
  167. mut genesis_config, ..
  168. } = create_genesis_config(100_000);
  169. let num_fee_payers = if use_single_payer { 1 } else { num_txs };
  170. // fee payers will be verified, so have to create them properly
  171. let fee_payers = create_accounts(num_fee_payers, &mut genesis_config);
  172. let (bank, bank_forks) =
  173. Bank::new_for_benches(&genesis_config).wrap_with_bank_forks_for_tests();
  174. let bank_start = BankStart {
  175. working_bank: bank.clone(),
  176. bank_creation_time: Arc::new(Instant::now()),
  177. };
  178. let (sender, receiver) = unbounded();
  179. let receive_and_buffer = T::create(receiver, bank_forks);
  180. let decision = BufferedPacketsDecision::Consume(bank_start);
  181. let txs = generate_transactions(
  182. num_txs,
  183. bank.clone(),
  184. &fee_payers,
  185. num_instructions_per_tx,
  186. probability_invalid_blockhash,
  187. set_rand_cu_price,
  188. );
  189. let container = <T as ReceiveAndBuffer>::Container::with_capacity(TOTAL_BUFFERED_PACKETS);
  190. ReceiveAndBufferSetup {
  191. txs,
  192. sender,
  193. container,
  194. receive_and_buffer,
  195. decision,
  196. }
  197. }