accounts_background_service.rs 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981
  1. //! Service to clean up dead slots in accounts_db
  2. //!
  3. //! This can be expensive since we have to walk the append vecs being cleaned up.
  4. mod pending_snapshot_packages;
  5. mod stats;
  6. pub use pending_snapshot_packages::PendingSnapshotPackages;
  7. #[cfg(feature = "dev-context-only-utils")]
  8. use qualifier_attr::qualifiers;
  9. use {
  10. crate::{
  11. bank::{Bank, BankSlotDelta, DropCallback},
  12. bank_forks::BankForks,
  13. snapshot_controller::SnapshotController,
  14. snapshot_package::SnapshotPackage,
  15. },
  16. agave_snapshots::{error::SnapshotError, SnapshotArchiveKind, SnapshotKind},
  17. crossbeam_channel::{Receiver, SendError, Sender},
  18. log::*,
  19. rayon::iter::{IntoParallelIterator, ParallelIterator},
  20. solana_clock::{BankId, Slot},
  21. solana_measure::{measure::Measure, measure_us},
  22. stats::StatsManager,
  23. std::{
  24. boxed::Box,
  25. cmp,
  26. fmt::{self, Debug, Formatter},
  27. sync::{
  28. atomic::{AtomicBool, AtomicU64, Ordering},
  29. Arc, LazyLock, Mutex, RwLock,
  30. },
  31. thread::{self, sleep, Builder, JoinHandle},
  32. time::{Duration, Instant},
  33. },
  34. };
  35. const INTERVAL_MS: u64 = 100;
  36. // Set the clean interval duration to be approximately how long before the next incremental
  37. // snapshot request is received, plus some buffer. The default incremental snapshot interval is
  38. // 100 slots, which ends up being 40 seconds plus buffer.
  39. const CLEAN_INTERVAL: Duration = Duration::from_secs(50);
  40. const SHRINK_INTERVAL: Duration = Duration::from_secs(1);
  41. pub type SnapshotRequestSender = Sender<SnapshotRequest>;
  42. pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
  43. pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
  44. pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
  45. /// interval to report bank_drop queue events: 60s
  46. const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
  47. /// maximum drop bank signal queue length
  48. const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
  49. #[derive(Debug, Default)]
  50. struct PrunedBankQueueLenReporter {
  51. last_report_time: AtomicU64,
  52. }
  53. impl PrunedBankQueueLenReporter {
  54. fn report(&self, q_len: usize) {
  55. let now = solana_time_utils::timestamp();
  56. let last_report_time = self.last_report_time.load(Ordering::Acquire);
  57. if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
  58. && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
  59. {
  60. datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
  61. self.last_report_time.store(now, Ordering::Release);
  62. }
  63. }
  64. }
  65. static BANK_DROP_QUEUE_REPORTER: LazyLock<PrunedBankQueueLenReporter> =
  66. LazyLock::new(PrunedBankQueueLenReporter::default);
  67. #[derive(Clone)]
  68. pub struct SendDroppedBankCallback {
  69. sender: DroppedSlotsSender,
  70. }
  71. impl DropCallback for SendDroppedBankCallback {
  72. fn callback(&self, bank: &Bank) {
  73. BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
  74. if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
  75. info!("bank DropCallback signal queue disconnected.");
  76. }
  77. }
  78. fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
  79. Box::new(self.clone())
  80. }
  81. }
  82. impl Debug for SendDroppedBankCallback {
  83. fn fmt(&self, f: &mut Formatter) -> fmt::Result {
  84. write!(f, "SendDroppedBankCallback({self:p})")
  85. }
  86. }
  87. impl SendDroppedBankCallback {
  88. pub fn new(sender: DroppedSlotsSender) -> Self {
  89. Self { sender }
  90. }
  91. }
  92. pub struct SnapshotRequest {
  93. pub snapshot_root_bank: Arc<Bank>,
  94. pub status_cache_slot_deltas: Vec<BankSlotDelta>,
  95. pub request_kind: SnapshotRequestKind,
  96. /// The instant this request was send to the queue.
  97. /// Used to track how long requests wait before processing.
  98. pub enqueued: Instant,
  99. }
  100. impl Debug for SnapshotRequest {
  101. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  102. f.debug_struct("SnapshotRequest")
  103. .field("request kind", &self.request_kind)
  104. .field("bank slot", &self.snapshot_root_bank.slot())
  105. .field("block height", &self.snapshot_root_bank.block_height())
  106. .finish_non_exhaustive()
  107. }
  108. }
  109. /// What kind of request is this?
  110. #[derive(Debug, Copy, Clone, Eq, PartialEq)]
  111. pub enum SnapshotRequestKind {
  112. FullSnapshot,
  113. IncrementalSnapshot,
  114. }
  115. pub struct SnapshotRequestHandler {
  116. pub snapshot_controller: Arc<SnapshotController>,
  117. pub snapshot_request_receiver: SnapshotRequestReceiver,
  118. pub pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
  119. }
  120. impl SnapshotRequestHandler {
  121. // Returns the latest requested snapshot slot and storages
  122. #[allow(clippy::type_complexity)]
  123. pub fn handle_snapshot_requests(
  124. &self,
  125. non_snapshot_time_us: u128,
  126. ) -> Option<Result<Slot, SnapshotError>> {
  127. let (snapshot_request, num_outstanding_requests, num_re_enqueued_requests) =
  128. self.get_next_snapshot_request()?;
  129. datapoint_info!(
  130. "handle_snapshot_requests",
  131. ("num_outstanding_requests", num_outstanding_requests, i64),
  132. ("num_re_enqueued_requests", num_re_enqueued_requests, i64),
  133. (
  134. "enqueued_time_us",
  135. snapshot_request.enqueued.elapsed().as_micros(),
  136. i64
  137. ),
  138. );
  139. let snapshot_kind = new_snapshot_kind(&snapshot_request)?;
  140. Some(self.handle_snapshot_request(non_snapshot_time_us, snapshot_request, snapshot_kind))
  141. }
  142. /// Get the next snapshot request to handle
  143. ///
  144. /// Look through the snapshot request channel to find the highest priority one to handle next.
  145. /// If there are no snapshot requests in the channel, return None. Otherwise return the
  146. /// highest priority one. Unhandled snapshot requests with slots GREATER-THAN the handled one
  147. /// will be re-enqueued. The remaining will be dropped.
  148. ///
  149. /// Also return the number of snapshot requests initially in the channel, and the number of
  150. /// ones re-enqueued.
  151. fn get_next_snapshot_request(
  152. &self,
  153. ) -> Option<(
  154. SnapshotRequest,
  155. /*num outstanding snapshot requests*/ usize,
  156. /*num re-enqueued snapshot requests*/ usize,
  157. )> {
  158. let mut requests: Vec<_> = self.snapshot_request_receiver.try_iter().collect();
  159. let requests_len = requests.len();
  160. debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
  161. match requests_len {
  162. 0 => None,
  163. 1 => {
  164. // SAFETY: We know the len is 1, so `pop` will return `Some`
  165. let snapshot_request = requests.pop().unwrap();
  166. Some((snapshot_request, 1, 0))
  167. }
  168. _ => {
  169. requests.select_nth_unstable_by(requests_len - 1, cmp_requests_by_priority);
  170. // SAFETY: We know the len is > 1, so `pop` will return `Some`
  171. let snapshot_request = requests.pop().unwrap();
  172. let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
  173. // re-enqueue any remaining requests for slots GREATER-THAN the one that will be handled
  174. let num_re_enqueued_requests = requests
  175. .into_iter()
  176. .filter(|snapshot_request| {
  177. snapshot_request.snapshot_root_bank.slot() > handled_request_slot
  178. })
  179. .map(|snapshot_request| {
  180. self.snapshot_controller
  181. .request_sender()
  182. .try_send(snapshot_request)
  183. .expect("re-enqueue snapshot request");
  184. })
  185. .count();
  186. Some((snapshot_request, requests_len, num_re_enqueued_requests))
  187. }
  188. }
  189. }
  190. fn handle_snapshot_request(
  191. &self,
  192. non_snapshot_time_us: u128,
  193. snapshot_request: SnapshotRequest,
  194. snapshot_kind: SnapshotKind,
  195. ) -> Result<Slot, SnapshotError> {
  196. info!("handling snapshot request: {snapshot_request:?}, {snapshot_kind:?}");
  197. let mut total_time = Measure::start("snapshot_request_receiver_total_time");
  198. let SnapshotRequest {
  199. snapshot_root_bank,
  200. status_cache_slot_deltas,
  201. request_kind: _,
  202. enqueued: _,
  203. } = snapshot_request;
  204. if snapshot_kind.is_full_snapshot() {
  205. // The latest full snapshot slot is what accounts-db uses to properly handle
  206. // zero lamport accounts. We are handling a full snapshot request here, and
  207. // since taking a snapshot is not allowed to fail, we can update accounts-db now.
  208. snapshot_root_bank
  209. .rc
  210. .accounts
  211. .accounts_db
  212. .set_latest_full_snapshot_slot(snapshot_root_bank.slot());
  213. }
  214. let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
  215. // Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot().
  216. // That's because `snapshot_root_bank.slot()` must be root at this point,
  217. // and contains relevant updates because each bank has at least 1 account update due
  218. // to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot
  219. snapshot_root_bank.force_flush_accounts_cache();
  220. // Ensure all roots <= `self.slot()` have been flushed.
  221. // Note `max_flush_root` could be larger than self.slot() if there are
  222. // `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes.
  223. assert!(
  224. snapshot_root_bank.slot()
  225. <= snapshot_root_bank
  226. .rc
  227. .accounts
  228. .accounts_db
  229. .accounts_cache
  230. .fetch_max_flush_root()
  231. );
  232. flush_accounts_cache_time.stop();
  233. let mut clean_time = Measure::start("clean_time");
  234. snapshot_root_bank.clean_accounts();
  235. clean_time.stop();
  236. let (_, shrink_ancient_time_us) = measure_us!(snapshot_root_bank.shrink_ancient_slots());
  237. let mut shrink_time = Measure::start("shrink_time");
  238. snapshot_root_bank.shrink_candidate_slots();
  239. shrink_time.stop();
  240. // Snapshot the bank and send over a snapshot package
  241. let mut snapshot_time = Measure::start("snapshot_time");
  242. let snapshot_package = SnapshotPackage::new(
  243. snapshot_kind,
  244. &snapshot_root_bank,
  245. snapshot_root_bank.get_snapshot_storages(None),
  246. status_cache_slot_deltas,
  247. );
  248. self.pending_snapshot_packages
  249. .lock()
  250. .unwrap()
  251. .push(snapshot_package);
  252. snapshot_time.stop();
  253. info!(
  254. "Handled snapshot request. snapshot kind: {:?}, slot: {}, bank hash: {}",
  255. snapshot_kind,
  256. snapshot_root_bank.slot(),
  257. snapshot_root_bank.hash(),
  258. );
  259. total_time.stop();
  260. datapoint_info!(
  261. "handle_snapshot_requests-timing",
  262. (
  263. "flush_accounts_cache_time",
  264. flush_accounts_cache_time.as_us(),
  265. i64
  266. ),
  267. ("shrink_time", shrink_time.as_us(), i64),
  268. ("clean_time", clean_time.as_us(), i64),
  269. ("snapshot_time", snapshot_time.as_us(), i64),
  270. ("total_us", total_time.as_us(), i64),
  271. ("non_snapshot_time_us", non_snapshot_time_us, i64),
  272. ("shrink_ancient_time_us", shrink_ancient_time_us, i64),
  273. );
  274. Ok(snapshot_root_bank.slot())
  275. }
  276. /// Returns the slot of the next snapshot request to be handled
  277. fn peek_next_snapshot_request_slot(&self) -> Option<Slot> {
  278. // We reuse `get_next_snapshot_request()` here, since it already implements all the logic
  279. // for getting the highest priority request, *AND* we leverage its test coverage.
  280. // Additionally, since `get_next_snapshot_request()` drops old requests, we might get to
  281. // proactively clean up old banks earlier as well!
  282. let (next_request, _, _) = self.get_next_snapshot_request()?;
  283. let next_slot = next_request.snapshot_root_bank.slot();
  284. // make sure to re-enqueue the request, otherwise we'd lose it!
  285. self.snapshot_controller
  286. .request_sender()
  287. .try_send(next_request)
  288. .expect("re-enqueue snapshot request");
  289. Some(next_slot)
  290. }
  291. }
  292. #[derive(Debug)]
  293. pub struct PrunedBanksRequestHandler {
  294. pub pruned_banks_receiver: DroppedSlotsReceiver,
  295. }
  296. impl PrunedBanksRequestHandler {
  297. #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
  298. fn handle_request(&self, bank: &Bank) -> usize {
  299. let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
  300. // We need a stable sort to ensure we purge banks—with the same slot—in the same order
  301. // they were sent into the channel.
  302. banks_to_purge.sort_by_key(|(slot, _id)| *slot);
  303. let num_banks_to_purge = banks_to_purge.len();
  304. // Group the banks into slices with the same slot
  305. let grouped_banks_to_purge: Vec<_> = banks_to_purge.chunk_by(|a, b| a.0 == b.0).collect();
  306. // Log whenever we need to handle banks with the same slot. Purposely do this *before* we
  307. // call `purge_slot()` to ensure we get the datapoint (in case there's an assert/panic).
  308. let num_banks_with_same_slot =
  309. num_banks_to_purge.saturating_sub(grouped_banks_to_purge.len());
  310. if num_banks_with_same_slot > 0 {
  311. datapoint_info!(
  312. "pruned_banks_request_handler",
  313. ("num_pruned_banks", num_banks_to_purge, i64),
  314. ("num_banks_with_same_slot", num_banks_with_same_slot, i64),
  315. );
  316. }
  317. // Purge all the slots in parallel
  318. // Banks for the same slot are purged sequentially
  319. let accounts_db = bank.rc.accounts.accounts_db.as_ref();
  320. accounts_db.thread_pool_background.install(|| {
  321. grouped_banks_to_purge.into_par_iter().for_each(|group| {
  322. group.iter().for_each(|(slot, bank_id)| {
  323. accounts_db.purge_slot(*slot, *bank_id, true);
  324. })
  325. });
  326. });
  327. num_banks_to_purge
  328. }
  329. fn remove_dead_slots(
  330. &self,
  331. bank: &Bank,
  332. removed_slots_count: &mut usize,
  333. total_remove_slots_time: &mut u64,
  334. ) {
  335. let mut remove_slots_time = Measure::start("remove_slots_time");
  336. *removed_slots_count += self.handle_request(bank);
  337. remove_slots_time.stop();
  338. *total_remove_slots_time += remove_slots_time.as_us();
  339. if *removed_slots_count >= 100 {
  340. datapoint_info!(
  341. "remove_slots_timing",
  342. ("remove_slots_time", *total_remove_slots_time, i64),
  343. ("removed_slots_count", *removed_slots_count, i64),
  344. );
  345. *total_remove_slots_time = 0;
  346. *removed_slots_count = 0;
  347. }
  348. }
  349. }
  350. pub struct AbsRequestHandlers {
  351. pub snapshot_request_handler: SnapshotRequestHandler,
  352. pub pruned_banks_request_handler: PrunedBanksRequestHandler,
  353. }
  354. impl AbsRequestHandlers {
  355. // Returns the latest requested snapshot slot, if one exists
  356. #[allow(clippy::type_complexity)]
  357. pub fn handle_snapshot_requests(
  358. &self,
  359. non_snapshot_time_us: u128,
  360. ) -> Option<Result<Slot, SnapshotError>> {
  361. self.snapshot_request_handler
  362. .handle_snapshot_requests(non_snapshot_time_us)
  363. }
  364. }
  365. pub struct AccountsBackgroundService {
  366. t_background: JoinHandle<()>,
  367. status: AbsStatus,
  368. }
  369. impl AccountsBackgroundService {
  370. pub fn new(
  371. bank_forks: Arc<RwLock<BankForks>>,
  372. exit: Arc<AtomicBool>,
  373. request_handlers: AbsRequestHandlers,
  374. ) -> Self {
  375. let is_running = Arc::new(AtomicBool::new(true));
  376. let stop = Arc::new(AtomicBool::new(false));
  377. let mut last_cleaned_slot = 0;
  378. let mut removed_slots_count = 0;
  379. let mut total_remove_slots_time = 0;
  380. let t_background = Builder::new()
  381. .name("solAcctsBgSvc".to_string())
  382. .spawn({
  383. let is_running = is_running.clone();
  384. let stop = stop.clone();
  385. move || {
  386. info!("AccountsBackgroundService has started");
  387. let mut stats = StatsManager::new();
  388. let mut last_snapshot_end_time = None;
  389. let mut previous_clean_time = Instant::now();
  390. let mut previous_shrink_time = Instant::now();
  391. loop {
  392. if exit.load(Ordering::Relaxed) || stop.load(Ordering::Relaxed) {
  393. break;
  394. }
  395. let start_time = Instant::now();
  396. // Grab the current root bank
  397. let bank = bank_forks.read().unwrap().root_bank();
  398. // Purge accounts of any dead slots
  399. request_handlers
  400. .pruned_banks_request_handler
  401. .remove_dead_slots(
  402. &bank,
  403. &mut removed_slots_count,
  404. &mut total_remove_slots_time,
  405. );
  406. let non_snapshot_time = last_snapshot_end_time
  407. .map(|last_snapshot_end_time: Instant| {
  408. last_snapshot_end_time.elapsed().as_micros()
  409. })
  410. .unwrap_or_default();
  411. // Check to see if there were any requests for snapshotting banks
  412. // < the current root bank `bank` above.
  413. //
  414. // Claim: Any snapshot request for slot `N` found here implies that the
  415. // last cleanup slot `M` satisfies `M < N`
  416. //
  417. // Proof: Assume for contradiction that we find a snapshot request for slot
  418. // `N` here, but cleanup has already happened on some slot `M >= N`.
  419. // Because the call to `bank.clean_accounts(true)` (in the code below)
  420. // implies we only clean slots `<= bank - 1`, then that means in some
  421. // *previous* iteration of this loop, we must have gotten a root bank for
  422. // slot some slot `R` where `R > N`, but did not see the snapshot for `N`
  423. // in the snapshot request channel.
  424. //
  425. // However, this is impossible because BankForks.set_root() will always
  426. // flush the snapshot request for `N` to the snapshot request channel
  427. // before setting a root `R > N`, and
  428. // snapshot_request_handler.handle_requests() will always look for the
  429. // latest available snapshot in the channel.
  430. let snapshot_handle_result =
  431. request_handlers.handle_snapshot_requests(non_snapshot_time);
  432. if let Some(snapshot_handle_result) = snapshot_handle_result {
  433. // Safe, see proof above
  434. last_snapshot_end_time = Some(Instant::now());
  435. match snapshot_handle_result {
  436. Ok(snapshot_slot) => {
  437. assert!(
  438. last_cleaned_slot <= snapshot_slot,
  439. "last cleaned slot: {last_cleaned_slot}, snapshot request \
  440. slot: {snapshot_slot}, enqueued snapshot requests: {:?}",
  441. request_handlers
  442. .snapshot_request_handler
  443. .snapshot_request_receiver
  444. .try_iter()
  445. .collect::<Vec<_>>(),
  446. );
  447. last_cleaned_slot = snapshot_slot;
  448. previous_clean_time = Instant::now();
  449. previous_shrink_time = Instant::now();
  450. }
  451. Err(err) => {
  452. error!(
  453. "Stopping AccountsBackgroundService! Fatal error while \
  454. handling snapshot requests: {err}",
  455. );
  456. exit.store(true, Ordering::Relaxed);
  457. break;
  458. }
  459. }
  460. } else {
  461. // we didn't handle a snapshot request, so do flush/clean/shrink
  462. let next_snapshot_request_slot = request_handlers
  463. .snapshot_request_handler
  464. .peek_next_snapshot_request_slot();
  465. // We cannot clean past the next snapshot request slot because it may
  466. // have zero-lamport accounts. See the comments in
  467. // Bank::clean_accounts() for more information.
  468. let max_clean_slot_inclusive = cmp::min(
  469. next_snapshot_request_slot.unwrap_or(Slot::MAX),
  470. bank.slot(),
  471. )
  472. .saturating_sub(1);
  473. let duration_since_previous_clean = previous_clean_time.elapsed();
  474. let should_clean = duration_since_previous_clean > CLEAN_INTERVAL;
  475. // if we're cleaning, then force flush, otherwise be lazy
  476. let force_flush = should_clean;
  477. bank.rc
  478. .accounts
  479. .accounts_db
  480. .flush_accounts_cache(force_flush, Some(max_clean_slot_inclusive));
  481. if should_clean {
  482. bank.rc.accounts.accounts_db.clean_accounts(
  483. Some(max_clean_slot_inclusive),
  484. false,
  485. bank.epoch_schedule(),
  486. );
  487. last_cleaned_slot = max_clean_slot_inclusive;
  488. previous_clean_time = Instant::now();
  489. }
  490. let duration_since_previous_shrink = previous_shrink_time.elapsed();
  491. let should_shrink = duration_since_previous_shrink > SHRINK_INTERVAL;
  492. // To avoid pathological interactions between the clean and shrink
  493. // timers, call shrink for either should_shrink or should_clean.
  494. if should_shrink || should_clean {
  495. if should_clean {
  496. // We used to only squash (aka shrink ancients) when we also
  497. // cleaned, so keep that same behavior here for now.
  498. bank.shrink_ancient_slots();
  499. }
  500. bank.shrink_candidate_slots();
  501. previous_shrink_time = Instant::now();
  502. }
  503. }
  504. stats.record_and_maybe_submit(start_time.elapsed());
  505. sleep(Duration::from_millis(INTERVAL_MS));
  506. }
  507. info!("AccountsBackgroundService has stopped");
  508. is_running.store(false, Ordering::Relaxed);
  509. }
  510. })
  511. .unwrap();
  512. Self {
  513. t_background,
  514. status: AbsStatus { is_running, stop },
  515. }
  516. }
  517. /// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
  518. /// should only be one bank, the root bank, in `bank_forks`
  519. /// All banks added to `bank_forks` will be descended from the root bank, and thus will inherit
  520. /// the bank drop callback.
  521. pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
  522. assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
  523. let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
  524. {
  525. let root_bank = bank_forks.read().unwrap().root_bank();
  526. root_bank
  527. .rc
  528. .accounts
  529. .accounts_db
  530. .enable_bank_drop_callback();
  531. root_bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
  532. pruned_banks_sender,
  533. ))));
  534. }
  535. pruned_banks_receiver
  536. }
  537. pub fn join(self) -> thread::Result<()> {
  538. self.t_background.join()
  539. }
  540. /// Returns an object to query/manage the status of ABS
  541. pub fn status(&self) -> &AbsStatus {
  542. &self.status
  543. }
  544. }
  545. /// Query and manage the status of AccountsBackgroundService
  546. #[derive(Debug, Clone)]
  547. pub struct AbsStatus {
  548. /// Flag to query if ABS is running
  549. is_running: Arc<AtomicBool>,
  550. /// Flag to set to stop ABS
  551. stop: Arc<AtomicBool>,
  552. }
  553. impl AbsStatus {
  554. /// Returns if ABS is running
  555. pub fn is_running(&self) -> bool {
  556. self.is_running.load(Ordering::Relaxed)
  557. }
  558. /// Raises the flag for ABS to stop
  559. pub fn stop(&self) {
  560. self.stop.store(true, Ordering::Relaxed)
  561. }
  562. #[cfg(feature = "dev-context-only-utils")]
  563. pub fn new_for_tests() -> Self {
  564. Self {
  565. is_running: Arc::new(AtomicBool::new(false)),
  566. stop: Arc::new(AtomicBool::new(false)),
  567. }
  568. }
  569. }
  570. /// Get the SnapshotKind from a given SnapshotRequest
  571. #[must_use]
  572. fn new_snapshot_kind(snapshot_request: &SnapshotRequest) -> Option<SnapshotKind> {
  573. match snapshot_request.request_kind {
  574. SnapshotRequestKind::FullSnapshot => Some(SnapshotKind::Archive(SnapshotArchiveKind::Full)),
  575. SnapshotRequestKind::IncrementalSnapshot => {
  576. if let Some(latest_full_snapshot_slot) = snapshot_request
  577. .snapshot_root_bank
  578. .rc
  579. .accounts
  580. .accounts_db
  581. .latest_full_snapshot_slot()
  582. {
  583. Some(SnapshotKind::Archive(SnapshotArchiveKind::Incremental(
  584. latest_full_snapshot_slot,
  585. )))
  586. } else {
  587. warn!(
  588. "Ignoring IncrementalSnapshot request for slot {} because there is no latest \
  589. full snapshot",
  590. snapshot_request.snapshot_root_bank.slot()
  591. );
  592. None
  593. }
  594. }
  595. }
  596. }
  597. /// Compare snapshot requests; used to pick the highest priority request to handle.
  598. ///
  599. /// Priority, from highest to lowest:
  600. /// - Epoch Accounts Hash
  601. /// - Full Snapshot
  602. /// - Incremental Snapshot
  603. ///
  604. /// If two requests of the same kind are being compared, their bank slots are the tiebreaker.
  605. #[must_use]
  606. fn cmp_requests_by_priority(a: &SnapshotRequest, b: &SnapshotRequest) -> cmp::Ordering {
  607. let slot_a = a.snapshot_root_bank.slot();
  608. let slot_b = b.snapshot_root_bank.slot();
  609. cmp_snapshot_request_kinds_by_priority(&a.request_kind, &b.request_kind)
  610. .then(slot_a.cmp(&slot_b))
  611. }
  612. /// Compare snapshot request kinds by priority
  613. ///
  614. /// Priority, from highest to lowest:
  615. /// - Full Snapshot
  616. /// - Incremental Snapshot
  617. #[must_use]
  618. fn cmp_snapshot_request_kinds_by_priority(
  619. a: &SnapshotRequestKind,
  620. b: &SnapshotRequestKind,
  621. ) -> cmp::Ordering {
  622. use {
  623. cmp::Ordering::{Equal, Greater, Less},
  624. SnapshotRequestKind as Kind,
  625. };
  626. match (a, b) {
  627. (Kind::FullSnapshot, Kind::FullSnapshot) => Equal,
  628. (Kind::FullSnapshot, Kind::IncrementalSnapshot) => Greater,
  629. (Kind::IncrementalSnapshot, Kind::FullSnapshot) => Less,
  630. (Kind::IncrementalSnapshot, Kind::IncrementalSnapshot) => Equal,
  631. }
  632. }
  633. #[cfg(test)]
  634. mod test {
  635. use {
  636. super::*,
  637. crate::genesis_utils::create_genesis_config,
  638. agave_snapshots::{snapshot_config::SnapshotConfig, SnapshotInterval},
  639. crossbeam_channel::unbounded,
  640. solana_account::AccountSharedData,
  641. solana_epoch_schedule::EpochSchedule,
  642. solana_pubkey::Pubkey,
  643. std::num::NonZeroU64,
  644. };
  645. #[test]
  646. fn test_accounts_background_service_remove_dead_slots() {
  647. let genesis = create_genesis_config(10);
  648. let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
  649. let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
  650. let pruned_banks_request_handler = PrunedBanksRequestHandler {
  651. pruned_banks_receiver,
  652. };
  653. // Store an account in slot 0
  654. let account_key = Pubkey::new_unique();
  655. bank0.store_account(
  656. &account_key,
  657. &AccountSharedData::new(264, 0, &Pubkey::default()),
  658. );
  659. assert!(bank0.get_account(&account_key).is_some());
  660. pruned_banks_sender.send((0, 0)).unwrap();
  661. assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
  662. pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
  663. assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
  664. }
  665. /// Ensure that unhandled snapshot requests are properly re-enqueued or dropped
  666. ///
  667. /// The snapshot request handler should be flexible and handle re-queueing unhandled snapshot
  668. /// requests, if those unhandled requests are for slots GREATER-THAN the last request handled.
  669. #[test]
  670. fn test_get_next_snapshot_request() {
  671. // These constants were picked to ensure the desired snapshot requests were sent to the
  672. // channel. Ensure there are multiple requests of each kind.
  673. const SLOTS_PER_EPOCH: Slot = 400;
  674. const FULL_SNAPSHOT_INTERVAL: Slot = 80;
  675. const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
  676. let snapshot_config = SnapshotConfig {
  677. full_snapshot_archive_interval: SnapshotInterval::Slots(
  678. NonZeroU64::new(FULL_SNAPSHOT_INTERVAL).unwrap(),
  679. ),
  680. incremental_snapshot_archive_interval: SnapshotInterval::Slots(
  681. NonZeroU64::new(INCREMENTAL_SNAPSHOT_INTERVAL).unwrap(),
  682. ),
  683. ..SnapshotConfig::default()
  684. };
  685. let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
  686. let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
  687. let snapshot_controller = Arc::new(SnapshotController::new(
  688. snapshot_request_sender.clone(),
  689. snapshot_config,
  690. 0,
  691. ));
  692. let snapshot_request_handler = SnapshotRequestHandler {
  693. snapshot_controller,
  694. snapshot_request_receiver,
  695. pending_snapshot_packages,
  696. };
  697. let send_snapshot_request = |snapshot_root_bank, request_kind| {
  698. let snapshot_request = SnapshotRequest {
  699. snapshot_root_bank,
  700. status_cache_slot_deltas: Vec::default(),
  701. request_kind,
  702. enqueued: Instant::now(),
  703. };
  704. snapshot_request_sender.send(snapshot_request).unwrap();
  705. };
  706. let mut genesis_config_info = create_genesis_config(10);
  707. genesis_config_info.genesis_config.epoch_schedule =
  708. EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
  709. let mut bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
  710. // We need to get and set accounts-db's latest full snapshot slot to test
  711. // get_next_snapshot_request(). To workaround potential borrowing issues
  712. // caused by make_banks() below, Arc::clone bank0 and add helper functions.
  713. let bank0 = bank.clone();
  714. fn latest_full_snapshot_slot(bank: &Bank) -> Option<Slot> {
  715. bank.rc.accounts.accounts_db.latest_full_snapshot_slot()
  716. }
  717. fn set_latest_full_snapshot_slot(bank: &Bank, slot: Slot) {
  718. bank.rc
  719. .accounts
  720. .accounts_db
  721. .set_latest_full_snapshot_slot(slot);
  722. }
  723. // Create new banks and send snapshot requests so that the following requests will be in
  724. // the channel before handling the requests:
  725. //
  726. // fss 80
  727. // iss 90
  728. // iss 120
  729. // iss 150
  730. // fss 160
  731. // iss 180
  732. // iss 210
  733. // fss 240 <-- handled 1st
  734. // iss 270
  735. // iss 300 <-- handled 2nd
  736. //
  737. // Also, incremental snapshots before slot 240 (the first full snapshot handled), will
  738. // actually be skipped since the latest full snapshot slot will be `None`.
  739. let mut make_banks = |num_banks| {
  740. for _ in 0..num_banks {
  741. let slot = bank.slot() + 1;
  742. bank = Arc::new(Bank::new_from_parent(
  743. bank.clone(),
  744. &Pubkey::new_unique(),
  745. slot,
  746. ));
  747. // Since we're not using `BankForks::set_root()`, we have to handle sending the
  748. // correct snapshot requests ourself.
  749. if bank.block_height().is_multiple_of(FULL_SNAPSHOT_INTERVAL) {
  750. send_snapshot_request(Arc::clone(&bank), SnapshotRequestKind::FullSnapshot);
  751. } else if bank
  752. .block_height()
  753. .is_multiple_of(INCREMENTAL_SNAPSHOT_INTERVAL)
  754. {
  755. send_snapshot_request(
  756. Arc::clone(&bank),
  757. SnapshotRequestKind::IncrementalSnapshot,
  758. );
  759. }
  760. }
  761. };
  762. make_banks(303);
  763. // Ensure the full snapshot from slot 240 is handled 1st
  764. // (the older full snapshots are skipped and dropped)
  765. assert_eq!(latest_full_snapshot_slot(&bank0), None);
  766. let (snapshot_request, ..) = snapshot_request_handler
  767. .get_next_snapshot_request()
  768. .unwrap();
  769. assert_eq!(
  770. snapshot_request.request_kind,
  771. SnapshotRequestKind::FullSnapshot
  772. );
  773. assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
  774. set_latest_full_snapshot_slot(&bank0, 240);
  775. // Ensure the incremental snapshot from slot 300 is handled 2nd
  776. // (the older incremental snapshots are skipped and dropped)
  777. assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
  778. let (snapshot_request, ..) = snapshot_request_handler
  779. .get_next_snapshot_request()
  780. .unwrap();
  781. assert_eq!(
  782. snapshot_request.request_kind,
  783. SnapshotRequestKind::IncrementalSnapshot
  784. );
  785. assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
  786. // And now ensure the snapshot request channel is empty!
  787. assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
  788. assert!(snapshot_request_handler
  789. .get_next_snapshot_request()
  790. .is_none());
  791. }
  792. /// Ensure that we can prune banks with the same slot (if they were on different forks)
  793. #[test]
  794. fn test_pruned_banks_request_handler_handle_request() {
  795. let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
  796. let pruned_banks_request_handler = PrunedBanksRequestHandler {
  797. pruned_banks_receiver,
  798. };
  799. let genesis_config_info = create_genesis_config(10);
  800. let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
  801. bank.rc.accounts.accounts_db.enable_bank_drop_callback();
  802. bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
  803. pruned_banks_sender,
  804. ))));
  805. let fork0_bank0 = Arc::new(bank);
  806. let fork0_bank1 = Arc::new(Bank::new_from_parent(
  807. fork0_bank0.clone(),
  808. &Pubkey::new_unique(),
  809. fork0_bank0.slot() + 1,
  810. ));
  811. let fork1_bank1 = Arc::new(Bank::new_from_parent(
  812. fork0_bank0.clone(),
  813. &Pubkey::new_unique(),
  814. fork0_bank0.slot() + 1,
  815. ));
  816. let fork2_bank1 = Arc::new(Bank::new_from_parent(
  817. fork0_bank0.clone(),
  818. &Pubkey::new_unique(),
  819. fork0_bank0.slot() + 1,
  820. ));
  821. let fork0_bank2 = Arc::new(Bank::new_from_parent(
  822. fork0_bank1.clone(),
  823. &Pubkey::new_unique(),
  824. fork0_bank1.slot() + 1,
  825. ));
  826. let fork1_bank2 = Arc::new(Bank::new_from_parent(
  827. fork1_bank1.clone(),
  828. &Pubkey::new_unique(),
  829. fork1_bank1.slot() + 1,
  830. ));
  831. let fork0_bank3 = Arc::new(Bank::new_from_parent(
  832. fork0_bank2.clone(),
  833. &Pubkey::new_unique(),
  834. fork0_bank2.slot() + 1,
  835. ));
  836. let fork3_bank3 = Arc::new(Bank::new_from_parent(
  837. fork0_bank2.clone(),
  838. &Pubkey::new_unique(),
  839. fork0_bank2.slot() + 1,
  840. ));
  841. fork0_bank3.squash();
  842. drop(fork3_bank3);
  843. drop(fork1_bank2);
  844. drop(fork0_bank2);
  845. drop(fork1_bank1);
  846. drop(fork2_bank1);
  847. drop(fork0_bank1);
  848. drop(fork0_bank0);
  849. let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3);
  850. assert_eq!(num_banks_purged, 7);
  851. }
  852. #[test]
  853. fn test_cmp_snapshot_request_kinds_by_priority() {
  854. use cmp::Ordering::{Equal, Greater, Less};
  855. for (snapshot_request_kind_a, snapshot_request_kind_b, expected_result) in [
  856. (
  857. SnapshotRequestKind::FullSnapshot,
  858. SnapshotRequestKind::FullSnapshot,
  859. Equal,
  860. ),
  861. (
  862. SnapshotRequestKind::FullSnapshot,
  863. SnapshotRequestKind::IncrementalSnapshot,
  864. Greater,
  865. ),
  866. (
  867. SnapshotRequestKind::IncrementalSnapshot,
  868. SnapshotRequestKind::FullSnapshot,
  869. Less,
  870. ),
  871. (
  872. SnapshotRequestKind::IncrementalSnapshot,
  873. SnapshotRequestKind::IncrementalSnapshot,
  874. Equal,
  875. ),
  876. ] {
  877. let actual_result = cmp_snapshot_request_kinds_by_priority(
  878. &snapshot_request_kind_a,
  879. &snapshot_request_kind_b,
  880. );
  881. assert_eq!(expected_result, actual_result);
  882. }
  883. }
  884. }