Browse Source

Move archive_snapshot to agave-snapshots (#8745)

Kamil Skalski 3 weeks ago
parent
commit
37d4c19a6b

+ 3 - 2
Cargo.lock

@@ -289,7 +289,10 @@ dependencies = [
  "solana-genesis-config",
  "solana-hash",
  "solana-lattice-hash",
+ "solana-measure",
+ "solana-metrics",
  "strum",
+ "symlink",
  "tar",
  "tempfile",
  "thiserror 2.0.17",
@@ -10272,11 +10275,9 @@ dependencies = [
  "strum",
  "strum_macros",
  "symlink",
- "tar",
  "tempfile",
  "test-case",
  "thiserror 2.0.17",
- "zstd",
 ]
 
 [[package]]

+ 4 - 2
dev-bins/Cargo.lock

@@ -271,8 +271,12 @@ dependencies = [
  "solana-genesis-config",
  "solana-hash",
  "solana-lattice-hash",
+ "solana-measure",
+ "solana-metrics",
  "strum",
+ "symlink",
  "tar",
+ "tempfile",
  "thiserror 2.0.17",
  "zstd",
 ]
@@ -8572,10 +8576,8 @@ dependencies = [
  "strum",
  "strum_macros",
  "symlink",
- "tar",
  "tempfile",
  "thiserror 2.0.17",
- "zstd",
 ]
 
 [[package]]

+ 4 - 2
programs/sbf/Cargo.lock

@@ -186,8 +186,12 @@ dependencies = [
  "solana-genesis-config",
  "solana-hash",
  "solana-lattice-hash",
+ "solana-measure",
+ "solana-metrics",
  "strum",
+ "symlink",
  "tar",
+ "tempfile",
  "thiserror 2.0.17",
  "zstd",
 ]
@@ -8318,10 +8322,8 @@ dependencies = [
  "strum",
  "strum_macros",
  "symlink",
- "tar",
  "tempfile",
  "thiserror 2.0.17",
- "zstd",
 ]
 
 [[package]]

+ 0 - 2
runtime/Cargo.toml

@@ -186,10 +186,8 @@ static_assertions = { workspace = true }
 strum = { workspace = true, features = ["derive"] }
 strum_macros = { workspace = true }
 symlink = { workspace = true }
-tar = { workspace = true }
 tempfile = { workspace = true }
 thiserror = { workspace = true }
-zstd = { workspace = true }
 
 [dev-dependencies]
 agave-logger = { workspace = true }

+ 3 - 193
runtime/src/snapshot_utils.rs

@@ -14,8 +14,9 @@ use {
         },
     },
     agave_snapshots::{
+        archive_snapshot,
         error::{
-            AddBankSnapshotError, ArchiveSnapshotPackageError, GetSnapshotAccountsHardLinkDirError,
+            AddBankSnapshotError, GetSnapshotAccountsHardLinkDirError,
             HardLinkStoragesToSnapshotError, SnapshotError, SnapshotFastbootError,
             SnapshotNewFromDirError,
         },
@@ -25,7 +26,6 @@ use {
             SnapshotArchiveInfoGetter,
         },
         snapshot_config::SnapshotConfig,
-        snapshot_hash::SnapshotHash,
         streaming_unarchive_snapshot, ArchiveFormat, Result, SnapshotKind, SnapshotVersion,
     },
     crossbeam_channel::{Receiver, Sender},
@@ -33,8 +33,7 @@ use {
     regex::Regex,
     semver::Version,
     solana_accounts_db::{
-        account_storage::{AccountStorageMap, AccountStoragesOrderer},
-        account_storage_reader::AccountStorageReader,
+        account_storage::AccountStorageMap,
         accounts_db::{AccountStorageEntry, AccountsDbConfig, AtomicAccountsFileId},
         accounts_file::{AccountsFile, StorageAccess},
         utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
@@ -52,7 +51,6 @@ use {
         str::FromStr,
         sync::{Arc, LazyLock},
     },
-    tar,
     tempfile::TempDir,
 };
 
@@ -74,11 +72,6 @@ const MAX_SNAPSHOT_VERSION_FILE_SIZE: u64 = 8; // byte
 //         Snapshots created with versions <2.0.0 will fastboot to version 2.0.0
 const SNAPSHOT_FASTBOOT_VERSION: Version = Version::new(2, 0, 0);
 
-// Balance large and small files order in snapshot tar with bias towards small (4 small + 1 large),
-// such that during unpacking large writes are mixed with file metadata operations
-// and towards the end of archive (sizes equalize) writes are >256KiB / file.
-const INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO: (usize, usize) = (4, 1);
-
 /// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
 /// the kind of the snapshot.
 #[derive(PartialEq, Eq, Debug)]
@@ -718,189 +711,6 @@ fn serialize_snapshot(
     do_serialize_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, slot))
 }
 
