unified_scheduler.rs 11 KB


  1. use {
  2. agave_banking_stage_ingress_types::BankingPacketBatch,
  3. assert_matches::assert_matches,
  4. crossbeam_channel::unbounded,
  5. itertools::Itertools,
  6. log::*,
  7. solana_core::{
  8. banking_stage::{unified_scheduler::ensure_banking_stage_setup, BankingStage},
  9. banking_trace::BankingTracer,
  10. consensus::{
  11. heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
  12. progress_map::{ForkProgress, ProgressMap},
  13. },
  14. drop_bank_service::DropBankService,
  15. repair::cluster_slot_state_verifier::{
  16. DuplicateConfirmedSlots, DuplicateSlotsTracker, EpochSlotsFrozenSlots,
  17. },
  18. replay_stage::{ReplayStage, TowerBFTStructures},
  19. unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
  20. },
  21. solana_entry::entry::Entry,
  22. solana_hash::Hash,
  23. solana_ledger::{
  24. blockstore::Blockstore, create_new_tmp_ledger_auto_delete,
  25. genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache,
  26. },
  27. solana_perf::packet::to_packet_batches,
  28. solana_poh::poh_recorder::create_test_recorder,
  29. solana_pubkey::Pubkey,
  30. solana_runtime::{
  31. bank::Bank, bank_forks::BankForks, genesis_utils::GenesisConfigInfo,
  32. installed_scheduler_pool::SchedulingContext,
  33. prioritization_fee_cache::PrioritizationFeeCache,
  34. },
  35. solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
  36. solana_svm_timings::ExecuteTimings,
  37. solana_system_transaction as system_transaction,
  38. solana_transaction_error::TransactionResult as Result,
  39. solana_unified_scheduler_logic::{SchedulingMode, Task},
  40. solana_unified_scheduler_pool::{
  41. DefaultSchedulerPool, DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool,
  42. TaskHandler,
  43. },
  44. std::{
  45. collections::HashMap,
  46. sync::{atomic::Ordering, Arc, Mutex},
  47. thread::sleep,
  48. time::Duration,
  49. },
  50. };
  51. #[test]
  52. fn test_scheduler_waited_by_drop_bank_service() {
  53. agave_logger::setup();
  54. static LOCK_TO_STALL: Mutex<()> = Mutex::new(());
  55. #[derive(Debug)]
  56. struct StallingHandler;
  57. impl TaskHandler for StallingHandler {
  58. fn handle(
  59. result: &mut Result<()>,
  60. timings: &mut ExecuteTimings,
  61. scheduling_context: &SchedulingContext,
  62. task: &Task,
  63. handler_context: &HandlerContext,
  64. ) {
  65. info!("Stalling at StallingHandler::handle()...");
  66. *LOCK_TO_STALL.lock().unwrap();
  67. // Wait a bit for the replay stage to prune banks
  68. std::thread::sleep(std::time::Duration::from_secs(3));
  69. info!("Now entering into DefaultTaskHandler::handle()...");
  70. DefaultTaskHandler::handle(result, timings, scheduling_context, task, handler_context);
  71. }
  72. }
  73. let GenesisConfigInfo {
  74. genesis_config,
  75. mint_keypair,
  76. ..
  77. } = create_genesis_config(10_000);
  78. // Setup bankforks with unified scheduler enabled
  79. let genesis_bank = Bank::new_for_tests(&genesis_config);
  80. let bank_forks = BankForks::new_rw_arc(genesis_bank);
  81. let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
  82. let pool_raw = SchedulerPool::<PooledScheduler<StallingHandler>, _>::new(
  83. None,
  84. None,
  85. None,
  86. None,
  87. ignored_prioritization_fee_cache,
  88. );
  89. let pool = pool_raw.clone();
  90. bank_forks.write().unwrap().install_scheduler_pool(pool);
  91. let genesis = 0;
  92. let genesis_bank = &bank_forks.read().unwrap().get(genesis).unwrap();
  93. genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
  94. // Create bank, which is pruned later
  95. let pruned = 2;
  96. let pruned_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), pruned);
  97. let pruned_bank = bank_forks.write().unwrap().insert(pruned_bank);
  98. // Create new root bank
  99. let root = 3;
  100. let root_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), root);
  101. root_bank.freeze();
  102. let root_hash = root_bank.hash();
  103. bank_forks.write().unwrap().insert(root_bank);
  104. let tx = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
  105. &mint_keypair,
  106. &solana_pubkey::new_rand(),
  107. 2,
  108. genesis_config.hash(),
  109. ));
  110. // Delay transaction execution to ensure transaction execution happens after termination has
  111. // been started
  112. let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
  113. pruned_bank
  114. .schedule_transaction_executions([(tx, 0)].into_iter())
  115. .unwrap();
  116. drop(pruned_bank);
  117. assert_eq!(pool_raw.pooled_scheduler_count(), 0);
  118. drop(lock_to_stall);
  119. // Create 2 channels to check actual pruned banks
  120. let (drop_bank_sender1, drop_bank_receiver1) = unbounded();
  121. let (drop_bank_sender2, drop_bank_receiver2) = unbounded();
  122. let drop_bank_service = DropBankService::new(drop_bank_receiver2);
  123. info!("calling handle_new_root()...");
  124. // Mostly copied from: test_handle_new_root()
  125. {
  126. let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new((root, root_hash));
  127. let mut progress = ProgressMap::default();
  128. for i in genesis..=root {
  129. progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
  130. }
  131. let duplicate_slots_tracker: DuplicateSlotsTracker =
  132. vec![root - 1, root, root + 1].into_iter().collect();
  133. let duplicate_confirmed_slots: DuplicateConfirmedSlots = vec![root - 1, root, root + 1]
  134. .into_iter()
  135. .map(|s| (s, Hash::default()))
  136. .collect();
  137. let unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes =
  138. UnfrozenGossipVerifiedVoteHashes {
  139. votes_per_slot: vec![root - 1, root, root + 1]
  140. .into_iter()
  141. .map(|s| (s, HashMap::new()))
  142. .collect(),
  143. };
  144. let epoch_slots_frozen_slots: EpochSlotsFrozenSlots = vec![root - 1, root, root + 1]
  145. .into_iter()
  146. .map(|slot| (slot, Hash::default()))
  147. .collect();
  148. let mut tbft_structs = TowerBFTStructures {
  149. heaviest_subtree_fork_choice,
  150. duplicate_slots_tracker,
  151. duplicate_confirmed_slots,
  152. unfrozen_gossip_verified_vote_hashes,
  153. epoch_slots_frozen_slots,
  154. };
  155. ReplayStage::handle_new_root(
  156. root,
  157. &bank_forks,
  158. &mut progress,
  159. None, // snapshot_controller
  160. None,
  161. &mut true,
  162. &mut Vec::new(),
  163. &drop_bank_sender1,
  164. &mut tbft_structs,
  165. );
  166. }
  167. // Receive pruned banks from the above handle_new_root
  168. let pruned_banks = drop_bank_receiver1.recv().unwrap();
  169. assert_eq!(
  170. pruned_banks
  171. .iter()
  172. .map(|b| b.slot())
  173. .sorted()
  174. .collect::<Vec<_>>(),
  175. vec![genesis, pruned]
  176. );
  177. info!("sending pruned banks to DropBankService...");
  178. drop_bank_sender2.send(pruned_banks).unwrap();
  179. info!("joining the drop bank service...");
  180. drop((
  181. (drop_bank_sender1, drop_bank_receiver1),
  182. (drop_bank_sender2,),
  183. ));
  184. drop_bank_service.join().unwrap();
  185. info!("finally joined the drop bank service!");
  186. // the scheduler used by the pruned_bank have been returned now.
  187. assert_eq!(pool_raw.pooled_scheduler_count(), 1);
  188. }
  189. #[test]
  190. fn test_scheduler_producing_blocks() {
  191. agave_logger::setup();
  192. let GenesisConfigInfo {
  193. genesis_config,
  194. mint_keypair,
  195. ..
  196. } = create_genesis_config(10_000);
  197. let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
  198. let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
  199. // Setup bank_forks with block-producing unified scheduler enabled
  200. let genesis_bank = Bank::new_for_tests(&genesis_config);
  201. let bank_forks = BankForks::new_rw_arc(genesis_bank);
  202. let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
  203. let genesis_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
  204. genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
  205. let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&genesis_bank));
  206. let (
  207. exit,
  208. poh_recorder,
  209. mut poh_controller,
  210. transaction_recorder,
  211. poh_service,
  212. signal_receiver,
  213. ) = create_test_recorder(
  214. genesis_bank.clone(),
  215. blockstore.clone(),
  216. None,
  217. Some(leader_schedule_cache),
  218. );
  219. let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
  220. let channels = {
  221. let banking_tracer = BankingTracer::new_disabled();
  222. banking_tracer.create_channels(true)
  223. };
  224. ensure_banking_stage_setup(
  225. &pool,
  226. &bank_forks,
  227. &channels,
  228. &poh_recorder,
  229. transaction_recorder,
  230. BankingStage::default_num_workers(),
  231. );
  232. bank_forks.write().unwrap().install_scheduler_pool(pool);
  233. // Wait until genesis_bank reaches its tick height...
  234. while poh_recorder.read().unwrap().bank().is_some() {
  235. sleep(Duration::from_millis(100));
  236. }
  237. // Create test tx
  238. let tx = system_transaction::transfer(
  239. &mint_keypair,
  240. &solana_pubkey::new_rand(),
  241. 1,
  242. genesis_config.hash(),
  243. );
  244. let banking_packet_batch = BankingPacketBatch::new(to_packet_batches(&vec![tx.clone(); 1], 1));
  245. let tx = RuntimeTransaction::from_transaction_for_tests(tx);
  246. // Crate tpu_bank
  247. let tpu_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), 2);
  248. let tpu_bank = bank_forks
  249. .write()
  250. .unwrap()
  251. .insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank);
  252. poh_controller
  253. .set_bank_sync(tpu_bank.clone_with_scheduler())
  254. .unwrap();
  255. tpu_bank.unpause_new_block_production_scheduler();
  256. let tpu_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
  257. assert_eq!(tpu_bank.transaction_count(), 0);
  258. // Now, send transaction
  259. channels
  260. .unified_sender()
  261. .send(banking_packet_batch)
  262. .unwrap();
  263. // Wait until tpu_bank reaches its tick height...
  264. while poh_recorder.read().unwrap().bank().is_some() {
  265. sleep(Duration::from_millis(100));
  266. }
  267. assert_matches!(tpu_bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
  268. // Verify transactions are committed and poh-recorded
  269. assert_eq!(tpu_bank.transaction_count(), 1);
  270. assert_matches!(
  271. signal_receiver.into_iter().find(|(_, (entry, _))| !entry.is_tick()),
  272. Some((_, (Entry {transactions, ..}, _))) if transactions == [tx.to_versioned_transaction()]
  273. );
  274. // Stop things.
  275. exit.store(true, Ordering::Relaxed);
  276. poh_service.join().unwrap();
  277. }