|
|
@@ -19,7 +19,9 @@ use {
|
|
|
get_slot_and_append_vec_id, SnapshotStorageRebuilder,
|
|
|
},
|
|
|
},
|
|
|
- agave_snapshots::{ArchiveFormat, ArchiveFormatDecompressor, SnapshotVersion},
|
|
|
+ agave_snapshots::{
|
|
|
+ hardened_unpack::UnpackError, streaming_unarchive_snapshot, ArchiveFormat, SnapshotVersion,
|
|
|
+ },
|
|
|
crossbeam_channel::{Receiver, Sender},
|
|
|
log::*,
|
|
|
regex::Regex,
|
|
|
@@ -31,7 +33,6 @@ use {
|
|
|
AccountStorageEntry, AccountsDbConfig, AccountsFileId, AtomicAccountsFileId,
|
|
|
},
|
|
|
accounts_file::{AccountsFile, AccountsFileError, StorageAccess},
|
|
|
- hardened_unpack::{self, UnpackError},
|
|
|
utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
|
|
|
},
|
|
|
solana_clock::{Epoch, Slot},
|
|
|
@@ -41,7 +42,7 @@ use {
|
|
|
cmp::Ordering,
|
|
|
collections::{HashMap, HashSet},
|
|
|
fs,
|
|
|
- io::{self, BufRead, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
|
|
|
+ io::{self, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
|
|
|
mem,
|
|
|
num::{NonZeroU64, NonZeroUsize},
|
|
|
ops::RangeInclusive,
|
|
|
@@ -49,9 +50,8 @@ use {
|
|
|
process::ExitStatus,
|
|
|
str::FromStr,
|
|
|
sync::{Arc, LazyLock},
|
|
|
- thread::{Builder, JoinHandle},
|
|
|
},
|
|
|
- tar::{self, Archive},
|
|
|
+ tar,
|
|
|
tempfile::TempDir,
|
|
|
thiserror::Error,
|
|
|
};
|
|
|
@@ -94,9 +94,6 @@ pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str =
|
|
|
r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar\.zst|tar\.lz4)$";
|
|
|
pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar\.zst|tar\.lz4)$";
|
|
|
|
|
|
-// Allows scheduling a large number of reads such that temporary disk access delays
|
|
|
-// shouldn't block decompression (unless read bandwidth is saturated).
|
|
|
-const MAX_SNAPSHOT_READER_BUF_SIZE: u64 = 128 * 1024 * 1024;
|
|
|
// Balance large and small files order in snapshot tar with bias towards small (4 small + 1 large),
|
|
|
// such that during unpacking large writes are mixed with file metadata operations
|
|
|
// and towards the end of archive (sizes equalize) writes are >256KiB / file.
|
|
|
@@ -1688,51 +1685,6 @@ pub fn verify_and_unarchive_snapshots(
|
|
|
))
|
|
|
}
|
|
|
|
|
|
-/// Streams unpacked files across channel
|
|
|
-fn streaming_unarchive_snapshot(
|
|
|
- file_sender: Sender<PathBuf>,
|
|
|
- account_paths: Vec<PathBuf>,
|
|
|
- ledger_dir: PathBuf,
|
|
|
- snapshot_archive_path: PathBuf,
|
|
|
- archive_format: ArchiveFormat,
|
|
|
- memlock_budget_size: usize,
|
|
|
-) -> JoinHandle<Result<()>> {
|
|
|
- Builder::new()
|
|
|
- .name("solTarUnpack".to_string())
|
|
|
- .spawn(move || {
|
|
|
- let archive_size = fs::metadata(&snapshot_archive_path)?.len() as usize;
|
|
|
- let read_write_budget_size = (memlock_budget_size / 2).min(archive_size);
|
|
|
- let read_buf_size = MAX_SNAPSHOT_READER_BUF_SIZE.min(read_write_budget_size as u64);
|
|
|
- let decompressor =
|
|
|
- decompressed_tar_reader(archive_format, snapshot_archive_path, read_buf_size)?;
|
|
|
- hardened_unpack::streaming_unpack_snapshot(
|
|
|
- Archive::new(decompressor),
|
|
|
- read_write_budget_size,
|
|
|
- ledger_dir.as_path(),
|
|
|
- &account_paths,
|
|
|
- &file_sender,
|
|
|
- )?;
|
|
|
- Ok(())
|
|
|
- })
|
|
|
- .unwrap()
|
|
|
-}
|
|
|
-
|
|
|
-fn decompressed_tar_reader(
|
|
|
- archive_format: ArchiveFormat,
|
|
|
- archive_path: impl AsRef<Path>,
|
|
|
- buf_size: u64,
|
|
|
-) -> Result<ArchiveFormatDecompressor<impl BufRead>> {
|
|
|
- let buf_reader =
|
|
|
- solana_accounts_db::large_file_buf_reader(archive_path.as_ref(), buf_size as usize)
|
|
|
- .map_err(|err| {
|
|
|
- io::Error::other(format!(
|
|
|
- "failed to open snapshot archive '{}': {err}",
|
|
|
- archive_path.as_ref().display(),
|
|
|
- ))
|
|
|
- })?;
|
|
|
- Ok(ArchiveFormatDecompressor::new(archive_format, buf_reader)?)
|
|
|
-}
|
|
|
-
|
|
|
/// Used to determine if a filename is structured like a version file, bank file, or storage file
|
|
|
#[derive(PartialEq, Debug)]
|
|
|
enum SnapshotFileKind {
|