receive_and_buffer_utils.rs 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. use {
  2. agave_banking_stage_ingress_types::BankingPacketBatch,
  3. crossbeam_channel::{unbounded, Receiver, Sender},
  4. rand::prelude::*,
  5. solana_account::AccountSharedData,
  6. solana_compute_budget_interface::ComputeBudgetInstruction,
  7. solana_core::banking_stage::{
  8. decision_maker::BufferedPacketsDecision,
  9. packet_deserializer::PacketDeserializer,
  10. transaction_scheduler::{
  11. receive_and_buffer::{
  12. ReceiveAndBuffer, SanitizedTransactionReceiveAndBuffer,
  13. TransactionViewReceiveAndBuffer,
  14. },
  15. transaction_state_container::StateContainer,
  16. },
  17. TOTAL_BUFFERED_PACKETS,
  18. },
  19. solana_genesis_config::GenesisConfig,
  20. solana_hash::Hash,
  21. solana_instruction::{AccountMeta, Instruction},
  22. solana_keypair::Keypair,
  23. solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
  24. solana_message::{Message, VersionedMessage},
  25. solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS},
  26. solana_poh::poh_recorder::BankStart,
  27. solana_pubkey::Pubkey,
  28. solana_runtime::{bank::Bank, bank_forks::BankForks},
  29. solana_sdk_ids::system_program,
  30. solana_signer::Signer,
  31. solana_transaction::versioned::VersionedTransaction,
  32. std::{
  33. sync::{atomic::AtomicBool, Arc, RwLock},
  34. time::Instant,
  35. },
  36. };
  37. // the max number of instructions of given type that we can put into packet.
  38. pub const MAX_INSTRUCTIONS_PER_TRANSACTION: usize = 204;
  39. fn create_accounts(num_accounts: usize, genesis_config: &mut GenesisConfig) -> Vec<Keypair> {
  40. let owner = &system_program::id();
  41. let account_keypairs: Vec<Keypair> = (0..num_accounts).map(|_| Keypair::new()).collect();
  42. for keypair in account_keypairs.iter() {
  43. genesis_config.add_account(keypair.pubkey(), AccountSharedData::new(10000, 0, owner));
  44. }
  45. account_keypairs
  46. }
  47. /// Structure that returns correct provided blockhash or some incorrect hash
  48. /// with given probability.
  49. pub struct FaultyBlockhash {
  50. blockhash: Hash,
  51. probability_invalid_blockhash: f64,
  52. }
  53. impl FaultyBlockhash {
  54. /// Create a new faulty hash generator
  55. pub fn new(blockhash: Hash, probability_invalid_blockhash: f64) -> Self {
  56. Self {
  57. blockhash,
  58. probability_invalid_blockhash,
  59. }
  60. }
  61. pub fn get<R: Rng>(&self, rng: &mut R) -> Hash {
  62. if rng.gen::<f64>() < self.probability_invalid_blockhash {
  63. Hash::default()
  64. } else {
  65. self.blockhash
  66. }
  67. }
  68. }
  69. fn generate_transactions(
  70. num_txs: usize,
  71. bank: Arc<Bank>,
  72. fee_payers: &[Keypair],
  73. num_instructions_per_tx: usize,
  74. probability_invalid_blockhash: f64,
  75. set_rand_cu_price: bool,
  76. ) -> BankingPacketBatch {
  77. assert!(num_instructions_per_tx <= MAX_INSTRUCTIONS_PER_TRANSACTION);
  78. if set_rand_cu_price {
  79. assert!(
  80. num_instructions_per_tx > 0,
  81. "`num_instructions_per_tx` must be at least 1 when `set_rand_cu_price` flag is set to \
  82. count the set_compute_unit_price instruction."
  83. );
  84. }
  85. let blockhash = FaultyBlockhash::new(bank.last_blockhash(), probability_invalid_blockhash);
  86. let mut rng = rand::thread_rng();
  87. let mut fee_payers = fee_payers.iter().cycle();
  88. let txs: Vec<VersionedTransaction> = (0..num_txs)
  89. .map(|_| {
  90. let fee_payer = fee_payers.next().unwrap();
  91. let program_id = Pubkey::new_unique();
  92. let mut instructions = Vec::with_capacity(num_instructions_per_tx);
  93. if set_rand_cu_price {
  94. // Experiments with different distributions didn't show much of the effect on the performance.
  95. let compute_unit_price = rng.gen_range(0..1000);
  96. instructions.push(ComputeBudgetInstruction::set_compute_unit_price(
  97. compute_unit_price,
  98. ));
  99. }
  100. for _ in 0..num_instructions_per_tx.saturating_sub(1) {
  101. instructions.push(Instruction::new_with_bytes(
  102. program_id,
  103. &[0],
  104. vec![AccountMeta {
  105. pubkey: fee_payer.pubkey(),
  106. is_signer: true,
  107. is_writable: true,
  108. }],
  109. ));
  110. }
  111. VersionedTransaction::try_new(
  112. VersionedMessage::Legacy(Message::new_with_blockhash(
  113. &instructions,
  114. Some(&fee_payer.pubkey()),
  115. &blockhash.get(&mut rng),
  116. )),
  117. &[&fee_payer],
  118. )
  119. .unwrap()
  120. })
  121. .collect();
  122. BankingPacketBatch::new(to_packet_batches(&txs, NUM_PACKETS))
  123. }
  124. pub trait ReceiveAndBufferCreator {
  125. fn create(
  126. receiver: Receiver<Arc<Vec<PacketBatch>>>,
  127. bank_forks: Arc<RwLock<BankForks>>,
  128. ) -> Self;
  129. }
  130. impl ReceiveAndBufferCreator for TransactionViewReceiveAndBuffer {
  131. fn create(
  132. receiver: Receiver<Arc<Vec<PacketBatch>>>,
  133. bank_forks: Arc<RwLock<BankForks>>,
  134. ) -> Self {
  135. TransactionViewReceiveAndBuffer {
  136. receiver,
  137. bank_forks,
  138. }
  139. }
  140. }
  141. impl ReceiveAndBufferCreator for SanitizedTransactionReceiveAndBuffer {
  142. fn create(
  143. receiver: Receiver<Arc<Vec<PacketBatch>>>,
  144. bank_forks: Arc<RwLock<BankForks>>,
  145. ) -> Self {
  146. SanitizedTransactionReceiveAndBuffer::new(PacketDeserializer::new(receiver), bank_forks)
  147. }
  148. }
  149. pub struct ReceiveAndBufferSetup<T: ReceiveAndBuffer> {
  150. // prepared transaction batches
  151. pub txs: BankingPacketBatch,
  152. // to send prepared transaction batches
  153. pub sender: Sender<Arc<Vec<PacketBatch>>>,
  154. // received transactions will be inserted into container
  155. pub container: <T as ReceiveAndBuffer>::Container,
  156. // receive_and_buffer for sdk or transaction_view
  157. pub receive_and_buffer: T,
  158. // hardcoded for bench to always Consume
  159. pub decision: BufferedPacketsDecision,
  160. }
  161. pub fn setup_receive_and_buffer<T: ReceiveAndBuffer + ReceiveAndBufferCreator>(
  162. num_txs: usize,
  163. num_instructions_per_tx: usize,
  164. probability_invalid_blockhash: f64,
  165. set_rand_cu_price: bool,
  166. use_single_payer: bool,
  167. ) -> ReceiveAndBufferSetup<T> {
  168. let GenesisConfigInfo {
  169. mut genesis_config, ..
  170. } = create_genesis_config(100_000);
  171. let num_fee_payers = if use_single_payer { 1 } else { num_txs };
  172. // fee payers will be verified, so have to create them properly
  173. let fee_payers = create_accounts(num_fee_payers, &mut genesis_config);
  174. let (bank, bank_forks) =
  175. Bank::new_for_benches(&genesis_config).wrap_with_bank_forks_for_tests();
  176. let bank_start = BankStart {
  177. working_bank: bank.clone(),
  178. bank_creation_time: Arc::new(Instant::now()),
  179. contains_valid_certificate: Arc::new(AtomicBool::new(false)),
  180. };
  181. let (sender, receiver) = unbounded();
  182. let receive_and_buffer = T::create(receiver, bank_forks);
  183. let decision = BufferedPacketsDecision::Consume(bank_start);
  184. let txs = generate_transactions(
  185. num_txs,
  186. bank.clone(),
  187. &fee_payers,
  188. num_instructions_per_tx,
  189. probability_invalid_blockhash,
  190. set_rand_cu_price,
  191. );
  192. let container = <T as ReceiveAndBuffer>::Container::with_capacity(TOTAL_BUFFERED_PACKETS);
  193. ReceiveAndBufferSetup {
  194. txs,
  195. sender,
  196. container,
  197. receive_and_buffer,
  198. decision,
  199. }
  200. }