Browse Source

Factor out setting up FileCreator to agave-snaphsots::unarchive (#8718)

Kamil Skalski 3 weeks ago
parent
commit
ffe2a12c55
2 changed files with 59 additions and 56 deletions
  1. 27 52
      snapshots/src/hardened_unpack.rs
  2. 32 4
      snapshots/src/unarchive.rs

+ 27 - 52
snapshots/src/hardened_unpack.rs

@@ -1,6 +1,5 @@
 use {
     agave_fs::file_io::{self, FileCreator},
-    crossbeam_channel::Sender,
     log::*,
     rand::{thread_rng, Rng},
     solana_genesis_config::DEFAULT_GENESIS_FILE,
@@ -46,12 +45,6 @@ const MAX_SNAPSHOT_ARCHIVE_UNPACKED_ACTUAL_SIZE: u64 = 4 * 1024 * 1024 * 1024 *
 const MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT: u64 = 5_000_000;
 const MAX_GENESIS_ARCHIVE_UNPACKED_COUNT: u64 = 100;
 
-// The buffer should be large enough to saturate write I/O bandwidth, while also accommodating:
-// - Many small files: each file consumes at least one write-capacity-sized chunk (0.5-1 MiB).
-// - Large files: their data may accumulate in backlog buffers while waiting for file open
-//   operations to complete.
-const MAX_UNPACK_WRITE_BUF_SIZE: usize = 512 * 1024 * 1024;
-
 fn checked_total_size_sum(total_size: u64, entry_size: u64, limit_size: u64) -> Result<u64> {
     trace!("checked_total_size_sum: {total_size} + {entry_size} < {limit_size}");
     let total_size = total_size.saturating_add(entry_size);
@@ -91,18 +84,16 @@ enum UnpackPath<'a> {
 }
 
 #[allow(clippy::arithmetic_side_effects)]
-fn unpack_archive<'a, C, D>(
+fn unpack_archive<'a, C>(
     input: impl Read,
-    memlock_budget_size: usize,
+    mut file_creator: Box<dyn FileCreator>,
     apparent_limit_size: u64,
     actual_limit_size: u64,
     limit_count: u64,
-    mut entry_checker: C,   // checks if entry is valid
-    file_path_processor: D, // processes file paths after writing
+    mut entry_checker: C, // checks if entry is valid
 ) -> Result<()>
 where
     C: FnMut(&[&str], tar::EntryType) -> UnpackPath<'a>,
-    D: FnMut(PathBuf),
 {
     let mut apparent_total_size: u64 = 0;
     let mut actual_total_size: u64 = 0;
@@ -111,12 +102,6 @@ where
     let mut total_entries = 0;
     let mut open_dirs = Vec::new();
 
-    // Bound the buffer based on provided limit of unpacked data and input archive size
-    // (decompression multiplies content size, but buffering more than origin isn't necessary).
-    let buf_size =
-        (memlock_budget_size.min(actual_limit_size as usize)).min(MAX_UNPACK_WRITE_BUF_SIZE);
-    let mut files_creator = file_io::file_creator(buf_size, file_path_processor)?;
-
     let mut archive = Archive::new(input);
     for entry in archive.entries()? {
         let entry = entry?;
@@ -190,12 +175,12 @@ where
             continue; // skip it
         };
 
-        let unpack = unpack_entry(&mut files_creator, entry, entry_path, open_dir);
+        let unpack = unpack_entry(&mut file_creator, entry, entry_path, open_dir);
         check_unpack_result(unpack, path_str)?;
 
         total_entries += 1;
     }
-    files_creator.drain()?;
+    file_creator.drain()?;
 
     info!("unpacked {total_entries} entries total");
     Ok(())
@@ -327,46 +312,28 @@ fn validate_inside_dst(dst: &Path, file_dst: &Path) -> Result<PathBuf> {
 /// sends entry file paths through the `sender` channel
 pub(super) fn streaming_unpack_snapshot(
     input: impl Read,
-    memlock_budget_size: usize,
+    file_creator: Box<dyn FileCreator>,
     ledger_dir: &Path,
     account_paths: &[PathBuf],
-    sender: &Sender<PathBuf>,
 ) -> Result<()> {
-    unpack_snapshot_with_processors(
-        input,
-        memlock_budget_size,
-        ledger_dir,
-        account_paths,
-        |_, _| {},
-        |file_path| {
-            let result = sender.send(file_path);
-            if let Err(err) = result {
-                panic!(
-                    "failed to send path '{}' from unpacker to rebuilder: {err}",
-                    err.0.display(),
-                );
-            }
-        },
-    )
+    unpack_snapshot_with_processors(input, file_creator, ledger_dir, account_paths, |_, _| {})
 }
 
-fn unpack_snapshot_with_processors<F, G>(
+fn unpack_snapshot_with_processors<F>(
     input: impl Read,
-    memlock_budget_size: usize,
+    file_creator: Box<dyn FileCreator>,
     ledger_dir: &Path,
     account_paths: &[PathBuf],
     mut accounts_path_processor: F,
-    file_path_processor: G,
 ) -> Result<()>
 where
     F: FnMut(&str, &Path),
-    G: FnMut(PathBuf),
 {
     assert!(!account_paths.is_empty());
 
     unpack_archive(
         input,
-        memlock_budget_size,
+        file_creator,
         MAX_SNAPSHOT_ARCHIVE_UNPACKED_APPARENT_SIZE,
         MAX_SNAPSHOT_ARCHIVE_UNPACKED_ACTUAL_SIZE,
         MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT,
@@ -392,7 +359,6 @@ where
                 UnpackPath::Invalid
             }
         },
-        file_path_processor,
     )
 }
 
@@ -448,17 +414,17 @@ fn is_valid_snapshot_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool
 
 pub(super) fn unpack_genesis(
     input: impl Read,
+    file_creator: Box<dyn FileCreator>,
     unpack_dir: &Path,
     max_genesis_archive_unpacked_size: u64,
 ) -> Result<()> {
     unpack_archive(
         input,
-        0, /* don't provide memlock budget (forces sync IO), since genesis archives are small */
+        file_creator,
         max_genesis_archive_unpacked_size,
         max_genesis_archive_unpacked_size,
         MAX_GENESIS_ARCHIVE_UNPACKED_COUNT,
         |p, k| is_valid_genesis_archive_entry(unpack_dir, p, k),
-        |_| {},
     )
 }
 
@@ -486,6 +452,7 @@ fn is_valid_genesis_archive_entry<'a>(
 mod tests {
     use {
         super::*,
+        agave_fs::file_io::file_creator,
         assert_matches::assert_matches,
         std::io::BufReader,
         tar::{Builder, Header},
@@ -717,7 +684,7 @@ mod tests {
 
     fn with_finalize_and_unpack<C>(archive: tar::Builder<Vec<u8>>, checker: C) -> Result<()>
     where
-        C: Fn(&[u8], &Path) -> Result<()>,
+        C: FnOnce(&[u8], &Path) -> Result<()>,
     {
         let data = archive.into_inner().unwrap();
         let temp_dir = tempfile::TempDir::new().unwrap();
@@ -730,14 +697,18 @@ mod tests {
     }
 
     fn finalize_and_unpack_snapshot(archive: tar::Builder<Vec<u8>>) -> Result<()> {
-        with_finalize_and_unpack(archive, |a, b| {
-            unpack_snapshot_with_processors(a, 256, b, &[PathBuf::new()], |_, _| {}, |_| {})
+        let file_creator = file_creator(256, |_| {})?;
+        with_finalize_and_unpack(archive, move |a, b| {
+            unpack_snapshot_with_processors(a, file_creator, b, &[PathBuf::new()], |_, _| {})
                 .map(|_| ())
         })
     }
 
     fn finalize_and_unpack_genesis(archive: tar::Builder<Vec<u8>>) -> Result<()> {
-        with_finalize_and_unpack(archive, |a, b| unpack_genesis(a, b, 1024))
+        let file_creator = file_creator(0, |_| {})?;
+        with_finalize_and_unpack(archive, move |a, b| {
+            unpack_genesis(a, file_creator, b, 1024)
+        })
     }
 
     #[test]
@@ -985,13 +956,17 @@ mod tests {
         let mut archive = Builder::new(Vec::new());
         archive.append(&header, data).unwrap();
         let result = with_finalize_and_unpack(archive, |ar, tmp| {
+            let tmp_path_buf = tmp.to_path_buf();
+            let file_creator = file_creator(256, move |path| {
+                assert_eq!(path, tmp_path_buf.join("accounts_dest/123.456"))
+            })
+            .expect("must make file_creator");
             unpack_snapshot_with_processors(
                 ar,
-                256,
+                file_creator,
                 tmp,
                 &[tmp.join("accounts_dest")],
                 |_, _| {},
-                |path| assert_eq!(path, tmp.join("accounts_dest/123.456")),
             )
         });
         assert_matches!(result, Ok(()));

+ 32 - 4
snapshots/src/unarchive.rs

@@ -3,7 +3,7 @@ use {
         hardened_unpack::{self, UnpackError},
         ArchiveFormat, ArchiveFormatDecompressor,
     },
-    agave_fs::buffered_reader,
+    agave_fs::{buffered_reader, file_io::file_creator},
     bzip2::bufread::BzDecoder,
     crossbeam_channel::Sender,
     std::{
@@ -18,6 +18,11 @@ use {
 // Allows scheduling a large number of reads such that temporary disk access delays
 // shouldn't block decompression (unless read bandwidth is saturated).
 const MAX_SNAPSHOT_READER_BUF_SIZE: u64 = 128 * 1024 * 1024;
+// The buffer should be large enough to saturate write I/O bandwidth, while also accommodating:
+// - Many small files: each file consumes at least one write-capacity-sized chunk (0.5-1 MiB).
+// - Large files: their data may accumulate in backlog buffers while waiting for file open
+//   operations to complete.
+const MAX_UNPACK_WRITE_BUF_SIZE: usize = 512 * 1024 * 1024;
 
 /// Streams unpacked files across channel
 pub fn streaming_unarchive_snapshot(
@@ -30,15 +35,29 @@ pub fn streaming_unarchive_snapshot(
 ) -> JoinHandle<Result<(), UnpackError>> {
     let do_unpack = move |archive_path: &Path| {
         let archive_size = fs::metadata(archive_path)?.len() as usize;
+        // Bound the buffer based on available memlock budget (reader and writer might use it to
+        // register buffer in kernel) and input archive size (decompression multiplies content size,
+        // but buffering more than origin isn't necessary).
         let read_write_budget_size = (memlock_budget_size / 2).min(archive_size);
         let read_buf_size = MAX_SNAPSHOT_READER_BUF_SIZE.min(read_write_budget_size as u64);
         let decompressor = decompressed_tar_reader(archive_format, archive_path, read_buf_size)?;
+
+        let write_buf_size = MAX_UNPACK_WRITE_BUF_SIZE.min(read_write_budget_size);
+        let file_creator = file_creator(write_buf_size, move |file_path| {
+            let result = file_sender.send(file_path);
+            if let Err(err) = result {
+                panic!(
+                    "failed to send path '{}' from unpacker to rebuilder: {err}",
+                    err.0.display(),
+                );
+            }
+        })?;
+
         hardened_unpack::streaming_unpack_snapshot(
             decompressor,
-            read_write_budget_size,
+            file_creator,
             ledger_dir.as_path(),
             &account_paths,
-            &file_sender,
         )
     };
 
@@ -62,7 +81,16 @@ pub fn unpack_genesis_archive(
     fs::create_dir_all(destination_dir)?;
     let tar_bz2 = fs::File::open(archive_filename)?;
     let tar = BzDecoder::new(BufReader::new(tar_bz2));
-    hardened_unpack::unpack_genesis(tar, destination_dir, max_genesis_archive_unpacked_size)?;
+    let file_creator = file_creator(
+        0, /* don't provide memlock budget (forces sync IO), since genesis archives are small */
+        |_| {},
+    )?;
+    hardened_unpack::unpack_genesis(
+        tar,
+        file_creator,
+        destination_dir,
+        max_genesis_archive_unpacked_size,
+    )?;
     log::info!(
         "Extracted {:?} in {:?}",
         archive_filename,