unified_scheduler.rs 10 KB


  1. use {
  2. agave_banking_stage_ingress_types::BankingPacketBatch,
  3. assert_matches::assert_matches,
  4. crossbeam_channel::unbounded,
  5. itertools::Itertools,
  6. log::*,
  7. solana_core::{
  8. banking_stage::{unified_scheduler::ensure_banking_stage_setup, BankingStage},
  9. banking_trace::BankingTracer,
  10. consensus::{
  11. heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
  12. progress_map::{ForkProgress, ProgressMap},
  13. },
  14. drop_bank_service::DropBankService,
  15. replay_stage::{ReplayStage, TowerBFTStructures},
  16. unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
  17. },
  18. solana_entry::entry::Entry,
  19. solana_hash::Hash,
  20. solana_ledger::{
  21. blockstore::Blockstore, create_new_tmp_ledger_auto_delete,
  22. genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache,
  23. },
  24. solana_perf::packet::to_packet_batches,
  25. solana_poh::poh_recorder::create_test_recorder,
  26. solana_pubkey::Pubkey,
  27. solana_runtime::{
  28. bank::Bank, bank_forks::BankForks, genesis_utils::GenesisConfigInfo,
  29. installed_scheduler_pool::SchedulingContext,
  30. prioritization_fee_cache::PrioritizationFeeCache,
  31. },
  32. solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
  33. solana_system_transaction as system_transaction,
  34. solana_timings::ExecuteTimings,
  35. solana_transaction_error::TransactionResult as Result,
  36. solana_unified_scheduler_logic::{SchedulingMode, Task},
  37. solana_unified_scheduler_pool::{
  38. DefaultSchedulerPool, DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool,
  39. TaskHandler,
  40. },
  41. std::{
  42. collections::HashMap,
  43. sync::{atomic::Ordering, Arc, Mutex},
  44. thread::sleep,
  45. time::Duration,
  46. },
  47. };
  48. #[test]
  49. fn test_scheduler_waited_by_drop_bank_service() {
  50. solana_logger::setup();
  51. static LOCK_TO_STALL: Mutex<()> = Mutex::new(());
  52. #[derive(Debug)]
  53. struct StallingHandler;
  54. impl TaskHandler for StallingHandler {
  55. fn handle(
  56. result: &mut Result<()>,
  57. timings: &mut ExecuteTimings,
  58. scheduling_context: &SchedulingContext,
  59. task: &Task,
  60. handler_context: &HandlerContext,
  61. ) {
  62. info!("Stalling at StallingHandler::handle()...");
  63. *LOCK_TO_STALL.lock().unwrap();
  64. // Wait a bit for the replay stage to prune banks
  65. std::thread::sleep(std::time::Duration::from_secs(3));
  66. info!("Now entering into DefaultTaskHandler::handle()...");
  67. DefaultTaskHandler::handle(result, timings, scheduling_context, task, handler_context);
  68. }
  69. }
  70. let GenesisConfigInfo {
  71. genesis_config,
  72. mint_keypair,
  73. ..
  74. } = create_genesis_config(10_000);
  75. // Setup bankforks with unified scheduler enabled
  76. let genesis_bank = Bank::new_for_tests(&genesis_config);
  77. let bank_forks = BankForks::new_rw_arc(genesis_bank);
  78. let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
  79. let pool_raw = SchedulerPool::<PooledScheduler<StallingHandler>, _>::new(
  80. None,
  81. None,
  82. None,
  83. None,
  84. ignored_prioritization_fee_cache,
  85. );
  86. let pool = pool_raw.clone();
  87. bank_forks.write().unwrap().install_scheduler_pool(pool);
  88. let genesis = 0;
  89. let genesis_bank = &bank_forks.read().unwrap().get(genesis).unwrap();
  90. genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
  91. // Create bank, which is pruned later
  92. let pruned = 2;
  93. let pruned_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), pruned);
  94. let pruned_bank = bank_forks.write().unwrap().insert(pruned_bank);
  95. // Create new root bank
  96. let root = 3;
  97. let root_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), root);
  98. root_bank.freeze();
  99. let root_hash = root_bank.hash();
  100. bank_forks.write().unwrap().insert(root_bank);
  101. let tx = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
  102. &mint_keypair,
  103. &solana_pubkey::new_rand(),
  104. 2,
  105. genesis_config.hash(),
  106. ));
  107. // Delay transaction execution to ensure transaction execution happens after termintion has
  108. // been started
  109. let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
  110. pruned_bank
  111. .schedule_transaction_executions([(tx, 0)].into_iter())
  112. .unwrap();
  113. drop(pruned_bank);
  114. assert_eq!(pool_raw.pooled_scheduler_count(), 0);
  115. drop(lock_to_stall);
  116. // Create 2 channels to check actual pruned banks
  117. let (drop_bank_sender1, drop_bank_receiver1) = unbounded();
  118. let (drop_bank_sender2, drop_bank_receiver2) = unbounded();
  119. let drop_bank_service = DropBankService::new(drop_bank_receiver2);
  120. info!("calling handle_new_root()...");
  121. // Mostly copied from: test_handle_new_root()
  122. {
  123. let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new((root, root_hash));
  124. let mut progress = ProgressMap::default();
  125. for i in genesis..=root {
  126. progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
  127. }
  128. let duplicate_slots_tracker = vec![root - 1, root, root + 1].into_iter().collect();
  129. let duplicate_confirmed_slots = vec![root - 1, root, root + 1]
  130. .into_iter()
  131. .map(|s| (s, Hash::default()))
  132. .collect();
  133. let unfrozen_gossip_verified_vote_hashes = UnfrozenGossipVerifiedVoteHashes {
  134. votes_per_slot: vec![root - 1, root, root + 1]
  135. .into_iter()
  136. .map(|s| (s, HashMap::new()))
  137. .collect(),
  138. };
  139. let epoch_slots_frozen_slots = vec![root - 1, root, root + 1]
  140. .into_iter()
  141. .map(|slot| (slot, Hash::default()))
  142. .collect();
  143. let mut tbft_structs = TowerBFTStructures {
  144. heaviest_subtree_fork_choice,
  145. duplicate_slots_tracker,
  146. duplicate_confirmed_slots,
  147. unfrozen_gossip_verified_vote_hashes,
  148. epoch_slots_frozen_slots,
  149. };
  150. ReplayStage::handle_new_root(
  151. root,
  152. &bank_forks,
  153. &mut progress,
  154. None, // snapshot_controller
  155. None,
  156. &mut true,
  157. &drop_bank_sender1,
  158. &mut tbft_structs,
  159. )
  160. .unwrap();
  161. }
  162. // Receive pruned banks from the above handle_new_root
  163. let pruned_banks = drop_bank_receiver1.recv().unwrap();
  164. assert_eq!(
  165. pruned_banks
  166. .iter()
  167. .map(|b| b.slot())
  168. .sorted()
  169. .collect::<Vec<_>>(),
  170. vec![genesis, pruned]
  171. );
  172. info!("sending pruned banks to DropBankService...");
  173. drop_bank_sender2.send(pruned_banks).unwrap();
  174. info!("joining the drop bank service...");
  175. drop((
  176. (drop_bank_sender1, drop_bank_receiver1),
  177. (drop_bank_sender2,),
  178. ));
  179. drop_bank_service.join().unwrap();
  180. info!("finally joined the drop bank service!");
  181. // the scheduler used by the pruned_bank have been returned now.
  182. assert_eq!(pool_raw.pooled_scheduler_count(), 1);
  183. }
  184. #[test]
  185. fn test_scheduler_producing_blocks() {
  186. solana_logger::setup();
  187. let GenesisConfigInfo {
  188. genesis_config,
  189. mint_keypair,
  190. ..
  191. } = create_genesis_config(10_000);
  192. let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
  193. let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
  194. // Setup bank_forks with block-producing unified scheduler enabled
  195. let genesis_bank = Bank::new_for_tests(&genesis_config);
  196. let bank_forks = BankForks::new_rw_arc(genesis_bank);
  197. let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
  198. let genesis_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
  199. genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
  200. let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&genesis_bank));
  201. let (exit, poh_recorder, transaction_recorder, poh_service, signal_receiver) =
  202. create_test_recorder(
  203. genesis_bank.clone(),
  204. blockstore.clone(),
  205. None,
  206. Some(leader_schedule_cache),
  207. );
  208. let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
  209. let channels = {
  210. let banking_tracer = BankingTracer::new_disabled();
  211. banking_tracer.create_channels(true)
  212. };
  213. ensure_banking_stage_setup(
  214. &pool,
  215. &bank_forks,
  216. &channels,
  217. &poh_recorder,
  218. transaction_recorder,
  219. BankingStage::num_threads(),
  220. );
  221. bank_forks.write().unwrap().install_scheduler_pool(pool);
  222. // Wait until genesis_bank reaches its tick height...
  223. while poh_recorder.read().unwrap().bank().is_some() {
  224. sleep(Duration::from_millis(100));
  225. }
  226. // Create test tx
  227. let tx = system_transaction::transfer(
  228. &mint_keypair,
  229. &solana_pubkey::new_rand(),
  230. 1,
  231. genesis_config.hash(),
  232. );
  233. let banking_packet_batch = BankingPacketBatch::new(to_packet_batches(&vec![tx.clone(); 1], 1));
  234. let tx = RuntimeTransaction::from_transaction_for_tests(tx);
  235. // Crate tpu_bank
  236. let tpu_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), 2);
  237. let tpu_bank = bank_forks
  238. .write()
  239. .unwrap()
  240. .insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank);
  241. poh_recorder
  242. .write()
  243. .unwrap()
  244. .set_bank(tpu_bank.clone_with_scheduler());
  245. tpu_bank.unpause_new_block_production_scheduler();
  246. let tpu_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
  247. assert_eq!(tpu_bank.transaction_count(), 0);
  248. // Now, send transaction
  249. channels
  250. .unified_sender()
  251. .send(banking_packet_batch)
  252. .unwrap();
  253. // Wait until tpu_bank reaches its tick height...
  254. while poh_recorder.read().unwrap().bank().is_some() {
  255. sleep(Duration::from_millis(100));
  256. }
  257. assert_matches!(tpu_bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
  258. // Verify transactions are committed and poh-recorded
  259. assert_eq!(tpu_bank.transaction_count(), 1);
  260. assert_matches!(
  261. signal_receiver.into_iter().find(|(_, (entry, _))| !entry.is_tick()),
  262. Some((_, (Entry {transactions, ..}, _))) if transactions == [tx.to_versioned_transaction()]
  263. );
  264. // Stop things.
  265. exit.store(true, Ordering::Relaxed);
  266. poh_service.join().unwrap();
  267. }