snapshot_utils.rs 104 KB


  1. #[cfg(feature = "dev-context-only-utils")]
  2. use solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs;
  3. use {
  4. crate::{
  5. bank::{BankFieldsToDeserialize, BankFieldsToSerialize, BankHashStats, BankSlotDelta},
  6. serde_snapshot::{
  7. self, AccountsDbFields, ExtraFieldsToSerialize, SerdeObsoleteAccountsMap,
  8. SerializableAccountStorageEntry, SnapshotAccountsDbFields, SnapshotBankFields,
  9. SnapshotStreams,
  10. },
  11. snapshot_package::SnapshotPackage,
  12. snapshot_utils::snapshot_storage_rebuilder::{
  13. get_slot_and_append_vec_id, SnapshotStorageRebuilder,
  14. },
  15. },
  16. agave_snapshots::{
  17. archive_snapshot,
  18. error::{
  19. AddBankSnapshotError, GetSnapshotAccountsHardLinkDirError,
  20. HardLinkStoragesToSnapshotError, SnapshotError, SnapshotFastbootError,
  21. SnapshotNewFromDirError,
  22. },
  23. paths::{self as snapshot_paths, get_incremental_snapshot_archives},
  24. snapshot_archive_info::{
  25. FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfo,
  26. SnapshotArchiveInfoGetter,
  27. },
  28. snapshot_config::SnapshotConfig,
  29. streaming_unarchive_snapshot, ArchiveFormat, Result, SnapshotArchiveKind, SnapshotKind,
  30. SnapshotVersion,
  31. },
  32. crossbeam_channel::{Receiver, Sender},
  33. log::*,
  34. regex::Regex,
  35. semver::Version,
  36. solana_accounts_db::{
  37. account_storage::AccountStorageMap,
  38. accounts_db::{AccountStorageEntry, AccountsDbConfig, AtomicAccountsFileId},
  39. accounts_file::{AccountsFile, StorageAccess},
  40. utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
  41. },
  42. solana_clock::Slot,
  43. solana_measure::{measure::Measure, measure_time, measure_us},
  44. std::{
  45. cmp::Ordering,
  46. collections::{HashMap, HashSet},
  47. fs,
  48. io::{self, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
  49. mem,
  50. num::NonZeroUsize,
  51. path::{Path, PathBuf},
  52. str::FromStr,
  53. sync::{Arc, LazyLock},
  54. },
  55. tempfile::TempDir,
  56. };
  57. pub mod snapshot_storage_rebuilder;
  58. /// Limit the size of the obsolete accounts file
  59. /// If it exceeds this limit, remove the file which will force restore from archives
  60. /// Limit is set assuming 24 bytes per entry, 5% of 10 billion accounts
  61. /// = 500 million entries * 24 bytes = 12 GB
  62. pub const MAX_OBSOLETE_ACCOUNTS_FILE_SIZE: u64 = 1024 * 1024 * 1024 * 12; // 12 GB
  63. pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
  64. const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
  65. // Snapshot Fastboot Version History
  66. // Legacy - No fastboot version file, storages flushed file presence determines if snapshot is loadable
  67. // 1.0.0 - Initial version file. Backwards and forwards compatible with Legacy.
  68. // 2.0.0 - Obsolete Accounts File added, storages flushed file not written anymore
  69. // Snapshots created with version 2.0.0 will not fastboot to older versions
  70. // Snapshots created with versions <2.0.0 will fastboot to version 2.0.0
  71. const SNAPSHOT_FASTBOOT_VERSION: Version = Version::new(2, 0, 0);
  72. /// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
  73. /// the kind of the snapshot.
  74. #[derive(PartialEq, Eq, Debug)]
  75. pub struct BankSnapshotInfo {
  76. /// Slot of the bank
  77. pub slot: Slot,
  78. /// Path to the bank snapshot directory
  79. pub snapshot_dir: PathBuf,
  80. /// Snapshot version
  81. pub snapshot_version: SnapshotVersion,
  82. /// Fastboot version
  83. pub fastboot_version: Option<Version>,
  84. }
  85. impl PartialOrd for BankSnapshotInfo {
  86. fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
  87. Some(self.cmp(other))
  88. }
  89. }
  90. // Order BankSnapshotInfo by slot (ascending), which practically is sorting chronologically
  91. impl Ord for BankSnapshotInfo {
  92. fn cmp(&self, other: &Self) -> Ordering {
  93. self.slot.cmp(&other.slot)
  94. }
  95. }
  96. impl BankSnapshotInfo {
  97. pub fn new_from_dir(
  98. bank_snapshots_dir: impl AsRef<Path>,
  99. slot: Slot,
  100. ) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
  101. // check this directory to see if there is a BankSnapshotPre and/or
  102. // BankSnapshotPost file
  103. let bank_snapshot_dir = snapshot_paths::get_bank_snapshot_dir(&bank_snapshots_dir, slot);
  104. if !bank_snapshot_dir.is_dir() {
  105. return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
  106. bank_snapshot_dir,
  107. ));
  108. }
  109. // Among the files checks, the completion flag file check should be done first to avoid the later
  110. // I/O errors.
  111. // There is a time window from the slot directory being created, and the content being completely
  112. // filled. Check the version file as it is the last file written to avoid using a highest
  113. // found slot directory with missing content
  114. let version_path = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
  115. let version_str = snapshot_version_from_file(&version_path).map_err(|err| {
  116. SnapshotNewFromDirError::IncompleteDir(err, bank_snapshot_dir.clone())
  117. })?;
  118. let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
  119. .or(Err(SnapshotNewFromDirError::InvalidVersion(version_str)))?;
  120. let status_cache_file =
  121. bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
  122. if !status_cache_file.is_file() {
  123. return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
  124. status_cache_file,
  125. ));
  126. }
  127. let bank_snapshot_path =
  128. bank_snapshot_dir.join(snapshot_paths::get_snapshot_file_name(slot));
  129. if !bank_snapshot_path.is_file() {
  130. return Err(SnapshotNewFromDirError::MissingSnapshotFile(
  131. bank_snapshot_dir,
  132. ));
  133. };
  134. let snapshot_fastboot_version_path =
  135. bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_FASTBOOT_VERSION_FILENAME);
  136. // If the version file is absent, fastboot_version will be None. This allows versions 3.1+
  137. // to load snapshots created by versions <3.1. In version 3.2, the version file will become
  138. // mandatory, and its absence can be treated as an error.
  139. let fastboot_version = fs::read_to_string(&snapshot_fastboot_version_path)
  140. .ok()
  141. .map(|version_string| {
  142. Version::from_str(version_string.trim())
  143. .map_err(|_| SnapshotNewFromDirError::InvalidFastbootVersion(version_string))
  144. })
  145. .transpose()?;
  146. Ok(BankSnapshotInfo {
  147. slot,
  148. snapshot_dir: bank_snapshot_dir,
  149. snapshot_version,
  150. fastboot_version,
  151. })
  152. }
  153. pub fn snapshot_path(&self) -> PathBuf {
  154. self.snapshot_dir
  155. .join(snapshot_paths::get_snapshot_file_name(self.slot))
  156. }
  157. }
  158. /// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive. Now,
  159. /// the snapshot can be from a snapshot directory, or from a snapshot archive. This is the flag to
  160. /// indicate which.
  161. #[derive(Clone, Copy, Debug, Eq, PartialEq)]
  162. pub enum SnapshotFrom {
  163. /// Build from the snapshot archive
  164. Archive,
  165. /// Build directly from the bank snapshot directory
  166. Dir,
  167. }
  168. /// Helper type when rebuilding from snapshots. Designed to handle when rebuilding from just a
  169. /// full snapshot, or from both a full snapshot and an incremental snapshot.
  170. #[derive(Debug)]
  171. pub struct SnapshotRootPaths {
  172. pub full_snapshot_root_file_path: PathBuf,
  173. pub incremental_snapshot_root_file_path: Option<PathBuf>,
  174. }
  175. /// Helper type to bundle up the results from `unarchive_snapshot()`
  176. #[derive(Debug)]
  177. pub struct UnarchivedSnapshot {
  178. #[allow(dead_code)]
  179. unpack_dir: TempDir,
  180. pub storage: AccountStorageMap,
  181. pub bank_fields: BankFieldsToDeserialize,
  182. pub accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
  183. pub unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
  184. pub measure_untar: Measure,
  185. }
  186. /// Helper type to bundle up the results from `verify_and_unarchive_snapshots()`.
  187. #[derive(Debug)]
  188. pub struct UnarchivedSnapshots {
  189. pub full_storage: AccountStorageMap,
  190. pub incremental_storage: Option<AccountStorageMap>,
  191. pub bank_fields: SnapshotBankFields,
  192. pub accounts_db_fields: SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
  193. pub full_unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
  194. pub incremental_unpacked_snapshots_dir_and_version: Option<UnpackedSnapshotsDirAndVersion>,
  195. pub full_measure_untar: Measure,
  196. pub incremental_measure_untar: Option<Measure>,
  197. pub next_append_vec_id: AtomicAccountsFileId,
  198. }
  199. /// Guard type that keeps the unpack directories of snapshots alive.
  200. /// Once dropped, the unpack directories are removed.
  201. #[allow(dead_code)]
  202. #[derive(Debug)]
  203. pub struct UnarchivedSnapshotsGuard {
  204. full_unpack_dir: TempDir,
  205. incremental_unpack_dir: Option<TempDir>,
  206. }
  207. /// Helper type for passing around the unpacked snapshots dir and the snapshot version together
  208. #[derive(Debug)]
  209. pub struct UnpackedSnapshotsDirAndVersion {
  210. pub unpacked_snapshots_dir: PathBuf,
  211. pub snapshot_version: SnapshotVersion,
  212. }
  213. /// Helper type for passing around account storage map and next append vec id
  214. /// for reconstructing accounts from a snapshot
  215. pub(crate) struct StorageAndNextAccountsFileId {
  216. pub storage: AccountStorageMap,
  217. pub next_append_vec_id: AtomicAccountsFileId,
  218. }
  219. /// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
  220. /// from <account_path>/run taken at snapshot <slot> time. They are referenced by the symlinks from the
  221. /// bank snapshot dir snapshot/<slot>/accounts_hardlinks/. We observed that sometimes the bank snapshot dir
  222. /// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
  223. /// or some legacy code not using the symlinks to clean up the account snapshot hardlink directories.
  224. /// This function cleans up any account snapshot directories that are no longer referenced by the bank
  225. /// snapshot dirs, to ensure proper snapshot operations.
  226. pub fn clean_orphaned_account_snapshot_dirs(
  227. bank_snapshots_dir: impl AsRef<Path>,
  228. account_snapshot_paths: &[PathBuf],
  229. ) -> io::Result<()> {
  230. // Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
  231. // This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
  232. let mut account_snapshot_dirs_referenced = HashSet::new();
  233. let snapshots = get_bank_snapshots(bank_snapshots_dir);
  234. for snapshot in snapshots {
  235. let account_hardlinks_dir = snapshot
  236. .snapshot_dir
  237. .join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
  238. // loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
  239. let Ok(read_dir) = fs::read_dir(&account_hardlinks_dir) else {
  240. // The bank snapshot may not have a hard links dir with the storages.
  241. // This is fine, and happens for bank snapshots we do *not* fastboot from.
  242. // In this case, log it and go to the next bank snapshot.
  243. debug!(
  244. "failed to read account hardlinks dir '{}'",
  245. account_hardlinks_dir.display(),
  246. );
  247. continue;
  248. };
  249. for entry in read_dir {
  250. let path = entry?.path();
  251. let target = fs::read_link(&path).map_err(|err| {
  252. IoError::other(format!(
  253. "failed to read symlink '{}': {err}",
  254. path.display(),
  255. ))
  256. })?;
  257. account_snapshot_dirs_referenced.insert(target);
  258. }
  259. }
  260. // loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
  261. for account_snapshot_path in account_snapshot_paths {
  262. let read_dir = fs::read_dir(account_snapshot_path).map_err(|err| {
  263. IoError::other(format!(
  264. "failed to read account snapshot dir '{}': {err}",
  265. account_snapshot_path.display(),
  266. ))
  267. })?;
  268. for entry in read_dir {
  269. let path = entry?.path();
  270. if !account_snapshot_dirs_referenced.contains(&path) {
  271. info!(
  272. "Removing orphaned account snapshot hardlink directory '{}'...",
  273. path.display()
  274. );
  275. move_and_async_delete_path(&path);
  276. }
  277. }
  278. }
  279. Ok(())
  280. }
  281. /// Purges incomplete bank snapshots
  282. pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
  283. let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
  284. // If we cannot read the bank snapshots dir, then there's nothing to do
  285. return;
  286. };
  287. let is_incomplete = |dir: &PathBuf| !is_bank_snapshot_complete(dir);
  288. let incomplete_dirs: Vec<_> = read_dir_iter
  289. .filter_map(|entry| entry.ok())
  290. .map(|entry| entry.path())
  291. .filter(|path| path.is_dir())
  292. .filter(is_incomplete)
  293. .collect();
  294. // attempt to purge all the incomplete directories; do not exit early
  295. for incomplete_dir in incomplete_dirs {
  296. let result = purge_bank_snapshot(&incomplete_dir);
  297. match result {
  298. Ok(_) => info!(
  299. "Purged incomplete snapshot dir: {}",
  300. incomplete_dir.display()
  301. ),
  302. Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
  303. }
  304. }
  305. }
  306. /// Is the bank snapshot complete?
  307. fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
  308. let version_path = bank_snapshot_dir
  309. .as_ref()
  310. .join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
  311. version_path.is_file()
  312. }
  313. /// Writes files that indicate the bank snapshot is loadable by fastboot
  314. pub fn mark_bank_snapshot_as_loadable(bank_snapshot_dir: impl AsRef<Path>) -> io::Result<()> {
  315. let snapshot_fastboot_version_path = bank_snapshot_dir
  316. .as_ref()
  317. .join(snapshot_paths::SNAPSHOT_FASTBOOT_VERSION_FILENAME);
  318. fs::write(
  319. &snapshot_fastboot_version_path,
  320. SNAPSHOT_FASTBOOT_VERSION.to_string(),
  321. )
  322. .map_err(|err| {
  323. IoError::other(format!(
  324. "failed to write fastboot version file '{}': {err}",
  325. snapshot_fastboot_version_path.display(),
  326. ))
  327. })?;
  328. Ok(())
  329. }
  330. /// Is this bank snapshot loadable?
  331. fn is_bank_snapshot_loadable(
  332. fastboot_version: Option<&Version>,
  333. ) -> std::result::Result<bool, SnapshotFastbootError> {
  334. if let Some(fastboot_version) = fastboot_version {
  335. is_snapshot_fastboot_compatible(fastboot_version)
  336. } else {
  337. // No fastboot version file, so this is not a fastbootable
  338. Ok(false)
  339. }
  340. }
  341. /// Is the fastboot snapshot version compatible?
  342. fn is_snapshot_fastboot_compatible(
  343. version: &Version,
  344. ) -> std::result::Result<bool, SnapshotFastbootError> {
  345. if version.major <= SNAPSHOT_FASTBOOT_VERSION.major {
  346. Ok(true)
  347. } else {
  348. Err(SnapshotFastbootError::IncompatibleVersion(version.clone()))
  349. }
  350. }
  351. /// Gets the highest, loadable, bank snapshot
  352. ///
  353. /// The highest bank snapshot is the one with the highest slot.
  354. pub fn get_highest_loadable_bank_snapshot(
  355. snapshot_config: &SnapshotConfig,
  356. ) -> Option<BankSnapshotInfo> {
  357. let highest_bank_snapshot = get_highest_bank_snapshot(&snapshot_config.bank_snapshots_dir)?;
  358. let is_bank_snapshot_loadable =
  359. is_bank_snapshot_loadable(highest_bank_snapshot.fastboot_version.as_ref());
  360. match is_bank_snapshot_loadable {
  361. Ok(true) => Some(highest_bank_snapshot),
  362. Ok(false) => None,
  363. Err(err) => {
  364. warn!(
  365. "Bank snapshot is not loadable '{}': {err}",
  366. highest_bank_snapshot.snapshot_dir.display()
  367. );
  368. None
  369. }
  370. }
  371. }
  372. /// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
  373. /// directory won't be cleaned up. Call this function to clean them up.
  374. pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
  375. if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
  376. for entry in entries.flatten() {
  377. if entry
  378. .file_name()
  379. .to_str()
  380. .map(|file_name| file_name.starts_with(snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX))
  381. .unwrap_or(false)
  382. {
  383. let path = entry.path();
  384. let result = if path.is_dir() {
  385. fs::remove_dir_all(&path)
  386. } else {
  387. fs::remove_file(&path)
  388. };
  389. if let Err(err) = result {
  390. warn!(
  391. "Failed to remove temporary snapshot archive '{}': {err}",
  392. path.display(),
  393. );
  394. }
  395. }
  396. }
  397. }
  398. }
  399. /// Serializes and archives a snapshot package
  400. pub fn serialize_and_archive_snapshot_package(
  401. snapshot_package: SnapshotPackage,
  402. snapshot_config: &SnapshotConfig,
  403. should_flush_and_hard_link_storages: bool,
  404. ) -> Result<SnapshotArchiveInfo> {
  405. let SnapshotPackage {
  406. snapshot_kind,
  407. slot: snapshot_slot,
  408. block_height: _,
  409. hash: snapshot_hash,
  410. mut snapshot_storages,
  411. status_cache_slot_deltas,
  412. bank_fields_to_serialize,
  413. bank_hash_stats,
  414. write_version,
  415. enqueued: _,
  416. } = snapshot_package;
  417. let bank_snapshot_info = serialize_snapshot(
  418. &snapshot_config.bank_snapshots_dir,
  419. snapshot_config.snapshot_version,
  420. snapshot_storages.as_slice(),
  421. status_cache_slot_deltas.as_slice(),
  422. bank_fields_to_serialize,
  423. bank_hash_stats,
  424. write_version,
  425. should_flush_and_hard_link_storages,
  426. )?;
  427. let SnapshotKind::Archive(snapshot_archive_kind) = snapshot_kind;
  428. let snapshot_archive_path = match snapshot_archive_kind {
  429. SnapshotArchiveKind::Full => snapshot_paths::build_full_snapshot_archive_path(
  430. &snapshot_config.full_snapshot_archives_dir,
  431. snapshot_package.slot,
  432. &snapshot_package.hash,
  433. snapshot_config.archive_format,
  434. ),
  435. SnapshotArchiveKind::Incremental(incremental_snapshot_base_slot) => {
  436. // After the snapshot has been serialized, it is now safe (and required) to prune all
  437. // the storages that are *not* to be archived for this incremental snapshot.
  438. snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
  439. snapshot_paths::build_incremental_snapshot_archive_path(
  440. &snapshot_config.incremental_snapshot_archives_dir,
  441. incremental_snapshot_base_slot,
  442. snapshot_package.slot,
  443. &snapshot_package.hash,
  444. snapshot_config.archive_format,
  445. )
  446. }
  447. };
  448. let snapshot_archive_info = archive_snapshot(
  449. snapshot_archive_kind,
  450. snapshot_slot,
  451. snapshot_hash,
  452. snapshot_storages.as_slice(),
  453. &bank_snapshot_info.snapshot_dir,
  454. snapshot_archive_path,
  455. snapshot_config.archive_format,
  456. )?;
  457. Ok(snapshot_archive_info)
  458. }
  459. /// Serializes a snapshot into `bank_snapshots_dir`
  460. #[allow(clippy::too_many_arguments)]
  461. fn serialize_snapshot(
  462. bank_snapshots_dir: impl AsRef<Path>,
  463. snapshot_version: SnapshotVersion,
  464. snapshot_storages: &[Arc<AccountStorageEntry>],
  465. slot_deltas: &[BankSlotDelta],
  466. mut bank_fields: BankFieldsToSerialize,
  467. bank_hash_stats: BankHashStats,
  468. write_version: u64,
  469. should_flush_and_hard_link_storages: bool,
  470. ) -> Result<BankSnapshotInfo> {
  471. let slot = bank_fields.slot;
  472. // this lambda function is to facilitate converting between
  473. // the AddBankSnapshotError and SnapshotError types
  474. let do_serialize_snapshot = || {
  475. let mut measure_everything = Measure::start("");
  476. let bank_snapshot_dir = snapshot_paths::get_bank_snapshot_dir(&bank_snapshots_dir, slot);
  477. if bank_snapshot_dir.exists() {
  478. return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
  479. bank_snapshot_dir,
  480. ));
  481. }
  482. fs::create_dir_all(&bank_snapshot_dir).map_err(|err| {
  483. AddBankSnapshotError::CreateSnapshotDir(err, bank_snapshot_dir.clone())
  484. })?;
  485. // the bank snapshot is stored as bank_snapshots_dir/slot/slot
  486. let bank_snapshot_path =
  487. bank_snapshot_dir.join(snapshot_paths::get_snapshot_file_name(slot));
  488. info!(
  489. "Creating bank snapshot for slot {slot} at '{}'",
  490. bank_snapshot_path.display(),
  491. );
  492. let bank_snapshot_serializer = move |stream: &mut BufWriter<fs::File>| -> Result<()> {
  493. let versioned_epoch_stakes = mem::take(&mut bank_fields.versioned_epoch_stakes);
  494. let extra_fields = ExtraFieldsToSerialize {
  495. lamports_per_signature: bank_fields.fee_rate_governor.lamports_per_signature,
  496. obsolete_incremental_snapshot_persistence: None,
  497. obsolete_epoch_accounts_hash: None,
  498. versioned_epoch_stakes,
  499. accounts_lt_hash: Some(bank_fields.accounts_lt_hash.clone().into()),
  500. };
  501. serde_snapshot::serialize_bank_snapshot_into(
  502. stream,
  503. bank_fields,
  504. bank_hash_stats,
  505. &get_storages_to_serialize(snapshot_storages),
  506. extra_fields,
  507. write_version,
  508. )?;
  509. Ok(())
  510. };
  511. let (bank_snapshot_consumed_size, bank_serialize) = measure_time!(
  512. serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
  513. .map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
  514. "bank serialize"
  515. );
  516. let status_cache_path =
  517. bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
  518. let (status_cache_consumed_size, status_cache_serialize_us) = measure_us!(
  519. serde_snapshot::serialize_status_cache(slot_deltas, &status_cache_path)
  520. .map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?
  521. );
  522. let version_path = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
  523. let (_, write_version_file_us) = measure_us!(fs::write(
  524. &version_path,
  525. snapshot_version.as_str().as_bytes(),
  526. )
  527. .map_err(|err| AddBankSnapshotError::WriteSnapshotVersionFile(err, version_path))?);
  528. let (flush_storages_us, hard_link_storages_us, serialize_obsolete_accounts_us) =
  529. if should_flush_and_hard_link_storages {
  530. let flush_measure = Measure::start("");
  531. for storage in snapshot_storages {
  532. storage.flush().map_err(|err| {
  533. AddBankSnapshotError::FlushStorage(err, storage.path().to_path_buf())
  534. })?;
  535. }
  536. let flush_us = flush_measure.end_as_us();
  537. let (_, hard_link_us) = measure_us!(hard_link_storages_to_snapshot(
  538. &bank_snapshot_dir,
  539. slot,
  540. snapshot_storages
  541. )
  542. .map_err(AddBankSnapshotError::HardLinkStorages)?);
  543. let (_, serialize_obsolete_accounts_us) = measure_us!({
  544. write_obsolete_accounts_to_snapshot(&bank_snapshot_dir, snapshot_storages, slot)
  545. .map_err(|err| {
  546. AddBankSnapshotError::SerializeObsoleteAccounts(Box::new(err))
  547. })?
  548. });
  549. mark_bank_snapshot_as_loadable(&bank_snapshot_dir)
  550. .map_err(AddBankSnapshotError::MarkSnapshotLoadable)?;
  551. (
  552. Some(flush_us),
  553. Some(hard_link_us),
  554. Some(serialize_obsolete_accounts_us),
  555. )
  556. } else {
  557. (None, None, None)
  558. };
  559. measure_everything.stop();
  560. // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
  561. datapoint_info!(
  562. "snapshot_bank",
  563. ("slot", slot, i64),
  564. ("bank_size", bank_snapshot_consumed_size, i64),
  565. ("status_cache_size", status_cache_consumed_size, i64),
  566. ("flush_storages_us", flush_storages_us, Option<i64>),
  567. ("hard_link_storages_us", hard_link_storages_us, Option<i64>),
  568. ("serialize_obsolete_accounts_us", serialize_obsolete_accounts_us, Option<i64>),
  569. ("bank_serialize_us", bank_serialize.as_us(), i64),
  570. ("status_cache_serialize_us", status_cache_serialize_us, i64),
  571. ("write_version_file_us", write_version_file_us, i64),
  572. ("total_us", measure_everything.as_us(), i64),
  573. );
  574. info!(
  575. "{} for slot {} at {}",
  576. bank_serialize,
  577. slot,
  578. bank_snapshot_path.display(),
  579. );
  580. Ok(BankSnapshotInfo {
  581. slot,
  582. snapshot_dir: bank_snapshot_dir,
  583. snapshot_version,
  584. fastboot_version: None,
  585. })
  586. };
  587. do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
  588. }
  589. /// Get the bank snapshots in a directory
  590. pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
  591. let mut bank_snapshots = Vec::default();
  592. match fs::read_dir(&bank_snapshots_dir) {
  593. Err(err) => {
  594. info!(
  595. "Unable to read bank snapshots directory '{}': {err}",
  596. bank_snapshots_dir.as_ref().display(),
  597. );
  598. }
  599. Ok(paths) => paths
  600. .filter_map(|entry| {
  601. // check if this entry is a directory and only a Slot
  602. // bank snapshots are bank_snapshots_dir/slot/slot
  603. entry
  604. .ok()
  605. .filter(|entry| entry.path().is_dir())
  606. .and_then(|entry| {
  607. entry
  608. .path()
  609. .file_name()
  610. .and_then(|file_name| file_name.to_str())
  611. .and_then(|file_name| file_name.parse::<Slot>().ok())
  612. })
  613. })
  614. .for_each(
  615. |slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
  616. Ok(snapshot_info) => bank_snapshots.push(snapshot_info),
  617. // Other threads may be modifying bank snapshots in parallel; only return
  618. // snapshots that are complete as deemed by BankSnapshotInfo::new_from_dir()
  619. Err(err) => debug!("Unable to read bank snapshot for slot {slot}: {err}"),
  620. },
  621. ),
  622. }
  623. bank_snapshots
  624. }
  625. /// Get the bank snapshot with the highest slot in a directory
  626. ///
  627. /// This function gets the highest bank snapshot of any kind
  628. pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
  629. do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
  630. }
  631. fn do_get_highest_bank_snapshot(
  632. mut bank_snapshots: Vec<BankSnapshotInfo>,
  633. ) -> Option<BankSnapshotInfo> {
  634. bank_snapshots.sort_unstable();
  635. bank_snapshots.into_iter().next_back()
  636. }
  637. pub fn write_obsolete_accounts_to_snapshot(
  638. bank_snapshot_dir: impl AsRef<Path>,
  639. snapshot_storages: &[Arc<AccountStorageEntry>],
  640. snapshot_slot: Slot,
  641. ) -> Result<u64> {
  642. let obsolete_accounts =
  643. SerdeObsoleteAccountsMap::new_from_storages(snapshot_storages, snapshot_slot);
  644. serialize_obsolete_accounts(
  645. bank_snapshot_dir,
  646. &obsolete_accounts,
  647. MAX_OBSOLETE_ACCOUNTS_FILE_SIZE,
  648. )
  649. }
  650. fn serialize_obsolete_accounts(
  651. bank_snapshot_dir: impl AsRef<Path>,
  652. obsolete_accounts_map: &SerdeObsoleteAccountsMap,
  653. maximum_obsolete_accounts_file_size: u64,
  654. ) -> Result<u64> {
  655. let obsolete_accounts_path = bank_snapshot_dir
  656. .as_ref()
  657. .join(snapshot_paths::SNAPSHOT_OBSOLETE_ACCOUNTS_FILENAME);
  658. let obsolete_accounts_file = fs::File::create(&obsolete_accounts_path)?;
  659. let mut file_stream = BufWriter::new(obsolete_accounts_file);
  660. serde_snapshot::serialize_into(&mut file_stream, obsolete_accounts_map)?;
  661. file_stream.flush()?;
  662. let consumed_size = file_stream.stream_position()?;
  663. if consumed_size > maximum_obsolete_accounts_file_size {
  664. let error_message = format!(
  665. "too large obsolete accounts file to serialize: '{}' has {consumed_size} bytes, max \
  666. size is {maximum_obsolete_accounts_file_size}",
  667. obsolete_accounts_path.display(),
  668. );
  669. return Err(IoError::other(error_message).into());
  670. }
  671. Ok(consumed_size)
  672. }
  673. fn deserialize_obsolete_accounts(
  674. bank_snapshot_dir: impl AsRef<Path>,
  675. maximum_obsolete_accounts_file_size: u64,
  676. ) -> Result<SerdeObsoleteAccountsMap> {
  677. let obsolete_accounts_path = bank_snapshot_dir
  678. .as_ref()
  679. .join(snapshot_paths::SNAPSHOT_OBSOLETE_ACCOUNTS_FILENAME);
  680. let obsolete_accounts_file = fs::File::open(&obsolete_accounts_path)?;
  681. // If the file is too large return error
  682. let obsolete_accounts_file_metadata = fs::metadata(&obsolete_accounts_path)?;
  683. if obsolete_accounts_file_metadata.len() > maximum_obsolete_accounts_file_size {
  684. let error_message = format!(
  685. "too large obsolete accounts file to deserialize: '{}' has {} bytes (max size is \
  686. {maximum_obsolete_accounts_file_size} bytes)",
  687. obsolete_accounts_path.display(),
  688. obsolete_accounts_file_metadata.len(),
  689. );
  690. return Err(IoError::other(error_message).into());
  691. }
  692. let mut data_file_stream = BufReader::new(obsolete_accounts_file);
  693. let obsolete_accounts = serde_snapshot::deserialize_from(&mut data_file_stream)?;
  694. Ok(obsolete_accounts)
  695. }
  696. pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
  697. where
  698. F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
  699. {
  700. serialize_snapshot_data_file_capped::<F>(
  701. data_file_path,
  702. MAX_SNAPSHOT_DATA_FILE_SIZE,
  703. serializer,
  704. )
  705. }
  706. pub fn deserialize_snapshot_data_file<T: Sized>(
  707. data_file_path: &Path,
  708. deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
  709. ) -> Result<T> {
  710. let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
  711. deserializer(streams.full_snapshot_stream)
  712. };
  713. let wrapped_data_file_path = SnapshotRootPaths {
  714. full_snapshot_root_file_path: data_file_path.to_path_buf(),
  715. incremental_snapshot_root_file_path: None,
  716. };
  717. deserialize_snapshot_data_files_capped(
  718. &wrapped_data_file_path,
  719. MAX_SNAPSHOT_DATA_FILE_SIZE,
  720. wrapped_deserializer,
  721. )
  722. }
  723. pub fn deserialize_snapshot_data_files<T: Sized>(
  724. snapshot_root_paths: &SnapshotRootPaths,
  725. deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
  726. ) -> Result<T> {
  727. deserialize_snapshot_data_files_capped(
  728. snapshot_root_paths,
  729. MAX_SNAPSHOT_DATA_FILE_SIZE,
  730. deserializer,
  731. )
  732. }
  733. fn serialize_snapshot_data_file_capped<F>(
  734. data_file_path: &Path,
  735. maximum_file_size: u64,
  736. serializer: F,
  737. ) -> Result<u64>
  738. where
  739. F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
  740. {
  741. let data_file = fs::File::create(data_file_path)?;
  742. let mut data_file_stream = BufWriter::new(data_file);
  743. serializer(&mut data_file_stream)?;
  744. data_file_stream.flush()?;
  745. let consumed_size = data_file_stream.stream_position()?;
  746. if consumed_size > maximum_file_size {
  747. let error_message = format!(
  748. "too large snapshot data file to serialize: '{}' has {consumed_size} bytes",
  749. data_file_path.display(),
  750. );
  751. return Err(IoError::other(error_message).into());
  752. }
  753. Ok(consumed_size)
  754. }
  755. fn deserialize_snapshot_data_files_capped<T: Sized>(
  756. snapshot_root_paths: &SnapshotRootPaths,
  757. maximum_file_size: u64,
  758. deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
  759. ) -> Result<T> {
  760. let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
  761. create_snapshot_data_file_stream(
  762. &snapshot_root_paths.full_snapshot_root_file_path,
  763. maximum_file_size,
  764. )?;
  765. let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) =
  766. if let Some(ref incremental_snapshot_root_file_path) =
  767. snapshot_root_paths.incremental_snapshot_root_file_path
  768. {
  769. Some(create_snapshot_data_file_stream(
  770. incremental_snapshot_root_file_path,
  771. maximum_file_size,
  772. )?)
  773. } else {
  774. None
  775. }
  776. .unzip();
  777. let mut snapshot_streams = SnapshotStreams {
  778. full_snapshot_stream: &mut full_snapshot_data_file_stream,
  779. incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(),
  780. };
  781. let ret = deserializer(&mut snapshot_streams)?;
  782. check_deserialize_file_consumed(
  783. full_snapshot_file_size,
  784. &snapshot_root_paths.full_snapshot_root_file_path,
  785. &mut full_snapshot_data_file_stream,
  786. )?;
  787. if let Some(ref incremental_snapshot_root_file_path) =
  788. snapshot_root_paths.incremental_snapshot_root_file_path
  789. {
  790. check_deserialize_file_consumed(
  791. incremental_snapshot_file_size.unwrap(),
  792. incremental_snapshot_root_file_path,
  793. incremental_snapshot_data_file_stream.as_mut().unwrap(),
  794. )?;
  795. }
  796. Ok(ret)
  797. }
  798. /// Before running the deserializer function, perform common operations on the snapshot archive
  799. /// files, such as checking the file size and opening the file into a stream.
  800. fn create_snapshot_data_file_stream(
  801. snapshot_root_file_path: impl AsRef<Path>,
  802. maximum_file_size: u64,
  803. ) -> Result<(u64, BufReader<std::fs::File>)> {
  804. let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
  805. if snapshot_file_size > maximum_file_size {
  806. let error_message = format!(
  807. "too large snapshot data file to deserialize: '{}' has {} bytes (max size is {} bytes)",
  808. snapshot_root_file_path.as_ref().display(),
  809. snapshot_file_size,
  810. maximum_file_size,
  811. );
  812. return Err(IoError::other(error_message).into());
  813. }
  814. let snapshot_data_file = fs::File::open(snapshot_root_file_path)?;
  815. let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
  816. Ok((snapshot_file_size, snapshot_data_file_stream))
  817. }
  818. /// After running the deserializer function, perform common checks to ensure the snapshot archive
  819. /// files were consumed correctly.
  820. fn check_deserialize_file_consumed(
  821. file_size: u64,
  822. file_path: impl AsRef<Path>,
  823. file_stream: &mut BufReader<std::fs::File>,
  824. ) -> Result<()> {
  825. let consumed_size = file_stream.stream_position()?;
  826. if consumed_size != file_size {
  827. let error_message = format!(
  828. "invalid snapshot data file: '{}' has {} bytes, however consumed {} bytes to \
  829. deserialize",
  830. file_path.as_ref().display(),
  831. file_size,
  832. consumed_size,
  833. );
  834. return Err(IoError::other(error_message).into());
  835. }
  836. Ok(())
  837. }
  838. /// Return account path from the appendvec path after checking its format.
  839. fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
  840. let run_path = appendvec_path.parent()?;
  841. let run_file_name = run_path.file_name()?;
  842. // All appendvec files should be under <account_path>/run/.
  843. // When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
  844. if run_file_name != ACCOUNTS_RUN_DIR {
  845. error!(
  846. "The account path {} does not have run/ as its immediate parent directory.",
  847. run_path.display()
  848. );
  849. return None;
  850. }
  851. let account_path = run_path.parent()?;
  852. Some(account_path.to_path_buf())
  853. }
  854. /// From an appendvec path, derive the snapshot hardlink path. If the corresponding snapshot hardlink
  855. /// directory does not exist, create it.
  856. fn get_snapshot_accounts_hardlink_dir(
  857. appendvec_path: &Path,
  858. bank_slot: Slot,
  859. account_paths: &mut HashSet<PathBuf>,
  860. hardlinks_dir: impl AsRef<Path>,
  861. ) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
  862. let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
  863. GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
  864. })?;
  865. let snapshot_hardlink_dir = account_path
  866. .join(ACCOUNTS_SNAPSHOT_DIR)
  867. .join(bank_slot.to_string());
  868. // Use the hashset to track, to avoid checking the file system. Only set up the hardlink directory
  869. // and the symlink to it at the first time of seeing the account_path.
  870. if !account_paths.contains(&account_path) {
  871. let idx = account_paths.len();
  872. debug!(
  873. "for appendvec_path {}, create hard-link path {}",
  874. appendvec_path.display(),
  875. snapshot_hardlink_dir.display()
  876. );
  877. fs::create_dir_all(&snapshot_hardlink_dir).map_err(|err| {
  878. GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir(
  879. err,
  880. snapshot_hardlink_dir.clone(),
  881. )
  882. })?;
  883. let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
  884. symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
  885. GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
  886. source: err,
  887. original: snapshot_hardlink_dir.clone(),
  888. link: symlink_path,
  889. }
  890. })?;
  891. account_paths.insert(account_path);
  892. };
  893. Ok(snapshot_hardlink_dir)
  894. }
  895. /// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
  896. /// This keeps the appendvec files alive and with the bank snapshot. The slot and id
  897. /// in the file names are also updated in case its file is a recycled one with inconsistent slot
  898. /// and id.
  899. pub fn hard_link_storages_to_snapshot(
  900. bank_snapshot_dir: impl AsRef<Path>,
  901. bank_slot: Slot,
  902. snapshot_storages: &[Arc<AccountStorageEntry>],
  903. ) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
  904. let accounts_hardlinks_dir = bank_snapshot_dir
  905. .as_ref()
  906. .join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
  907. fs::create_dir_all(&accounts_hardlinks_dir).map_err(|err| {
  908. HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir(
  909. err,
  910. accounts_hardlinks_dir.clone(),
  911. )
  912. })?;
  913. let mut account_paths: HashSet<PathBuf> = HashSet::new();
  914. for storage in snapshot_storages {
  915. let storage_path = storage.accounts.path();
  916. let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
  917. storage_path,
  918. bank_slot,
  919. &mut account_paths,
  920. &accounts_hardlinks_dir,
  921. )?;
  922. // The appendvec could be recycled, so its filename may not be consistent to the slot and id.
  923. // Use the storage slot and id to compose a consistent file name for the hard-link file.
  924. let hardlink_filename = AccountsFile::file_name(storage.slot(), storage.id());
  925. let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
  926. fs::hard_link(storage_path, &hard_link_path).map_err(|err| {
  927. HardLinkStoragesToSnapshotError::HardLinkStorage(
  928. err,
  929. storage_path.to_path_buf(),
  930. hard_link_path,
  931. )
  932. })?;
  933. }
  934. Ok(())
  935. }
  936. /// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
  937. /// translates to what we need
  938. pub(crate) fn get_storages_to_serialize(
  939. snapshot_storages: &[Arc<AccountStorageEntry>],
  940. ) -> Vec<Vec<Arc<AccountStorageEntry>>> {
  941. snapshot_storages
  942. .iter()
  943. .map(|storage| vec![Arc::clone(storage)])
  944. .collect::<Vec<_>>()
  945. }
  946. /// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
  947. pub fn verify_and_unarchive_snapshots(
  948. bank_snapshots_dir: impl AsRef<Path>,
  949. full_snapshot_archive_info: &FullSnapshotArchiveInfo,
  950. incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
  951. account_paths: &[PathBuf],
  952. accounts_db_config: &AccountsDbConfig,
  953. ) -> Result<(UnarchivedSnapshots, UnarchivedSnapshotsGuard)> {
  954. check_are_snapshots_compatible(
  955. full_snapshot_archive_info,
  956. incremental_snapshot_archive_info,
  957. )?;
  958. let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
  959. let UnarchivedSnapshot {
  960. unpack_dir: full_unpack_dir,
  961. storage: full_storage,
  962. bank_fields: full_bank_fields,
  963. accounts_db_fields: full_accounts_db_fields,
  964. unpacked_snapshots_dir_and_version: full_unpacked_snapshots_dir_and_version,
  965. measure_untar: full_measure_untar,
  966. } = unarchive_snapshot(
  967. &bank_snapshots_dir,
  968. snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX,
  969. full_snapshot_archive_info.path(),
  970. "snapshot untar",
  971. account_paths,
  972. full_snapshot_archive_info.archive_format(),
  973. next_append_vec_id.clone(),
  974. accounts_db_config,
  975. )?;
  976. let (
  977. incremental_unpack_dir,
  978. incremental_storage,
  979. incremental_bank_fields,
  980. incremental_accounts_db_fields,
  981. incremental_unpacked_snapshots_dir_and_version,
  982. incremental_measure_untar,
  983. ) = if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
  984. let UnarchivedSnapshot {
  985. unpack_dir,
  986. storage,
  987. bank_fields,
  988. accounts_db_fields,
  989. unpacked_snapshots_dir_and_version,
  990. measure_untar,
  991. } = unarchive_snapshot(
  992. &bank_snapshots_dir,
  993. snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX,
  994. incremental_snapshot_archive_info.path(),
  995. "incremental snapshot untar",
  996. account_paths,
  997. incremental_snapshot_archive_info.archive_format(),
  998. next_append_vec_id.clone(),
  999. accounts_db_config,
  1000. )?;
  1001. (
  1002. Some(unpack_dir),
  1003. Some(storage),
  1004. Some(bank_fields),
  1005. Some(accounts_db_fields),
  1006. Some(unpacked_snapshots_dir_and_version),
  1007. Some(measure_untar),
  1008. )
  1009. } else {
  1010. (None, None, None, None, None, None)
  1011. };
  1012. let bank_fields = SnapshotBankFields::new(full_bank_fields, incremental_bank_fields);
  1013. let accounts_db_fields =
  1014. SnapshotAccountsDbFields::new(full_accounts_db_fields, incremental_accounts_db_fields);
  1015. let next_append_vec_id = Arc::try_unwrap(next_append_vec_id).unwrap();
  1016. Ok((
  1017. UnarchivedSnapshots {
  1018. full_storage,
  1019. incremental_storage,
  1020. bank_fields,
  1021. accounts_db_fields,
  1022. full_unpacked_snapshots_dir_and_version,
  1023. incremental_unpacked_snapshots_dir_and_version,
  1024. full_measure_untar,
  1025. incremental_measure_untar,
  1026. next_append_vec_id,
  1027. },
  1028. UnarchivedSnapshotsGuard {
  1029. full_unpack_dir,
  1030. incremental_unpack_dir,
  1031. },
  1032. ))
  1033. }
  1034. /// Used to determine if a filename is structured like a version file, bank file, or storage file
  1035. #[derive(PartialEq, Debug)]
  1036. enum SnapshotFileKind {
  1037. Version,
  1038. BankFields,
  1039. Storage,
  1040. }
  1041. /// Determines `SnapshotFileKind` for `filename` if any
  1042. fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
  1043. static VERSION_FILE_REGEX: LazyLock<Regex> =
  1044. LazyLock::new(|| Regex::new(r"^version$").unwrap());
  1045. static BANK_FIELDS_FILE_REGEX: LazyLock<Regex> =
  1046. LazyLock::new(|| Regex::new(r"^[0-9]+(\.pre)?$").unwrap());
  1047. if VERSION_FILE_REGEX.is_match(filename) {
  1048. Some(SnapshotFileKind::Version)
  1049. } else if BANK_FIELDS_FILE_REGEX.is_match(filename) {
  1050. Some(SnapshotFileKind::BankFields)
  1051. } else if get_slot_and_append_vec_id(filename).is_ok() {
  1052. Some(SnapshotFileKind::Storage)
  1053. } else {
  1054. None
  1055. }
  1056. }
  1057. /// Waits for snapshot file
  1058. /// Due to parallel unpacking, we may receive some append_vec files before the snapshot file
  1059. /// This function will push append_vec files into a buffer until we receive the snapshot file
  1060. fn get_version_and_snapshot_files(
  1061. file_receiver: &Receiver<PathBuf>,
  1062. ) -> Result<(PathBuf, PathBuf, Vec<PathBuf>)> {
  1063. let mut append_vec_files = Vec::with_capacity(1024);
  1064. let mut snapshot_version_path = None;
  1065. let mut snapshot_file_path = None;
  1066. loop {
  1067. if let Ok(path) = file_receiver.recv() {
  1068. let filename = path.file_name().unwrap().to_str().unwrap();
  1069. match get_snapshot_file_kind(filename) {
  1070. Some(SnapshotFileKind::Version) => {
  1071. snapshot_version_path = Some(path);
  1072. // break if we have both the snapshot file and the version file
  1073. if snapshot_file_path.is_some() {
  1074. break;
  1075. }
  1076. }
  1077. Some(SnapshotFileKind::BankFields) => {
  1078. snapshot_file_path = Some(path);
  1079. // break if we have both the snapshot file and the version file
  1080. if snapshot_version_path.is_some() {
  1081. break;
  1082. }
  1083. }
  1084. Some(SnapshotFileKind::Storage) => {
  1085. append_vec_files.push(path);
  1086. }
  1087. None => {} // do nothing for other kinds of files
  1088. }
  1089. } else {
  1090. return Err(SnapshotError::RebuildStorages(
  1091. "did not receive snapshot file from unpacking threads".to_string(),
  1092. ));
  1093. }
  1094. }
  1095. let snapshot_version_path = snapshot_version_path.unwrap();
  1096. let snapshot_file_path = snapshot_file_path.unwrap();
  1097. Ok((snapshot_version_path, snapshot_file_path, append_vec_files))
  1098. }
  1099. /// Fields and information parsed from the snapshot.
  1100. struct SnapshotFieldsBundle {
  1101. snapshot_version: SnapshotVersion,
  1102. bank_fields: BankFieldsToDeserialize,
  1103. accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
  1104. append_vec_files: Vec<PathBuf>,
  1105. }
  1106. /// Parses fields and information from the snapshot files provided by
  1107. /// `file_receiver`.
  1108. fn snapshot_fields_from_files(file_receiver: &Receiver<PathBuf>) -> Result<SnapshotFieldsBundle> {
  1109. let (snapshot_version_path, snapshot_file_path, append_vec_files) =
  1110. get_version_and_snapshot_files(file_receiver)?;
  1111. let snapshot_version_str = snapshot_version_from_file(snapshot_version_path)?;
  1112. let snapshot_version = snapshot_version_str.parse().map_err(|err| {
  1113. IoError::other(format!(
  1114. "unsupported snapshot version '{snapshot_version_str}': {err}",
  1115. ))
  1116. })?;
  1117. let snapshot_file = fs::File::open(snapshot_file_path).unwrap();
  1118. let mut snapshot_stream = BufReader::new(snapshot_file);
  1119. let (bank_fields, accounts_db_fields) = match snapshot_version {
  1120. SnapshotVersion::V1_2_0 => serde_snapshot::fields_from_stream(&mut snapshot_stream)?,
  1121. };
  1122. Ok(SnapshotFieldsBundle {
  1123. snapshot_version,
  1124. bank_fields,
  1125. accounts_db_fields,
  1126. append_vec_files,
  1127. })
  1128. }
  1129. /// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
  1130. /// as a valid one. A dir unpacked from an archive lacks these files. Fill them here to
  1131. /// allow new_from_dir() checks to pass. These checks are not needed for unpacked dirs,
  1132. /// but it is not clean to add another flag to new_from_dir() to skip them.
  1133. fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
  1134. let snapshots_dir = unpack_dir.as_ref().join(snapshot_paths::BANK_SNAPSHOTS_DIR);
  1135. if !snapshots_dir.is_dir() {
  1136. return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
  1137. }
  1138. // The unpacked dir has a single slot dir, which is the snapshot slot dir.
  1139. let slot_dir = std::fs::read_dir(&snapshots_dir)
  1140. .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
  1141. .find(|entry| entry.as_ref().unwrap().path().is_dir())
  1142. .ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
  1143. .map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
  1144. .path();
  1145. let version_file = unpack_dir
  1146. .as_ref()
  1147. .join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
  1148. fs::hard_link(
  1149. version_file,
  1150. slot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME),
  1151. )?;
  1152. let status_cache_file = snapshots_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
  1153. fs::hard_link(
  1154. status_cache_file,
  1155. slot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME),
  1156. )?;
  1157. Ok(())
  1158. }
  1159. /// Perform the common tasks when unarchiving a snapshot. Handles creating the temporary
  1160. /// directories, untaring, reading the version file, and then returning those fields plus the
  1161. /// rebuilt storage
  1162. fn unarchive_snapshot(
  1163. bank_snapshots_dir: impl AsRef<Path>,
  1164. unpacked_snapshots_dir_prefix: &'static str,
  1165. snapshot_archive_path: impl AsRef<Path>,
  1166. measure_name: &'static str,
  1167. account_paths: &[PathBuf],
  1168. archive_format: ArchiveFormat,
  1169. next_append_vec_id: Arc<AtomicAccountsFileId>,
  1170. accounts_db_config: &AccountsDbConfig,
  1171. ) -> Result<UnarchivedSnapshot> {
  1172. let unpack_dir = tempfile::Builder::new()
  1173. .prefix(unpacked_snapshots_dir_prefix)
  1174. .tempdir_in(bank_snapshots_dir)?;
  1175. let unpacked_snapshots_dir = unpack_dir.path().join(snapshot_paths::BANK_SNAPSHOTS_DIR);
  1176. let (file_sender, file_receiver) = crossbeam_channel::unbounded();
  1177. let unarchive_handle = streaming_unarchive_snapshot(
  1178. file_sender,
  1179. account_paths.to_vec(),
  1180. unpack_dir.path().to_path_buf(),
  1181. snapshot_archive_path.as_ref().to_path_buf(),
  1182. archive_format,
  1183. accounts_db_config.memlock_budget_size,
  1184. );
  1185. let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
  1186. let snapshot_result = snapshot_fields_from_files(&file_receiver).and_then(
  1187. |SnapshotFieldsBundle {
  1188. snapshot_version,
  1189. bank_fields,
  1190. accounts_db_fields,
  1191. append_vec_files,
  1192. ..
  1193. }| {
  1194. let (storage, measure_untar) = measure_time!(
  1195. SnapshotStorageRebuilder::rebuild_storage(
  1196. &accounts_db_fields,
  1197. append_vec_files,
  1198. file_receiver,
  1199. num_rebuilder_threads,
  1200. next_append_vec_id,
  1201. SnapshotFrom::Archive,
  1202. accounts_db_config.storage_access,
  1203. None,
  1204. )?,
  1205. measure_name
  1206. );
  1207. info!("{measure_untar}");
  1208. create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
  1209. Ok(UnarchivedSnapshot {
  1210. unpack_dir,
  1211. storage,
  1212. bank_fields,
  1213. accounts_db_fields,
  1214. unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
  1215. unpacked_snapshots_dir,
  1216. snapshot_version,
  1217. },
  1218. measure_untar,
  1219. })
  1220. },
  1221. );
  1222. unarchive_handle.join().unwrap()?;
  1223. snapshot_result
  1224. }
  1225. /// Streams snapshot dir files across channel
  1226. /// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
  1227. fn streaming_snapshot_dir_files(
  1228. file_sender: Sender<PathBuf>,
  1229. snapshot_file_path: impl Into<PathBuf>,
  1230. snapshot_version_path: impl Into<PathBuf>,
  1231. account_paths: &[PathBuf],
  1232. ) -> Result<()> {
  1233. file_sender.send(snapshot_file_path.into())?;
  1234. file_sender.send(snapshot_version_path.into())?;
  1235. for account_path in account_paths {
  1236. for file in fs::read_dir(account_path)? {
  1237. file_sender.send(file?.path())?;
  1238. }
  1239. }
  1240. Ok(())
  1241. }
  1242. /// Performs the common tasks when deserializing a snapshot
  1243. ///
  1244. /// Handles reading the snapshot file and version file,
  1245. /// then returning those fields plus the rebuilt storages.
  1246. pub fn rebuild_storages_from_snapshot_dir(
  1247. snapshot_info: &BankSnapshotInfo,
  1248. account_paths: &[PathBuf],
  1249. next_append_vec_id: Arc<AtomicAccountsFileId>,
  1250. storage_access: StorageAccess,
  1251. ) -> Result<(
  1252. AccountStorageMap,
  1253. BankFieldsToDeserialize,
  1254. AccountsDbFields<SerializableAccountStorageEntry>,
  1255. )> {
  1256. let bank_snapshot_dir = &snapshot_info.snapshot_dir;
  1257. let accounts_hardlinks = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
  1258. let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
  1259. // With fastboot_version >= 2, obsolete accounts are tracked and stored in the snapshot
  1260. // Even if obsolete accounts are not enabled, the snapshot may still contain obsolete accounts
  1261. // as the feature may have been enabled in previous validator runs.
  1262. let obsolete_accounts = snapshot_info
  1263. .fastboot_version
  1264. .as_ref()
  1265. .is_some_and(|fastboot_version| fastboot_version.major >= 2)
  1266. .then(|| deserialize_obsolete_accounts(bank_snapshot_dir, MAX_OBSOLETE_ACCOUNTS_FILE_SIZE))
  1267. .transpose()
  1268. .map_err(|err| {
  1269. IoError::other(format!(
  1270. "failed to read obsolete accounts file '{}': {err}",
  1271. bank_snapshot_dir.display()
  1272. ))
  1273. })?;
  1274. let read_dir = fs::read_dir(&accounts_hardlinks).map_err(|err| {
  1275. IoError::other(format!(
  1276. "failed to read accounts hardlinks dir '{}': {err}",
  1277. accounts_hardlinks.display(),
  1278. ))
  1279. })?;
  1280. for dir_entry in read_dir {
  1281. let symlink_path = dir_entry?.path();
  1282. // The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
  1283. // The corresponding run path should be <account_path>/run/
  1284. let account_snapshot_path = fs::read_link(&symlink_path).map_err(|err| {
  1285. IoError::other(format!(
  1286. "failed to read symlink '{}': {err}",
  1287. symlink_path.display(),
  1288. ))
  1289. })?;
  1290. let account_run_path = account_snapshot_path
  1291. .parent()
  1292. .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
  1293. .parent()
  1294. .ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
  1295. .join(ACCOUNTS_RUN_DIR);
  1296. if !account_run_paths.contains(&account_run_path) {
  1297. // The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
  1298. // The account paths have changed so the snapshot is no longer usable.
  1299. return Err(SnapshotError::AccountPathsMismatch);
  1300. }
  1301. // Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
  1302. // paths be in accounts/
  1303. let read_dir = fs::read_dir(&account_snapshot_path).map_err(|err| {
  1304. IoError::other(format!(
  1305. "failed to read account snapshot dir '{}': {err}",
  1306. account_snapshot_path.display(),
  1307. ))
  1308. })?;
  1309. for file in read_dir {
  1310. let file_path = file?.path();
  1311. let file_name = file_path
  1312. .file_name()
  1313. .ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
  1314. let dest_path = account_run_path.join(file_name);
  1315. fs::hard_link(&file_path, &dest_path).map_err(|err| {
  1316. IoError::other(format!(
  1317. "failed to hard link from '{}' to '{}': {err}",
  1318. file_path.display(),
  1319. dest_path.display(),
  1320. ))
  1321. })?;
  1322. }
  1323. }
  1324. let (file_sender, file_receiver) = crossbeam_channel::unbounded();
  1325. let snapshot_file_path = &snapshot_info.snapshot_path();
  1326. let snapshot_version_path = bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
  1327. streaming_snapshot_dir_files(
  1328. file_sender,
  1329. snapshot_file_path,
  1330. snapshot_version_path,
  1331. account_paths,
  1332. )?;
  1333. let SnapshotFieldsBundle {
  1334. bank_fields,
  1335. accounts_db_fields,
  1336. append_vec_files,
  1337. ..
  1338. } = snapshot_fields_from_files(&file_receiver)?;
  1339. let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
  1340. let storage = SnapshotStorageRebuilder::rebuild_storage(
  1341. &accounts_db_fields,
  1342. append_vec_files,
  1343. file_receiver,
  1344. num_rebuilder_threads,
  1345. next_append_vec_id,
  1346. SnapshotFrom::Dir,
  1347. storage_access,
  1348. obsolete_accounts,
  1349. )?;
  1350. Ok((storage, bank_fields, accounts_db_fields))
  1351. }
  1352. /// Reads the `snapshot_version` from a file. Before opening the file, its size
  1353. /// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
  1354. /// threshold, it is not opened and an error is returned.
  1355. fn snapshot_version_from_file(path: impl AsRef<Path>) -> io::Result<String> {
  1356. // Check file size.
  1357. let file_metadata = fs::metadata(&path).map_err(|err| {
  1358. IoError::other(format!(
  1359. "failed to query snapshot version file metadata '{}': {err}",
  1360. path.as_ref().display(),
  1361. ))
  1362. })?;
  1363. let file_size = file_metadata.len();
  1364. if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
  1365. let error_message = format!(
  1366. "snapshot version file too large: '{}' has {} bytes (max size is {} bytes)",
  1367. path.as_ref().display(),
  1368. file_size,
  1369. MAX_SNAPSHOT_VERSION_FILE_SIZE,
  1370. );
  1371. return Err(IoError::other(error_message));
  1372. }
  1373. // Read snapshot_version from file.
  1374. let mut snapshot_version = String::new();
  1375. let mut file = fs::File::open(&path).map_err(|err| {
  1376. IoError::other(format!(
  1377. "failed to open snapshot version file '{}': {err}",
  1378. path.as_ref().display()
  1379. ))
  1380. })?;
  1381. file.read_to_string(&mut snapshot_version).map_err(|err| {
  1382. IoError::other(format!(
  1383. "failed to read snapshot version from file '{}': {err}",
  1384. path.as_ref().display()
  1385. ))
  1386. })?;
  1387. Ok(snapshot_version.trim().to_string())
  1388. }
  1389. /// Check if an incremental snapshot is compatible with a full snapshot. This is done by checking
  1390. /// if the incremental snapshot's base slot is the same as the full snapshot's slot.
  1391. fn check_are_snapshots_compatible(
  1392. full_snapshot_archive_info: &FullSnapshotArchiveInfo,
  1393. incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
  1394. ) -> Result<()> {
  1395. if incremental_snapshot_archive_info.is_none() {
  1396. return Ok(());
  1397. }
  1398. let incremental_snapshot_archive_info = incremental_snapshot_archive_info.unwrap();
  1399. (full_snapshot_archive_info.slot() == incremental_snapshot_archive_info.base_slot())
  1400. .then_some(())
  1401. .ok_or_else(|| {
  1402. SnapshotError::MismatchedBaseSlot(
  1403. full_snapshot_archive_info.slot(),
  1404. incremental_snapshot_archive_info.base_slot(),
  1405. )
  1406. })
  1407. }
  1408. pub fn purge_old_snapshot_archives(
  1409. full_snapshot_archives_dir: impl AsRef<Path>,
  1410. incremental_snapshot_archives_dir: impl AsRef<Path>,
  1411. maximum_full_snapshot_archives_to_retain: NonZeroUsize,
  1412. maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
  1413. ) {
  1414. info!(
  1415. "Purging old full snapshot archives in {}, retaining up to {} full snapshots",
  1416. full_snapshot_archives_dir.as_ref().display(),
  1417. maximum_full_snapshot_archives_to_retain
  1418. );
  1419. let mut full_snapshot_archives =
  1420. snapshot_paths::get_full_snapshot_archives(&full_snapshot_archives_dir);
  1421. full_snapshot_archives.sort_unstable();
  1422. full_snapshot_archives.reverse();
  1423. let num_to_retain = full_snapshot_archives
  1424. .len()
  1425. .min(maximum_full_snapshot_archives_to_retain.get());
  1426. trace!(
  1427. "There are {} full snapshot archives, retaining {}",
  1428. full_snapshot_archives.len(),
  1429. num_to_retain,
  1430. );
  1431. let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
  1432. if full_snapshot_archives.is_empty() {
  1433. None
  1434. } else {
  1435. Some(full_snapshot_archives.split_at(num_to_retain))
  1436. }
  1437. .unwrap_or_default();
  1438. let retained_full_snapshot_slots = full_snapshot_archives_to_retain
  1439. .iter()
  1440. .map(|ai| ai.slot())
  1441. .collect::<HashSet<_>>();
  1442. fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
  1443. for path in archives.iter().map(|a| a.path()) {
  1444. trace!("Removing snapshot archive: {}", path.display());
  1445. let result = fs::remove_file(path);
  1446. if let Err(err) = result {
  1447. info!(
  1448. "Failed to remove snapshot archive '{}': {err}",
  1449. path.display()
  1450. );
  1451. }
  1452. }
  1453. }
  1454. remove_archives(full_snapshot_archives_to_remove);
  1455. info!(
  1456. "Purging old incremental snapshot archives in {}, retaining up to {} incremental snapshots",
  1457. incremental_snapshot_archives_dir.as_ref().display(),
  1458. maximum_incremental_snapshot_archives_to_retain
  1459. );
  1460. let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
  1461. for incremental_snapshot_archive in
  1462. get_incremental_snapshot_archives(&incremental_snapshot_archives_dir)
  1463. {
  1464. incremental_snapshot_archives_by_base_slot
  1465. .entry(incremental_snapshot_archive.base_slot())
  1466. .or_default()
  1467. .push(incremental_snapshot_archive)
  1468. }
  1469. let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
  1470. for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
  1471. {
  1472. incremental_snapshot_archives.sort_unstable();
  1473. let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
  1474. maximum_incremental_snapshot_archives_to_retain.get()
  1475. } else {
  1476. usize::from(retained_full_snapshot_slots.contains(&base_slot))
  1477. };
  1478. trace!(
  1479. "There are {} incremental snapshot archives for base slot {}, removing {} of them",
  1480. incremental_snapshot_archives.len(),
  1481. base_slot,
  1482. incremental_snapshot_archives
  1483. .len()
  1484. .saturating_sub(num_to_retain),
  1485. );
  1486. incremental_snapshot_archives.truncate(
  1487. incremental_snapshot_archives
  1488. .len()
  1489. .saturating_sub(num_to_retain),
  1490. );
  1491. remove_archives(&incremental_snapshot_archives);
  1492. }
  1493. }
  1494. pub fn verify_unpacked_snapshots_dir_and_version(
  1495. unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
  1496. ) -> Result<(SnapshotVersion, BankSnapshotInfo)> {
  1497. info!(
  1498. "snapshot version: {}",
  1499. &unpacked_snapshots_dir_and_version.snapshot_version
  1500. );
  1501. let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
  1502. let mut bank_snapshots =
  1503. get_bank_snapshots(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
  1504. if bank_snapshots.len() > 1 {
  1505. return Err(IoError::other(format!(
  1506. "invalid snapshot format: only one snapshot allowed, but found {}",
  1507. bank_snapshots.len(),
  1508. ))
  1509. .into());
  1510. }
  1511. let root_paths = bank_snapshots.pop().ok_or_else(|| {
  1512. IoError::other(format!(
  1513. "no snapshots found in snapshots directory '{}'",
  1514. unpacked_snapshots_dir_and_version
  1515. .unpacked_snapshots_dir
  1516. .display(),
  1517. ))
  1518. })?;
  1519. Ok((snapshot_version, root_paths))
  1520. }
  1521. #[derive(Debug, Copy, Clone)]
  1522. /// allow tests to specify what happened to the serialized format
  1523. pub enum VerifyBank {
  1524. /// the bank's serialized format is expected to be identical to what we are comparing against
  1525. Deterministic,
  1526. /// the serialized bank was 'reserialized' into a non-deterministic format
  1527. /// so, deserialize both files and compare deserialized results
  1528. NonDeterministic,
  1529. }
  1530. /// Purges all bank snapshots
  1531. pub fn purge_all_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
  1532. let bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
  1533. purge_bank_snapshots(&bank_snapshots);
  1534. }
  1535. /// Purges bank snapshots, retaining the newest `num_bank_snapshots_to_retain`
  1536. pub fn purge_old_bank_snapshots(
  1537. bank_snapshots_dir: impl AsRef<Path>,
  1538. num_bank_snapshots_to_retain: usize,
  1539. ) {
  1540. let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
  1541. bank_snapshots.sort_unstable();
  1542. purge_bank_snapshots(
  1543. bank_snapshots
  1544. .iter()
  1545. .rev()
  1546. .skip(num_bank_snapshots_to_retain),
  1547. );
  1548. }
  1549. /// At startup, purge old (i.e. unusable) bank snapshots
  1550. pub fn purge_old_bank_snapshots_at_startup(bank_snapshots_dir: impl AsRef<Path>) {
  1551. purge_old_bank_snapshots(&bank_snapshots_dir, 1);
  1552. let highest_bank_snapshot = get_highest_bank_snapshot(&bank_snapshots_dir);
  1553. if let Some(highest_bank_snapshot) = highest_bank_snapshot {
  1554. debug!(
  1555. "Retained bank snapshot for slot {}, and purged the rest.",
  1556. highest_bank_snapshot.slot
  1557. );
  1558. }
  1559. }
  1560. /// Purges bank snapshots that are older than `slot`
  1561. pub fn purge_bank_snapshots_older_than_slot(bank_snapshots_dir: impl AsRef<Path>, slot: Slot) {
  1562. let mut bank_snapshots = get_bank_snapshots(&bank_snapshots_dir);
  1563. bank_snapshots.retain(|bank_snapshot| bank_snapshot.slot < slot);
  1564. purge_bank_snapshots(&bank_snapshots);
  1565. }
  1566. /// Purges all `bank_snapshots`
  1567. ///
  1568. /// Does not exit early if there is an error while purging a bank snapshot.
  1569. fn purge_bank_snapshots<'a>(bank_snapshots: impl IntoIterator<Item = &'a BankSnapshotInfo>) {
  1570. for snapshot_dir in bank_snapshots.into_iter().map(|s| &s.snapshot_dir) {
  1571. if purge_bank_snapshot(snapshot_dir).is_err() {
  1572. warn!("Failed to purge bank snapshot: {}", snapshot_dir.display());
  1573. }
  1574. }
  1575. }
  1576. /// Remove the bank snapshot at this path
  1577. pub fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
  1578. const FN_ERR: &str = "failed to purge bank snapshot";
  1579. let accounts_hardlinks_dir = bank_snapshot_dir
  1580. .as_ref()
  1581. .join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
  1582. if accounts_hardlinks_dir.is_dir() {
  1583. // This directory contain symlinks to all accounts snapshot directories.
  1584. // They should all be removed.
  1585. let read_dir = fs::read_dir(&accounts_hardlinks_dir).map_err(|err| {
  1586. IoError::other(format!(
  1587. "{FN_ERR}: failed to read accounts hardlinks dir '{}': {err}",
  1588. accounts_hardlinks_dir.display(),
  1589. ))
  1590. })?;
  1591. for entry in read_dir {
  1592. let accounts_hardlink_dir = entry?.path();
  1593. let accounts_hardlink_dir = fs::read_link(&accounts_hardlink_dir).map_err(|err| {
  1594. IoError::other(format!(
  1595. "{FN_ERR}: failed to read symlink '{}': {err}",
  1596. accounts_hardlink_dir.display(),
  1597. ))
  1598. })?;
  1599. move_and_async_delete_path(&accounts_hardlink_dir);
  1600. }
  1601. }
  1602. fs::remove_dir_all(&bank_snapshot_dir).map_err(|err| {
  1603. IoError::other(format!(
  1604. "{FN_ERR}: failed to remove dir '{}': {err}",
  1605. bank_snapshot_dir.as_ref().display(),
  1606. ))
  1607. })?;
  1608. Ok(())
  1609. }
  1610. pub fn should_take_full_snapshot(
  1611. block_height: Slot,
  1612. full_snapshot_archive_interval_slots: Slot,
  1613. ) -> bool {
  1614. block_height.is_multiple_of(full_snapshot_archive_interval_slots)
  1615. }
  1616. pub fn should_take_incremental_snapshot(
  1617. block_height: Slot,
  1618. incremental_snapshot_archive_interval_slots: Slot,
  1619. latest_full_snapshot_slot: Option<Slot>,
  1620. ) -> bool {
  1621. block_height.is_multiple_of(incremental_snapshot_archive_interval_slots)
  1622. && latest_full_snapshot_slot.is_some()
  1623. }
  1624. /// Creates an "accounts path" directory for tests
  1625. ///
  1626. /// This temporary directory will contain the "run" and "snapshot"
  1627. /// sub-directories required by a validator.
  1628. #[cfg(feature = "dev-context-only-utils")]
  1629. pub fn create_tmp_accounts_dir_for_tests() -> (TempDir, PathBuf) {
  1630. let tmp_dir = tempfile::TempDir::new().unwrap();
  1631. let account_dir = create_accounts_run_and_snapshot_dirs(&tmp_dir).unwrap().0;
  1632. (tmp_dir, account_dir)
  1633. }
  1634. #[cfg(test)]
  1635. mod tests {
  1636. use {
  1637. super::*,
  1638. agave_snapshots::{
  1639. paths::{
  1640. get_full_snapshot_archives, get_highest_full_snapshot_archive_slot,
  1641. get_highest_incremental_snapshot_archive_slot,
  1642. },
  1643. snapshot_config::{
  1644. DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
  1645. DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
  1646. },
  1647. },
  1648. assert_matches::assert_matches,
  1649. bincode::{deserialize_from, serialize_into},
  1650. solana_accounts_db::accounts_file::AccountsFileProvider,
  1651. solana_hash::Hash,
  1652. std::{convert::TryFrom, mem::size_of},
  1653. tempfile::NamedTempFile,
  1654. test_case::test_case,
  1655. };
  1656. #[test]
  1657. fn test_serialize_snapshot_data_file_under_limit() {
  1658. let temp_dir = tempfile::TempDir::new().unwrap();
  1659. let expected_consumed_size = size_of::<u32>() as u64;
  1660. let consumed_size = serialize_snapshot_data_file_capped(
  1661. &temp_dir.path().join("data-file"),
  1662. expected_consumed_size,
  1663. |stream| {
  1664. serialize_into(stream, &2323_u32)?;
  1665. Ok(())
  1666. },
  1667. )
  1668. .unwrap();
  1669. assert_eq!(consumed_size, expected_consumed_size);
  1670. }
  1671. #[test]
  1672. fn test_serialize_snapshot_data_file_over_limit() {
  1673. let temp_dir = tempfile::TempDir::new().unwrap();
  1674. let expected_consumed_size = size_of::<u32>() as u64;
  1675. let result = serialize_snapshot_data_file_capped(
  1676. &temp_dir.path().join("data-file"),
  1677. expected_consumed_size - 1,
  1678. |stream| {
  1679. serialize_into(stream, &2323_u32)?;
  1680. Ok(())
  1681. },
  1682. );
  1683. assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
  1684. }
  1685. #[test]
  1686. fn test_deserialize_snapshot_data_file_under_limit() {
  1687. let expected_data = 2323_u32;
  1688. let expected_consumed_size = size_of::<u32>() as u64;
  1689. let temp_dir = tempfile::TempDir::new().unwrap();
  1690. serialize_snapshot_data_file_capped(
  1691. &temp_dir.path().join("data-file"),
  1692. expected_consumed_size,
  1693. |stream| {
  1694. serialize_into(stream, &expected_data)?;
  1695. Ok(())
  1696. },
  1697. )
  1698. .unwrap();
  1699. let snapshot_root_paths = SnapshotRootPaths {
  1700. full_snapshot_root_file_path: temp_dir.path().join("data-file"),
  1701. incremental_snapshot_root_file_path: None,
  1702. };
  1703. let actual_data = deserialize_snapshot_data_files_capped(
  1704. &snapshot_root_paths,
  1705. expected_consumed_size,
  1706. |stream| {
  1707. Ok(deserialize_from::<_, u32>(
  1708. &mut stream.full_snapshot_stream,
  1709. )?)
  1710. },
  1711. )
  1712. .unwrap();
  1713. assert_eq!(actual_data, expected_data);
  1714. }
  1715. #[test]
  1716. fn test_deserialize_snapshot_data_file_over_limit() {
  1717. let expected_data = 2323_u32;
  1718. let expected_consumed_size = size_of::<u32>() as u64;
  1719. let temp_dir = tempfile::TempDir::new().unwrap();
  1720. serialize_snapshot_data_file_capped(
  1721. &temp_dir.path().join("data-file"),
  1722. expected_consumed_size,
  1723. |stream| {
  1724. serialize_into(stream, &expected_data)?;
  1725. Ok(())
  1726. },
  1727. )
  1728. .unwrap();
  1729. let snapshot_root_paths = SnapshotRootPaths {
  1730. full_snapshot_root_file_path: temp_dir.path().join("data-file"),
  1731. incremental_snapshot_root_file_path: None,
  1732. };
  1733. let result = deserialize_snapshot_data_files_capped(
  1734. &snapshot_root_paths,
  1735. expected_consumed_size - 1,
  1736. |stream| {
  1737. Ok(deserialize_from::<_, u32>(
  1738. &mut stream.full_snapshot_stream,
  1739. )?)
  1740. },
  1741. );
  1742. assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
  1743. }
  1744. #[test]
  1745. fn test_deserialize_snapshot_data_file_extra_data() {
  1746. let expected_data = 2323_u32;
  1747. let expected_consumed_size = size_of::<u32>() as u64;
  1748. let temp_dir = tempfile::TempDir::new().unwrap();
  1749. serialize_snapshot_data_file_capped(
  1750. &temp_dir.path().join("data-file"),
  1751. expected_consumed_size * 2,
  1752. |stream| {
  1753. serialize_into(stream.by_ref(), &expected_data)?;
  1754. serialize_into(stream.by_ref(), &expected_data)?;
  1755. Ok(())
  1756. },
  1757. )
  1758. .unwrap();
  1759. let snapshot_root_paths = SnapshotRootPaths {
  1760. full_snapshot_root_file_path: temp_dir.path().join("data-file"),
  1761. incremental_snapshot_root_file_path: None,
  1762. };
  1763. let result = deserialize_snapshot_data_files_capped(
  1764. &snapshot_root_paths,
  1765. expected_consumed_size * 2,
  1766. |stream| {
  1767. Ok(deserialize_from::<_, u32>(
  1768. &mut stream.full_snapshot_stream,
  1769. )?)
  1770. },
  1771. );
  1772. assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
  1773. }
  1774. #[test]
  1775. fn test_snapshot_version_from_file_under_limit() {
  1776. let file_content = SnapshotVersion::default().as_str();
  1777. let mut file = NamedTempFile::new().unwrap();
  1778. file.write_all(file_content.as_bytes()).unwrap();
  1779. let version_from_file = snapshot_version_from_file(file.path()).unwrap();
  1780. assert_eq!(version_from_file, file_content);
  1781. }
  1782. #[test]
  1783. fn test_snapshot_version_from_file_over_limit() {
  1784. let over_limit_size = usize::try_from(MAX_SNAPSHOT_VERSION_FILE_SIZE + 1).unwrap();
  1785. let file_content = vec![7u8; over_limit_size];
  1786. let mut file = NamedTempFile::new().unwrap();
  1787. file.write_all(&file_content).unwrap();
  1788. assert_matches!(
  1789. snapshot_version_from_file(file.path()),
  1790. Err(ref message) if message.to_string().starts_with("snapshot version file too large")
  1791. );
  1792. }
  1793. #[test]
  1794. fn test_check_are_snapshots_compatible() {
  1795. let slot1: Slot = 1234;
  1796. let slot2: Slot = 5678;
  1797. let slot3: Slot = 999_999;
  1798. let full_snapshot_archive_info = FullSnapshotArchiveInfo::new_from_path(PathBuf::from(
  1799. format!("/dir/snapshot-{}-{}.tar.zst", slot1, Hash::new_unique()),
  1800. ))
  1801. .unwrap();
  1802. assert!(check_are_snapshots_compatible(&full_snapshot_archive_info, None,).is_ok());
  1803. let incremental_snapshot_archive_info =
  1804. IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
  1805. "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
  1806. slot1,
  1807. slot2,
  1808. Hash::new_unique()
  1809. )))
  1810. .unwrap();
  1811. assert!(check_are_snapshots_compatible(
  1812. &full_snapshot_archive_info,
  1813. Some(&incremental_snapshot_archive_info)
  1814. )
  1815. .is_ok());
  1816. let incremental_snapshot_archive_info =
  1817. IncrementalSnapshotArchiveInfo::new_from_path(PathBuf::from(format!(
  1818. "/dir/incremental-snapshot-{}-{}-{}.tar.zst",
  1819. slot2,
  1820. slot3,
  1821. Hash::new_unique()
  1822. )))
  1823. .unwrap();
  1824. assert!(check_are_snapshots_compatible(
  1825. &full_snapshot_archive_info,
  1826. Some(&incremental_snapshot_archive_info)
  1827. )
  1828. .is_err());
  1829. }
  1830. /// A test heler function that creates bank snapshot files
  1831. fn common_create_bank_snapshot_files(
  1832. bank_snapshots_dir: &Path,
  1833. min_slot: Slot,
  1834. max_slot: Slot,
  1835. ) {
  1836. for slot in min_slot..max_slot {
  1837. let snapshot_dir = snapshot_paths::get_bank_snapshot_dir(bank_snapshots_dir, slot);
  1838. fs::create_dir_all(&snapshot_dir).unwrap();
  1839. let snapshot_filename = snapshot_paths::get_snapshot_file_name(slot);
  1840. let snapshot_path = snapshot_dir.join(snapshot_filename);
  1841. fs::File::create(snapshot_path).unwrap();
  1842. let status_cache_file =
  1843. snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
  1844. fs::File::create(status_cache_file).unwrap();
  1845. let version_path = snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
  1846. fs::write(version_path, SnapshotVersion::default().as_str().as_bytes()).unwrap();
  1847. }
  1848. }
  1849. #[test]
  1850. fn test_get_bank_snapshots() {
  1851. let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
  1852. let min_slot = 10;
  1853. let max_slot = 20;
  1854. common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
  1855. let bank_snapshots = get_bank_snapshots(temp_snapshots_dir.path());
  1856. assert_eq!(bank_snapshots.len() as Slot, max_slot - min_slot);
  1857. }
  1858. #[test]
  1859. fn test_get_highest_bank_snapshot() {
  1860. let temp_snapshots_dir = tempfile::TempDir::new().unwrap();
  1861. let min_slot = 99;
  1862. let max_slot = 123;
  1863. common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot);
  1864. let highest_bank_snapshot = get_highest_bank_snapshot(temp_snapshots_dir.path());
  1865. assert!(highest_bank_snapshot.is_some());
  1866. assert_eq!(highest_bank_snapshot.unwrap().slot, max_slot - 1);
  1867. }
  1868. /// A test helper function that creates full and incremental snapshot archive files. Creates
  1869. /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and
  1870. /// incremental snapshot files in the range (`min_incremental_snapshot_slot`,
  1871. /// `max_incremental_snapshot_slot`]. Additionally, "bad" files are created for both full and
  1872. /// incremental snapshots to ensure the tests properly filter them out.
  1873. fn common_create_snapshot_archive_files(
  1874. full_snapshot_archives_dir: &Path,
  1875. incremental_snapshot_archives_dir: &Path,
  1876. min_full_snapshot_slot: Slot,
  1877. max_full_snapshot_slot: Slot,
  1878. min_incremental_snapshot_slot: Slot,
  1879. max_incremental_snapshot_slot: Slot,
  1880. ) {
  1881. fs::create_dir_all(full_snapshot_archives_dir).unwrap();
  1882. fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
  1883. for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
  1884. for incremental_snapshot_slot in
  1885. min_incremental_snapshot_slot..max_incremental_snapshot_slot
  1886. {
  1887. let snapshot_filename = format!(
  1888. "incremental-snapshot-{}-{}-{}.tar.zst",
  1889. full_snapshot_slot,
  1890. incremental_snapshot_slot,
  1891. Hash::default()
  1892. );
  1893. let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
  1894. fs::File::create(snapshot_filepath).unwrap();
  1895. }
  1896. let snapshot_filename = format!(
  1897. "snapshot-{}-{}.tar.zst",
  1898. full_snapshot_slot,
  1899. Hash::default()
  1900. );
  1901. let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
  1902. fs::File::create(snapshot_filepath).unwrap();
  1903. // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
  1904. let bad_filename = format!(
  1905. "incremental-snapshot-{}-{}-bad!hash.tar.zst",
  1906. full_snapshot_slot,
  1907. max_incremental_snapshot_slot + 1,
  1908. );
  1909. let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
  1910. fs::File::create(bad_filepath).unwrap();
  1911. }
  1912. // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
  1913. // sorted correctly
  1914. let bad_filename = format!("snapshot-{}-bad!hash.tar.zst", max_full_snapshot_slot + 1);
  1915. let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
  1916. fs::File::create(bad_filepath).unwrap();
  1917. }
  1918. #[test]
  1919. fn test_get_full_snapshot_archives() {
  1920. let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  1921. let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  1922. let min_slot = 123;
  1923. let max_slot = 456;
  1924. common_create_snapshot_archive_files(
  1925. full_snapshot_archives_dir.path(),
  1926. incremental_snapshot_archives_dir.path(),
  1927. min_slot,
  1928. max_slot,
  1929. 0,
  1930. 0,
  1931. );
  1932. let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
  1933. assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
  1934. }
  1935. #[test]
  1936. fn test_get_full_snapshot_archives_remote() {
  1937. let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  1938. let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  1939. let min_slot = 123;
  1940. let max_slot = 456;
  1941. common_create_snapshot_archive_files(
  1942. &full_snapshot_archives_dir
  1943. .path()
  1944. .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
  1945. &incremental_snapshot_archives_dir
  1946. .path()
  1947. .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
  1948. min_slot,
  1949. max_slot,
  1950. 0,
  1951. 0,
  1952. );
  1953. let snapshot_archives = get_full_snapshot_archives(full_snapshot_archives_dir);
  1954. assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
  1955. assert!(snapshot_archives.iter().all(|info| info.is_remote()));
  1956. }
  1957. #[test]
  1958. fn test_get_incremental_snapshot_archives() {
  1959. let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  1960. let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  1961. let min_full_snapshot_slot = 12;
  1962. let max_full_snapshot_slot = 23;
  1963. let min_incremental_snapshot_slot = 34;
  1964. let max_incremental_snapshot_slot = 45;
  1965. common_create_snapshot_archive_files(
  1966. full_snapshot_archives_dir.path(),
  1967. incremental_snapshot_archives_dir.path(),
  1968. min_full_snapshot_slot,
  1969. max_full_snapshot_slot,
  1970. min_incremental_snapshot_slot,
  1971. max_incremental_snapshot_slot,
  1972. );
  1973. let incremental_snapshot_archives =
  1974. get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
  1975. assert_eq!(
  1976. incremental_snapshot_archives.len() as Slot,
  1977. (max_full_snapshot_slot - min_full_snapshot_slot)
  1978. * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
  1979. );
  1980. }
  1981. #[test]
  1982. fn test_get_incremental_snapshot_archives_remote() {
  1983. let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  1984. let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  1985. let min_full_snapshot_slot = 12;
  1986. let max_full_snapshot_slot = 23;
  1987. let min_incremental_snapshot_slot = 34;
  1988. let max_incremental_snapshot_slot = 45;
  1989. common_create_snapshot_archive_files(
  1990. &full_snapshot_archives_dir
  1991. .path()
  1992. .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
  1993. &incremental_snapshot_archives_dir
  1994. .path()
  1995. .join(snapshot_paths::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR),
  1996. min_full_snapshot_slot,
  1997. max_full_snapshot_slot,
  1998. min_incremental_snapshot_slot,
  1999. max_incremental_snapshot_slot,
  2000. );
  2001. let incremental_snapshot_archives =
  2002. get_incremental_snapshot_archives(incremental_snapshot_archives_dir);
  2003. assert_eq!(
  2004. incremental_snapshot_archives.len() as Slot,
  2005. (max_full_snapshot_slot - min_full_snapshot_slot)
  2006. * (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
  2007. );
  2008. assert!(incremental_snapshot_archives
  2009. .iter()
  2010. .all(|info| info.is_remote()));
  2011. }
  2012. #[test]
  2013. fn test_get_highest_full_snapshot_archive_slot() {
  2014. let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2015. let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2016. let min_slot = 123;
  2017. let max_slot = 456;
  2018. common_create_snapshot_archive_files(
  2019. full_snapshot_archives_dir.path(),
  2020. incremental_snapshot_archives_dir.path(),
  2021. min_slot,
  2022. max_slot,
  2023. 0,
  2024. 0,
  2025. );
  2026. assert_eq!(
  2027. get_highest_full_snapshot_archive_slot(full_snapshot_archives_dir.path()),
  2028. Some(max_slot - 1)
  2029. );
  2030. }
  2031. #[test]
  2032. fn test_get_highest_incremental_snapshot_slot() {
  2033. let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2034. let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2035. let min_full_snapshot_slot = 12;
  2036. let max_full_snapshot_slot = 23;
  2037. let min_incremental_snapshot_slot = 34;
  2038. let max_incremental_snapshot_slot = 45;
  2039. common_create_snapshot_archive_files(
  2040. full_snapshot_archives_dir.path(),
  2041. incremental_snapshot_archives_dir.path(),
  2042. min_full_snapshot_slot,
  2043. max_full_snapshot_slot,
  2044. min_incremental_snapshot_slot,
  2045. max_incremental_snapshot_slot,
  2046. );
  2047. for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
  2048. assert_eq!(
  2049. get_highest_incremental_snapshot_archive_slot(
  2050. incremental_snapshot_archives_dir.path(),
  2051. full_snapshot_slot
  2052. ),
  2053. Some(max_incremental_snapshot_slot - 1)
  2054. );
  2055. }
  2056. assert_eq!(
  2057. get_highest_incremental_snapshot_archive_slot(
  2058. incremental_snapshot_archives_dir.path(),
  2059. max_full_snapshot_slot
  2060. ),
  2061. None
  2062. );
  2063. }
  2064. fn common_test_purge_old_snapshot_archives(
  2065. snapshot_names: &[&String],
  2066. maximum_full_snapshot_archives_to_retain: NonZeroUsize,
  2067. maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
  2068. expected_snapshots: &[&String],
  2069. ) {
  2070. let temp_snap_dir = tempfile::TempDir::new().unwrap();
  2071. for snap_name in snapshot_names {
  2072. let snap_path = temp_snap_dir.path().join(snap_name);
  2073. let mut _snap_file = fs::File::create(snap_path);
  2074. }
  2075. purge_old_snapshot_archives(
  2076. temp_snap_dir.path(),
  2077. temp_snap_dir.path(),
  2078. maximum_full_snapshot_archives_to_retain,
  2079. maximum_incremental_snapshot_archives_to_retain,
  2080. );
  2081. let mut retained_snaps = HashSet::new();
  2082. for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
  2083. let entry_path_buf = entry.unwrap().path();
  2084. let entry_path = entry_path_buf.as_path();
  2085. let snapshot_name = entry_path
  2086. .file_name()
  2087. .unwrap()
  2088. .to_str()
  2089. .unwrap()
  2090. .to_string();
  2091. retained_snaps.insert(snapshot_name);
  2092. }
  2093. for snap_name in expected_snapshots {
  2094. assert!(
  2095. retained_snaps.contains(snap_name.as_str()),
  2096. "{snap_name} not found"
  2097. );
  2098. }
  2099. assert_eq!(retained_snaps.len(), expected_snapshots.len());
  2100. }
  2101. #[test]
  2102. fn test_purge_old_full_snapshot_archives() {
  2103. let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default());
  2104. let snap2_name = format!("snapshot-3-{}.tar.zst", Hash::default());
  2105. let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
  2106. let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
  2107. // expecting only the newest to be retained
  2108. let expected_snapshots = vec![&snap3_name];
  2109. common_test_purge_old_snapshot_archives(
  2110. &snapshot_names,
  2111. NonZeroUsize::new(1).unwrap(),
  2112. DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
  2113. &expected_snapshots,
  2114. );
  2115. // retaining 2, expecting the 2 newest to be retained
  2116. let expected_snapshots = vec![&snap2_name, &snap3_name];
  2117. common_test_purge_old_snapshot_archives(
  2118. &snapshot_names,
  2119. NonZeroUsize::new(2).unwrap(),
  2120. DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
  2121. &expected_snapshots,
  2122. );
  2123. // retaining 3, all three should be retained
  2124. let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
  2125. common_test_purge_old_snapshot_archives(
  2126. &snapshot_names,
  2127. NonZeroUsize::new(3).unwrap(),
  2128. DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
  2129. &expected_snapshots,
  2130. );
  2131. }
  2132. /// Mimic a running node's behavior w.r.t. purging old snapshot archives. Take snapshots in a
  2133. /// loop, and periodically purge old snapshot archives. After purging, check to make sure the
  2134. /// snapshot archives on disk are correct.
  2135. #[test]
  2136. fn test_purge_old_full_snapshot_archives_in_the_loop() {
  2137. let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2138. let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2139. let maximum_snapshots_to_retain = NonZeroUsize::new(5).unwrap();
  2140. let starting_slot: Slot = 42;
  2141. for slot in (starting_slot..).take(100) {
  2142. let full_snapshot_archive_file_name =
  2143. format!("snapshot-{}-{}.tar.zst", slot, Hash::default());
  2144. let full_snapshot_archive_path = full_snapshot_archives_dir
  2145. .as_ref()
  2146. .join(full_snapshot_archive_file_name);
  2147. fs::File::create(full_snapshot_archive_path).unwrap();
  2148. // don't purge-and-check until enough snapshot archives have been created
  2149. if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
  2150. continue;
  2151. }
  2152. // purge infrequently, so there will always be snapshot archives to purge
  2153. if slot % (maximum_snapshots_to_retain.get() as Slot * 2) != 0 {
  2154. continue;
  2155. }
  2156. purge_old_snapshot_archives(
  2157. &full_snapshot_archives_dir,
  2158. &incremental_snapshot_archives_dir,
  2159. maximum_snapshots_to_retain,
  2160. NonZeroUsize::new(usize::MAX).unwrap(),
  2161. );
  2162. let mut full_snapshot_archives =
  2163. get_full_snapshot_archives(&full_snapshot_archives_dir);
  2164. full_snapshot_archives.sort_unstable();
  2165. assert_eq!(
  2166. full_snapshot_archives.len(),
  2167. maximum_snapshots_to_retain.get()
  2168. );
  2169. assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
  2170. for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
  2171. assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
  2172. }
  2173. }
  2174. }
  2175. #[test]
  2176. fn test_purge_old_incremental_snapshot_archives() {
  2177. let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2178. let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2179. let starting_slot = 100_000;
  2180. let maximum_incremental_snapshot_archives_to_retain =
  2181. DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
  2182. let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
  2183. let incremental_snapshot_interval = 100;
  2184. let num_incremental_snapshots_per_full_snapshot =
  2185. maximum_incremental_snapshot_archives_to_retain.get() * 2;
  2186. let full_snapshot_interval =
  2187. incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
  2188. let mut snapshot_filenames = vec![];
  2189. (starting_slot..)
  2190. .step_by(full_snapshot_interval)
  2191. .take(
  2192. maximum_full_snapshot_archives_to_retain
  2193. .checked_mul(NonZeroUsize::new(2).unwrap())
  2194. .unwrap()
  2195. .get(),
  2196. )
  2197. .for_each(|full_snapshot_slot| {
  2198. let snapshot_filename = format!(
  2199. "snapshot-{}-{}.tar.zst",
  2200. full_snapshot_slot,
  2201. Hash::default()
  2202. );
  2203. let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
  2204. fs::File::create(snapshot_path).unwrap();
  2205. snapshot_filenames.push(snapshot_filename);
  2206. (full_snapshot_slot..)
  2207. .step_by(incremental_snapshot_interval)
  2208. .take(num_incremental_snapshots_per_full_snapshot)
  2209. .skip(1)
  2210. .for_each(|incremental_snapshot_slot| {
  2211. let snapshot_filename = format!(
  2212. "incremental-snapshot-{}-{}-{}.tar.zst",
  2213. full_snapshot_slot,
  2214. incremental_snapshot_slot,
  2215. Hash::default()
  2216. );
  2217. let snapshot_path = incremental_snapshot_archives_dir
  2218. .path()
  2219. .join(&snapshot_filename);
  2220. fs::File::create(snapshot_path).unwrap();
  2221. snapshot_filenames.push(snapshot_filename);
  2222. });
  2223. });
  2224. purge_old_snapshot_archives(
  2225. full_snapshot_archives_dir.path(),
  2226. incremental_snapshot_archives_dir.path(),
  2227. maximum_full_snapshot_archives_to_retain,
  2228. maximum_incremental_snapshot_archives_to_retain,
  2229. );
  2230. // Ensure correct number of full snapshot archives are purged/retained
  2231. let mut remaining_full_snapshot_archives =
  2232. get_full_snapshot_archives(full_snapshot_archives_dir.path());
  2233. assert_eq!(
  2234. remaining_full_snapshot_archives.len(),
  2235. maximum_full_snapshot_archives_to_retain.get(),
  2236. );
  2237. remaining_full_snapshot_archives.sort_unstable();
  2238. let latest_full_snapshot_archive_slot =
  2239. remaining_full_snapshot_archives.last().unwrap().slot();
  2240. // Ensure correct number of incremental snapshot archives are purged/retained
  2241. // For each additional full snapshot archive, one additional (the newest)
  2242. // incremental snapshot archive is retained. This is accounted for by the
  2243. // `+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)`
  2244. let mut remaining_incremental_snapshot_archives =
  2245. get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
  2246. assert_eq!(
  2247. remaining_incremental_snapshot_archives.len(),
  2248. maximum_incremental_snapshot_archives_to_retain
  2249. .get()
  2250. .saturating_add(
  2251. maximum_full_snapshot_archives_to_retain
  2252. .get()
  2253. .saturating_sub(1)
  2254. )
  2255. );
  2256. remaining_incremental_snapshot_archives.sort_unstable();
  2257. remaining_incremental_snapshot_archives.reverse();
  2258. // Ensure there exists one incremental snapshot all but the latest full snapshot
  2259. for i in (1..maximum_full_snapshot_archives_to_retain.get()).rev() {
  2260. let incremental_snapshot_archive =
  2261. remaining_incremental_snapshot_archives.pop().unwrap();
  2262. let expected_base_slot =
  2263. latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
  2264. assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
  2265. let expected_slot = expected_base_slot
  2266. + (full_snapshot_interval - incremental_snapshot_interval) as u64;
  2267. assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
  2268. }
  2269. // Ensure all remaining incremental snapshots are only for the latest full snapshot
  2270. for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
  2271. assert_eq!(
  2272. incremental_snapshot_archive.base_slot(),
  2273. latest_full_snapshot_archive_slot
  2274. );
  2275. }
  2276. // Ensure the remaining incremental snapshots are at the right slot
  2277. let expected_remaining_incremental_snapshot_archive_slots =
  2278. (latest_full_snapshot_archive_slot..)
  2279. .step_by(incremental_snapshot_interval)
  2280. .take(num_incremental_snapshots_per_full_snapshot)
  2281. .skip(
  2282. num_incremental_snapshots_per_full_snapshot
  2283. - maximum_incremental_snapshot_archives_to_retain.get(),
  2284. )
  2285. .collect::<HashSet<_>>();
  2286. let actual_remaining_incremental_snapshot_archive_slots =
  2287. remaining_incremental_snapshot_archives
  2288. .iter()
  2289. .map(|snapshot| snapshot.slot())
  2290. .collect::<HashSet<_>>();
  2291. assert_eq!(
  2292. actual_remaining_incremental_snapshot_archive_slots,
  2293. expected_remaining_incremental_snapshot_archive_slots
  2294. );
  2295. }
  2296. #[test]
  2297. fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
  2298. let full_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2299. let incremental_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
  2300. for snapshot_filenames in [
  2301. format!("incremental-snapshot-100-120-{}.tar.zst", Hash::default()),
  2302. format!("incremental-snapshot-100-140-{}.tar.zst", Hash::default()),
  2303. format!("incremental-snapshot-100-160-{}.tar.zst", Hash::default()),
  2304. format!("incremental-snapshot-100-180-{}.tar.zst", Hash::default()),
  2305. format!("incremental-snapshot-200-220-{}.tar.zst", Hash::default()),
  2306. format!("incremental-snapshot-200-240-{}.tar.zst", Hash::default()),
  2307. format!("incremental-snapshot-200-260-{}.tar.zst", Hash::default()),
  2308. format!("incremental-snapshot-200-280-{}.tar.zst", Hash::default()),
  2309. ] {
  2310. let snapshot_path = incremental_snapshot_archives_dir
  2311. .path()
  2312. .join(snapshot_filenames);
  2313. fs::File::create(snapshot_path).unwrap();
  2314. }
  2315. purge_old_snapshot_archives(
  2316. full_snapshot_archives_dir.path(),
  2317. incremental_snapshot_archives_dir.path(),
  2318. NonZeroUsize::new(usize::MAX).unwrap(),
  2319. NonZeroUsize::new(usize::MAX).unwrap(),
  2320. );
  2321. let remaining_incremental_snapshot_archives =
  2322. get_incremental_snapshot_archives(incremental_snapshot_archives_dir.path());
  2323. assert!(remaining_incremental_snapshot_archives.is_empty());
  2324. }
  2325. #[test]
  2326. fn test_get_snapshot_accounts_hardlink_dir() {
  2327. let slot: Slot = 1;
  2328. let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
  2329. let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
  2330. let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
  2331. let accounts_hardlinks_dir =
  2332. bank_snapshot_dir.join(snapshot_paths::SNAPSHOT_ACCOUNTS_HARDLINKS);
  2333. fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
  2334. let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
  2335. let appendvec_filename = format!("{slot}.0");
  2336. let appendvec_path = accounts_dir.join(appendvec_filename);
  2337. let ret = get_snapshot_accounts_hardlink_dir(
  2338. &appendvec_path,
  2339. slot,
  2340. &mut account_paths_set,
  2341. &accounts_hardlinks_dir,
  2342. );
  2343. assert!(ret.is_ok());
  2344. let wrong_appendvec_path = appendvec_path
  2345. .parent()
  2346. .unwrap()
  2347. .parent()
  2348. .unwrap()
  2349. .join(appendvec_path.file_name().unwrap());
  2350. let ret = get_snapshot_accounts_hardlink_dir(
  2351. &wrong_appendvec_path,
  2352. slot,
  2353. &mut account_paths_set,
  2354. accounts_hardlinks_dir,
  2355. );
  2356. assert_matches!(
  2357. ret,
  2358. Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
  2359. );
  2360. }
  2361. #[test]
  2362. fn test_get_snapshot_file_kind() {
  2363. assert_eq!(None, get_snapshot_file_kind("file.txt"));
  2364. assert_eq!(
  2365. Some(SnapshotFileKind::Version),
  2366. get_snapshot_file_kind(snapshot_paths::SNAPSHOT_VERSION_FILENAME)
  2367. );
  2368. assert_eq!(
  2369. Some(SnapshotFileKind::BankFields),
  2370. get_snapshot_file_kind("1234")
  2371. );
  2372. assert_eq!(
  2373. Some(SnapshotFileKind::Storage),
  2374. get_snapshot_file_kind("1000.999")
  2375. );
  2376. }
  2377. #[test_case(0)]
  2378. #[test_case(1)]
  2379. #[test_case(10)]
  2380. fn test_serialize_deserialize_account_storage_entries(num_storages: u64) {
  2381. let temp_dir = tempfile::tempdir().unwrap();
  2382. let bank_snapshot_dir = temp_dir.path();
  2383. let snapshot_slot = num_storages + 1 as Slot;
  2384. // Create AccountStorageEntries
  2385. let mut snapshot_storages = Vec::new();
  2386. for i in 0..num_storages {
  2387. let storage = Arc::new(AccountStorageEntry::new(
  2388. &PathBuf::new(),
  2389. i, // Incrementing slot
  2390. i as u32, // Incrementing id
  2391. 1024,
  2392. AccountsFileProvider::AppendVec,
  2393. StorageAccess::File,
  2394. ));
  2395. snapshot_storages.push(storage);
  2396. }
  2397. // write obsolete accounts to snapshot
  2398. write_obsolete_accounts_to_snapshot(bank_snapshot_dir, &snapshot_storages, snapshot_slot)
  2399. .unwrap();
  2400. // Deserialize
  2401. let deserialized_accounts =
  2402. deserialize_obsolete_accounts(bank_snapshot_dir, MAX_OBSOLETE_ACCOUNTS_FILE_SIZE)
  2403. .unwrap();
  2404. // Verify
  2405. for storage in &snapshot_storages {
  2406. assert!(deserialized_accounts.remove(&storage.slot()).unwrap().2 == 0);
  2407. }
  2408. }
  2409. #[test]
  2410. #[should_panic(expected = "too large obsolete accounts file to serialize")]
  2411. fn test_serialize_obsolete_accounts_too_large_file() {
  2412. let temp_dir = tempfile::tempdir().unwrap();
  2413. let bank_snapshot_dir = temp_dir.path();
  2414. let num_storages = 10;
  2415. let snapshot_slot = num_storages + 1 as Slot;
  2416. // Create AccountStorageEntries
  2417. let mut snapshot_storages = Vec::new();
  2418. for i in 0..num_storages {
  2419. let storage = Arc::new(AccountStorageEntry::new(
  2420. &PathBuf::new(),
  2421. i, // Incrementing slot
  2422. i as u32, // Incrementing id
  2423. 1024,
  2424. AccountsFileProvider::AppendVec,
  2425. StorageAccess::File,
  2426. ));
  2427. snapshot_storages.push(storage);
  2428. }
  2429. // write obsolete accounts to snapshot
  2430. let obsolete_accounts =
  2431. SerdeObsoleteAccountsMap::new_from_storages(&snapshot_storages, snapshot_slot);
  2432. // Limit the file size to something low for the test
  2433. serialize_obsolete_accounts(bank_snapshot_dir, &obsolete_accounts, 100).unwrap();
  2434. }
  2435. #[test]
  2436. #[should_panic(expected = "too large obsolete accounts file to deserialize")]
  2437. fn test_deserialize_obsolete_accounts_too_large_file() {
  2438. let temp_dir = tempfile::tempdir().unwrap();
  2439. let bank_snapshot_dir = temp_dir.path();
  2440. let num_storages = 10;
  2441. let snapshot_slot = num_storages + 1 as Slot;
  2442. // Create AccountStorageEntries
  2443. let mut snapshot_storages = Vec::new();
  2444. for i in 0..num_storages {
  2445. let storage = Arc::new(AccountStorageEntry::new(
  2446. &PathBuf::new(),
  2447. i, // Incrementing slot
  2448. i as u32, // Incrementing id
  2449. 1024,
  2450. AccountsFileProvider::AppendVec,
  2451. StorageAccess::File,
  2452. ));
  2453. snapshot_storages.push(storage);
  2454. }
  2455. // Write obsolete accounts to snapshot
  2456. write_obsolete_accounts_to_snapshot(bank_snapshot_dir, &snapshot_storages, snapshot_slot)
  2457. .unwrap();
  2458. // Set a very low maximum file size for deserialization
  2459. // This should panic
  2460. deserialize_obsolete_accounts(bank_snapshot_dir, 100).unwrap();
  2461. }
  2462. }