| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- use {
- agave_banking_stage_ingress_types::BankingPacketBatch,
- assert_matches::assert_matches,
- crossbeam_channel::unbounded,
- itertools::Itertools,
- log::*,
- solana_core::{
- banking_stage::{unified_scheduler::ensure_banking_stage_setup, BankingStage},
- banking_trace::BankingTracer,
- consensus::{
- heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
- progress_map::{ForkProgress, ProgressMap},
- },
- drop_bank_service::DropBankService,
- replay_stage::{ReplayStage, TowerBFTStructures},
- unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
- },
- solana_entry::entry::Entry,
- solana_hash::Hash,
- solana_ledger::{
- blockstore::Blockstore, create_new_tmp_ledger_auto_delete,
- genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache,
- },
- solana_perf::packet::to_packet_batches,
- solana_poh::poh_recorder::create_test_recorder,
- solana_pubkey::Pubkey,
- solana_runtime::{
- bank::Bank, bank_forks::BankForks, genesis_utils::GenesisConfigInfo,
- installed_scheduler_pool::SchedulingContext,
- prioritization_fee_cache::PrioritizationFeeCache,
- },
- solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
- solana_system_transaction as system_transaction,
- solana_timings::ExecuteTimings,
- solana_transaction_error::TransactionResult as Result,
- solana_unified_scheduler_logic::{SchedulingMode, Task},
- solana_unified_scheduler_pool::{
- DefaultSchedulerPool, DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool,
- TaskHandler,
- },
- std::{
- collections::HashMap,
- sync::{atomic::Ordering, Arc, Mutex},
- thread::sleep,
- time::Duration,
- },
- };
- #[test]
- fn test_scheduler_waited_by_drop_bank_service() {
- solana_logger::setup();
- static LOCK_TO_STALL: Mutex<()> = Mutex::new(());
- #[derive(Debug)]
- struct StallingHandler;
- impl TaskHandler for StallingHandler {
- fn handle(
- result: &mut Result<()>,
- timings: &mut ExecuteTimings,
- scheduling_context: &SchedulingContext,
- task: &Task,
- handler_context: &HandlerContext,
- ) {
- info!("Stalling at StallingHandler::handle()...");
- *LOCK_TO_STALL.lock().unwrap();
- // Wait a bit for the replay stage to prune banks
- std::thread::sleep(std::time::Duration::from_secs(3));
- info!("Now entering into DefaultTaskHandler::handle()...");
- DefaultTaskHandler::handle(result, timings, scheduling_context, task, handler_context);
- }
- }
- let GenesisConfigInfo {
- genesis_config,
- mint_keypair,
- ..
- } = create_genesis_config(10_000);
- // Setup bankforks with unified scheduler enabled
- let genesis_bank = Bank::new_for_tests(&genesis_config);
- let bank_forks = BankForks::new_rw_arc(genesis_bank);
- let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
- let pool_raw = SchedulerPool::<PooledScheduler<StallingHandler>, _>::new(
- None,
- None,
- None,
- None,
- ignored_prioritization_fee_cache,
- );
- let pool = pool_raw.clone();
- bank_forks.write().unwrap().install_scheduler_pool(pool);
- let genesis = 0;
- let genesis_bank = &bank_forks.read().unwrap().get(genesis).unwrap();
- genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
- // Create bank, which is pruned later
- let pruned = 2;
- let pruned_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), pruned);
- let pruned_bank = bank_forks.write().unwrap().insert(pruned_bank);
- // Create new root bank
- let root = 3;
- let root_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), root);
- root_bank.freeze();
- let root_hash = root_bank.hash();
- bank_forks.write().unwrap().insert(root_bank);
- let tx = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
- &mint_keypair,
- &solana_pubkey::new_rand(),
- 2,
- genesis_config.hash(),
- ));
- // Delay transaction execution to ensure transaction execution happens after termintion has
- // been started
- let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
- pruned_bank
- .schedule_transaction_executions([(tx, 0)].into_iter())
- .unwrap();
- drop(pruned_bank);
- assert_eq!(pool_raw.pooled_scheduler_count(), 0);
- drop(lock_to_stall);
- // Create 2 channels to check actual pruned banks
- let (drop_bank_sender1, drop_bank_receiver1) = unbounded();
- let (drop_bank_sender2, drop_bank_receiver2) = unbounded();
- let drop_bank_service = DropBankService::new(drop_bank_receiver2);
- info!("calling handle_new_root()...");
- // Mostly copied from: test_handle_new_root()
- {
- let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new((root, root_hash));
- let mut progress = ProgressMap::default();
- for i in genesis..=root {
- progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
- }
- let duplicate_slots_tracker = vec![root - 1, root, root + 1].into_iter().collect();
- let duplicate_confirmed_slots = vec![root - 1, root, root + 1]
- .into_iter()
- .map(|s| (s, Hash::default()))
- .collect();
- let unfrozen_gossip_verified_vote_hashes = UnfrozenGossipVerifiedVoteHashes {
- votes_per_slot: vec![root - 1, root, root + 1]
- .into_iter()
- .map(|s| (s, HashMap::new()))
- .collect(),
- };
- let epoch_slots_frozen_slots = vec![root - 1, root, root + 1]
- .into_iter()
- .map(|slot| (slot, Hash::default()))
- .collect();
- let mut tbft_structs = TowerBFTStructures {
- heaviest_subtree_fork_choice,
- duplicate_slots_tracker,
- duplicate_confirmed_slots,
- unfrozen_gossip_verified_vote_hashes,
- epoch_slots_frozen_slots,
- };
- ReplayStage::handle_new_root(
- root,
- &bank_forks,
- &mut progress,
- None, // snapshot_controller
- None,
- &mut true,
- &drop_bank_sender1,
- &mut tbft_structs,
- )
- .unwrap();
- }
- // Receive pruned banks from the above handle_new_root
- let pruned_banks = drop_bank_receiver1.recv().unwrap();
- assert_eq!(
- pruned_banks
- .iter()
- .map(|b| b.slot())
- .sorted()
- .collect::<Vec<_>>(),
- vec![genesis, pruned]
- );
- info!("sending pruned banks to DropBankService...");
- drop_bank_sender2.send(pruned_banks).unwrap();
- info!("joining the drop bank service...");
- drop((
- (drop_bank_sender1, drop_bank_receiver1),
- (drop_bank_sender2,),
- ));
- drop_bank_service.join().unwrap();
- info!("finally joined the drop bank service!");
- // the scheduler used by the pruned_bank have been returned now.
- assert_eq!(pool_raw.pooled_scheduler_count(), 1);
- }
- #[test]
- fn test_scheduler_producing_blocks() {
- solana_logger::setup();
- let GenesisConfigInfo {
- genesis_config,
- mint_keypair,
- ..
- } = create_genesis_config(10_000);
- let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
- let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
- // Setup bank_forks with block-producing unified scheduler enabled
- let genesis_bank = Bank::new_for_tests(&genesis_config);
- let bank_forks = BankForks::new_rw_arc(genesis_bank);
- let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
- let genesis_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
- genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
- let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&genesis_bank));
- let (exit, poh_recorder, transaction_recorder, poh_service, signal_receiver) =
- create_test_recorder(
- genesis_bank.clone(),
- blockstore.clone(),
- None,
- Some(leader_schedule_cache),
- );
- let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
- let channels = {
- let banking_tracer = BankingTracer::new_disabled();
- banking_tracer.create_channels(true)
- };
- ensure_banking_stage_setup(
- &pool,
- &bank_forks,
- &channels,
- &poh_recorder,
- transaction_recorder,
- BankingStage::num_threads(),
- );
- bank_forks.write().unwrap().install_scheduler_pool(pool);
- // Wait until genesis_bank reaches its tick height...
- while poh_recorder.read().unwrap().bank().is_some() {
- sleep(Duration::from_millis(100));
- }
- // Create test tx
- let tx = system_transaction::transfer(
- &mint_keypair,
- &solana_pubkey::new_rand(),
- 1,
- genesis_config.hash(),
- );
- let banking_packet_batch = BankingPacketBatch::new(to_packet_batches(&vec![tx.clone(); 1], 1));
- let tx = RuntimeTransaction::from_transaction_for_tests(tx);
- // Crate tpu_bank
- let tpu_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), 2);
- let tpu_bank = bank_forks
- .write()
- .unwrap()
- .insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank);
- poh_recorder
- .write()
- .unwrap()
- .set_bank(tpu_bank.clone_with_scheduler());
- tpu_bank.unpause_new_block_production_scheduler();
- let tpu_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
- assert_eq!(tpu_bank.transaction_count(), 0);
- // Now, send transaction
- channels
- .unified_sender()
- .send(banking_packet_batch)
- .unwrap();
- // Wait until tpu_bank reaches its tick height...
- while poh_recorder.read().unwrap().bank().is_some() {
- sleep(Duration::from_millis(100));
- }
- assert_matches!(tpu_bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
- // Verify transactions are committed and poh-recorded
- assert_eq!(tpu_bank.transaction_count(), 1);
- assert_matches!(
- signal_receiver.into_iter().find(|(_, (entry, _))| !entry.is_tick()),
- Some((_, (Entry {transactions, ..}, _))) if transactions == [tx.to_versioned_transaction()]
- );
- // Stop things.
- exit.store(true, Ordering::Relaxed);
- poh_service.join().unwrap();
- }
|