Browse Source

Add scheduler benches (#5857)

* Add scheduler benches

* use iter_custom to exclude dropping container and scheduler into bench timing.

* print custom stats

* Update core/benches/scheduler.rs

Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>

* use dummy signature to tag Tracer transaction

* remove BenchContainer, no longer needed

* support receive_completed

* fix dropping in benching

* remove BenchStats that collects some scheduler behavior stats during benching, it can be live outside of benches

* reorganize receive_and_buffer to be shared with Scheduler bench

* rework scheduler bench to prep container with shared functions

* support both schedulers

* support single or unique fee payer to bench lock conflicting condition

* fix windows nightly clippy

* Apply patch from Andrew to remove macro and move Scheduler creation outside bench loop

---------

Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>
Tao Zhu 6 tháng trước cách đây
mục cha
commit
62375a6781

+ 1 - 0
Cargo.lock

@@ -7676,6 +7676,7 @@ dependencies = [
  "tempfile",
  "test-case",
  "thiserror 2.0.12",
+ "tikv-jemallocator",
  "tokio",
  "tokio-util 0.7.15",
  "trees",

+ 7 - 0
core/Cargo.toml

@@ -145,6 +145,9 @@ test-case = { workspace = true }
 [target."cfg(unix)".dependencies]
 sysctl = { workspace = true }
 
+[target.'cfg(not(any(target_env = "msvc", target_os = "freebsd")))'.dependencies]
+jemallocator = { workspace = true }
+
 [features]
 dev-context-only-utils = [
     "solana-perf/dev-context-only-utils",
@@ -183,6 +186,10 @@ name = "sigverify_stage"
 name = "receive_and_buffer"
 harness = false
 
+[[bench]]
+name = "scheduler"
+harness = false
+
 [package.metadata.docs.rs]
 targets = ["x86_64-unknown-linux-gnu"]
 

+ 22 - 191
core/benches/receive_and_buffer.rs

@@ -1,209 +1,38 @@
+#[path = "receive_and_buffer_utils.rs"]
+mod utils;
 use {
-    agave_banking_stage_ingress_types::BankingPacketBatch,
     criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput},
-    crossbeam_channel::{unbounded, Receiver},
-    rand::prelude::*,
-    solana_core::banking_stage::{
-        decision_maker::BufferedPacketsDecision,
-        packet_deserializer::PacketDeserializer,
-        transaction_scheduler::{
-            receive_and_buffer::{
-                ReceiveAndBuffer, SanitizedTransactionReceiveAndBuffer,
-                TransactionViewReceiveAndBuffer,
-            },
-            scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics},
-            transaction_state_container::StateContainer,
+    solana_core::banking_stage::transaction_scheduler::{
+        receive_and_buffer::{
+            ReceiveAndBuffer, SanitizedTransactionReceiveAndBuffer, TransactionViewReceiveAndBuffer,
         },
+        scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics},
+        transaction_state_container::StateContainer,
     },
-    solana_keypair::Keypair,
-    solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
-    solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS},
-    solana_poh::poh_recorder::BankStart,
-    solana_pubkey::Pubkey,
-    solana_runtime::{bank::Bank, bank_forks::BankForks},
-    solana_sdk::{
-        account::AccountSharedData,
-        compute_budget::ComputeBudgetInstruction,
-        genesis_config::GenesisConfig,
-        hash::Hash,
-        instruction::{AccountMeta, Instruction},
-        message::{Message, VersionedMessage},
-        signer::Signer,
-        transaction::VersionedTransaction,
-    },
-    solana_sdk_ids::system_program,
-    std::{
-        sync::{Arc, RwLock},
-        time::{Duration, Instant},
-    },
+    std::time::{Duration, Instant},
 };
 
-// the max number of instructions of given type that we can put into packet.
-const MAX_INSTRUCTIONS_PER_TRANSACTION: usize = 204;
-
-fn create_accounts(num_accounts: usize, genesis_config: &mut GenesisConfig) -> Vec<Keypair> {
-    let owner = &system_program::id();
-
-    let account_keypairs: Vec<Keypair> = (0..num_accounts).map(|_| Keypair::new()).collect();
-    for keypair in account_keypairs.iter() {
-        genesis_config.add_account(keypair.pubkey(), AccountSharedData::new(10000, 0, owner));
-    }
-    account_keypairs
-}
-
-/// Structure that returns correct provided blockhash or some incorrect hash
-/// with given probability.
-pub struct FaultyBlockhash {
-    blockhash: Hash,
-    probability_invalid_blockhash: f64,
-}
-
-impl FaultyBlockhash {
-    /// Create a new faulty hash generator
-    pub fn new(blockhash: Hash, probability_invalid_blockhash: f64) -> Self {
-        Self {
-            blockhash,
-            probability_invalid_blockhash,
-        }
-    }
-
-    pub fn get<R: Rng>(&self, rng: &mut R) -> Hash {
-        if rng.gen::<f64>() < self.probability_invalid_blockhash {
-            Hash::default()
-        } else {
-            self.blockhash
-        }
-    }
-}
-
-fn generate_transactions(
-    num_txs: usize,
-    bank: Arc<Bank>,
-    fee_payers: &[Keypair],
-    num_instructions_per_tx: usize,
-    probability_invalid_blockhash: f64,
-    set_rand_cu_price: bool,
-) -> BankingPacketBatch {
-    assert!(num_instructions_per_tx <= MAX_INSTRUCTIONS_PER_TRANSACTION);
-    if set_rand_cu_price {
-        assert!(num_instructions_per_tx > 0,
-            "`num_instructions_per_tx` must be at least 1 when `set_rand_cu_price` flag is set to count\
-             the set_compute_unit_price instruction.");
-    }
-    let blockhash = FaultyBlockhash::new(bank.last_blockhash(), probability_invalid_blockhash);
-
-    let mut rng = rand::thread_rng();
-
-    let mut fee_payers = fee_payers.iter().cycle();
-
-    let txs: Vec<VersionedTransaction> = (0..num_txs)
-        .map(|_| {
-            let fee_payer = fee_payers.next().unwrap();
-            let program_id = Pubkey::new_unique();
-
-            let mut instructions = Vec::with_capacity(num_instructions_per_tx);
-            if set_rand_cu_price {
-                // Experiments with different distributions didn't show much of the effect on the performance.
-                let compute_unit_price = rng.gen_range(0..1000);
-                instructions.push(ComputeBudgetInstruction::set_compute_unit_price(
-                    compute_unit_price,
-                ));
-            }
-            for _ in 0..num_instructions_per_tx.saturating_sub(1) {
-                instructions.push(Instruction::new_with_bytes(
-                    program_id,
-                    &[0],
-                    vec![AccountMeta {
-                        pubkey: fee_payer.pubkey(),
-                        is_signer: true,
-                        is_writable: true,
-                    }],
-                ));
-            }
-            VersionedTransaction::try_new(
-                VersionedMessage::Legacy(Message::new_with_blockhash(
-                    &instructions,
-                    Some(&fee_payer.pubkey()),
-                    &blockhash.get(&mut rng),
-                )),
-                &[&fee_payer],
-            )
-            .unwrap()
-        })
-        .collect();
-
-    BankingPacketBatch::new(to_packet_batches(&txs, NUM_PACKETS))
-}
-
-trait ReceiveAndBufferCreator {
-    fn create(
-        receiver: Receiver<Arc<Vec<PacketBatch>>>,
-        bank_forks: Arc<RwLock<BankForks>>,
-    ) -> Self;
-}
-
-impl ReceiveAndBufferCreator for TransactionViewReceiveAndBuffer {
-    fn create(
-        receiver: Receiver<Arc<Vec<PacketBatch>>>,
-        bank_forks: Arc<RwLock<BankForks>>,
-    ) -> Self {
-        TransactionViewReceiveAndBuffer {
-            receiver,
-            bank_forks,
-        }
-    }
-}
-
-impl ReceiveAndBufferCreator for SanitizedTransactionReceiveAndBuffer {
-    fn create(
-        receiver: Receiver<Arc<Vec<PacketBatch>>>,
-        bank_forks: Arc<RwLock<BankForks>>,
-    ) -> Self {
-        SanitizedTransactionReceiveAndBuffer::new(PacketDeserializer::new(receiver), bank_forks)
-    }
-}
-
-fn bench_receive_and_buffer<T: ReceiveAndBuffer + ReceiveAndBufferCreator>(
+fn bench_receive_and_buffer<T: ReceiveAndBuffer + utils::ReceiveAndBufferCreator>(
     c: &mut Criterion,
     bench_name: &str,
     num_instructions_per_tx: usize,
     probability_invalid_blockhash: f64,
     set_rand_cu_price: bool,
 ) {
-    let GenesisConfigInfo {
-        mut genesis_config, ..
-    } = create_genesis_config(100_000);
     let num_txs = 16 * 1024;
-    // This doesn't change the time to execute
-    let num_fee_payers = 1;
-    // fee payers will be verified, so have to create them properly
-    let fee_payers = create_accounts(num_fee_payers, &mut genesis_config);
-
-    let (bank, bank_forks) =
-        Bank::new_for_benches(&genesis_config).wrap_with_bank_forks_for_tests();
-    let bank_start = BankStart {
-        working_bank: bank.clone(),
-        bank_creation_time: Arc::new(Instant::now()),
-    };
-
-    let (sender, receiver) = unbounded();
-
-    let mut rb = T::create(receiver, bank_forks);
-
-    const TOTAL_BUFFERED_PACKETS: usize = 100_000;
-    let mut count_metrics = SchedulerCountMetrics::default();
-    let mut timing_metrics = SchedulerTimingMetrics::default();
-    let decision = BufferedPacketsDecision::Consume(bank_start);
-
-    let txs = generate_transactions(
+    let utils::ReceiveAndBufferSetup {
+        txs,
+        sender,
+        mut container,
+        mut receive_and_buffer,
+        decision,
+    }: utils::ReceiveAndBufferSetup<T> = utils::setup_receive_and_buffer(
         num_txs,
-        bank.clone(),
-        &fee_payers,
         num_instructions_per_tx,
         probability_invalid_blockhash,
         set_rand_cu_price,
+        true, // single fee payer for all transactions
     );
-    let mut container = <T as ReceiveAndBuffer>::Container::with_capacity(TOTAL_BUFFERED_PACKETS);
 
     let mut group = c.benchmark_group("receive_and_buffer");
     group.throughput(Throughput::Elements(num_txs as u64));
@@ -212,6 +41,8 @@ fn bench_receive_and_buffer<T: ReceiveAndBuffer + ReceiveAndBufferCreator>(
             let mut total: Duration = std::time::Duration::ZERO;
             for _ in 0..iters {
                 // Setup
+                let mut count_metrics = SchedulerCountMetrics::default();
+                let mut timing_metrics = SchedulerTimingMetrics::default();
                 {
                     if sender.send(txs.clone()).is_err() {
                         panic!("Unexpectedly dropped receiver!");
@@ -223,7 +54,7 @@ fn bench_receive_and_buffer<T: ReceiveAndBuffer + ReceiveAndBufferCreator>(
 
                 let start = Instant::now();
                 {
-                    let res = rb.receive_and_buffer_packets(
+                    let res = receive_and_buffer.receive_and_buffer_packets(
                         &mut container,
                         &mut timing_metrics,
                         &mut count_metrics,
@@ -244,7 +75,7 @@ fn bench_sanitized_transaction_receive_and_buffer(c: &mut Criterion) {
     bench_receive_and_buffer::<SanitizedTransactionReceiveAndBuffer>(
         c,
         "sanitized_transaction_max_instructions",
-        MAX_INSTRUCTIONS_PER_TRANSACTION,
+        utils::MAX_INSTRUCTIONS_PER_TRANSACTION,
         0.0,
         true,
     );
@@ -261,7 +92,7 @@ fn bench_transaction_view_receive_and_buffer(c: &mut Criterion) {
     bench_receive_and_buffer::<TransactionViewReceiveAndBuffer>(
         c,
         "transaction_view_max_instructions",
-        MAX_INSTRUCTIONS_PER_TRANSACTION,
+        utils::MAX_INSTRUCTIONS_PER_TRANSACTION,
         0.0,
         true,
     );

+ 222 - 0
core/benches/receive_and_buffer_utils.rs

@@ -0,0 +1,222 @@
+use {
+    agave_banking_stage_ingress_types::BankingPacketBatch,
+    crossbeam_channel::{unbounded, Receiver, Sender},
+    rand::prelude::*,
+    solana_core::banking_stage::{
+        decision_maker::BufferedPacketsDecision,
+        packet_deserializer::PacketDeserializer,
+        transaction_scheduler::{
+            receive_and_buffer::{
+                ReceiveAndBuffer, SanitizedTransactionReceiveAndBuffer,
+                TransactionViewReceiveAndBuffer,
+            },
+            transaction_state_container::StateContainer,
+        },
+        TOTAL_BUFFERED_PACKETS,
+    },
+    solana_keypair::Keypair,
+    solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
+    solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS},
+    solana_poh::poh_recorder::BankStart,
+    solana_pubkey::Pubkey,
+    solana_runtime::{bank::Bank, bank_forks::BankForks},
+    solana_sdk::{
+        account::AccountSharedData,
+        compute_budget::ComputeBudgetInstruction,
+        genesis_config::GenesisConfig,
+        hash::Hash,
+        instruction::{AccountMeta, Instruction},
+        message::{Message, VersionedMessage},
+        signer::Signer,
+        transaction::VersionedTransaction,
+    },
+    solana_sdk_ids::system_program,
+    std::{
+        sync::{Arc, RwLock},
+        time::Instant,
+    },
+};
+
+// the max number of instructions of given type that we can put into packet.
+pub const MAX_INSTRUCTIONS_PER_TRANSACTION: usize = 204;
+
+fn create_accounts(num_accounts: usize, genesis_config: &mut GenesisConfig) -> Vec<Keypair> {
+    let owner = &system_program::id();
+
+    let account_keypairs: Vec<Keypair> = (0..num_accounts).map(|_| Keypair::new()).collect();
+    for keypair in account_keypairs.iter() {
+        genesis_config.add_account(keypair.pubkey(), AccountSharedData::new(10000, 0, owner));
+    }
+    account_keypairs
+}
+
+/// Structure that returns correct provided blockhash or some incorrect hash
+/// with given probability.
+pub struct FaultyBlockhash {
+    blockhash: Hash,
+    probability_invalid_blockhash: f64,
+}
+
+impl FaultyBlockhash {
+    /// Create a new faulty hash generator
+    pub fn new(blockhash: Hash, probability_invalid_blockhash: f64) -> Self {
+        Self {
+            blockhash,
+            probability_invalid_blockhash,
+        }
+    }
+
+    pub fn get<R: Rng>(&self, rng: &mut R) -> Hash {
+        if rng.gen::<f64>() < self.probability_invalid_blockhash {
+            Hash::default()
+        } else {
+            self.blockhash
+        }
+    }
+}
+
+fn generate_transactions(
+    num_txs: usize,
+    bank: Arc<Bank>,
+    fee_payers: &[Keypair],
+    num_instructions_per_tx: usize,
+    probability_invalid_blockhash: f64,
+    set_rand_cu_price: bool,
+) -> BankingPacketBatch {
+    assert!(num_instructions_per_tx <= MAX_INSTRUCTIONS_PER_TRANSACTION);
+    if set_rand_cu_price {
+        assert!(num_instructions_per_tx > 0,
+            "`num_instructions_per_tx` must be at least 1 when `set_rand_cu_price` flag is set to count\
+             the set_compute_unit_price instruction.");
+    }
+    let blockhash = FaultyBlockhash::new(bank.last_blockhash(), probability_invalid_blockhash);
+
+    let mut rng = rand::thread_rng();
+
+    let mut fee_payers = fee_payers.iter().cycle();
+
+    let txs: Vec<VersionedTransaction> = (0..num_txs)
+        .map(|_| {
+            let fee_payer = fee_payers.next().unwrap();
+            let program_id = Pubkey::new_unique();
+
+            let mut instructions = Vec::with_capacity(num_instructions_per_tx);
+            if set_rand_cu_price {
+                // Experiments with different distributions didn't show much of the effect on the performance.
+                let compute_unit_price = rng.gen_range(0..1000);
+                instructions.push(ComputeBudgetInstruction::set_compute_unit_price(
+                    compute_unit_price,
+                ));
+            }
+            for _ in 0..num_instructions_per_tx.saturating_sub(1) {
+                instructions.push(Instruction::new_with_bytes(
+                    program_id,
+                    &[0],
+                    vec![AccountMeta {
+                        pubkey: fee_payer.pubkey(),
+                        is_signer: true,
+                        is_writable: true,
+                    }],
+                ));
+            }
+            VersionedTransaction::try_new(
+                VersionedMessage::Legacy(Message::new_with_blockhash(
+                    &instructions,
+                    Some(&fee_payer.pubkey()),
+                    &blockhash.get(&mut rng),
+                )),
+                &[&fee_payer],
+            )
+            .unwrap()
+        })
+        .collect();
+
+    BankingPacketBatch::new(to_packet_batches(&txs, NUM_PACKETS))
+}
+
+pub trait ReceiveAndBufferCreator {
+    fn create(
+        receiver: Receiver<Arc<Vec<PacketBatch>>>,
+        bank_forks: Arc<RwLock<BankForks>>,
+    ) -> Self;
+}
+
+impl ReceiveAndBufferCreator for TransactionViewReceiveAndBuffer {
+    fn create(
+        receiver: Receiver<Arc<Vec<PacketBatch>>>,
+        bank_forks: Arc<RwLock<BankForks>>,
+    ) -> Self {
+        TransactionViewReceiveAndBuffer {
+            receiver,
+            bank_forks,
+        }
+    }
+}
+
+impl ReceiveAndBufferCreator for SanitizedTransactionReceiveAndBuffer {
+    fn create(
+        receiver: Receiver<Arc<Vec<PacketBatch>>>,
+        bank_forks: Arc<RwLock<BankForks>>,
+    ) -> Self {
+        SanitizedTransactionReceiveAndBuffer::new(PacketDeserializer::new(receiver), bank_forks)
+    }
+}
+
+pub struct ReceiveAndBufferSetup<T: ReceiveAndBuffer> {
+    // prepared transaction batches
+    pub txs: BankingPacketBatch,
+    // to send prepared transaction batches
+    pub sender: Sender<Arc<Vec<PacketBatch>>>,
+    // received transactions will be inserted into container
+    pub container: <T as ReceiveAndBuffer>::Container,
+    // receive_and_buffer for sdk or transaction_view
+    pub receive_and_buffer: T,
+    // hardcoded for bench to always Consume
+    pub decision: BufferedPacketsDecision,
+}
+
+pub fn setup_receive_and_buffer<T: ReceiveAndBuffer + ReceiveAndBufferCreator>(
+    num_txs: usize,
+    num_instructions_per_tx: usize,
+    probability_invalid_blockhash: f64,
+    set_rand_cu_price: bool,
+    use_single_payer: bool,
+) -> ReceiveAndBufferSetup<T> {
+    let GenesisConfigInfo {
+        mut genesis_config, ..
+    } = create_genesis_config(100_000);
+    let num_fee_payers = if use_single_payer { 1 } else { num_txs };
+    // fee payers will be verified, so have to create them properly
+    let fee_payers = create_accounts(num_fee_payers, &mut genesis_config);
+
+    let (bank, bank_forks) =
+        Bank::new_for_benches(&genesis_config).wrap_with_bank_forks_for_tests();
+    let bank_start = BankStart {
+        working_bank: bank.clone(),
+        bank_creation_time: Arc::new(Instant::now()),
+    };
+
+    let (sender, receiver) = unbounded();
+
+    let receive_and_buffer = T::create(receiver, bank_forks);
+
+    let decision = BufferedPacketsDecision::Consume(bank_start);
+
+    let txs = generate_transactions(
+        num_txs,
+        bank.clone(),
+        &fee_payers,
+        num_instructions_per_tx,
+        probability_invalid_blockhash,
+        set_rand_cu_price,
+    );
+    let container = <T as ReceiveAndBuffer>::Container::with_capacity(TOTAL_BUFFERED_PACKETS);
+
+    ReceiveAndBufferSetup {
+        txs,
+        sender,
+        container,
+        receive_and_buffer,
+        decision,
+    }
+}

+ 244 - 0
core/benches/scheduler.rs

@@ -0,0 +1,244 @@
+#[cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
+use jemallocator::Jemalloc;
+#[path = "receive_and_buffer_utils.rs"]
+mod utils;
+use {
+    criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput},
+    crossbeam_channel::{unbounded, Receiver, Sender},
+    solana_core::banking_stage::{
+        scheduler_messages::{ConsumeWork, FinishedConsumeWork},
+        transaction_scheduler::{
+            greedy_scheduler::{GreedyScheduler, GreedySchedulerConfig},
+            prio_graph_scheduler::{PrioGraphScheduler, PrioGraphSchedulerConfig},
+            receive_and_buffer::{
+                ReceiveAndBuffer, SanitizedTransactionReceiveAndBuffer,
+                TransactionViewReceiveAndBuffer,
+            },
+            scheduler::{PreLockFilterAction, Scheduler},
+            scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics},
+            transaction_state::TransactionState,
+            transaction_state_container::StateContainer,
+        },
+    },
+    solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
+    std::time::{Duration, Instant},
+};
+
+#[cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+// a bench consumer worker that quickly drain work channel, then send a OK back via completed-work
+// channel
+// NOTE: Avoid creating PingPong within bench iter since joining threads at its eol would
+// introducing variance to bench timing.
+#[allow(dead_code)]
+struct PingPong {
+    threads: Vec<std::thread::JoinHandle<()>>,
+}
+
+impl PingPong {
+    fn new<Tx: TransactionWithMeta + Send + Sync + 'static>(
+        work_receivers: Vec<Receiver<ConsumeWork<Tx>>>,
+        completed_work_sender: Sender<FinishedConsumeWork<Tx>>,
+    ) -> Self {
+        let mut threads = Vec::with_capacity(work_receivers.len());
+
+        for receiver in work_receivers {
+            let completed_work_sender_clone = completed_work_sender.clone();
+
+            let handle = std::thread::spawn(move || {
+                Self::service_loop(receiver, completed_work_sender_clone);
+            });
+            threads.push(handle);
+        }
+
+        Self { threads }
+    }
+
+    fn service_loop<Tx: TransactionWithMeta + Send + Sync + 'static>(
+        work_receiver: Receiver<ConsumeWork<Tx>>,
+        completed_work_sender: Sender<FinishedConsumeWork<Tx>>,
+    ) {
+        while let Ok(work) = work_receiver.recv() {
+            if completed_work_sender
+                .send(FinishedConsumeWork {
+                    work,
+                    retryable_indexes: vec![],
+                })
+                .is_err()
+            {
+                // kill this worker if finished_work channel is broken
+                break;
+            }
+        }
+    }
+}
+
+struct BenchEnv<Tx: TransactionWithMeta + Send + Sync + 'static> {
+    #[allow(dead_code)]
+    pingpong_worker: PingPong,
+    filter_1: fn(&[&Tx], &mut [bool]),
+    filter_2: fn(&TransactionState<Tx>) -> PreLockFilterAction,
+    consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
+    finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,
+}
+
+impl<Tx: TransactionWithMeta + Send + Sync + 'static> BenchEnv<Tx> {
+    fn new() -> Self {
+        let num_workers = 4;
+
+        let (consume_work_senders, consume_work_receivers) =
+            (0..num_workers).map(|_| unbounded()).unzip();
+        let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded();
+        let pingpong_worker = PingPong::new(consume_work_receivers, finished_consume_work_sender);
+
+        Self {
+            pingpong_worker,
+            filter_1: Self::test_pre_graph_filter,
+            filter_2: Self::test_pre_lock_filter,
+            consume_work_senders,
+            finished_consume_work_receiver,
+        }
+    }
+
+    fn test_pre_graph_filter(_txs: &[&Tx], results: &mut [bool]) {
+        results.fill(true);
+    }
+
+    fn test_pre_lock_filter(_tx: &TransactionState<Tx>) -> PreLockFilterAction {
+        PreLockFilterAction::AttemptToSchedule
+    }
+}
+
+fn bench_scheduler_impl<T: ReceiveAndBuffer + utils::ReceiveAndBufferCreator>(
+    c: &mut Criterion,
+    bench_name: &str,
+) where
+    <T as ReceiveAndBuffer>::Transaction: 'static,
+{
+    let mut group = c.benchmark_group("bench_scheduler");
+    group.sample_size(10);
+
+    let scheduler_types: Vec<(bool, &str)> =
+        vec![(true, "greedy_scheduler"), (false, "prio_graph_scheduler")];
+    //solana_core::banking_stage::TOTAL_BUFFERED_PACKETS took too long
+    let tx_counts: Vec<(usize, &str)> = vec![(16 * 1024, "16K_txs")];
+    let ix_counts: Vec<(usize, &str)> = vec![
+        (1, "single_ix"),
+        (utils::MAX_INSTRUCTIONS_PER_TRANSACTION, "max_ixs"),
+    ];
+    let conflict_types: Vec<(bool, &str)> = vec![(true, "single-payer"), (false, "unique_payer")];
+
+    for (is_greedy_scheduler, scheduler_desc) in scheduler_types {
+        for (ix_count, ix_count_desc) in &ix_counts {
+            for (tx_count, tx_count_desc) in &tx_counts {
+                for (conflict_type, conflict_type_desc) in &conflict_types {
+                    let bench_name =
+                    format!("{bench_name}/{scheduler_desc}/{ix_count_desc}/{tx_count_desc}/{conflict_type_desc}");
+                    group.throughput(Throughput::Elements(*tx_count as u64));
+                    group.bench_function(&bench_name, |bencher| {
+                        bencher.iter_custom(|iters| {
+                            let setup: utils::ReceiveAndBufferSetup<T> =
+                                utils::setup_receive_and_buffer(
+                                    *tx_count,
+                                    *ix_count,
+                                    0.0,
+                                    true,
+                                    *conflict_type,
+                                );
+                            let bench_env: BenchEnv<T::Transaction> = BenchEnv::new();
+
+                            if is_greedy_scheduler {
+                                timing_scheduler(
+                                    setup,
+                                    &bench_env,
+                                    GreedyScheduler::new(
+                                        bench_env.consume_work_senders.clone(),
+                                        bench_env.finished_consume_work_receiver.clone(),
+                                        GreedySchedulerConfig::default(),
+                                    ),
+                                    iters,
+                                )
+                            } else {
+                                timing_scheduler(
+                                    setup,
+                                    &bench_env,
+                                    PrioGraphScheduler::new(
+                                        bench_env.consume_work_senders.clone(),
+                                        bench_env.finished_consume_work_receiver.clone(),
+                                        PrioGraphSchedulerConfig::default(),
+                                    ),
+                                    iters,
+                                )
+                            }
+                        })
+                    });
+                }
+            }
+        }
+    }
+}
+
+fn timing_scheduler<T: ReceiveAndBuffer, S: Scheduler<T::Transaction>>(
+    setup: utils::ReceiveAndBufferSetup<T>,
+    bench_env: &BenchEnv<T::Transaction>,
+    mut scheduler: S,
+    iters: u64,
+) -> Duration {
+    let utils::ReceiveAndBufferSetup {
+        txs,
+        sender,
+        mut container,
+        mut receive_and_buffer,
+        decision,
+    }: utils::ReceiveAndBufferSetup<T> = setup;
+
+    let mut execute_time: Duration = std::time::Duration::ZERO;
+    let num_txs: usize = txs.iter().map(|txs| txs.len()).sum();
+    for _i in 0..iters {
+        if sender.send(txs.clone()).is_err() {
+            panic!("Unexpectedly dropped receiver!");
+        }
+        let mut count_metrics = SchedulerCountMetrics::default();
+        let mut timing_metrics = SchedulerTimingMetrics::default();
+        let res = receive_and_buffer.receive_and_buffer_packets(
+            &mut container,
+            &mut timing_metrics,
+            &mut count_metrics,
+            &decision,
+        );
+        assert_eq!(res.unwrap(), num_txs);
+        assert!(!container.is_empty());
+
+        let elapsed = {
+            let start = Instant::now();
+            {
+                while !container.is_empty() {
+                    scheduler
+                        .receive_completed(black_box(&mut container))
+                        .unwrap();
+
+                    scheduler
+                        .schedule(
+                            black_box(&mut container),
+                            bench_env.filter_1,
+                            bench_env.filter_2,
+                        )
+                        .unwrap();
+                }
+            }
+            start.elapsed()
+        };
+
+        execute_time = execute_time.saturating_add(elapsed);
+    }
+    execute_time
+}
+
+fn bench_scheduler(c: &mut Criterion) {
+    bench_scheduler_impl::<SanitizedTransactionReceiveAndBuffer>(c, "sdk_transaction");
+    bench_scheduler_impl::<TransactionViewReceiveAndBuffer>(c, "transaction_view");
+}
+
+criterion_group!(benches, bench_scheduler,);
+criterion_main!(benches);

+ 1 - 0
core/src/banking_stage.rs

@@ -82,6 +82,7 @@ conditional_vis_mod!(unified_scheduler, feature = "dev-context-only-utils", pub,
 // Fixed thread size seems to be fastest on GCP setup
 pub const NUM_THREADS: u32 = 6;
 
+#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
 const TOTAL_BUFFERED_PACKETS: usize = 100_000;
 
 const NUM_VOTE_PROCESSING_THREADS: u32 = 2;

+ 4 - 0
core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs

@@ -1,3 +1,5 @@
+#[cfg(feature = "dev-context-only-utils")]
+use qualifier_attr::qualifiers;
 use {
     super::{
         scheduler::{PreLockFilterAction, Scheduler, SchedulingSummary},
@@ -22,6 +24,7 @@ use {
     solana_sdk::saturating_add_assign,
 };
 
+#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
 pub(crate) struct GreedySchedulerConfig {
     pub target_scheduled_cus: u64,
     pub max_scanned_transactions_per_scheduling_pass: usize,
@@ -49,6 +52,7 @@ pub struct GreedyScheduler<Tx: TransactionWithMeta> {
 }
 
 impl<Tx: TransactionWithMeta> GreedyScheduler<Tx> {
+    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
     pub(crate) fn new(
         consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
         finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,

+ 4 - 4
core/src/banking_stage/transaction_scheduler/mod.rs

@@ -1,16 +1,16 @@
 use conditional_mod::conditional_vis_mod;
 
 mod batch_id_generator;
-pub(crate) mod greedy_scheduler;
+conditional_vis_mod!(greedy_scheduler, feature = "dev-context-only-utils", pub, pub(crate));
 mod in_flight_tracker;
-pub(crate) mod prio_graph_scheduler;
+conditional_vis_mod!(prio_graph_scheduler, feature = "dev-context-only-utils", pub, pub(crate));
 conditional_vis_mod!(receive_and_buffer, feature = "dev-context-only-utils", pub, pub(crate));
-pub(crate) mod scheduler;
+conditional_vis_mod!(scheduler, feature = "dev-context-only-utils", pub, pub(crate));
 pub(crate) mod scheduler_common;
 pub(crate) mod scheduler_controller;
 pub(crate) mod scheduler_error;
 conditional_vis_mod!(scheduler_metrics, feature = "dev-context-only-utils", pub);
 mod thread_aware_account_locks;
 mod transaction_priority_id;
-mod transaction_state;
+conditional_vis_mod!(transaction_state, feature = "dev-context-only-utils", pub, pub(crate));
 conditional_vis_mod!(transaction_state_container, feature = "dev-context-only-utils", pub, pub(crate));

+ 5 - 0
core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs

@@ -1,3 +1,5 @@
+#[cfg(feature = "dev-context-only-utils")]
+use qualifier_attr::qualifiers;
 use {
     super::{
         scheduler::{PreLockFilterAction, Scheduler, SchedulingSummary},
@@ -42,6 +44,7 @@ type SchedulerPrioGraph = PrioGraph<
     fn(&TransactionPriorityId, &GraphNode<TransactionPriorityId>) -> TransactionPriorityId,
 >;
 
+#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
 pub(crate) struct PrioGraphSchedulerConfig {
     pub max_scheduled_cus: u64,
     pub max_scanned_transactions_per_scheduling_pass: usize,
@@ -60,6 +63,7 @@ impl Default for PrioGraphSchedulerConfig {
     }
 }
 
+#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
 pub(crate) struct PrioGraphScheduler<Tx> {
     common: SchedulingCommon<Tx>,
     prio_graph: SchedulerPrioGraph,
@@ -67,6 +71,7 @@ pub(crate) struct PrioGraphScheduler<Tx> {
 }
 
 impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
+    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
     pub(crate) fn new(
         consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
         finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,

+ 2 - 0
core/src/banking_stage/transaction_scheduler/scheduler.rs

@@ -9,6 +9,7 @@ use {
     solana_sdk::saturating_add_assign,
 };
 
+#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
 pub(crate) trait Scheduler<Tx: TransactionWithMeta> {
     /// Schedule transactions from `container`.
     /// pre-graph and pre-lock filters may be passed to be applied
@@ -47,6 +48,7 @@ pub(crate) trait Scheduler<Tx: TransactionWithMeta> {
 }
 
 /// Action to be taken by pre-lock filter.
+#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
 pub(crate) enum PreLockFilterAction {
     /// Attempt to schedule the transaction.
     AttemptToSchedule,

+ 3 - 0
core/src/banking_stage/transaction_scheduler/scheduler_common.rs

@@ -1,3 +1,5 @@
+#[cfg(feature = "dev-context-only-utils")]
+use qualifier_attr::qualifiers;
 use {
     super::{
         in_flight_tracker::InFlightTracker,
@@ -129,6 +131,7 @@ pub fn select_thread<Tx>(
 }
 
 /// Common scheduler communication structure.
+#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
 pub(crate) struct SchedulingCommon<Tx> {
     pub(crate) consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
     pub(crate) finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,

+ 1 - 0
core/src/banking_stage/transaction_scheduler/transaction_state_container.rs

@@ -191,6 +191,7 @@ impl<Tx: TransactionWithMeta> StateContainer<Tx> for TransactionStateContainer<T
 impl<Tx: TransactionWithMeta> TransactionStateContainer<Tx> {
     /// Insert a new transaction into the container's queues and maps.
     /// Returns `true` if a packet was dropped due to capacity limits.
+    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
     pub(crate) fn insert_new_transaction(
         &mut self,
         transaction: Tx,

+ 1 - 0
programs/sbf/Cargo.lock

@@ -6000,6 +6000,7 @@ dependencies = [
  "sysctl",
  "tempfile",
  "thiserror 2.0.12",
+ "tikv-jemallocator",
  "tokio",
  "tokio-util 0.7.15",
  "trees",

+ 21 - 0
svm/examples/Cargo.lock

@@ -5837,6 +5837,7 @@ dependencies = [
  "sysctl",
  "tempfile",
  "thiserror 2.0.12",
+ "tikv-jemallocator",
  "tokio",
  "tokio-util 0.7.15",
  "trees",
@@ -9471,6 +9472,26 @@ dependencies = [
  "once_cell",
 ]
 
+[[package]]
+name = "tikv-jemalloc-sys"
+version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d"
+dependencies = [
+ "cc",
+ "libc",
+]
+
+[[package]]
+name = "tikv-jemallocator"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865"
+dependencies = [
+ "libc",
+ "tikv-jemalloc-sys",
+]
+
 [[package]]
 name = "time"
 version = "0.3.37"