-/// Archives a snapshot into `archive_path`
-fn archive_snapshot(
-    snapshot_kind: SnapshotKind,
-    snapshot_slot: Slot,
-    snapshot_hash: SnapshotHash,
-    snapshot_storages: &[Arc<AccountStorageEntry>],
-    bank_snapshot_dir: impl AsRef<Path>,
-    archive_path: impl AsRef<Path>,
-    archive_format: ArchiveFormat,
-) -> Result<SnapshotArchiveInfo> {
-    use ArchiveSnapshotPackageError as E;
-    const ACCOUNTS_DIR: &str = "accounts";
-    info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
-
-    let mut timer = Measure::start("snapshot_package-package_snapshots");
-    let tar_dir = archive_path
-        .as_ref()
-        .parent()
-        .expect("Tar output path is invalid");
-
-    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
-
-    // Create the staging directories
-    let staging_dir_prefix = snapshot_paths::TMP_SNAPSHOT_ARCHIVE_PREFIX;
-    let staging_dir = tempfile::Builder::new()
-        .prefix(&format!("{staging_dir_prefix}{snapshot_slot}-"))
-        .tempdir_in(tar_dir)
-        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
-    let staging_snapshots_dir = staging_dir.path().join(snapshot_paths::BANK_SNAPSHOTS_DIR);
-
-    let slot_str = snapshot_slot.to_string();
-    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
-    // Creates staging snapshots/<slot>/
-    fs::create_dir_all(&staging_snapshot_dir)
-        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
-
-    // To be a source for symlinking and archiving, the path need to be an absolute path
-    let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
-        E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
-    })?;
-    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
-    let src_snapshot_file = src_snapshot_dir.join(slot_str);
-    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
-        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
-
-    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
-    // like in the snapshot dir.
-    let staging_status_cache =
-        staging_snapshots_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
-    let src_status_cache = src_snapshot_dir.join(snapshot_paths::SNAPSHOT_STATUS_CACHE_FILENAME);
-    symlink::symlink_file(&src_status_cache, &staging_status_cache)
-        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
-
-    // The bank snapshot has the version file, so symlink it to the correct staging path
-    let staging_version_file = staging_dir
-        .path()
-        .join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
-    let src_version_file = src_snapshot_dir.join(snapshot_paths::SNAPSHOT_VERSION_FILENAME);
-    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
-        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
-    })?;
-
-    // Tar the staging directory into the archive at `staging_archive_path`
-    let staging_archive_path = tar_dir.join(format!(
-        "{}{}.{}",
-        staging_dir_prefix,
-        snapshot_slot,
-        archive_format.extension(),
-    ));
-
-    {
-        let archive_file = fs::File::create(&staging_archive_path)
-            .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
-
-        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
-            let mut archive = tar::Builder::new(encoder);
-            // Disable sparse file handling.  This seems to be the root cause of an issue when
-            // upgrading v2.0 to v2.1, and the tar crate from 0.4.41 to 0.4.42.
-            // Since the tarball will still go through compression (zstd/etc) afterwards, disabling
-            // sparse handling in the tar itself should be fine.
-            //
-            // Likely introduced in [^1].  Tracking resolution in [^2].
-            // [^1] https://github.com/alexcrichton/tar-rs/pull/375
-            // [^2] https://github.com/alexcrichton/tar-rs/issues/403
-            archive.sparse(false);
-            // Serialize the version and snapshots files before accounts so we can quickly determine the version
-            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
-            archive
-                .append_path_with_name(
-                    &staging_version_file,
-                    snapshot_paths::SNAPSHOT_VERSION_FILENAME,
-                )
-                .map_err(E::ArchiveVersionFile)?;
-            archive
-                .append_dir_all(snapshot_paths::BANK_SNAPSHOTS_DIR, &staging_snapshots_dir)
-                .map_err(E::ArchiveSnapshotsDir)?;
-
-            let storages_orderer = AccountStoragesOrderer::with_small_to_large_ratio(
-                snapshot_storages,
-                INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO,
-            );
-            for storage in storages_orderer.iter() {
-                let path_in_archive = Path::new(ACCOUNTS_DIR)
-                    .join(AccountsFile::file_name(storage.slot(), storage.id()));
-
-                let reader =
-                    AccountStorageReader::new(storage, Some(snapshot_slot)).map_err(|err| {
-                        E::AccountStorageReaderError(err, storage.path().to_path_buf())
-                    })?;
-                let mut header = tar::Header::new_gnu();
-                header.set_path(path_in_archive).map_err(|err| {
-                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
-                })?;
-                header.set_size(reader.len() as u64);
-                header.set_cksum();
-                archive.append(&header, reader).map_err(|err| {
-                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
-                })?;
-            }
-
-            archive.into_inner().map_err(E::FinishArchive)?;
-            Ok(())
-        };
-
-        match archive_format {
-            ArchiveFormat::TarZstd { config } => {
-                let mut encoder =
-                    zstd::stream::Encoder::new(archive_file, config.compression_level)
-                        .map_err(E::CreateEncoder)?;
-                do_archive_files(&mut encoder)?;
-                encoder.finish().map_err(E::FinishEncoder)?;
-            }
-            ArchiveFormat::TarLz4 => {
-                let mut encoder = lz4::EncoderBuilder::new()
-                    .level(1)
-                    .build(archive_file)
-                    .map_err(E::CreateEncoder)?;
-                do_archive_files(&mut encoder)?;
-                let (_output, result) = encoder.finish();
-                result.map_err(E::FinishEncoder)?;
-            }
-        };
-    }
-
-    // Atomically move the archive into position for other validators to find
-    let metadata = fs::metadata(&staging_archive_path)
-        .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
-    let archive_path = archive_path.as_ref().to_path_buf();
-    fs::rename(&staging_archive_path, &archive_path)
-        .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
-
-    timer.stop();
-    info!(
-        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
-        archive_path.display(),
-        snapshot_slot,
-        timer.as_ms(),
-        metadata.len()
-    );
-
-    datapoint_info!(
-        "archive-snapshot-package",
-        ("slot", snapshot_slot, i64),
-        ("archive_format", archive_format.to_string(), String),
-        ("duration_ms", timer.as_ms(), i64),
-        (
-            if snapshot_kind.is_full_snapshot() {
-                "full-snapshot-archive-size"
-            } else {
-                "incremental-snapshot-archive-size"
-            },
-            metadata.len(),
-            i64
-        ),
-    );
-    Ok(SnapshotArchiveInfo {
-        path: archive_path,
-        slot: snapshot_slot,
-        hash: snapshot_hash,
-        archive_format,
-    })
-}
-
 /// Get the bank snapshots in a directory
 pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
     let mut bank_snapshots = Vec::default();

