scheduler.rs 9.1 KB


  1. #[cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
  2. use jemallocator::Jemalloc;
  3. #[path = "receive_and_buffer_utils.rs"]
  4. mod utils;
  5. use {
  6. criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput},
  7. crossbeam_channel::{unbounded, Receiver, Sender},
  8. solana_core::banking_stage::{
  9. scheduler_messages::{ConsumeWork, FinishedConsumeWork},
  10. transaction_scheduler::{
  11. greedy_scheduler::{GreedyScheduler, GreedySchedulerConfig},
  12. prio_graph_scheduler::{PrioGraphScheduler, PrioGraphSchedulerConfig},
  13. receive_and_buffer::{
  14. ReceiveAndBuffer, SanitizedTransactionReceiveAndBuffer,
  15. TransactionViewReceiveAndBuffer,
  16. },
  17. scheduler::{PreLockFilterAction, Scheduler},
  18. scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics},
  19. transaction_state::TransactionState,
  20. transaction_state_container::StateContainer,
  21. },
  22. },
  23. solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
  24. std::time::{Duration, Instant},
  25. };
  26. #[cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
  27. #[global_allocator]
  28. static GLOBAL: Jemalloc = Jemalloc;
  29. // a bench consumer worker that quickly drain work channel, then send a OK back via completed-work
  30. // channel
  31. // NOTE: Avoid creating PingPong within bench iter since joining threads at its eol would
  32. // introducing variance to bench timing.
  33. #[allow(dead_code)]
  34. struct PingPong {
  35. threads: Vec<std::thread::JoinHandle<()>>,
  36. }
  37. impl PingPong {
  38. fn new<Tx: TransactionWithMeta + Send + Sync + 'static>(
  39. work_receivers: Vec<Receiver<ConsumeWork<Tx>>>,
  40. completed_work_sender: Sender<FinishedConsumeWork<Tx>>,
  41. ) -> Self {
  42. let mut threads = Vec::with_capacity(work_receivers.len());
  43. for receiver in work_receivers {
  44. let completed_work_sender_clone = completed_work_sender.clone();
  45. let handle = std::thread::spawn(move || {
  46. Self::service_loop(receiver, completed_work_sender_clone);
  47. });
  48. threads.push(handle);
  49. }
  50. Self { threads }
  51. }
  52. fn service_loop<Tx: TransactionWithMeta + Send + Sync + 'static>(
  53. work_receiver: Receiver<ConsumeWork<Tx>>,
  54. completed_work_sender: Sender<FinishedConsumeWork<Tx>>,
  55. ) {
  56. while let Ok(work) = work_receiver.recv() {
  57. if completed_work_sender
  58. .send(FinishedConsumeWork {
  59. work,
  60. retryable_indexes: vec![],
  61. })
  62. .is_err()
  63. {
  64. // kill this worker if finished_work channel is broken
  65. break;
  66. }
  67. }
  68. }
  69. }
  70. struct BenchEnv<Tx: TransactionWithMeta + Send + Sync + 'static> {
  71. #[allow(dead_code)]
  72. pingpong_worker: PingPong,
  73. filter_1: fn(&[&Tx], &mut [bool]),
  74. filter_2: fn(&TransactionState<Tx>) -> PreLockFilterAction,
  75. consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
  76. finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,
  77. }
  78. impl<Tx: TransactionWithMeta + Send + Sync + 'static> BenchEnv<Tx> {
  79. fn new() -> Self {
  80. let num_workers = 4;
  81. let (consume_work_senders, consume_work_receivers) =
  82. (0..num_workers).map(|_| unbounded()).unzip();
  83. let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded();
  84. let pingpong_worker = PingPong::new(consume_work_receivers, finished_consume_work_sender);
  85. Self {
  86. pingpong_worker,
  87. filter_1: Self::test_pre_graph_filter,
  88. filter_2: Self::test_pre_lock_filter,
  89. consume_work_senders,
  90. finished_consume_work_receiver,
  91. }
  92. }
  93. fn test_pre_graph_filter(_txs: &[&Tx], results: &mut [bool]) {
  94. results.fill(true);
  95. }
  96. fn test_pre_lock_filter(_tx: &TransactionState<Tx>) -> PreLockFilterAction {
  97. PreLockFilterAction::AttemptToSchedule
  98. }
  99. }
  100. fn bench_scheduler_impl<T: ReceiveAndBuffer + utils::ReceiveAndBufferCreator>(
  101. c: &mut Criterion,
  102. bench_name: &str,
  103. ) where
  104. <T as ReceiveAndBuffer>::Transaction: 'static,
  105. {
  106. let mut group = c.benchmark_group("bench_scheduler");
  107. group.sample_size(10);
  108. let scheduler_types: Vec<(bool, &str)> =
  109. vec![(true, "greedy_scheduler"), (false, "prio_graph_scheduler")];
  110. //solana_core::banking_stage::TOTAL_BUFFERED_PACKETS took too long
  111. let tx_counts: Vec<(usize, &str)> = vec![(16 * 1024, "16K_txs")];
  112. let ix_counts: Vec<(usize, &str)> = vec![
  113. (1, "single_ix"),
  114. (utils::MAX_INSTRUCTIONS_PER_TRANSACTION, "max_ixs"),
  115. ];
  116. let conflict_types: Vec<(bool, &str)> = vec![(true, "single-payer"), (false, "unique_payer")];
  117. for (is_greedy_scheduler, scheduler_desc) in scheduler_types {
  118. for (ix_count, ix_count_desc) in &ix_counts {
  119. for (tx_count, tx_count_desc) in &tx_counts {
  120. for (conflict_type, conflict_type_desc) in &conflict_types {
  121. let bench_name = format!(
  122. "{bench_name}/{scheduler_desc}/{ix_count_desc}/{tx_count_desc}/\
  123. {conflict_type_desc}"
  124. );
  125. group.throughput(Throughput::Elements(*tx_count as u64));
  126. group.bench_function(&bench_name, |bencher| {
  127. bencher.iter_custom(|iters| {
  128. let setup: utils::ReceiveAndBufferSetup<T> =
  129. utils::setup_receive_and_buffer(
  130. *tx_count,
  131. *ix_count,
  132. 0.0,
  133. true,
  134. *conflict_type,
  135. );
  136. let bench_env: BenchEnv<T::Transaction> = BenchEnv::new();
  137. if is_greedy_scheduler {
  138. timing_scheduler(
  139. setup,
  140. &bench_env,
  141. GreedyScheduler::new(
  142. bench_env.consume_work_senders.clone(),
  143. bench_env.finished_consume_work_receiver.clone(),
  144. GreedySchedulerConfig::default(),
  145. ),
  146. iters,
  147. )
  148. } else {
  149. timing_scheduler(
  150. setup,
  151. &bench_env,
  152. PrioGraphScheduler::new(
  153. bench_env.consume_work_senders.clone(),
  154. bench_env.finished_consume_work_receiver.clone(),
  155. PrioGraphSchedulerConfig::default(),
  156. ),
  157. iters,
  158. )
  159. }
  160. })
  161. });
  162. }
  163. }
  164. }
  165. }
  166. }
  167. fn timing_scheduler<T: ReceiveAndBuffer, S: Scheduler<T::Transaction>>(
  168. setup: utils::ReceiveAndBufferSetup<T>,
  169. bench_env: &BenchEnv<T::Transaction>,
  170. mut scheduler: S,
  171. iters: u64,
  172. ) -> Duration {
  173. let utils::ReceiveAndBufferSetup {
  174. txs,
  175. sender,
  176. mut container,
  177. mut receive_and_buffer,
  178. decision,
  179. }: utils::ReceiveAndBufferSetup<T> = setup;
  180. let mut execute_time: Duration = std::time::Duration::ZERO;
  181. let num_txs: usize = txs.iter().map(|txs| txs.len()).sum();
  182. for _i in 0..iters {
  183. if sender.send(txs.clone()).is_err() {
  184. panic!("Unexpectedly dropped receiver!");
  185. }
  186. let mut count_metrics = SchedulerCountMetrics::default();
  187. let mut timing_metrics = SchedulerTimingMetrics::default();
  188. let res = receive_and_buffer.receive_and_buffer_packets(
  189. &mut container,
  190. &mut timing_metrics,
  191. &mut count_metrics,
  192. &decision,
  193. );
  194. assert_eq!(res.unwrap(), num_txs);
  195. assert!(!container.is_empty());
  196. let elapsed = {
  197. let start = Instant::now();
  198. {
  199. while !container.is_empty() {
  200. scheduler
  201. .receive_completed(black_box(&mut container))
  202. .unwrap();
  203. scheduler
  204. .schedule(
  205. black_box(&mut container),
  206. bench_env.filter_1,
  207. bench_env.filter_2,
  208. )
  209. .unwrap();
  210. }
  211. }
  212. start.elapsed()
  213. };
  214. execute_time = execute_time.saturating_add(elapsed);
  215. }
  216. execute_time
  217. }
  218. fn bench_scheduler(c: &mut Criterion) {
  219. bench_scheduler_impl::<SanitizedTransactionReceiveAndBuffer>(c, "sdk_transaction");
  220. bench_scheduler_impl::<TransactionViewReceiveAndBuffer>(c, "transaction_view");
  221. }
  222. criterion_group!(benches, bench_scheduler,);
  223. criterion_main!(benches);