poh_recorder.rs 84 KB


  1. //! The `poh_recorder` module provides an object for synchronizing with Proof of History.
  2. //! It synchronizes PoH, bank's register_tick and the ledger
  3. //!
  4. //! PohRecorder will send ticks or entries to a WorkingBank, if the current range of ticks is
  5. //! within the specified WorkingBank range.
  6. //!
  7. //! For Ticks:
  8. //! * new tick_height must be > WorkingBank::min_tick_height && new tick_height must be <= WorkingBank::max_tick_height
  9. //!
  10. //! For Entries:
  11. //! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::max_tick_height
  12. //!
  13. #[cfg(feature = "dev-context-only-utils")]
  14. use qualifier_attr::qualifiers;
  15. use {
  16. crate::{
  17. poh_controller::PohController, poh_service::PohService, record_channels::record_channels,
  18. transaction_recorder::TransactionRecorder,
  19. },
  20. arc_swap::ArcSwap,
  21. crossbeam_channel::{unbounded, Receiver, SendError, Sender, TrySendError},
  22. log::*,
  23. solana_clock::{BankId, Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
  24. solana_entry::{
  25. entry::Entry,
  26. poh::{Poh, PohEntry},
  27. },
  28. solana_hash::Hash,
  29. solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
  30. solana_measure::measure_us,
  31. solana_poh_config::PohConfig,
  32. solana_pubkey::Pubkey,
  33. solana_runtime::{bank::Bank, installed_scheduler_pool::BankWithScheduler},
  34. solana_transaction::versioned::VersionedTransaction,
  35. std::{
  36. cmp,
  37. sync::{
  38. atomic::{AtomicBool, AtomicU64, Ordering},
  39. Arc, Mutex, RwLock,
  40. },
  41. time::Instant,
  42. },
  43. thiserror::Error,
  44. };
  45. pub const GRACE_TICKS_FACTOR: u64 = 2;
  46. pub const MAX_GRACE_SLOTS: u64 = 2;
  47. #[derive(Error, Debug, Clone)]
  48. pub enum PohRecorderError {
  49. #[error("max height reached")]
  50. MaxHeightReached,
  51. #[error("min height not reached")]
  52. MinHeightNotReached,
  53. #[error("send WorkingBankEntry error")]
  54. SendError(#[from] SendError<WorkingBankEntry>),
  55. #[error("channel full")]
  56. ChannelFull,
  57. #[error("channel disconnected")]
  58. ChannelDisconnected,
  59. }
  60. pub(crate) type Result<T> = std::result::Result<T, PohRecorderError>;
  61. pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
  62. #[derive(Debug)]
  63. pub struct RecordSummary {
  64. pub remaining_hashes_in_slot: u64,
  65. }
  66. pub struct Record {
  67. pub mixins: Vec<Hash>,
  68. pub transaction_batches: Vec<Vec<VersionedTransaction>>,
  69. pub bank_id: BankId,
  70. }
  71. impl Record {
  72. pub fn new(
  73. mixins: Vec<Hash>,
  74. transaction_batches: Vec<Vec<VersionedTransaction>>,
  75. bank_id: BankId,
  76. ) -> Self {
  77. Self {
  78. mixins,
  79. transaction_batches,
  80. bank_id,
  81. }
  82. }
  83. }
  84. pub struct WorkingBank {
  85. pub bank: BankWithScheduler,
  86. pub start: Arc<Instant>,
  87. pub min_tick_height: u64,
  88. pub max_tick_height: u64,
  89. }
  90. #[derive(Debug, PartialEq, Eq)]
  91. pub enum PohLeaderStatus {
  92. NotReached,
  93. Reached { poh_slot: Slot, parent_slot: Slot },
  94. }
  95. struct PohRecorderMetrics {
  96. flush_cache_tick_us: u64,
  97. flush_cache_no_tick_us: u64,
  98. record_us: u64,
  99. record_lock_contention_us: u64,
  100. report_metrics_us: u64,
  101. send_entry_us: u64,
  102. tick_lock_contention_us: u64,
  103. ticks_from_record: u64,
  104. total_sleep_us: u64,
  105. last_metric: Instant,
  106. }
  107. impl Default for PohRecorderMetrics {
  108. fn default() -> Self {
  109. Self {
  110. flush_cache_tick_us: 0,
  111. flush_cache_no_tick_us: 0,
  112. record_us: 0,
  113. record_lock_contention_us: 0,
  114. report_metrics_us: 0,
  115. send_entry_us: 0,
  116. tick_lock_contention_us: 0,
  117. ticks_from_record: 0,
  118. total_sleep_us: 0,
  119. last_metric: Instant::now(),
  120. }
  121. }
  122. }
  123. impl PohRecorderMetrics {
  124. fn report(&mut self, bank_slot: Slot) {
  125. if self.last_metric.elapsed().as_millis() > 1000 {
  126. datapoint_info!(
  127. "poh_recorder",
  128. ("slot", bank_slot, i64),
  129. ("flush_cache_tick_us", self.flush_cache_tick_us, i64),
  130. ("flush_cache_no_tick_us", self.flush_cache_no_tick_us, i64),
  131. ("record_us", self.record_us, i64),
  132. (
  133. "record_lock_contention_us",
  134. self.record_lock_contention_us,
  135. i64
  136. ),
  137. ("report_metrics_us", self.report_metrics_us, i64),
  138. ("send_entry_us", self.send_entry_us, i64),
  139. ("tick_lock_contention", self.tick_lock_contention_us, i64),
  140. ("ticks_from_record", self.ticks_from_record, i64),
  141. ("total_sleep_us", self.total_sleep_us, i64),
  142. );
  143. *self = Self::default();
  144. }
  145. }
  146. }
  147. pub struct PohRecorder {
  148. pub(crate) poh: Arc<Mutex<Poh>>,
  149. clear_bank_signal: Option<Sender<bool>>,
  150. start_bank: Arc<Bank>, // parent slot
  151. start_bank_active_descendants: Vec<Slot>,
  152. start_tick_height: u64, // first tick_height this recorder will observe
  153. tick_cache: Vec<(Entry, u64)>, // cache of entry and its tick_height
  154. /// This stores the current working bank + scheduler and other metadata,
  155. /// if they exist.
  156. /// This field MUST be kept consistent with the `shared_leader_state` field.
  157. working_bank: Option<WorkingBank>,
  158. shared_leader_state: SharedLeaderState,
  159. working_bank_sender: Sender<WorkingBankEntry>,
  160. leader_last_tick_height: u64, // zero if none
  161. grace_ticks: u64,
  162. blockstore: Arc<Blockstore>,
  163. leader_schedule_cache: Arc<LeaderScheduleCache>,
  164. ticks_per_slot: u64,
  165. metrics: PohRecorderMetrics,
  166. delay_leader_block_for_pending_fork: bool,
  167. last_reported_slot_for_pending_fork: Arc<Mutex<Slot>>,
  168. pub is_exited: Arc<AtomicBool>,
  169. // Allocation to hold PohEntrys recorded into PoHStream.
  170. entries: Vec<PohEntry>,
  171. // Alpenglow related migration things
  172. pub is_alpenglow_enabled: bool,
  173. /// When alpenglow is enabled there will be no ticks apart from a final one
  174. /// to complete the block. This tick will not be verified, and we use this
  175. /// flag to unset hashes_per_tick
  176. alpenglow_enabled: bool,
  177. }
  178. impl PohRecorder {
  179. /// A recorder to synchronize PoH with the following data structures
  180. /// * bank - the LastId's queue is updated on `tick` and `record` events
  181. #[allow(clippy::too_many_arguments)]
  182. pub fn new(
  183. tick_height: u64,
  184. last_entry_hash: Hash,
  185. start_bank: Arc<Bank>,
  186. next_leader_slot: Option<(Slot, Slot)>,
  187. ticks_per_slot: u64,
  188. blockstore: Arc<Blockstore>,
  189. leader_schedule_cache: &Arc<LeaderScheduleCache>,
  190. poh_config: &PohConfig,
  191. is_exited: Arc<AtomicBool>,
  192. ) -> (Self, Receiver<WorkingBankEntry>) {
  193. let delay_leader_block_for_pending_fork = false;
  194. Self::new_with_clear_signal(
  195. tick_height,
  196. last_entry_hash,
  197. start_bank,
  198. next_leader_slot,
  199. ticks_per_slot,
  200. delay_leader_block_for_pending_fork,
  201. blockstore,
  202. None,
  203. leader_schedule_cache,
  204. poh_config,
  205. is_exited,
  206. )
  207. }
  208. #[allow(clippy::too_many_arguments)]
  209. pub fn new_with_clear_signal(
  210. tick_height: u64,
  211. last_entry_hash: Hash,
  212. start_bank: Arc<Bank>,
  213. next_leader_slot: Option<(Slot, Slot)>,
  214. ticks_per_slot: u64,
  215. delay_leader_block_for_pending_fork: bool,
  216. blockstore: Arc<Blockstore>,
  217. clear_bank_signal: Option<Sender<bool>>,
  218. leader_schedule_cache: &Arc<LeaderScheduleCache>,
  219. poh_config: &PohConfig,
  220. is_exited: Arc<AtomicBool>,
  221. ) -> (Self, Receiver<WorkingBankEntry>) {
  222. let tick_number = 0;
  223. let poh = Arc::new(Mutex::new(Poh::new_with_slot_info(
  224. last_entry_hash,
  225. poh_config.hashes_per_tick,
  226. tick_number,
  227. )));
  228. let (working_bank_sender, working_bank_receiver) = unbounded();
  229. let (leader_first_tick_height, leader_last_tick_height, grace_ticks) =
  230. Self::compute_leader_slot_tick_heights(next_leader_slot, ticks_per_slot);
  231. (
  232. Self {
  233. poh,
  234. tick_cache: vec![],
  235. working_bank: None,
  236. shared_leader_state: SharedLeaderState::new(
  237. tick_height,
  238. leader_first_tick_height,
  239. next_leader_slot,
  240. ),
  241. working_bank_sender,
  242. clear_bank_signal,
  243. start_bank,
  244. start_bank_active_descendants: vec![],
  245. start_tick_height: tick_height + 1,
  246. leader_last_tick_height,
  247. grace_ticks,
  248. blockstore,
  249. leader_schedule_cache: leader_schedule_cache.clone(),
  250. ticks_per_slot,
  251. metrics: PohRecorderMetrics::default(),
  252. delay_leader_block_for_pending_fork,
  253. last_reported_slot_for_pending_fork: Arc::default(),
  254. is_exited,
  255. entries: Vec::with_capacity(64),
  256. is_alpenglow_enabled: false,
  257. alpenglow_enabled: false,
  258. },
  259. working_bank_receiver,
  260. )
  261. }
  262. // synchronize PoH with a bank
  263. pub fn reset(&mut self, reset_bank: Arc<Bank>, next_leader_slot: Option<(Slot, Slot)>) {
  264. self.clear_bank(false);
  265. let tick_height = self.reset_poh(reset_bank, true);
  266. let (leader_first_tick_height, leader_last_tick_height, grace_ticks) =
  267. Self::compute_leader_slot_tick_heights(next_leader_slot, self.ticks_per_slot);
  268. self.grace_ticks = grace_ticks;
  269. // Above call to `clear_bank` did not set the shared state,
  270. // nor did `reset_poh` update the tick_height.
  271. // Do the atomic swap of state here to reflect the reset.
  272. self.shared_leader_state.store(Arc::new(LeaderState::new(
  273. None,
  274. tick_height,
  275. leader_first_tick_height,
  276. next_leader_slot,
  277. )));
  278. self.leader_last_tick_height = leader_last_tick_height;
  279. }
  280. // Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank
  281. pub fn record(
  282. &mut self,
  283. bank_id: BankId,
  284. mixins: Vec<Hash>,
  285. transaction_batches: Vec<Vec<VersionedTransaction>>,
  286. ) -> Result<RecordSummary> {
  287. // Entries without transactions are used to track real-time passing in the ledger and
  288. // cannot be generated by `record()`
  289. assert!(
  290. mixins.len() == transaction_batches.len(),
  291. "mismatched mixin and transaction batch lengths"
  292. );
  293. assert!(
  294. !transaction_batches.iter().any(|batch| batch.is_empty()),
  295. "No transactions provided"
  296. );
  297. if let Some(working_bank) = self.working_bank.as_ref() {
  298. let ((), report_metrics_us) =
  299. measure_us!(self.metrics.report(working_bank.bank.slot()));
  300. self.metrics.report_metrics_us += report_metrics_us;
  301. }
  302. loop {
  303. let (flush_cache_res, flush_cache_us) = measure_us!(self.flush_cache(false));
  304. self.metrics.flush_cache_no_tick_us += flush_cache_us;
  305. flush_cache_res?;
  306. let tick_height = self.tick_height(); // cannot change until next loop iteration.
  307. let working_bank = self
  308. .working_bank
  309. .as_mut()
  310. .ok_or(PohRecorderError::MaxHeightReached)?;
  311. if bank_id != working_bank.bank.bank_id() {
  312. return Err(PohRecorderError::MaxHeightReached);
  313. }
  314. let (mut poh_lock, poh_lock_us) = measure_us!(self.poh.lock().unwrap());
  315. self.metrics.record_lock_contention_us += poh_lock_us;
  316. let (mixed_in, record_mixin_us) =
  317. measure_us!(poh_lock.record_batches(&mixins, &mut self.entries));
  318. self.metrics.record_us += record_mixin_us;
  319. let remaining_hashes_in_slot =
  320. poh_lock.remaining_hashes_in_slot(working_bank.bank.ticks_per_slot());
  321. drop(poh_lock);
  322. if mixed_in {
  323. debug_assert_eq!(self.entries.len(), mixins.len());
  324. for (entry, transactions) in self.entries.drain(..).zip(transaction_batches) {
  325. let (send_entry_res, send_batches_us) =
  326. measure_us!(self.working_bank_sender.send((
  327. working_bank.bank.clone(),
  328. (
  329. Entry {
  330. num_hashes: entry.num_hashes,
  331. hash: entry.hash,
  332. transactions,
  333. },
  334. tick_height, // `record_batches` guarantees that mixins are **not** split across ticks.
  335. ),
  336. )));
  337. self.metrics.send_entry_us += send_batches_us;
  338. send_entry_res?;
  339. }
  340. return Ok(RecordSummary {
  341. remaining_hashes_in_slot,
  342. });
  343. }
  344. // record() might fail if the next PoH hash needs to be a tick. But that's ok, tick()
  345. // and re-record()
  346. self.metrics.ticks_from_record += 1;
  347. self.tick();
  348. }
  349. }
  350. #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
  351. pub(crate) fn tick(&mut self) {
  352. let (poh_entry, tick_lock_contention_us) = measure_us!({
  353. let mut poh_l = self.poh.lock().unwrap();
  354. poh_l.tick()
  355. });
  356. self.metrics.tick_lock_contention_us += tick_lock_contention_us;
  357. if let Some(poh_entry) = poh_entry {
  358. self.shared_leader_state.increment_tick_height();
  359. trace!("tick_height {}", self.tick_height());
  360. if self
  361. .shared_leader_state
  362. .load()
  363. .leader_first_tick_height
  364. .is_none()
  365. {
  366. return;
  367. }
  368. self.tick_cache.push((
  369. Entry {
  370. num_hashes: poh_entry.num_hashes,
  371. hash: poh_entry.hash,
  372. transactions: vec![],
  373. },
  374. self.tick_height(),
  375. ));
  376. let (_flush_res, flush_cache_and_tick_us) = measure_us!(self.flush_cache(true));
  377. self.metrics.flush_cache_tick_us += flush_cache_and_tick_us;
  378. }
  379. }
  380. pub fn set_bank(&mut self, bank: BankWithScheduler) {
  381. assert!(self.working_bank.is_none());
  382. let working_bank = WorkingBank {
  383. min_tick_height: bank.tick_height(),
  384. max_tick_height: bank.max_tick_height(),
  385. bank,
  386. start: Arc::new(Instant::now()),
  387. };
  388. trace!("new working bank");
  389. assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot());
  390. let mut tick_height = self.tick_height();
  391. if let Some(hashes_per_tick) = *working_bank.bank.hashes_per_tick() {
  392. if self.poh.lock().unwrap().hashes_per_tick() != hashes_per_tick {
  393. // We must clear/reset poh when changing hashes per tick because it's
  394. // possible there are ticks in the cache created with the old hashes per
  395. // tick value that would get flushed later. This would corrupt the leader's
  396. // block and it would be disregarded by the network.
  397. info!(
  398. "resetting poh due to hashes per tick change detected at {}",
  399. working_bank.bank.slot()
  400. );
  401. tick_height = self.reset_poh(working_bank.bank.clone(), false);
  402. }
  403. }
  404. let leader_state = self.shared_leader_state.load();
  405. let leader_first_tick_height = leader_state.leader_first_tick_height();
  406. let next_leader_slot = leader_state.next_leader_slot_range();
  407. drop(leader_state);
  408. self.shared_leader_state.store(Arc::new(LeaderState::new(
  409. Some(working_bank.bank.clone_without_scheduler()),
  410. tick_height,
  411. leader_first_tick_height,
  412. next_leader_slot,
  413. )));
  414. self.working_bank = Some(working_bank);
  415. // TODO: adjust the working_bank.start time based on number of ticks
  416. // that have already elapsed based on current tick height.
  417. let _ = self.flush_cache(false);
  418. }
  419. /// Clears the working bank.
  420. /// Updates [`Self::shared_leader_state`] if `set_shared_state` is true.
  421. /// Otherwise the caller is responsible for setting the state before
  422. /// releasing the lock.
  423. fn clear_bank(&mut self, set_shared_state: bool) {
  424. if let Some(WorkingBank { bank, start, .. }) = self.working_bank.take() {
  425. let next_leader_slot = self.leader_schedule_cache.next_leader_slot(
  426. bank.collector_id(),
  427. bank.slot(),
  428. &bank,
  429. Some(&self.blockstore),
  430. GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS,
  431. );
  432. assert_eq!(self.ticks_per_slot, bank.ticks_per_slot());
  433. let (leader_first_tick_height, leader_last_tick_height, grace_ticks) =
  434. Self::compute_leader_slot_tick_heights(next_leader_slot, self.ticks_per_slot);
  435. self.grace_ticks = grace_ticks;
  436. self.leader_last_tick_height = leader_last_tick_height;
  437. // Only update if `set_shared_state` is true.
  438. // If `false` it is the caller's responsibility to set the shared state.
  439. if set_shared_state {
  440. self.shared_leader_state.store(Arc::new(LeaderState::new(
  441. None,
  442. self.tick_height(),
  443. leader_first_tick_height,
  444. next_leader_slot,
  445. )));
  446. }
  447. datapoint_info!(
  448. "leader-slot-start-to-cleared-elapsed-ms",
  449. ("slot", bank.slot(), i64),
  450. ("elapsed", start.elapsed().as_millis(), i64),
  451. );
  452. }
  453. if let Some(ref signal) = self.clear_bank_signal {
  454. match signal.try_send(true) {
  455. Ok(_) => {}
  456. Err(TrySendError::Full(_)) => {
  457. trace!("replay wake up signal channel is full.")
  458. }
  459. Err(TrySendError::Disconnected(_)) => {
  460. trace!("replay wake up signal channel is disconnected.")
  461. }
  462. }
  463. }
  464. }
  465. /// Returns tick_height - does not update the internal state for tick_height.
  466. #[must_use]
  467. fn reset_poh(&mut self, reset_bank: Arc<Bank>, reset_start_bank: bool) -> u64 {
  468. let blockhash = reset_bank.last_blockhash();
  469. let hashes_per_tick = if self.alpenglow_enabled {
  470. None
  471. } else {
  472. *reset_bank.hashes_per_tick()
  473. };
  474. let poh_hash = {
  475. let mut poh = self.poh.lock().unwrap();
  476. poh.reset(blockhash, hashes_per_tick);
  477. poh.hash
  478. };
  479. info!(
  480. "reset poh from: {},{},{} to: {},{}",
  481. poh_hash,
  482. self.tick_height(),
  483. self.start_slot(),
  484. blockhash,
  485. reset_bank.slot()
  486. );
  487. self.tick_cache = vec![];
  488. if reset_start_bank {
  489. self.start_bank = reset_bank;
  490. self.start_bank_active_descendants = vec![];
  491. }
  492. let tick_height = (self.start_slot() + 1) * self.ticks_per_slot;
  493. self.start_tick_height = tick_height + 1;
  494. tick_height
  495. }
  496. // Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height
  497. // On a record flush will flush the cache at the WorkingBank::min_tick_height, since a record
  498. // occurs after the min_tick_height was generated
  499. fn flush_cache(&mut self, tick: bool) -> Result<()> {
  500. // check_tick_height is called before flush cache, so it cannot overrun the bank
  501. // so a bank that is so late that it's slot fully generated before it starts recording
  502. // will fail instead of broadcasting any ticks
  503. let working_bank = self
  504. .working_bank
  505. .as_ref()
  506. .ok_or(PohRecorderError::MaxHeightReached)?;
  507. if self.tick_height() < working_bank.min_tick_height {
  508. return Err(PohRecorderError::MinHeightNotReached);
  509. }
  510. if tick && self.tick_height() == working_bank.min_tick_height {
  511. return Err(PohRecorderError::MinHeightNotReached);
  512. }
  513. let entry_count = self
  514. .tick_cache
  515. .iter()
  516. .take_while(|x| x.1 <= working_bank.max_tick_height)
  517. .count();
  518. let mut send_result: std::result::Result<(), SendError<WorkingBankEntry>> = Ok(());
  519. if entry_count > 0 {
  520. trace!(
  521. "flush_cache: bank_slot: {} tick_height: {} max: {} sending: {}",
  522. working_bank.bank.slot(),
  523. working_bank.bank.tick_height(),
  524. working_bank.max_tick_height,
  525. entry_count,
  526. );
  527. for tick in &self.tick_cache[..entry_count] {
  528. working_bank.bank.register_tick(&tick.0.hash);
  529. send_result = self
  530. .working_bank_sender
  531. .send((working_bank.bank.clone(), tick.clone()));
  532. if send_result.is_err() {
  533. break;
  534. }
  535. }
  536. }
  537. if self.tick_height() >= working_bank.max_tick_height {
  538. info!(
  539. "poh_record: max_tick_height {} reached, clearing working_bank {}",
  540. working_bank.max_tick_height,
  541. working_bank.bank.slot()
  542. );
  543. self.start_bank = working_bank.bank.clone();
  544. let working_slot = self.start_slot();
  545. self.start_tick_height = working_slot * self.ticks_per_slot + 1;
  546. self.clear_bank(true);
  547. }
  548. if send_result.is_err() {
  549. info!("WorkingBank::sender disconnected {send_result:?}");
  550. // revert the cache, but clear the working bank
  551. self.clear_bank(true);
  552. } else {
  553. // commit the flush
  554. let _ = self.tick_cache.drain(..entry_count);
  555. }
  556. Ok(())
  557. }
  558. pub fn would_be_leader(&self, within_next_n_ticks: u64) -> bool {
  559. self.has_bank()
  560. || self
  561. .leader_first_tick_height()
  562. .is_some_and(|leader_first_tick_height| {
  563. let tick_height = self.tick_height();
  564. tick_height + within_next_n_ticks >= leader_first_tick_height
  565. && tick_height <= self.leader_last_tick_height
  566. })
  567. }
  568. // Return the slot for a given tick height
  569. fn slot_for_tick_height(&self, tick_height: u64) -> Slot {
  570. // We need to subtract by one here because, assuming ticks per slot is 64,
  571. // tick heights [1..64] correspond to slot 0. The last tick height of a slot
  572. // is always a multiple of 64.
  573. tick_height.saturating_sub(1) / self.ticks_per_slot
  574. }
  575. /// Return the slot that PoH is currently ticking through.
  576. fn current_poh_slot(&self) -> Slot {
  577. // The tick_height field is initialized to the last tick of the start
  578. // bank and generally indicates what tick height has already been
  579. // reached so use the next tick height to determine which slot poh is
  580. // ticking through.
  581. let next_tick_height = self.tick_height().saturating_add(1);
  582. self.slot_for_tick_height(next_tick_height)
  583. }
  584. pub fn leader_after_n_slots(&self, slots: u64) -> Option<Pubkey> {
  585. self.leader_schedule_cache
  586. .slot_leader_at(self.current_poh_slot() + slots, None)
  587. }
  588. /// Return the leader and slot pair after `slots_in_the_future` slots.
  589. pub fn leader_and_slot_after_n_slots(
  590. &self,
  591. slots_in_the_future: u64,
  592. ) -> Option<(Pubkey, Slot)> {
  593. let target_slot = self.current_poh_slot().checked_add(slots_in_the_future)?;
  594. self.leader_schedule_cache
  595. .slot_leader_at(target_slot, None)
  596. .map(|leader| (leader, target_slot))
  597. }
  598. pub fn shared_leader_state(&self) -> SharedLeaderState {
  599. self.shared_leader_state.clone()
  600. }
  601. pub fn bank(&self) -> Option<Arc<Bank>> {
  602. self.working_bank.as_ref().map(|w| w.bank.clone())
  603. }
  604. pub fn has_bank(&self) -> bool {
  605. self.working_bank.is_some()
  606. }
  607. pub fn tick_height(&self) -> u64 {
  608. self.shared_leader_state.load().tick_height()
  609. }
  610. fn leader_first_tick_height(&self) -> Option<u64> {
  611. self.shared_leader_state.load().leader_first_tick_height()
  612. }
  613. pub fn ticks_per_slot(&self) -> u64 {
  614. self.ticks_per_slot
  615. }
  616. pub fn start_slot(&self) -> Slot {
  617. self.start_bank.slot()
  618. }
  619. /// Returns if the leader slot has been reached along with the current poh
  620. /// slot and the parent slot (could be a few slots ago if any previous
  621. /// leaders needed to be skipped).
  622. pub fn reached_leader_slot(&self, my_pubkey: &Pubkey) -> PohLeaderStatus {
  623. trace!(
  624. "tick_height {}, start_tick_height {}, leader_first_tick_height {:?}, grace_ticks {}, \
  625. has_bank {}",
  626. self.tick_height(),
  627. self.start_tick_height,
  628. self.leader_first_tick_height(),
  629. self.grace_ticks,
  630. self.has_bank()
  631. );
  632. let current_poh_slot = self.current_poh_slot();
  633. let Some(leader_first_tick_height) = self.leader_first_tick_height() else {
  634. // No next leader slot, so no leader slot has been reached.
  635. return PohLeaderStatus::NotReached;
  636. };
  637. if !self.reached_leader_tick(my_pubkey, leader_first_tick_height) {
  638. // PoH hasn't ticked far enough yet.
  639. return PohLeaderStatus::NotReached;
  640. }
  641. if self
  642. .blockstore
  643. .has_existing_shreds_for_slot(current_poh_slot)
  644. {
  645. // We already have existing shreds for this slot. This can happen when this block was previously
  646. // created and added to BankForks, however a recent PoH reset caused this bank to be removed
  647. // as it was not part of the rooted fork. If this slot is not the first slot for this leader,
  648. // and the first slot was previously ticked over, the check in `leader_schedule_cache::next_leader_slot`
  649. // will not suffice, as it only checks if there are shreds for the first slot.
  650. return PohLeaderStatus::NotReached;
  651. }
  652. let poh_slot = current_poh_slot;
  653. let parent_slot = self.start_slot();
  654. PohLeaderStatus::Reached {
  655. poh_slot,
  656. parent_slot,
  657. }
  658. }
  659. fn reached_leader_tick(&self, my_pubkey: &Pubkey, leader_first_tick_height: u64) -> bool {
  660. if self.start_tick_height == leader_first_tick_height {
  661. // PoH was reset to run immediately.
  662. return true;
  663. }
  664. let ideal_target_tick_height = leader_first_tick_height.saturating_sub(1);
  665. if self.tick_height() < ideal_target_tick_height {
  666. // We haven't ticked to our leader slot yet.
  667. return false;
  668. }
  669. if self.tick_height() >= ideal_target_tick_height.saturating_add(self.grace_ticks) {
  670. // We have finished waiting for grace ticks.
  671. return true;
  672. }
  673. // We're in the grace tick zone. Check if we can skip grace ticks.
  674. let next_leader_slot = self.current_poh_slot();
  675. self.can_skip_grace_ticks(my_pubkey, next_leader_slot)
  676. }
  677. fn can_skip_grace_ticks(&self, my_pubkey: &Pubkey, next_leader_slot: Slot) -> bool {
  678. if self.start_slot_was_mine(my_pubkey) {
  679. // Building off my own block. No need to wait.
  680. return true;
  681. }
  682. if self.start_slot_was_mine_or_previous_leader(next_leader_slot) {
  683. // Planning to build off block produced by the leader previous to
  684. // me. Check if they've completed all of their slots.
  685. return self.building_off_previous_leader_last_block(my_pubkey, next_leader_slot);
  686. }
  687. if !self.is_new_reset_bank_pending(next_leader_slot) {
  688. // No pending blocks from previous leader have been observed. No
  689. // need to wait.
  690. return true;
  691. }
  692. self.report_pending_fork_was_detected(next_leader_slot);
  693. if !self.delay_leader_block_for_pending_fork {
  694. // Not configured to wait for pending blocks from previous leader.
  695. return true;
  696. }
  697. // Wait for grace ticks
  698. false
  699. }
  700. fn start_slot_was_mine_or_previous_leader(&self, next_leader_slot: Slot) -> bool {
  701. (next_leader_slot.saturating_sub(NUM_CONSECUTIVE_LEADER_SLOTS)..next_leader_slot).any(
  702. |slot| {
  703. // Check if the last slot PoH reset to was any of the
  704. // previous leader's slots.
  705. // If so, PoH is currently building on the previous leader's blocks
  706. // If not, PoH is building on a different fork
  707. slot == self.start_slot()
  708. },
  709. )
  710. }
  711. // Check if the last slot PoH reset onto was the previous leader's last slot.
  712. fn building_off_previous_leader_last_block(
  713. &self,
  714. my_pubkey: &Pubkey,
  715. next_leader_slot: Slot,
  716. ) -> bool {
  717. // Walk backwards from the slot before our next leader slot.
  718. for slot in
  719. (next_leader_slot.saturating_sub(NUM_CONSECUTIVE_LEADER_SLOTS)..next_leader_slot).rev()
  720. {
  721. // Identify which leader is responsible for building this slot.
  722. let leader_for_slot = self.leader_schedule_cache.slot_leader_at(slot, None);
  723. let Some(leader_for_slot) = leader_for_slot else {
  724. // No leader for this slot, skip
  725. continue;
  726. };
  727. // If the leader for this slot is not me, then it's the previous
  728. // leader's last slot.
  729. if leader_for_slot != *my_pubkey {
  730. // Check if the last slot PoH reset onto was the previous leader's last slot.
  731. return slot == self.start_slot();
  732. }
  733. }
  734. false
  735. }
  736. fn start_slot_was_mine(&self, my_pubkey: &Pubkey) -> bool {
  737. self.start_bank.collector_id() == my_pubkey
  738. }
  739. // Active descendants of the last reset bank that are smaller than the
  740. // next leader slot could soon become the new reset bank.
  741. fn is_new_reset_bank_pending(&self, next_leader_slot: Slot) -> bool {
  742. self.start_bank_active_descendants
  743. .iter()
  744. .any(|pending_slot| *pending_slot < next_leader_slot)
  745. }
  746. // Report metrics when poh recorder detects a pending fork that could
  747. // soon lead to poh reset.
  748. fn report_pending_fork_was_detected(&self, next_leader_slot: Slot) {
  749. // Only report once per next leader slot to avoid spamming metrics. It's
  750. // enough to know that a leader decided to delay or not once per slot
  751. let mut last_slot = self.last_reported_slot_for_pending_fork.lock().unwrap();
  752. if *last_slot == next_leader_slot {
  753. return;
  754. }
  755. *last_slot = next_leader_slot;
  756. datapoint_info!(
  757. "poh_recorder-detected_pending_fork",
  758. ("next_leader_slot", next_leader_slot, i64),
  759. (
  760. "did_delay_leader_slot",
  761. self.delay_leader_block_for_pending_fork,
  762. bool
  763. ),
  764. );
  765. }
  766. // returns (leader_first_tick_height, leader_last_tick_height, grace_ticks) given the next
  767. // slot this recorder will lead
  768. fn compute_leader_slot_tick_heights(
  769. next_leader_slot: Option<(Slot, Slot)>,
  770. ticks_per_slot: u64,
  771. ) -> (Option<u64>, u64, u64) {
  772. next_leader_slot
  773. .map(|(first_slot, last_slot)| {
  774. let leader_first_tick_height = first_slot * ticks_per_slot + 1;
  775. let last_tick_height = (last_slot + 1) * ticks_per_slot;
  776. let num_slots = last_slot - first_slot + 1;
  777. let grace_ticks = cmp::min(
  778. ticks_per_slot * MAX_GRACE_SLOTS,
  779. ticks_per_slot * num_slots / GRACE_TICKS_FACTOR,
  780. );
  781. (
  782. Some(leader_first_tick_height),
  783. last_tick_height,
  784. grace_ticks,
  785. )
  786. })
  787. .unwrap_or((
  788. None,
  789. 0,
  790. cmp::min(
  791. ticks_per_slot * MAX_GRACE_SLOTS,
  792. ticks_per_slot * NUM_CONSECUTIVE_LEADER_SLOTS / GRACE_TICKS_FACTOR,
  793. ),
  794. ))
  795. }
  796. // update the list of active descendants of the start bank to make a better
  797. // decision about whether to use grace ticks
  798. pub fn update_start_bank_active_descendants(&mut self, active_descendants: &[Slot]) {
  799. self.start_bank_active_descendants = active_descendants.to_vec();
  800. }
  801. #[cfg(feature = "dev-context-only-utils")]
  802. pub fn set_bank_for_test(&mut self, bank: Arc<Bank>) {
  803. self.set_bank(BankWithScheduler::new_without_scheduler(bank))
  804. }
  805. #[cfg(feature = "dev-context-only-utils")]
  806. pub fn clear_bank_for_test(&mut self) {
  807. self.clear_bank(true);
  808. }
  809. pub fn tick_alpenglow(&mut self, slot_max_tick_height: u64) {
  810. let (poh_entry, tick_lock_contention_us) = measure_us!({
  811. let mut poh_l = self.poh.lock().unwrap();
  812. poh_l.tick()
  813. });
  814. self.metrics.tick_lock_contention_us += tick_lock_contention_us;
  815. if let Some(poh_entry) = poh_entry {
  816. self.shared_leader_state
  817. .0
  818. .load()
  819. .tick_height
  820. .store(slot_max_tick_height, Ordering::Release);
  821. // Should be empty in most cases, but reset just to be safe
  822. self.tick_cache = vec![];
  823. self.tick_cache.push((
  824. Entry {
  825. num_hashes: poh_entry.num_hashes,
  826. hash: poh_entry.hash,
  827. transactions: vec![],
  828. },
  829. self.shared_leader_state
  830. .0
  831. .load()
  832. .tick_height
  833. .load(Ordering::Acquire),
  834. ));
  835. let (_flush_res, flush_cache_and_tick_us) = measure_us!(self.flush_cache(true));
  836. self.metrics.flush_cache_tick_us += flush_cache_and_tick_us;
  837. }
  838. }
  839. pub fn enable_alpenglow(&mut self) {
  840. info!("Enabling Alpenglow, migrating poh to low power mode");
  841. self.alpenglow_enabled = true;
  842. self.tick_cache = vec![];
  843. {
  844. let mut poh = self.poh.lock().unwrap();
  845. let hashes_per_tick = None;
  846. let current_hash = poh.hash;
  847. poh.reset(current_hash, hashes_per_tick);
  848. }
  849. }
  850. }
  851. #[allow(clippy::type_complexity)]
  852. fn do_create_test_recorder(
  853. bank: Arc<Bank>,
  854. blockstore: Arc<Blockstore>,
  855. poh_config: Option<PohConfig>,
  856. leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
  857. track_transaction_indexes: bool,
  858. ) -> (
  859. Arc<AtomicBool>,
  860. Arc<RwLock<PohRecorder>>,
  861. PohController,
  862. TransactionRecorder,
  863. PohService,
  864. Receiver<WorkingBankEntry>,
  865. ) {
  866. let leader_schedule_cache = match leader_schedule_cache {
  867. Some(provided_cache) => provided_cache,
  868. None => Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
  869. };
  870. let exit = Arc::new(AtomicBool::new(false));
  871. let poh_config = poh_config.unwrap_or_default();
  872. let (poh_recorder, entry_receiver) = PohRecorder::new(
  873. bank.tick_height(),
  874. bank.last_blockhash(),
  875. bank.clone(),
  876. Some((4, 4)),
  877. bank.ticks_per_slot(),
  878. blockstore,
  879. &leader_schedule_cache,
  880. &poh_config,
  881. exit.clone(),
  882. );
  883. let ticks_per_slot = bank.ticks_per_slot();
  884. let (record_sender, record_receiver) = record_channels(track_transaction_indexes);
  885. let transaction_recorder = TransactionRecorder::new(record_sender);
  886. let poh_recorder = Arc::new(RwLock::new(poh_recorder));
  887. let (mut poh_controller, poh_service_message_receiver) = PohController::new();
  888. let poh_service = PohService::new(
  889. poh_recorder.clone(),
  890. &poh_config,
  891. exit.clone(),
  892. ticks_per_slot,
  893. crate::poh_service::DEFAULT_PINNED_CPU_CORE,
  894. crate::poh_service::DEFAULT_HASHES_PER_BATCH,
  895. record_receiver,
  896. poh_service_message_receiver,
  897. );
  898. poh_controller
  899. .set_bank_sync(BankWithScheduler::new_without_scheduler(bank))
  900. .unwrap();
  901. (
  902. exit,
  903. poh_recorder,
  904. poh_controller,
  905. transaction_recorder,
  906. poh_service,
  907. entry_receiver,
  908. )
  909. }
  910. #[allow(clippy::type_complexity)]
  911. pub fn create_test_recorder(
  912. bank: Arc<Bank>,
  913. blockstore: Arc<Blockstore>,
  914. poh_config: Option<PohConfig>,
  915. leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
  916. ) -> (
  917. Arc<AtomicBool>,
  918. Arc<RwLock<PohRecorder>>,
  919. PohController,
  920. TransactionRecorder,
  921. PohService,
  922. Receiver<WorkingBankEntry>,
  923. ) {
  924. do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, false)
  925. }
  926. /// A shareable leader status that can be used to
  927. /// determine the current leader status of the
  928. /// `PohRecorder`.
  929. #[derive(Clone)]
  930. pub struct SharedLeaderState(Arc<ArcSwap<LeaderState>>);
  931. impl SharedLeaderState {
  932. #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
  933. fn new(
  934. tick_height: u64,
  935. leader_first_tick_height: Option<u64>,
  936. next_leader_slot_range: Option<(Slot, Slot)>,
  937. ) -> Self {
  938. let inner = LeaderState {
  939. working_bank: None,
  940. tick_height: AtomicU64::new(tick_height),
  941. leader_first_tick_height,
  942. next_leader_slot_range,
  943. };
  944. Self(Arc::new(ArcSwap::from_pointee(inner)))
  945. }
  946. pub fn load(&self) -> arc_swap::Guard<Arc<LeaderState>> {
  947. self.0.load()
  948. }
  949. #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
  950. fn store(&mut self, state: Arc<LeaderState>) {
  951. self.0.store(state)
  952. }
  953. #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
  954. fn increment_tick_height(&self) {
  955. let inner = self.0.load();
  956. inner.tick_height.fetch_add(1, Ordering::Release);
  957. }
  958. }
  959. pub struct LeaderState {
  960. working_bank: Option<Arc<Bank>>,
  961. tick_height: AtomicU64,
  962. leader_first_tick_height: Option<u64>,
  963. next_leader_slot_range: Option<(Slot, Slot)>,
  964. }
  965. impl LeaderState {
  966. #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
  967. fn new(
  968. working_bank: Option<Arc<Bank>>,
  969. tick_height: u64,
  970. leader_first_tick_height: Option<u64>,
  971. next_leader_slot_range: Option<(u64, u64)>,
  972. ) -> Self {
  973. Self {
  974. working_bank,
  975. tick_height: AtomicU64::new(tick_height),
  976. leader_first_tick_height,
  977. next_leader_slot_range,
  978. }
  979. }
  980. pub fn working_bank(&self) -> Option<&Arc<Bank>> {
  981. self.working_bank.as_ref()
  982. }
  983. pub fn tick_height(&self) -> u64 {
  984. self.tick_height.load(Ordering::Acquire)
  985. }
  986. pub fn leader_first_tick_height(&self) -> Option<u64> {
  987. self.leader_first_tick_height
  988. }
  989. /// Returns [first_slot, last_slot] inclusive range for the next
  990. /// leader slots.
  991. pub fn next_leader_slot_range(&self) -> Option<(Slot, Slot)> {
  992. self.next_leader_slot_range
  993. }
  994. }
  995. #[cfg(test)]
  996. mod tests {
  997. use {
  998. super::*,
  999. crossbeam_channel::bounded,
  1000. solana_clock::DEFAULT_TICKS_PER_SLOT,
  1001. solana_ledger::{
  1002. blockstore::Blockstore,
  1003. blockstore_meta::SlotMeta,
  1004. genesis_utils::{create_genesis_config, GenesisConfigInfo},
  1005. get_tmp_ledger_path_auto_delete,
  1006. },
  1007. solana_perf::test_tx::test_tx,
  1008. solana_sha256_hasher::hash,
  1009. };
  1010. #[test]
  1011. fn test_poh_recorder_no_zero_tick() {
  1012. let prev_hash = Hash::default();
  1013. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1014. let blockstore = Blockstore::open(ledger_path.path())
  1015. .expect("Expected to be able to open database ledger");
  1016. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1017. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1018. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1019. 0,
  1020. prev_hash,
  1021. bank,
  1022. Some((4, 4)),
  1023. DEFAULT_TICKS_PER_SLOT,
  1024. Arc::new(blockstore),
  1025. &Arc::new(LeaderScheduleCache::default()),
  1026. &PohConfig::default(),
  1027. Arc::new(AtomicBool::default()),
  1028. );
  1029. poh_recorder.tick();
  1030. assert_eq!(poh_recorder.tick_cache.len(), 1);
  1031. assert_eq!(poh_recorder.tick_cache[0].1, 1);
  1032. assert_eq!(poh_recorder.tick_height(), 1);
  1033. }
  1034. #[test]
  1035. fn test_poh_recorder_tick_height_is_last_tick() {
  1036. let prev_hash = Hash::default();
  1037. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1038. let blockstore = Blockstore::open(ledger_path.path())
  1039. .expect("Expected to be able to open database ledger");
  1040. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1041. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1042. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1043. 0,
  1044. prev_hash,
  1045. bank,
  1046. Some((4, 4)),
  1047. DEFAULT_TICKS_PER_SLOT,
  1048. Arc::new(blockstore),
  1049. &Arc::new(LeaderScheduleCache::default()),
  1050. &PohConfig::default(),
  1051. Arc::new(AtomicBool::default()),
  1052. );
  1053. poh_recorder.tick();
  1054. poh_recorder.tick();
  1055. assert_eq!(poh_recorder.tick_cache.len(), 2);
  1056. assert_eq!(poh_recorder.tick_cache[1].1, 2);
  1057. assert_eq!(poh_recorder.tick_height(), 2);
  1058. }
  1059. #[test]
  1060. fn test_poh_recorder_reset_clears_cache() {
  1061. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1062. let blockstore = Blockstore::open(ledger_path.path())
  1063. .expect("Expected to be able to open database ledger");
  1064. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1065. let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
  1066. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1067. 0,
  1068. Hash::default(),
  1069. bank0.clone(),
  1070. Some((4, 4)),
  1071. DEFAULT_TICKS_PER_SLOT,
  1072. Arc::new(blockstore),
  1073. &Arc::new(LeaderScheduleCache::default()),
  1074. &PohConfig::default(),
  1075. Arc::new(AtomicBool::default()),
  1076. );
  1077. poh_recorder.tick();
  1078. assert_eq!(poh_recorder.tick_cache.len(), 1);
  1079. poh_recorder.reset(bank0, Some((4, 4)));
  1080. assert_eq!(poh_recorder.tick_cache.len(), 0);
  1081. }
  1082. #[test]
  1083. fn test_poh_recorder_clear() {
  1084. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1085. let blockstore = Blockstore::open(ledger_path.path())
  1086. .expect("Expected to be able to open database ledger");
  1087. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1088. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1089. let prev_hash = bank.last_blockhash();
  1090. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1091. 0,
  1092. prev_hash,
  1093. bank.clone(),
  1094. Some((4, 4)),
  1095. bank.ticks_per_slot(),
  1096. Arc::new(blockstore),
  1097. &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
  1098. &PohConfig::default(),
  1099. Arc::new(AtomicBool::default()),
  1100. );
  1101. poh_recorder.set_bank_for_test(bank);
  1102. assert!(poh_recorder.working_bank.is_some());
  1103. poh_recorder.clear_bank(true);
  1104. assert!(poh_recorder.working_bank.is_none());
  1105. }
  1106. #[test]
  1107. fn test_poh_recorder_tick_sent_after_min() {
  1108. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1109. let blockstore = Blockstore::open(ledger_path.path())
  1110. .expect("Expected to be able to open database ledger");
  1111. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1112. let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
  1113. let prev_hash = bank0.last_blockhash();
  1114. let (mut poh_recorder, entry_receiver) = PohRecorder::new(
  1115. 0,
  1116. prev_hash,
  1117. bank0.clone(),
  1118. Some((4, 4)),
  1119. bank0.ticks_per_slot(),
  1120. Arc::new(blockstore),
  1121. &Arc::new(LeaderScheduleCache::new_from_bank(&bank0)),
  1122. &PohConfig::default(),
  1123. Arc::new(AtomicBool::default()),
  1124. );
  1125. bank0.fill_bank_with_ticks_for_tests();
  1126. let bank1 = Arc::new(Bank::new_from_parent(bank0, &Pubkey::default(), 1));
  1127. // Set a working bank
  1128. poh_recorder.set_bank_for_test(bank1.clone());
  1129. // Tick until poh_recorder.tick_height == working bank's min_tick_height
  1130. let num_new_ticks = bank1.tick_height() - poh_recorder.tick_height();
  1131. println!("{} {}", bank1.tick_height(), poh_recorder.tick_height());
  1132. assert!(num_new_ticks > 0);
  1133. for _ in 0..num_new_ticks {
  1134. poh_recorder.tick();
  1135. }
  1136. // Check that poh_recorder.tick_height == working bank's min_tick_height
  1137. let min_tick_height = poh_recorder.working_bank.as_ref().unwrap().min_tick_height;
  1138. assert_eq!(min_tick_height, bank1.tick_height());
  1139. assert_eq!(poh_recorder.tick_height(), min_tick_height);
  1140. //poh_recorder.tick height == working bank's min_tick_height,
  1141. // so no ticks should have been flushed yet
  1142. assert_eq!(poh_recorder.tick_cache.last().unwrap().1, num_new_ticks);
  1143. assert!(entry_receiver.try_recv().is_err());
  1144. // all ticks are sent after height > min
  1145. let tick_height_before = poh_recorder.tick_height();
  1146. poh_recorder.tick();
  1147. assert_eq!(poh_recorder.tick_height(), tick_height_before + 1);
  1148. assert_eq!(poh_recorder.tick_cache.len(), 0);
  1149. let mut num_entries = 0;
  1150. while let Ok((wbank, (_entry, _tick_height))) = entry_receiver.try_recv() {
  1151. assert_eq!(wbank.slot(), bank1.slot());
  1152. num_entries += 1;
  1153. }
  1154. // All the cached ticks, plus the new tick above should have been flushed
  1155. assert_eq!(num_entries, num_new_ticks + 1);
  1156. }
  1157. #[test]
  1158. fn test_poh_recorder_tick_sent_upto_and_including_max() {
  1159. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1160. let blockstore = Blockstore::open(ledger_path.path())
  1161. .expect("Expected to be able to open database ledger");
  1162. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1163. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1164. let prev_hash = bank.last_blockhash();
  1165. let (mut poh_recorder, entry_receiver) = PohRecorder::new(
  1166. 0,
  1167. prev_hash,
  1168. bank.clone(),
  1169. Some((4, 4)),
  1170. bank.ticks_per_slot(),
  1171. Arc::new(blockstore),
  1172. &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
  1173. &PohConfig::default(),
  1174. Arc::new(AtomicBool::default()),
  1175. );
  1176. // Tick further than the bank's max height
  1177. for _ in 0..bank.max_tick_height() + 1 {
  1178. poh_recorder.tick();
  1179. }
  1180. assert_eq!(
  1181. poh_recorder.tick_cache.last().unwrap().1,
  1182. bank.max_tick_height() + 1
  1183. );
  1184. assert_eq!(poh_recorder.tick_height(), bank.max_tick_height() + 1);
  1185. poh_recorder.set_bank_for_test(bank.clone());
  1186. poh_recorder.tick();
  1187. assert_eq!(poh_recorder.tick_height(), bank.max_tick_height() + 2);
  1188. assert!(poh_recorder.working_bank.is_none());
  1189. let mut num_entries = 0;
  1190. while entry_receiver.try_recv().is_ok() {
  1191. num_entries += 1;
  1192. }
  1193. // Should only flush up to bank's max tick height, despite the tick cache
  1194. // having many more entries
  1195. assert_eq!(num_entries, bank.max_tick_height());
  1196. }
  1197. #[test]
  1198. fn test_poh_recorder_record_to_early() {
  1199. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1200. let blockstore = Blockstore::open(ledger_path.path())
  1201. .expect("Expected to be able to open database ledger");
  1202. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1203. let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
  1204. let prev_hash = bank0.last_blockhash();
  1205. let (mut poh_recorder, entry_receiver) = PohRecorder::new(
  1206. 0,
  1207. prev_hash,
  1208. bank0.clone(),
  1209. Some((4, 4)),
  1210. bank0.ticks_per_slot(),
  1211. Arc::new(blockstore),
  1212. &Arc::new(LeaderScheduleCache::new_from_bank(&bank0)),
  1213. &PohConfig::default(),
  1214. Arc::new(AtomicBool::default()),
  1215. );
  1216. bank0.fill_bank_with_ticks_for_tests();
  1217. let bank1 = Arc::new(Bank::new_from_parent(bank0, &Pubkey::default(), 1));
  1218. poh_recorder.set_bank_for_test(bank1.clone());
  1219. // Let poh_recorder tick up to bank1.tick_height() - 1
  1220. for _ in 0..bank1.tick_height() - 1 {
  1221. poh_recorder.tick()
  1222. }
  1223. let tx = test_tx();
  1224. let h1 = hash(b"hello world!");
  1225. // We haven't yet reached the minimum tick height for the working bank,
  1226. // so record should fail
  1227. assert_matches!(
  1228. poh_recorder.record(bank1.slot(), vec![h1], vec![vec![tx.into()]]),
  1229. Err(PohRecorderError::MinHeightNotReached)
  1230. );
  1231. assert!(entry_receiver.try_recv().is_err());
  1232. }
  1233. #[test]
  1234. fn test_poh_recorder_record_bad_slot() {
  1235. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1236. let blockstore = Blockstore::open(ledger_path.path())
  1237. .expect("Expected to be able to open database ledger");
  1238. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1239. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1240. let prev_hash = bank.last_blockhash();
  1241. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1242. 0,
  1243. prev_hash,
  1244. bank.clone(),
  1245. Some((4, 4)),
  1246. bank.ticks_per_slot(),
  1247. Arc::new(blockstore),
  1248. &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
  1249. &PohConfig::default(),
  1250. Arc::new(AtomicBool::default()),
  1251. );
  1252. poh_recorder.set_bank_for_test(bank.clone());
  1253. let tx = test_tx();
  1254. let h1 = hash(b"hello world!");
  1255. // Fulfills min height criteria for a successful record
  1256. assert_eq!(
  1257. poh_recorder.tick_height(),
  1258. poh_recorder.working_bank.as_ref().unwrap().min_tick_height
  1259. );
  1260. // However we hand over a bad slot so record fails
  1261. let bad_slot = bank.slot() + 1;
  1262. assert_matches!(
  1263. poh_recorder.record(bad_slot, vec![h1], vec![vec![tx.into()]]),
  1264. Err(PohRecorderError::MaxHeightReached)
  1265. );
  1266. }
  1267. #[test]
  1268. fn test_poh_recorder_record_at_min_passes() {
  1269. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1270. let blockstore = Blockstore::open(ledger_path.path())
  1271. .expect("Expected to be able to open database ledger");
  1272. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1273. let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
  1274. let prev_hash = bank0.last_blockhash();
  1275. let (mut poh_recorder, entry_receiver) = PohRecorder::new(
  1276. 0,
  1277. prev_hash,
  1278. bank0.clone(),
  1279. Some((4, 4)),
  1280. bank0.ticks_per_slot(),
  1281. Arc::new(blockstore),
  1282. &Arc::new(LeaderScheduleCache::new_from_bank(&bank0)),
  1283. &PohConfig::default(),
  1284. Arc::new(AtomicBool::default()),
  1285. );
  1286. bank0.fill_bank_with_ticks_for_tests();
  1287. let bank1 = Arc::new(Bank::new_from_parent(bank0, &Pubkey::default(), 1));
  1288. poh_recorder.set_bank_for_test(bank1.clone());
  1289. // Record up to exactly min tick height
  1290. let min_tick_height = poh_recorder.working_bank.as_ref().unwrap().min_tick_height;
  1291. while poh_recorder.tick_height() < min_tick_height {
  1292. poh_recorder.tick();
  1293. }
  1294. assert_eq!(poh_recorder.tick_cache.len() as u64, min_tick_height);
  1295. // Check record succeeds on boundary condition where
  1296. // poh_recorder.tick height == poh_recorder.working_bank.min_tick_height
  1297. assert_eq!(poh_recorder.tick_height(), min_tick_height);
  1298. let tx = test_tx();
  1299. let h1 = hash(b"hello world!");
  1300. assert!(poh_recorder
  1301. .record(bank1.slot(), vec![h1], vec![vec![tx.into()]])
  1302. .is_ok());
  1303. assert_eq!(poh_recorder.tick_cache.len(), 0);
  1304. //tick in the cache + entry
  1305. for _ in 0..min_tick_height {
  1306. let (_bank, (e, _tick_height)) = entry_receiver.recv().unwrap();
  1307. assert!(e.is_tick());
  1308. }
  1309. let (_bank, (e, _tick_height)) = entry_receiver.recv().unwrap();
  1310. assert!(!e.is_tick());
  1311. }
  1312. #[test]
  1313. fn test_poh_recorder_record_at_max_fails() {
  1314. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1315. let blockstore = Blockstore::open(ledger_path.path())
  1316. .expect("Expected to be able to open database ledger");
  1317. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1318. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1319. let prev_hash = bank.last_blockhash();
  1320. let (mut poh_recorder, entry_receiver) = PohRecorder::new(
  1321. 0,
  1322. prev_hash,
  1323. bank.clone(),
  1324. Some((4, 4)),
  1325. bank.ticks_per_slot(),
  1326. Arc::new(blockstore),
  1327. &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
  1328. &PohConfig::default(),
  1329. Arc::new(AtomicBool::default()),
  1330. );
  1331. poh_recorder.set_bank_for_test(bank.clone());
  1332. let num_ticks_to_max = bank.max_tick_height() - poh_recorder.tick_height();
  1333. for _ in 0..num_ticks_to_max {
  1334. poh_recorder.tick();
  1335. }
  1336. let tx = test_tx();
  1337. let h1 = hash(b"hello world!");
  1338. assert!(poh_recorder
  1339. .record(bank.slot(), vec![h1], vec![vec![tx.into()]])
  1340. .is_err());
  1341. for _ in 0..num_ticks_to_max {
  1342. let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
  1343. assert!(entry.is_tick());
  1344. }
  1345. }
  1346. #[test]
  1347. fn test_poh_cache_on_disconnect() {
  1348. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1349. let blockstore = Blockstore::open(ledger_path.path())
  1350. .expect("Expected to be able to open database ledger");
  1351. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1352. let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
  1353. let prev_hash = bank0.last_blockhash();
  1354. let (mut poh_recorder, entry_receiver) = PohRecorder::new(
  1355. 0,
  1356. prev_hash,
  1357. bank0.clone(),
  1358. Some((4, 4)),
  1359. bank0.ticks_per_slot(),
  1360. Arc::new(blockstore),
  1361. &Arc::new(LeaderScheduleCache::new_from_bank(&bank0)),
  1362. &PohConfig::default(),
  1363. Arc::new(AtomicBool::default()),
  1364. );
  1365. bank0.fill_bank_with_ticks_for_tests();
  1366. let bank1 = Arc::new(Bank::new_from_parent(bank0, &Pubkey::default(), 1));
  1367. poh_recorder.set_bank_for_test(bank1);
  1368. // Check we can make two ticks without hitting min_tick_height
  1369. let remaining_ticks_to_min = poh_recorder.working_bank.as_ref().unwrap().min_tick_height
  1370. - poh_recorder.tick_height();
  1371. for _ in 0..remaining_ticks_to_min {
  1372. poh_recorder.tick();
  1373. }
  1374. assert_eq!(poh_recorder.tick_height(), remaining_ticks_to_min);
  1375. assert_eq!(
  1376. poh_recorder.tick_cache.len(),
  1377. remaining_ticks_to_min as usize
  1378. );
  1379. assert!(poh_recorder.working_bank.is_some());
  1380. // Drop entry receiver, and try to tick again. Because
  1381. // the receiver is closed, the ticks will not be drained from the cache,
  1382. // and the working bank will be cleared
  1383. drop(entry_receiver);
  1384. poh_recorder.tick();
  1385. // Check everything is cleared
  1386. assert!(poh_recorder.working_bank.is_none());
  1387. // Extra +1 for the tick that happened after the drop of the entry receiver.
  1388. assert_eq!(
  1389. poh_recorder.tick_cache.len(),
  1390. remaining_ticks_to_min as usize + 1
  1391. );
  1392. }
  1393. #[test]
  1394. fn test_reset_current() {
  1395. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1396. let blockstore = Blockstore::open(ledger_path.path())
  1397. .expect("Expected to be able to open database ledger");
  1398. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1399. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1400. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1401. 0,
  1402. Hash::default(),
  1403. bank.clone(),
  1404. Some((4, 4)),
  1405. DEFAULT_TICKS_PER_SLOT,
  1406. Arc::new(blockstore),
  1407. &Arc::new(LeaderScheduleCache::default()),
  1408. &PohConfig::default(),
  1409. Arc::new(AtomicBool::default()),
  1410. );
  1411. poh_recorder.tick();
  1412. poh_recorder.tick();
  1413. assert_eq!(poh_recorder.tick_cache.len(), 2);
  1414. poh_recorder.reset(bank, Some((4, 4)));
  1415. assert_eq!(poh_recorder.tick_cache.len(), 0);
  1416. }
  1417. #[test]
  1418. fn test_reset_with_cached() {
  1419. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1420. let blockstore = Blockstore::open(ledger_path.path())
  1421. .expect("Expected to be able to open database ledger");
  1422. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1423. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1424. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1425. 0,
  1426. Hash::default(),
  1427. bank.clone(),
  1428. Some((4, 4)),
  1429. DEFAULT_TICKS_PER_SLOT,
  1430. Arc::new(blockstore),
  1431. &Arc::new(LeaderScheduleCache::default()),
  1432. &PohConfig::default(),
  1433. Arc::new(AtomicBool::default()),
  1434. );
  1435. poh_recorder.tick();
  1436. poh_recorder.tick();
  1437. assert_eq!(poh_recorder.tick_cache.len(), 2);
  1438. poh_recorder.reset(bank, Some((4, 4)));
  1439. assert_eq!(poh_recorder.tick_cache.len(), 0);
  1440. }
  1441. #[test]
  1442. fn test_reset_to_new_value() {
  1443. agave_logger::setup();
  1444. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1445. let blockstore = Blockstore::open(ledger_path.path())
  1446. .expect("Expected to be able to open database ledger");
  1447. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1448. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1449. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1450. 0,
  1451. Hash::default(),
  1452. bank.clone(),
  1453. Some((4, 4)),
  1454. DEFAULT_TICKS_PER_SLOT,
  1455. Arc::new(blockstore),
  1456. &Arc::new(LeaderScheduleCache::default()),
  1457. &PohConfig::default(),
  1458. Arc::new(AtomicBool::default()),
  1459. );
  1460. poh_recorder.tick();
  1461. poh_recorder.tick();
  1462. poh_recorder.tick();
  1463. poh_recorder.tick();
  1464. assert_eq!(poh_recorder.tick_cache.len(), 4);
  1465. assert_eq!(poh_recorder.tick_height(), 4);
  1466. poh_recorder.reset(bank, Some((4, 4))); // parent slot 0 implies tick_height of 3
  1467. assert_eq!(poh_recorder.tick_cache.len(), 0);
  1468. poh_recorder.tick();
  1469. assert_eq!(poh_recorder.tick_height(), DEFAULT_TICKS_PER_SLOT + 1);
  1470. }
  1471. #[test]
  1472. fn test_reset_clear_bank() {
  1473. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1474. let blockstore = Blockstore::open(ledger_path.path())
  1475. .expect("Expected to be able to open database ledger");
  1476. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1477. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1478. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1479. 0,
  1480. Hash::default(),
  1481. bank.clone(),
  1482. Some((4, 4)),
  1483. bank.ticks_per_slot(),
  1484. Arc::new(blockstore),
  1485. &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
  1486. &PohConfig::default(),
  1487. Arc::new(AtomicBool::default()),
  1488. );
  1489. poh_recorder.set_bank_for_test(bank.clone());
  1490. assert_eq!(bank.slot(), 0);
  1491. poh_recorder.reset(bank, Some((4, 4)));
  1492. assert!(poh_recorder.working_bank.is_none());
  1493. }
  1494. #[test]
  1495. pub fn test_clear_signal() {
  1496. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1497. let blockstore = Blockstore::open(ledger_path.path())
  1498. .expect("Expected to be able to open database ledger");
  1499. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1500. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1501. let (sender, receiver) = bounded(1);
  1502. let (mut poh_recorder, _entry_receiver) = PohRecorder::new_with_clear_signal(
  1503. 0,
  1504. Hash::default(),
  1505. bank.clone(),
  1506. None,
  1507. bank.ticks_per_slot(),
  1508. false,
  1509. Arc::new(blockstore),
  1510. Some(sender),
  1511. &Arc::new(LeaderScheduleCache::default()),
  1512. &PohConfig::default(),
  1513. Arc::new(AtomicBool::default()),
  1514. );
  1515. poh_recorder.set_bank_for_test(bank);
  1516. poh_recorder.clear_bank(true);
  1517. assert!(receiver.try_recv().is_ok());
  1518. }
  1519. #[test]
  1520. fn test_poh_recorder_record_sets_start_slot() {
  1521. agave_logger::setup();
  1522. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1523. let blockstore = Blockstore::open(ledger_path.path())
  1524. .expect("Expected to be able to open database ledger");
  1525. let ticks_per_slot = 5;
  1526. let GenesisConfigInfo {
  1527. mut genesis_config, ..
  1528. } = create_genesis_config(2);
  1529. genesis_config.ticks_per_slot = ticks_per_slot;
  1530. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1531. let prev_hash = bank.last_blockhash();
  1532. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1533. 0,
  1534. prev_hash,
  1535. bank.clone(),
  1536. Some((4, 4)),
  1537. bank.ticks_per_slot(),
  1538. Arc::new(blockstore),
  1539. &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
  1540. &PohConfig::default(),
  1541. Arc::new(AtomicBool::default()),
  1542. );
  1543. poh_recorder.set_bank_for_test(bank.clone());
  1544. // Simulate ticking much further than working_bank.max_tick_height
  1545. let max_tick_height = poh_recorder.working_bank.as_ref().unwrap().max_tick_height;
  1546. for _ in 0..3 * max_tick_height {
  1547. poh_recorder.tick();
  1548. }
  1549. let tx = test_tx();
  1550. let h1 = hash(b"hello world!");
  1551. assert!(poh_recorder
  1552. .record(bank.slot(), vec![h1], vec![vec![tx.into()]])
  1553. .is_err());
  1554. assert!(poh_recorder.working_bank.is_none());
  1555. // Even thought we ticked much further than working_bank.max_tick_height,
  1556. // the `start_slot` is still the slot of the last working bank set by
  1557. // the earlier call to `poh_recorder.set_bank()`
  1558. assert_eq!(poh_recorder.start_slot(), bank.slot());
  1559. }
  1560. #[test]
  1561. fn test_current_poh_slot() {
  1562. let genesis_config = create_genesis_config(2).genesis_config;
  1563. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1564. let last_entry_hash = bank.last_blockhash();
  1565. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1566. let blockstore = Blockstore::open(ledger_path.path())
  1567. .expect("Expected to be able to open database ledger");
  1568. let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
  1569. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1570. 0,
  1571. last_entry_hash,
  1572. bank.clone(),
  1573. None,
  1574. bank.ticks_per_slot(),
  1575. Arc::new(blockstore),
  1576. &Arc::new(leader_schedule_cache),
  1577. &PohConfig::default(),
  1578. Arc::new(AtomicBool::default()),
  1579. );
  1580. // Tick height is initialized as 0
  1581. assert_eq!(0, poh_recorder.current_poh_slot());
  1582. // Tick height will be reset to the last tick of the reset bank
  1583. poh_recorder.reset(bank.clone(), None);
  1584. assert_eq!(bank.slot() + 1, poh_recorder.current_poh_slot());
  1585. // Check that any ticks before the last tick of the current poh slot will
  1586. // not cause the current poh slot to advance
  1587. for _ in 0..bank.ticks_per_slot() - 1 {
  1588. poh_recorder.tick();
  1589. assert_eq!(bank.slot() + 1, poh_recorder.current_poh_slot());
  1590. }
  1591. // Check that the current poh slot is advanced once the last tick of the
  1592. // slot is reached
  1593. poh_recorder.tick();
  1594. assert_eq!(bank.slot() + 2, poh_recorder.current_poh_slot());
  1595. }
  1596. #[test]
  1597. fn test_reached_leader_tick() {
  1598. agave_logger::setup();
  1599. // Setup genesis.
  1600. let GenesisConfigInfo {
  1601. genesis_config,
  1602. validator_pubkey,
  1603. ..
  1604. } = create_genesis_config(2);
  1605. // Setup start bank.
  1606. let mut bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1607. let prev_hash = bank.last_blockhash();
  1608. // Setup leader schedule.
  1609. let leader_a_pubkey = validator_pubkey;
  1610. let leader_b_pubkey = Pubkey::new_unique();
  1611. let leader_c_pubkey = Pubkey::new_unique();
  1612. let leader_d_pubkey = Pubkey::new_unique();
  1613. let consecutive_leader_slots = NUM_CONSECUTIVE_LEADER_SLOTS as usize;
  1614. let mut slot_leaders = Vec::with_capacity(consecutive_leader_slots * 3);
  1615. slot_leaders.extend(std::iter::repeat_n(
  1616. leader_a_pubkey,
  1617. consecutive_leader_slots,
  1618. ));
  1619. slot_leaders.extend(std::iter::repeat_n(
  1620. leader_b_pubkey,
  1621. consecutive_leader_slots,
  1622. ));
  1623. slot_leaders.extend(std::iter::repeat_n(
  1624. leader_c_pubkey,
  1625. consecutive_leader_slots,
  1626. ));
  1627. slot_leaders.extend(std::iter::repeat_n(
  1628. leader_d_pubkey,
  1629. consecutive_leader_slots,
  1630. ));
  1631. let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
  1632. let fixed_schedule = solana_ledger::leader_schedule::FixedSchedule {
  1633. leader_schedule: Arc::new(Box::new(
  1634. solana_ledger::leader_schedule::IdentityKeyedLeaderSchedule::new_from_schedule(
  1635. slot_leaders,
  1636. ),
  1637. )),
  1638. };
  1639. leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule));
  1640. // Setup PoH recorder.
  1641. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1642. let blockstore = Blockstore::open(ledger_path.path())
  1643. .expect("Expected to be able to open database ledger");
  1644. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1645. 0,
  1646. prev_hash,
  1647. bank.clone(),
  1648. None,
  1649. bank.ticks_per_slot(),
  1650. Arc::new(blockstore),
  1651. &Arc::new(leader_schedule_cache),
  1652. &PohConfig::default(),
  1653. Arc::new(AtomicBool::default()),
  1654. );
  1655. let ticks_per_slot = bank.ticks_per_slot();
  1656. let grace_ticks = ticks_per_slot * MAX_GRACE_SLOTS;
  1657. poh_recorder.grace_ticks = grace_ticks;
  1658. // Setup leader slot ranges.
  1659. let leader_a_start_slot = 0;
  1660. let leader_a_end_slot = leader_a_start_slot + NUM_CONSECUTIVE_LEADER_SLOTS - 1;
  1661. let leader_b_start_slot = leader_a_end_slot + 1;
  1662. let leader_b_end_slot = leader_b_start_slot + NUM_CONSECUTIVE_LEADER_SLOTS - 1;
  1663. let leader_c_start_slot = leader_b_end_slot + 1;
  1664. let leader_c_end_slot = leader_c_start_slot + NUM_CONSECUTIVE_LEADER_SLOTS - 1;
  1665. let leader_d_start_slot = leader_c_end_slot + 1;
  1666. let leader_d_end_slot = leader_d_start_slot + NUM_CONSECUTIVE_LEADER_SLOTS - 1;
  1667. // Reset onto Leader A's first slot 0.
  1668. poh_recorder.reset(
  1669. bank.clone(),
  1670. Some((leader_a_start_slot + 1, leader_a_end_slot)),
  1671. );
  1672. // Setup leader start ticks.
  1673. let ticks_in_leader_slot_set = ticks_per_slot * NUM_CONSECUTIVE_LEADER_SLOTS;
  1674. let leader_a_start_tick = 1;
  1675. let leader_b_start_tick = leader_a_start_tick + ticks_in_leader_slot_set;
  1676. let leader_c_start_tick = leader_b_start_tick + ticks_in_leader_slot_set;
  1677. let leader_d_start_tick = leader_c_start_tick + ticks_in_leader_slot_set;
  1678. // True, because from Leader A's perspective, the previous slot was also
  1679. // its own slot, and validators don't give grace periods if previous
  1680. // slot was also their own.
  1681. assert!(poh_recorder.reached_leader_tick(&leader_a_pubkey, leader_a_start_tick));
  1682. // Tick through grace ticks.
  1683. for _ in 0..grace_ticks {
  1684. poh_recorder.tick();
  1685. }
  1686. // True, because we have ticked through all the grace ticks.
  1687. assert!(poh_recorder.reached_leader_tick(&leader_a_pubkey, leader_a_start_tick));
  1688. // Reset PoH on Leader A's first slot 0, ticking towards Leader B's leader slots.
  1689. poh_recorder.reset(bank.clone(), Some((leader_b_start_slot, leader_b_end_slot)));
  1690. // False, because Leader B hasn't ticked to its starting slot yet.
  1691. assert!(!poh_recorder.reached_leader_tick(&leader_b_pubkey, leader_b_start_tick));
  1692. // Tick through Leader A's remaining slots.
  1693. for _ in poh_recorder.tick_height()..ticks_in_leader_slot_set {
  1694. poh_recorder.tick();
  1695. }
  1696. // False, because the PoH was reset on slot 0, which is a block produced
  1697. // by previous leader A, so a grace period must be given.
  1698. assert!(!poh_recorder.reached_leader_tick(&leader_b_pubkey, leader_b_start_tick));
  1699. // Reset onto Leader A's last slot.
  1700. for _ in leader_a_start_slot + 1..leader_b_start_slot {
  1701. let child_slot = bank.slot() + 1;
  1702. bank = Arc::new(Bank::new_from_parent(bank, &leader_a_pubkey, child_slot));
  1703. }
  1704. poh_recorder.reset(bank.clone(), Some((leader_b_start_slot, leader_b_end_slot)));
  1705. // True, because the PoH was reset the last slot produced by the
  1706. // previous leader, so we can run immediately.
  1707. assert!(poh_recorder.reached_leader_tick(&leader_b_pubkey, leader_b_start_tick));
  1708. // Simulate skipping Leader B's first slot.
  1709. poh_recorder.reset(
  1710. bank.clone(),
  1711. Some((leader_b_start_slot + 1, leader_b_end_slot)),
  1712. );
  1713. for _ in 0..ticks_per_slot {
  1714. poh_recorder.tick();
  1715. }
  1716. // True, because we're building off the previous leader A's last block.
  1717. assert!(poh_recorder.reached_leader_tick(&leader_b_pubkey, leader_b_start_tick));
  1718. // Simulate generating Leader B's second slot.
  1719. let child_slot = bank.slot() + 1;
  1720. bank = Arc::new(Bank::new_from_parent(bank, &leader_b_pubkey, child_slot));
  1721. // Reset PoH targeting Leader D's slots.
  1722. poh_recorder.reset(bank, Some((leader_d_start_slot, leader_d_end_slot)));
  1723. // Tick through Leader B's remaining slots.
  1724. for _ in ticks_per_slot..ticks_in_leader_slot_set {
  1725. poh_recorder.tick();
  1726. }
  1727. // Tick through Leader C's slots.
  1728. for _ in 0..ticks_in_leader_slot_set {
  1729. poh_recorder.tick();
  1730. }
  1731. // True, because Leader D is not building on any of Leader C's slots.
  1732. // The PoH was last reset onto Leader B's second slot.
  1733. assert!(poh_recorder.reached_leader_tick(&leader_d_pubkey, leader_d_start_tick));
  1734. // Add some active (partially received) blocks to the active fork.
  1735. let active_descendants = vec![NUM_CONSECUTIVE_LEADER_SLOTS];
  1736. poh_recorder.update_start_bank_active_descendants(&active_descendants);
  1737. // True, because Leader D observes pending blocks on the active fork,
  1738. // but the config to delay for these is not set.
  1739. assert!(poh_recorder.reached_leader_tick(&leader_d_pubkey, leader_d_start_tick));
  1740. // Flip the config to delay for pending blocks.
  1741. poh_recorder.delay_leader_block_for_pending_fork = true;
  1742. // False, because Leader D observes pending blocks on the active fork,
  1743. // and the config to delay for these is set.
  1744. assert!(!poh_recorder.reached_leader_tick(&leader_d_pubkey, leader_d_start_tick));
  1745. }
  1746. #[test]
  1747. fn test_reached_leader_slot() {
  1748. agave_logger::setup();
  1749. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1750. let blockstore = Blockstore::open(ledger_path.path())
  1751. .expect("Expected to be able to open database ledger");
  1752. let GenesisConfigInfo {
  1753. genesis_config,
  1754. validator_pubkey,
  1755. ..
  1756. } = create_genesis_config(2);
  1757. let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
  1758. let prev_hash = bank0.last_blockhash();
  1759. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1760. 0,
  1761. prev_hash,
  1762. bank0.clone(),
  1763. None,
  1764. bank0.ticks_per_slot(),
  1765. Arc::new(blockstore),
  1766. &Arc::new(LeaderScheduleCache::new_from_bank(&bank0)),
  1767. &PohConfig::default(),
  1768. Arc::new(AtomicBool::default()),
  1769. );
  1770. // Test that with no next leader slot, we don't reach the leader slot
  1771. assert_eq!(
  1772. poh_recorder.reached_leader_slot(&validator_pubkey),
  1773. PohLeaderStatus::NotReached
  1774. );
  1775. // Test that with no next leader slot in reset(), we don't reach the leader slot
  1776. assert_eq!(bank0.slot(), 0);
  1777. poh_recorder.reset(bank0.clone(), None);
  1778. assert_eq!(
  1779. poh_recorder.reached_leader_slot(&validator_pubkey),
  1780. PohLeaderStatus::NotReached
  1781. );
  1782. // Provide a leader slot one slot down
  1783. poh_recorder.reset(bank0.clone(), Some((2, 2)));
  1784. let init_ticks = poh_recorder.tick_height();
  1785. // Send one slot worth of ticks
  1786. for _ in 0..bank0.ticks_per_slot() {
  1787. poh_recorder.tick();
  1788. }
  1789. // Tick should be recorded
  1790. assert_eq!(
  1791. poh_recorder.tick_height(),
  1792. init_ticks + bank0.ticks_per_slot()
  1793. );
  1794. let parent_meta = SlotMeta {
  1795. received: 1,
  1796. ..SlotMeta::default()
  1797. };
  1798. poh_recorder.blockstore.put_meta(0, &parent_meta).unwrap();
  1799. // Use a key that's different from the previous leader so that grace
  1800. // ticks are enforced.
  1801. let test_validator_pubkey = Pubkey::new_unique();
  1802. // Test that we don't reach the leader slot because of grace ticks
  1803. assert_eq!(
  1804. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1805. PohLeaderStatus::NotReached
  1806. );
  1807. // reset poh now. we should immediately be leader
  1808. let bank1 = Arc::new(Bank::new_from_parent(bank0, &Pubkey::default(), 1));
  1809. assert_eq!(bank1.slot(), 1);
  1810. poh_recorder.reset(bank1.clone(), Some((2, 2)));
  1811. assert_eq!(
  1812. poh_recorder.reached_leader_slot(&validator_pubkey),
  1813. PohLeaderStatus::Reached {
  1814. poh_slot: 2,
  1815. parent_slot: 1,
  1816. }
  1817. );
  1818. // Now test that with grace ticks we can reach leader slot
  1819. // Set the leader slot one slot down
  1820. poh_recorder.reset(bank1.clone(), Some((3, 3)));
  1821. // Send one slot worth of ticks ("skips" slot 2)
  1822. for _ in 0..bank1.ticks_per_slot() {
  1823. poh_recorder.tick();
  1824. }
  1825. // We are not the leader yet, as expected
  1826. assert_eq!(
  1827. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1828. PohLeaderStatus::NotReached
  1829. );
  1830. // Check that if prev slot was mine, grace ticks are ignored
  1831. assert_eq!(
  1832. poh_recorder.reached_leader_slot(bank1.collector_id()),
  1833. PohLeaderStatus::Reached {
  1834. poh_slot: 3,
  1835. parent_slot: 1
  1836. }
  1837. );
  1838. // Send the grace ticks
  1839. for _ in 0..bank1.ticks_per_slot() / GRACE_TICKS_FACTOR {
  1840. poh_recorder.tick();
  1841. }
  1842. // We should be the leader now
  1843. // without sending more ticks, we should be leader now
  1844. assert_eq!(
  1845. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1846. PohLeaderStatus::Reached {
  1847. poh_slot: 3,
  1848. parent_slot: 1,
  1849. }
  1850. );
  1851. // Let's test that correct grace ticks are reported
  1852. // Set the leader slot one slot down
  1853. let bank2 = Arc::new(Bank::new_from_parent(bank1.clone(), &Pubkey::default(), 2));
  1854. poh_recorder.reset(bank2.clone(), Some((4, 4)));
  1855. // send ticks for a slot
  1856. for _ in 0..bank1.ticks_per_slot() {
  1857. poh_recorder.tick();
  1858. }
  1859. // We are not the leader yet, as expected
  1860. assert_eq!(
  1861. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1862. PohLeaderStatus::NotReached
  1863. );
  1864. let bank3 = Arc::new(Bank::new_from_parent(bank2, &Pubkey::default(), 3));
  1865. assert_eq!(bank3.slot(), 3);
  1866. poh_recorder.reset(bank3.clone(), Some((4, 4)));
  1867. // without sending more ticks, we should be leader now
  1868. assert_eq!(
  1869. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1870. PohLeaderStatus::Reached {
  1871. poh_slot: 4,
  1872. parent_slot: 3,
  1873. }
  1874. );
  1875. // Let's test that if a node overshoots the ticks for its target
  1876. // leader slot, reached_leader_slot() will return true, because it's overdue
  1877. // Set the leader slot one slot down
  1878. let bank4 = Arc::new(Bank::new_from_parent(bank3, &Pubkey::default(), 4));
  1879. poh_recorder.reset(bank4.clone(), Some((5, 5)));
  1880. // Overshoot ticks for the slot
  1881. let overshoot_factor = 4;
  1882. for _ in 0..overshoot_factor * bank4.ticks_per_slot() {
  1883. poh_recorder.tick();
  1884. }
  1885. // We are overdue to lead
  1886. assert_eq!(
  1887. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1888. PohLeaderStatus::Reached {
  1889. poh_slot: 9,
  1890. parent_slot: 4,
  1891. }
  1892. );
  1893. // Test that grace ticks are not required if the previous leader's 4
  1894. // slots got skipped.
  1895. {
  1896. poh_recorder.reset(bank4.clone(), Some((9, 9)));
  1897. // Tick until leader slot
  1898. for _ in 0..4 * bank4.ticks_per_slot() {
  1899. poh_recorder.tick();
  1900. }
  1901. // We are due to lead
  1902. assert_eq!(
  1903. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1904. PohLeaderStatus::Reached {
  1905. poh_slot: 9,
  1906. parent_slot: 4,
  1907. }
  1908. );
  1909. // Add an active descendant which is considered to be a pending new
  1910. // reset bank
  1911. poh_recorder.update_start_bank_active_descendants(&[5]);
  1912. assert!(poh_recorder.is_new_reset_bank_pending(8));
  1913. // Without setting delay_leader_block_for_pending_fork, skip grace ticks
  1914. assert_eq!(
  1915. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1916. PohLeaderStatus::Reached {
  1917. poh_slot: 9,
  1918. parent_slot: 4,
  1919. }
  1920. );
  1921. // After setting delay_leader_block_for_pending_fork, grace ticks are required
  1922. poh_recorder.delay_leader_block_for_pending_fork = true;
  1923. assert_eq!(
  1924. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1925. PohLeaderStatus::NotReached,
  1926. );
  1927. // Tick through grace ticks
  1928. for _ in 0..poh_recorder.grace_ticks {
  1929. poh_recorder.tick();
  1930. }
  1931. // After grace ticks, we are due to lead
  1932. assert_eq!(
  1933. poh_recorder.reached_leader_slot(&test_validator_pubkey),
  1934. PohLeaderStatus::Reached {
  1935. poh_slot: 9,
  1936. parent_slot: 4,
  1937. }
  1938. );
  1939. }
  1940. }
  1941. #[test]
  1942. fn test_would_be_leader_soon() {
  1943. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1944. let blockstore = Blockstore::open(ledger_path.path())
  1945. .expect("Expected to be able to open database ledger");
  1946. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1947. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1948. let prev_hash = bank.last_blockhash();
  1949. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1950. 0,
  1951. prev_hash,
  1952. bank.clone(),
  1953. None,
  1954. bank.ticks_per_slot(),
  1955. Arc::new(blockstore),
  1956. &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
  1957. &PohConfig::default(),
  1958. Arc::new(AtomicBool::default()),
  1959. );
  1960. // Test that with no leader slot, we don't reach the leader tick
  1961. assert!(!poh_recorder.would_be_leader(2 * bank.ticks_per_slot()));
  1962. assert_eq!(bank.slot(), 0);
  1963. poh_recorder.reset(bank.clone(), None);
  1964. assert!(!poh_recorder.would_be_leader(2 * bank.ticks_per_slot()));
  1965. // We reset with leader slot after 3 slots
  1966. let bank_slot = bank.slot() + 3;
  1967. poh_recorder.reset(bank.clone(), Some((bank_slot, bank_slot)));
  1968. // Test that the node won't be leader in next 2 slots
  1969. assert!(!poh_recorder.would_be_leader(2 * bank.ticks_per_slot()));
  1970. // Test that the node will be leader in next 3 slots
  1971. assert!(poh_recorder.would_be_leader(3 * bank.ticks_per_slot()));
  1972. assert!(!poh_recorder.would_be_leader(2 * bank.ticks_per_slot()));
  1973. // Move the bank up a slot (so that max_tick_height > slot 0's tick_height)
  1974. let bank = Arc::new(Bank::new_from_parent(bank, &Pubkey::default(), 1));
  1975. // If we set the working bank, the node should be leader within next 2 slots
  1976. poh_recorder.set_bank_for_test(bank.clone());
  1977. assert!(poh_recorder.would_be_leader(2 * bank.ticks_per_slot()));
  1978. }
  1979. #[test]
  1980. fn test_flush_virtual_ticks() {
  1981. let ledger_path = get_tmp_ledger_path_auto_delete!();
  1982. // test that virtual ticks are flushed into a newly set bank asap
  1983. let blockstore = Blockstore::open(ledger_path.path())
  1984. .expect("Expected to be able to open database ledger");
  1985. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
  1986. let bank = Arc::new(Bank::new_for_tests(&genesis_config));
  1987. let genesis_hash = bank.last_blockhash();
  1988. let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
  1989. 0,
  1990. bank.last_blockhash(),
  1991. bank.clone(),
  1992. Some((2, 2)),
  1993. bank.ticks_per_slot(),
  1994. Arc::new(blockstore),
  1995. &Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
  1996. &PohConfig::default(),
  1997. Arc::new(AtomicBool::default()),
  1998. );
  1999. //create a new bank
  2000. let bank = Arc::new(Bank::new_from_parent(bank, &Pubkey::default(), 2));
  2001. // add virtual ticks into poh for slots 0, 1, and 2
  2002. for _ in 0..(bank.ticks_per_slot() * 3) {
  2003. poh_recorder.tick();
  2004. }
  2005. poh_recorder.set_bank_for_test(bank.clone());
  2006. assert!(!bank.is_hash_valid_for_age(&genesis_hash, 0));
  2007. assert!(bank.is_hash_valid_for_age(&genesis_hash, 1));
  2008. }
  2009. #[test]
  2010. fn test_compute_leader_slot_tick_heights() {
  2011. assert_eq!(
  2012. PohRecorder::compute_leader_slot_tick_heights(None, 0),
  2013. (None, 0, 0)
  2014. );
  2015. assert_eq!(
  2016. PohRecorder::compute_leader_slot_tick_heights(Some((4, 4)), 8),
  2017. (Some(33), 40, 4)
  2018. );
  2019. assert_eq!(
  2020. PohRecorder::compute_leader_slot_tick_heights(Some((4, 7)), 8),
  2021. (Some(33), 64, 2 * 8)
  2022. );
  2023. assert_eq!(
  2024. PohRecorder::compute_leader_slot_tick_heights(Some((6, 7)), 8),
  2025. (Some(49), 64, 8)
  2026. );
  2027. assert_eq!(
  2028. PohRecorder::compute_leader_slot_tick_heights(Some((6, 7)), 4),
  2029. (Some(25), 32, 4)
  2030. );
  2031. }
  2032. }