+ 4 - 1
snapshots/Cargo.toml

@@ -34,12 +34,15 @@ solana-clock = { workspace = true }
 solana-genesis-config = { workspace = true }
 solana-hash = { workspace = true }
 solana-lattice-hash = { workspace = true }
+solana-measure = { workspace = true }
+solana-metrics = { workspace = true }
 strum = { workspace = true, features = ["derive"] }
+symlink = { workspace = true }
 tar = { workspace = true }
+tempfile = { workspace = true }
 thiserror = { workspace = true }
 zstd = { workspace = true }
 
 [dev-dependencies]
 agave-logger = { workspace = true }
 assert_matches = { workspace = true }
-tempfile = { workspace = true }

+ 197 - 0
snapshots/src/archive.rs

@@ -0,0 +1,197 @@
+use {
+    crate::{
+        error::ArchiveSnapshotPackageError, paths, snapshot_archive_info::SnapshotArchiveInfo,
+        snapshot_hash::SnapshotHash, ArchiveFormat, Result, SnapshotKind,
+    },
+    log::info,
+    solana_accounts_db::{
+        account_storage::AccountStoragesOrderer, account_storage_reader::AccountStorageReader,
+        accounts_db::AccountStorageEntry, accounts_file::AccountsFile,
+    },
+    solana_clock::Slot,
+    solana_measure::measure::Measure,
+    solana_metrics::datapoint_info,
+    std::{fs, io::Write, path::Path, sync::Arc},
+};
+
+// Balance large and small files order in snapshot tar with bias towards small (4 small + 1 large),
+// such that during unpacking large writes are mixed with file metadata operations
+// and towards the end of archive (sizes equalize) writes are >256KiB / file.
+const INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO: (usize, usize) = (4, 1);
+
+/// Archives a snapshot into `archive_path`
+pub fn archive_snapshot(
+    snapshot_kind: SnapshotKind,
+    snapshot_slot: Slot,
+    snapshot_hash: SnapshotHash,
+    snapshot_storages: &[Arc<AccountStorageEntry>],
+    bank_snapshot_dir: impl AsRef<Path>,
+    archive_path: impl AsRef<Path>,
+    archive_format: ArchiveFormat,
+) -> Result<SnapshotArchiveInfo> {
+    use ArchiveSnapshotPackageError as E;
+    const ACCOUNTS_DIR: &str = "accounts";
+    info!("Generating snapshot archive for slot {snapshot_slot}, kind: {snapshot_kind:?}");
+
+    let mut timer = Measure::start("snapshot_package-package_snapshots");
+    let tar_dir = archive_path
+        .as_ref()
+        .parent()
+        .expect("Tar output path is invalid");
+
+    fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
+
+    // Create the staging directories
+    let staging_dir_prefix = paths::TMP_SNAPSHOT_ARCHIVE_PREFIX;
+    let staging_dir = tempfile::Builder::new()
+        .prefix(&format!("{staging_dir_prefix}{snapshot_slot}-"))
+        .tempdir_in(tar_dir)
+        .map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
+    let staging_snapshots_dir = staging_dir.path().join(paths::BANK_SNAPSHOTS_DIR);
+
+    let slot_str = snapshot_slot.to_string();
+    let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
+    // Creates staging snapshots/<slot>/
+    fs::create_dir_all(&staging_snapshot_dir)
+        .map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
+
+    // To be a source for symlinking and archiving, the path need to be an absolute path
+    let src_snapshot_dir = bank_snapshot_dir.as_ref().canonicalize().map_err(|err| {
+        E::CanonicalizeSnapshotSourceDir(err, bank_snapshot_dir.as_ref().to_path_buf())
+    })?;
+    let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
+    let src_snapshot_file = src_snapshot_dir.join(slot_str);
+    symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
+        .map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
+
+    // Following the existing archive format, the status cache is under snapshots/, not under <slot>/
+    // like in the snapshot dir.
+    let staging_status_cache = staging_snapshots_dir.join(paths::SNAPSHOT_STATUS_CACHE_FILENAME);
+    let src_status_cache = src_snapshot_dir.join(paths::SNAPSHOT_STATUS_CACHE_FILENAME);
+    symlink::symlink_file(&src_status_cache, &staging_status_cache)
+        .map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
+
+    // The bank snapshot has the version file, so symlink it to the correct staging path
+    let staging_version_file = staging_dir.path().join(paths::SNAPSHOT_VERSION_FILENAME);
+    let src_version_file = src_snapshot_dir.join(paths::SNAPSHOT_VERSION_FILENAME);
+    symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
+        E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
+    })?;
+
+    // Tar the staging directory into the archive at `staging_archive_path`
+    let staging_archive_path = tar_dir.join(format!(
+        "{}{}.{}",
+        staging_dir_prefix,
+        snapshot_slot,
+        archive_format.extension(),
+    ));
+
+    {
+        let archive_file = fs::File::create(&staging_archive_path)
+            .map_err(|err| E::CreateArchiveFile(err, staging_archive_path.clone()))?;
+
+        let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
+            let mut archive = tar::Builder::new(encoder);
+            // Disable sparse file handling.  This seems to be the root cause of an issue when
+            // upgrading v2.0 to v2.1, and the tar crate from 0.4.41 to 0.4.42.
+            // Since the tarball will still go through compression (zstd/etc) afterwards, disabling
+            // sparse handling in the tar itself should be fine.
+            //
+            // Likely introduced in [^1].  Tracking resolution in [^2].
+            // [^1] https://github.com/alexcrichton/tar-rs/pull/375
+            // [^2] https://github.com/alexcrichton/tar-rs/issues/403
+            archive.sparse(false);
+            // Serialize the version and snapshots files before accounts so we can quickly determine the version
+            // and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
+            archive
+                .append_path_with_name(&staging_version_file, paths::SNAPSHOT_VERSION_FILENAME)
+                .map_err(E::ArchiveVersionFile)?;
+            archive
+                .append_dir_all(paths::BANK_SNAPSHOTS_DIR, &staging_snapshots_dir)
+                .map_err(E::ArchiveSnapshotsDir)?;
+
+            let storages_orderer = AccountStoragesOrderer::with_small_to_large_ratio(
+                snapshot_storages,
+                INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO,
+            );
+            for storage in storages_orderer.iter() {
+                let path_in_archive = Path::new(ACCOUNTS_DIR)
+                    .join(AccountsFile::file_name(storage.slot(), storage.id()));
+
+                let reader =
+                    AccountStorageReader::new(storage, Some(snapshot_slot)).map_err(|err| {
+                        E::AccountStorageReaderError(err, storage.path().to_path_buf())
+                    })?;
+                let mut header = tar::Header::new_gnu();
+                header.set_path(path_in_archive).map_err(|err| {
+                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
+                })?;
+                header.set_size(reader.len() as u64);
+                header.set_cksum();
+                archive.append(&header, reader).map_err(|err| {
+                    E::ArchiveAccountStorageFile(err, storage.path().to_path_buf())
+                })?;
+            }
+
+            archive.into_inner().map_err(E::FinishArchive)?;
+            Ok(())
+        };
+
+        match archive_format {
+            ArchiveFormat::TarZstd { config } => {
+                let mut encoder =
+                    zstd::stream::Encoder::new(archive_file, config.compression_level)
+                        .map_err(E::CreateEncoder)?;
+                do_archive_files(&mut encoder)?;
+                encoder.finish().map_err(E::FinishEncoder)?;
+            }
+            ArchiveFormat::TarLz4 => {
+                let mut encoder = lz4::EncoderBuilder::new()
+                    .level(1)
+                    .build(archive_file)
+                    .map_err(E::CreateEncoder)?;
+                do_archive_files(&mut encoder)?;
+                let (_output, result) = encoder.finish();
+                result.map_err(E::FinishEncoder)?;
+            }
+        };
+    }
+
+    // Atomically move the archive into position for other validators to find
+    let metadata = fs::metadata(&staging_archive_path)
+        .map_err(|err| E::QueryArchiveMetadata(err, staging_archive_path.clone()))?;
+    let archive_path = archive_path.as_ref().to_path_buf();
+    fs::rename(&staging_archive_path, &archive_path)
+        .map_err(|err| E::MoveArchive(err, staging_archive_path, archive_path.clone()))?;
+
+    timer.stop();
+    info!(
+        "Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
+        archive_path.display(),
+        snapshot_slot,
+        timer.as_ms(),
+        metadata.len()
+    );
+
+    datapoint_info!(
+        "archive-snapshot-package",
+        ("slot", snapshot_slot, i64),
+        ("archive_format", archive_format.to_string(), String),
+        ("duration_ms", timer.as_ms(), i64),
+        (
+            if snapshot_kind.is_full_snapshot() {
+                "full-snapshot-archive-size"
+            } else {
+                "incremental-snapshot-archive-size"
+            },
+            metadata.len(),
+            i64
+        ),
+    );
+    Ok(SnapshotArchiveInfo {
+        path: archive_path,
+        slot: snapshot_slot,
+        hash: snapshot_hash,
+        archive_format,
+    })
+}

+ 2 - 0
snapshots/src/lib.rs

@@ -8,6 +8,7 @@
     )
 )]
 
+mod archive;
 mod archive_format;
 pub mod error;
 pub mod hardened_unpack;
@@ -23,6 +24,7 @@ mod unarchive;
 pub type Result<T> = std::result::Result<T, error::SnapshotError>;
 
 pub use {
+    archive::archive_snapshot,
     archive_format::*,
     kind::SnapshotKind,
     snapshot_interval::SnapshotInterval,