Răsfoiți Sursa

Use io_uring for creating files when unpacking snapshot (#6671)

* [breaking change] memlock limit of around 800MiB is now hard requirement for starting validator (when it does snapshot unpacking) - added entry in the Changelog
* Introduce IoUringFileCreator (plus compatibility trait for non-linux platforms) and use it for creating files while unpacking snapshot.
* Remove ArchiveChunker and perform whole unpacking in single thread - all IO is done in background kernel threads (with io_uring) and unless we run out of disk write bandwidth this thread will spend time on decompression.
* Change entry_processor into file_path_processor and only execute it for files (such that is_file() call can be avoided in the only non-trivial call site that is filtering for files)
* Rafactor auxiliary code shared by io_uring sequential file reader and file creator
Kamil Skalski 3 luni în urmă
părinte
comite
6c80057133

+ 1 - 1
CHANGELOG.md

@@ -27,6 +27,7 @@ Release channels have their own copy of this changelog:
 ### Validator
 
 #### Breaking
+* Require increased `memlock` limits - recommended setting is `LimitMEMLOCK=2000000000` in systemd service configuration. Lack of sufficient limit (on Linux) will cause startup error.
 * Remove deprecated arguments
   * `--accounts-index-memory-limit-mb`
   * `--accountsdb-repl-bind-address`, `--accountsdb-repl-port`, `--accountsdb-repl-threads`, `--enable-accountsdb-repl`
@@ -41,7 +42,6 @@ Release channels have their own copy of this changelog:
 * Using `--snapshot-interval-slots 0` to disable generating snapshots has been removed. Use `--no-snapshots` instead.
 
 #### Changes
-* Reading snapshot archives requires increased `memlock` limits - recommended setting is `LimitMEMLOCK=2000000000` in systemd service configuration. Lack of sufficient limit will result slower startup times.
 * `--transaction-structure view` is now the default.
 * The default full snapshot interval is now 100,000 slots.
 

+ 0 - 1
Cargo.lock

@@ -6868,7 +6868,6 @@ dependencies = [
  "solana-message",
  "solana-metrics",
  "solana-nohash-hasher",
- "solana-perf",
  "solana-pubkey",
  "solana-rayon-threadlimit",
  "solana-rent",

+ 0 - 1
accounts-db/Cargo.toml

@@ -85,7 +85,6 @@ solana-measure = { workspace = true }
 solana-message = { workspace = true }
 solana-metrics = { workspace = true }
 solana-nohash-hasher = { workspace = true }
-solana-perf = { workspace = true }
 solana-pubkey = { workspace = true }
 solana-rayon-threadlimit = { workspace = true }
 solana-rent = { workspace = true, optional = true }

+ 3 - 7
accounts-db/src/buffered_reader.rs

@@ -375,13 +375,9 @@ pub fn large_file_buf_reader(
     if agave_io_uring::io_uring_supported() {
         use crate::io_uring::sequential_file_reader::SequentialFileReader;
 
-        let io_uring_reader = SequentialFileReader::with_capacity(buf_size, &path);
-        match io_uring_reader {
-            Ok(reader) => return Ok(Box::new(reader)),
-            Err(error) => {
-                log::warn!("unable to create io_uring reader: {error}");
-            }
-        }
+        return Ok(Box::new(SequentialFileReader::with_capacity(
+            buf_size, path,
+        )?));
     }
     let file = File::open(path)?;
     Ok(Box::new(BufReader::with_capacity(buf_size, file)))

+ 168 - 3
accounts-db/src/file_io.rs

@@ -1,5 +1,11 @@
 //! File i/o helper functions.
-use std::{fs::File, ops::Range};
+use std::{
+    fs::{File, OpenOptions},
+    io::{self, BufWriter, Write},
+    ops::Range,
+    path::PathBuf,
+    sync::Arc,
+};
 
 /// `buffer` contains `valid_bytes` of data at its end.
 /// Move those valid bytes to the beginning of `buffer`, then read from `offset` to fill the rest of `buffer`.
@@ -83,10 +89,105 @@ pub fn read_into_buffer(
     Ok(total_bytes_read)
 }
 
+/// An asynchronous queue for file creation.
+pub trait FileCreator {
+    /// Schedule creating a file at `path` with `mode` permissions and bytes read from `contents`.
+    ///
+    /// `parent_dir_handle` is assumed to be a parent directory of `path` such that file may be
+    /// created using optimized kernel API to create `path.file_name()` inside `parent_dir_handle`.
+    fn schedule_create_at_dir(
+        &mut self,
+        path: PathBuf,
+        mode: u32,
+        parent_dir_handle: Arc<File>,
+        contents: &mut dyn io::Read,
+    ) -> io::Result<()>;
+
+    /// Invoke implementation specific logic to handle file creation completion.
+    fn file_complete(&mut self, path: PathBuf);
+
+    /// Waits for all operations to be completed
+    fn drain(&mut self) -> io::Result<()>;
+}
+
+pub fn file_creator<'a>(
+    buf_size: usize,
+    file_complete: impl FnMut(PathBuf) + 'a,
+) -> io::Result<Box<dyn FileCreator + 'a>> {
+    #[cfg(target_os = "linux")]
+    if agave_io_uring::io_uring_supported() {
+        use crate::io_uring::file_creator::IoUringFileCreator;
+
+        let io_uring_creator = IoUringFileCreator::with_buffer_capacity(buf_size, file_complete)?;
+        return Ok(Box::new(io_uring_creator));
+    }
+    Ok(Box::new(SyncIoFileCreator::new(buf_size, file_complete)))
+}
+
+pub struct SyncIoFileCreator<'a> {
+    file_complete: Box<dyn FnMut(PathBuf) + 'a>,
+}
+
+impl<'a> SyncIoFileCreator<'a> {
+    fn new(_buf_size: usize, file_complete: impl FnMut(PathBuf) + 'a) -> Self {
+        Self {
+            file_complete: Box::new(file_complete),
+        }
+    }
+}
+
+#[cfg(not(unix))]
+pub(super) fn set_file_readonly(path: &std::path::Path, readonly: bool) -> io::Result<()> {
+    let mut perm = std::fs::metadata(path)?.permissions();
+    perm.set_readonly(readonly);
+    std::fs::set_permissions(path, perm)
+}
+
+impl FileCreator for SyncIoFileCreator<'_> {
+    fn schedule_create_at_dir(
+        &mut self,
+        path: PathBuf,
+        mode: u32,
+        _parent_dir_handle: Arc<File>,
+        contents: &mut dyn io::Read,
+    ) -> io::Result<()> {
+        // Open for writing (also allows overwrite) and apply `mode`
+        let mut options = OpenOptions::new();
+        options.create(true).truncate(true).write(true);
+
+        #[cfg(unix)]
+        std::os::unix::fs::OpenOptionsExt::mode(&mut options, mode);
+
+        let mut file_buf = BufWriter::new(options.open(&path)?);
+        io::copy(contents, &mut file_buf)?;
+        file_buf.flush()?;
+
+        #[cfg(not(unix))]
+        set_file_readonly(&path, mode & 0o200 == 0)?;
+
+        self.file_complete(path);
+        Ok(())
+    }
+
+    fn file_complete(&mut self, path: PathBuf) {
+        (self.file_complete)(path)
+    }
+
+    fn drain(&mut self) -> io::Result<()> {
+        Ok(())
+    }
+}
+
 #[cfg(test)]
 mod tests {
-
-    use {super::*, std::io::Write, tempfile::tempfile};
+    use {
+        super::*,
+        std::{
+            fs,
+            io::{Cursor, Write},
+        },
+        tempfile::tempfile,
+    };
 
     #[test]
     fn test_read_into_buffer() {
@@ -193,4 +294,68 @@ mod tests {
             bytes[start_offset..file_size]
         );
     }
+
+    fn read_file_to_string(path: &PathBuf) -> String {
+        String::from_utf8(fs::read(path).expect("Failed to read file"))
+            .expect("Failed to decode file contents")
+    }
+
+    #[test]
+    fn test_create_writes_contents() -> io::Result<()> {
+        let temp_dir = tempfile::tempdir()?;
+        let file_path = temp_dir.path().join("test.txt");
+        let contents = "Hello, world!";
+
+        // Shared state to capture callback invocations
+        let mut callback_invoked_path = None;
+
+        // Instantiate FileCreator
+        let mut creator = file_creator(2 << 20, |path| {
+            callback_invoked_path.replace(path);
+        })?;
+
+        let dir = Arc::new(File::open(temp_dir.path())?);
+        creator.schedule_create_at_dir(
+            file_path.clone(),
+            0o644,
+            dir,
+            &mut Cursor::new(contents),
+        )?;
+        creator.drain()?;
+        drop(creator);
+
+        assert_eq!(read_file_to_string(&file_path), contents);
+        assert_eq!(callback_invoked_path, Some(file_path));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_multiple_file_creations() -> io::Result<()> {
+        let temp_dir = tempfile::tempdir()?;
+        let mut callback_counter = 0;
+
+        let mut creator = file_creator(2 << 20, |path: PathBuf| {
+            let contents = read_file_to_string(&path);
+            assert!(contents.starts_with("File "));
+            callback_counter += 1;
+        })?;
+
+        let dir = Arc::new(File::open(temp_dir.path())?);
+        for i in 0..5 {
+            let file_path = temp_dir.path().join(format!("file_{i}.txt"));
+            let data = format!("File {i}");
+            creator.schedule_create_at_dir(
+                file_path,
+                0o600,
+                dir.clone(),
+                &mut Cursor::new(data),
+            )?;
+        }
+        creator.drain()?;
+        drop(creator);
+
+        assert_eq!(callback_counter, 5);
+        Ok(())
+    }
 }

+ 114 - 288
accounts-db/src/hardened_unpack.rs

@@ -1,17 +1,19 @@
 use {
+    crate::file_io::{file_creator, FileCreator},
     bzip2::bufread::BzDecoder,
+    crossbeam_channel::Sender,
     log::*,
     rand::{thread_rng, Rng},
     solana_genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE},
-    solana_perf::packet::bytes::{Buf, Bytes, BytesMut},
     std::{
-        collections::{HashMap, VecDeque},
+        collections::HashMap,
         fs::{self, File},
         io::{self, BufReader, Read},
         path::{
             Component::{self, CurDir, Normal},
             Path, PathBuf,
         },
+        sync::Arc,
         time::Instant,
     },
     tar::{
@@ -46,225 +48,13 @@ const MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT: u64 = 5_000_000;
 pub const MAX_GENESIS_ARCHIVE_UNPACKED_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
 const MAX_GENESIS_ARCHIVE_UNPACKED_COUNT: u64 = 100;
 
-/// Collection of shareable byte slices forming a chain of bytes to read (using `std::io::Read`)
-pub struct MultiBytes(VecDeque<Bytes>);
-
-impl MultiBytes {
-    pub fn new() -> Self {
-        // Typically we expect 2 entries:
-        // archive spanning until end of decode buffer +
-        // short continuation of last entry from next buffer
-        Self(VecDeque::with_capacity(2))
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.0.is_empty()
-    }
-
-    pub fn push(&mut self, bytes: Bytes) {
-        self.0.push_back(bytes);
-    }
-}
-
-impl Default for MultiBytes {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl Read for MultiBytes {
-    fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
-        let mut copied_len = 0;
-        while let Some(bytes) = self.0.front_mut() {
-            let to_copy_len = bytes.len().min(buf.len());
-            let (to_copy_dst_buf, remaining_buf) = buf.split_at_mut(to_copy_len);
-            bytes.copy_to_slice(to_copy_dst_buf);
-            copied_len += to_copy_len;
-            if bytes.is_empty() {
-                self.0.pop_front();
-            }
-            if remaining_buf.is_empty() {
-                break;
-            }
-            buf = remaining_buf;
-        }
-        Ok(copied_len)
-    }
-}
-
-pub struct BytesChannelReader {
-    current_bytes: MultiBytes,
-    receiver: crossbeam_channel::Receiver<MultiBytes>,
-}
-
-impl BytesChannelReader {
-    pub fn new(receiver: crossbeam_channel::Receiver<MultiBytes>) -> Self {
-        Self {
-            current_bytes: MultiBytes::new(),
-            receiver,
-        }
-    }
-}
-
-impl Read for BytesChannelReader {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        while self.current_bytes.is_empty() {
-            let Ok(new_bytes) = self.receiver.recv() else {
-                return Ok(0);
-            };
-            self.current_bytes = new_bytes;
-        }
-        self.current_bytes.read(buf)
-    }
-}
-
-#[derive(Debug)]
-pub struct ArchiveChunker<R> {
-    input: R,
-    /// Intermediate buffer with tar contents to seek and split on entry boundaries
-    current_decoded: Bytes,
-    /// Number of bytes from last entry that were not available in decoded buffer
-    num_started_entry_bytes: usize,
-    mempool: VecDeque<Bytes>,
-}
-
-impl<R: Read> ArchiveChunker<R> {
-    const TAR_BLOCK_SIZE: usize = size_of::<tar::Header>();
-    // Buffer size will influence typical amount of bytes sent as single work item.
-    // Pick value significantly larger than majority of entries, yet not too large to keep
-    // the work-queue non-empty as much as possible.
-    const DECODE_BUF_SIZE: usize = 64 * 1024 * 1024;
-
-    pub fn new(input: R) -> Self {
-        Self {
-            input,
-            current_decoded: Bytes::new(),
-            num_started_entry_bytes: 0,
-            mempool: VecDeque::new(),
-        }
-    }
-
-    /// Read `self.input`, split it at TAR archive boundaries and send chunks consisting
-    /// of complete, independent tar archives into `chunk_sender`.
-    pub fn decode_and_send_chunks(
-        mut self,
-        chunk_sender: crossbeam_channel::Sender<MultiBytes>,
-    ) -> io::Result<()> {
-        // Bytes for chunk of archive to be sent to workers for unpacking
-        let mut current_chunk = MultiBytes::new();
-        while self.refill_decoded_buf()? {
-            let (new_bytes, was_archive_completion) = if self.has_started_entry() {
-                let started_entry_bytes = self.take_started_entry_bytes();
-                let did_finish_entry = !self.has_started_entry();
-                (started_entry_bytes, did_finish_entry)
-            } else {
-                (self.take_complete_archive()?, true)
-            };
-            if !new_bytes.is_empty() {
-                current_chunk.push(new_bytes);
-                if was_archive_completion {
-                    let chunk = std::mem::take(&mut current_chunk);
-                    if chunk_sender.send(chunk).is_err() {
-                        break;
-                    }
-                }
-            }
-        }
-        Ok(())
-    }
-
-    /// Take as many bytes as possible from decoded data until last entry boundary.
-    fn take_complete_archive(&mut self) -> io::Result<Bytes> {
-        let mut archive = Archive::new(self.current_decoded.as_ref());
-
-        let mut completed_entry_end = 0;
-        let mut entry_end = 0;
-        for entry in archive.entries()? {
-            let entry = entry?;
-            // End of file data
-            assert_ne!(tar::EntryType::GNUSparse, entry.header().entry_type());
-            entry_end = (entry.raw_file_position() + entry.size()) as usize;
-
-            // Padding to block size
-            entry_end = Self::TAR_BLOCK_SIZE * entry_end.div_ceil(Self::TAR_BLOCK_SIZE);
-            if entry_end <= self.current_decoded.len() {
-                // Entry ends within decoded input, we can consume it
-                completed_entry_end = entry_end;
-            }
-            if entry_end + Self::TAR_BLOCK_SIZE > self.current_decoded.len() {
-                // Next entry's header spans beyond input - can't decode it,
-                // so terminate at last completed entry and keep remaining input after it
-                break;
-            }
-        }
-        // Either we run out of entries or last entry crosses input
-        let completed_entry = self.current_decoded.split_to(completed_entry_end);
-        if completed_entry.is_empty() && entry_end == completed_entry_end {
-            // Archive ended, clear any tar footer from remaining input
-            assert!(
-                self.current_decoded.len() <= 1024,
-                "Footer should be at most 1024 len"
-            );
-            self.current_decoded.clear();
-        }
-        self.num_started_entry_bytes = entry_end - completed_entry_end;
-        Ok(completed_entry)
-    }
-
-    fn has_started_entry(&self) -> bool {
-        self.num_started_entry_bytes > 0
-    }
-
-    fn take_started_entry_bytes(&mut self) -> Bytes {
-        let num_bytes = self.num_started_entry_bytes.min(self.current_decoded.len());
-        self.num_started_entry_bytes -= num_bytes;
-        self.current_decoded.split_to(num_bytes)
-    }
-
-    /// Re-fill decoded buffer such that it has minimum bytes to decode TAR header.
-    ///
-    /// Return `false` on EOF
-    fn refill_decoded_buf(&mut self) -> io::Result<bool> {
-        if self.current_decoded.len() < Self::TAR_BLOCK_SIZE {
-            let mut next_buffer = self.get_next_buffer();
-            if !self.current_decoded.is_empty() {
-                next_buffer.extend_from_slice(&self.current_decoded);
-            }
-            self.current_decoded = self.decode_bytes(next_buffer)?;
-        }
-        Ok(!self.current_decoded.is_empty())
-    }
-
-    /// Acquire memory buffer for decoding input reusing already consumed chunks.
-    fn get_next_buffer(&mut self) -> BytesMut {
-        if self.mempool.front().is_some_and(Bytes::is_unique) {
-            let mut reclaimed: BytesMut = self.mempool.pop_front().unwrap().into();
-            reclaimed.clear();
-            reclaimed
-        } else {
-            BytesMut::with_capacity(Self::DECODE_BUF_SIZE)
-        }
-    }
-
-    /// Fill `decode_buf` with data from `self.input`.
-    fn decode_bytes(&mut self, mut decode_buf: BytesMut) -> io::Result<Bytes> {
-        let mut_slice = unsafe {
-            std::slice::from_raw_parts_mut(decode_buf.as_mut_ptr(), decode_buf.capacity())
-        };
-        let mut current_len = decode_buf.len();
-        while current_len < decode_buf.capacity() {
-            let new_bytes = self.input.read(&mut mut_slice[current_len..])?;
-            if new_bytes == 0 {
-                break;
-            }
-            current_len += new_bytes;
-        }
-        unsafe { decode_buf.set_len(current_len) };
-        let bytes: Bytes = decode_buf.into();
-        self.mempool.push_back(bytes.clone());
-        Ok(bytes)
-    }
-}
+// The buffer should be large enough to saturate write I/O bandwidth, while also accommodating:
+// - Many small files: each file consumes at least one write-capacity-sized chunk (0.5-1 MiB).
+// - Large files: their data may accumulate in backlog buffers while waiting for file open
+//   operations to complete.
+const MAX_UNPACK_WRITE_BUF_SIZE: usize = 512 * 1024 * 1024;
+// Minimum for unpacking small archives - allows ~2-4 write-capacity-sized operations concurrently.
+const MIN_UNPACK_WRITE_BUF_SIZE: usize = 2 * 1024 * 1024;
 
 fn checked_total_size_sum(total_size: u64, entry_size: u64, limit_size: u64) -> Result<u64> {
     trace!("checked_total_size_sum: {total_size} + {entry_size} < {limit_size}");
@@ -287,9 +77,11 @@ fn checked_total_count_increment(total_count: u64, limit_count: u64) -> Result<u
     Ok(total_count)
 }
 
-fn check_unpack_result(unpack_result: bool, path: String) -> Result<()> {
-    if !unpack_result {
-        return Err(UnpackError::Archive(format!("failed to unpack: {path:?}")));
+fn check_unpack_result(unpack_result: Result<()>, path: String) -> Result<()> {
+    if let Err(err) = unpack_result {
+        return Err(UnpackError::Archive(format!(
+            "failed to unpack {path:?}: {err}"
+        )));
     }
     Ok(())
 }
@@ -306,23 +98,30 @@ fn unpack_archive<'a, A, C, D>(
     apparent_limit_size: u64,
     actual_limit_size: u64,
     limit_count: u64,
-    mut entry_checker: C, // checks if entry is valid
-    entry_processor: D,   // processes entry after setting permissions
+    mut entry_checker: C,   // checks if entry is valid
+    file_path_processor: D, // processes file paths after writing
 ) -> Result<()>
 where
     A: Read,
     C: FnMut(&[&str], tar::EntryType) -> UnpackPath<'a>,
-    D: Fn(PathBuf),
+    D: FnMut(PathBuf),
 {
     let mut apparent_total_size: u64 = 0;
     let mut actual_total_size: u64 = 0;
     let mut total_count: u64 = 0;
 
     let mut total_entries = 0;
-    let mut sanitized_paths_cache = Vec::new();
+    let mut open_dirs = Vec::new();
+
+    // Bound the buffer based on provided limit of unpacked data (buffering a fraction,
+    // e.g. 25%, of absolute maximum won't be necessary) - this works well for genesis,
+    // while normal case hit the UNPACK_WRITE_BUF_SIZE tuned for it prod snapshot archive.
+    let buf_size = (apparent_limit_size.div_ceil(4) as usize)
+        .clamp(MIN_UNPACK_WRITE_BUF_SIZE, MAX_UNPACK_WRITE_BUF_SIZE);
+    let mut files_creator = file_creator(buf_size, file_path_processor)?;
 
     for entry in archive.entries()? {
-        let mut entry = entry?;
+        let entry = entry?;
         let path = entry.path()?;
         let path_str = path.display().to_string();
 
@@ -385,30 +184,47 @@ where
             // account_paths returned by `entry_checker`. We want to unpack into
             // account_path/<account> instead of account_path/accounts/<account> so we strip the
             // accounts/ prefix.
-            sanitize_path(&account, unpack_dir, &mut sanitized_paths_cache)
+            sanitize_path_and_open_dir(&account, unpack_dir, &mut open_dirs)
         } else {
-            sanitize_path(&path, unpack_dir, &mut sanitized_paths_cache)
+            sanitize_path_and_open_dir(&path, unpack_dir, &mut open_dirs)
         }?; // ? handles file system errors
-        let Some(entry_path) = entry_path else {
+        let Some((entry_path, open_dir)) = entry_path else {
             continue; // skip it
         };
 
-        let unpack = entry.unpack(&entry_path);
-        check_unpack_result(unpack.map(|_unpack| true)?, path_str)?;
-
-        // Sanitize permissions.
-        let mode = match entry.header().entry_type() {
-            GNUSparse | Regular => 0o644,
-            _ => 0o755,
-        };
-        set_perms(&entry_path, mode)?;
-
-        // Process entry after setting permissions
-        entry_processor(entry_path);
+        let unpack = unpack_entry(&mut files_creator, entry, entry_path, open_dir);
+        check_unpack_result(unpack, path_str)?;
 
         total_entries += 1;
     }
+    files_creator.drain()?;
+
     info!("unpacked {total_entries} entries total");
+    Ok(())
+}
+
+fn unpack_entry<'a, R: Read>(
+    files_creator: &mut Box<dyn FileCreator + 'a>,
+    mut entry: tar::Entry<'_, R>,
+    dst: PathBuf,
+    dst_open_dir: Arc<File>,
+) -> Result<()> {
+    let mode = match entry.header().entry_type() {
+        GNUSparse | Regular => 0o644,
+        _ => 0o755,
+    };
+    if should_fallback_to_tar_unpack(&entry) {
+        entry.unpack(&dst)?;
+        // Sanitize permissions.
+        set_perms(&dst, mode)?;
+
+        if !entry.header().entry_type().is_dir() {
+            // Process file after setting permissions
+            files_creator.file_complete(dst);
+        }
+        return Ok(());
+    }
+    files_creator.schedule_create_at_dir(dst, mode, dst_open_dir, &mut entry)?;
 
     return Ok(());
 
@@ -422,28 +238,38 @@ where
 
     #[cfg(windows)]
     fn set_perms(dst: &Path, _mode: u32) -> io::Result<()> {
-        let mut perm = fs::metadata(dst)?.permissions();
-        // This is OK for Windows, but clippy doesn't realize we're doing this
-        // only on Windows.
-        #[allow(clippy::permissions_set_readonly_false)]
-        perm.set_readonly(false);
-        fs::set_permissions(dst, perm)
+        super::file_io::set_file_readonly(dst, false)
     }
 }
 
+fn should_fallback_to_tar_unpack<R: io::Read>(entry: &tar::Entry<'_, R>) -> bool {
+    // Follows cases that are handled as directory or in special way by tar-rs library,
+    // we want to handle just cases where the library would write plain files with entry's content.
+    matches!(
+        entry.header().entry_type(),
+        tar::EntryType::Directory
+            | tar::EntryType::Link
+            | tar::EntryType::Symlink
+            | tar::EntryType::XGlobalHeader
+            | tar::EntryType::XHeader
+            | tar::EntryType::GNULongName
+            | tar::EntryType::GNULongLink
+    ) || entry.header().as_ustar().is_none() && entry.path_bytes().ends_with(b"/")
+}
+
 // return Err on file system error
-// return Some(path) if path is good
+// return Some((path, open_dir)) if path is good
 // return None if we should skip this file
-fn sanitize_path(
+fn sanitize_path_and_open_dir(
     entry_path: &Path,
     dst: &Path,
-    cache: &mut Vec<(PathBuf, PathBuf)>,
-) -> Result<Option<PathBuf>> {
+    open_dirs: &mut Vec<(PathBuf, Arc<File>)>,
+) -> Result<Option<(PathBuf, Arc<File>)>> {
     // We cannot call unpack_in because it errors if we try to use 2 account paths.
     // So, this code is borrowed from unpack_in
     // ref: https://docs.rs/tar/*/tar/struct.Entry.html#method.unpack_in
     let mut file_dst = dst.to_path_buf();
-    const SKIP: Result<Option<PathBuf>> = Ok(None);
+    const SKIP: Result<Option<(PathBuf, Arc<File>)>> = Ok(None);
     {
         let path = entry_path;
         for part in path.components() {
@@ -475,19 +301,22 @@ fn sanitize_path(
         return SKIP;
     };
 
-    if let Err(insert_at) = cache.binary_search_by(|(dst_cached, parent_cached)| {
-        parent.cmp(parent_cached).then_with(|| dst.cmp(dst_cached))
-    }) {
-        fs::create_dir_all(parent)?;
+    let open_dst_dir = match open_dirs.binary_search_by(|(key, _)| parent.cmp(key)) {
+        Err(insert_at) => {
+            fs::create_dir_all(parent)?;
 
-        // Here we are different than untar_in. The code for tar::unpack_in internally calling unpack is a little different.
-        // ignore return value here
-        validate_inside_dst(dst, parent)?;
-        cache.insert(insert_at, (dst.to_path_buf(), parent.to_path_buf()));
-    }
-    let target = parent.join(entry_path.file_name().unwrap());
+            // Here we are different than untar_in. The code for tar::unpack_in internally calling unpack is a little different.
+            // ignore return value here
+            validate_inside_dst(dst, parent)?;
 
-    Ok(Some(target))
+            let opened_dir = Arc::new(File::open(parent)?);
+            open_dirs.insert(insert_at, (parent.to_path_buf(), opened_dir.clone()));
+            opened_dir
+        }
+        Ok(index) => open_dirs[index].1.clone(),
+    };
+
+    Ok(Some((file_dst, open_dst_dir)))
 }
 
 // copied from:
@@ -495,14 +324,10 @@ fn sanitize_path(
 fn validate_inside_dst(dst: &Path, file_dst: &Path) -> Result<PathBuf> {
     // Abort if target (canonical) parent is outside of `dst`
     let canon_parent = file_dst.canonicalize().map_err(|err| {
-        UnpackError::Archive(format!(
-            "{} while canonicalizing {}",
-            err,
-            file_dst.display()
-        ))
+        UnpackError::Archive(format!("{err} while canonicalizing {}", file_dst.display()))
     })?;
     let canon_target = dst.canonicalize().map_err(|err| {
-        UnpackError::Archive(format!("{} while canonicalizing {}", err, dst.display()))
+        UnpackError::Archive(format!("{err} while canonicalizing {}", dst.display()))
     })?;
     if !canon_parent.starts_with(&canon_target) {
         return Err(UnpackError::Archive(format!(
@@ -541,22 +366,20 @@ pub fn streaming_unpack_snapshot<A: Read>(
     archive: Archive<A>,
     ledger_dir: &Path,
     account_paths: &[PathBuf],
-    sender: &crossbeam_channel::Sender<PathBuf>,
+    sender: &Sender<PathBuf>,
 ) -> Result<()> {
     unpack_snapshot_with_processors(
         archive,
         ledger_dir,
         account_paths,
         |_, _| {},
-        |entry_path_buf| {
-            if entry_path_buf.is_file() {
-                let result = sender.send(entry_path_buf);
-                if let Err(err) = result {
-                    panic!(
-                        "failed to send path '{}' from unpacker to rebuilder: {err}",
-                        err.0.display(),
-                    );
-                }
+        |file_path| {
+            let result = sender.send(file_path);
+            if let Err(err) = result {
+                panic!(
+                    "failed to send path '{}' from unpacker to rebuilder: {err}",
+                    err.0.display(),
+                );
             }
         },
     )
@@ -567,12 +390,12 @@ fn unpack_snapshot_with_processors<A, F, G>(
     ledger_dir: &Path,
     account_paths: &[PathBuf],
     mut accounts_path_processor: F,
-    entry_processor: G,
+    file_path_processor: G,
 ) -> Result<()>
 where
     A: Read,
     F: FnMut(&str, &Path),
-    G: Fn(PathBuf),
+    G: FnMut(PathBuf),
 {
     assert!(!account_paths.is_empty());
 
@@ -603,7 +426,7 @@ where
                 UnpackPath::Invalid
             }
         },
-        entry_processor,
+        file_path_processor,
     )
 }
 
@@ -981,7 +804,7 @@ mod tests {
     {
         let data = archive.into_inner().unwrap();
         let reader = BufReader::new(&data[..]);
-        let archive: Archive<std::io::BufReader<&[u8]>> = Archive::new(reader);
+        let archive = Archive::new(reader);
         let temp_dir = tempfile::TempDir::new().unwrap();
 
         checker(archive, temp_dir.path())?;
@@ -1199,8 +1022,11 @@ mod tests {
 
     #[test]
     fn test_archive_unpack_snapshot_bad_unpack() {
-        let result = check_unpack_result(false, "abc".to_string());
-        assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == "failed to unpack: \"abc\"");
+        let result = check_unpack_result(
+            Err(UnpackError::Io(io::ErrorKind::FileTooLarge.into())),
+            "abc".to_string(),
+        );
+        assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == "failed to unpack \"abc\": IO error: file too large");
     }
 
     #[test]

+ 568 - 0
accounts-db/src/io_uring/file_creator.rs

@@ -0,0 +1,568 @@
+use {
+    crate::{
+        file_io::FileCreator,
+        io_uring::{
+            memory::{FixedIoBuffer, LargeBuffer},
+            IO_PRIO_BE_HIGHEST,
+        },
+    },
+    agave_io_uring::{Completion, FixedSlab, Ring, RingOp},
+    core::slice,
+    io_uring::{opcode, squeue, types, IoUring},
+    libc::{O_CREAT, O_NOATIME, O_NOFOLLOW, O_NONBLOCK, O_TRUNC, O_WRONLY},
+    smallvec::SmallVec,
+    std::{
+        collections::VecDeque,
+        fs::File,
+        io::{self, Read},
+        mem,
+        os::{fd::AsRawFd, unix::ffi::OsStrExt as _},
+        path::PathBuf,
+        pin::Pin,
+        ptr,
+        sync::Arc,
+        time::Duration,
+    },
+};
+
+// Based on transfers seen with `dd bs=SIZE` for NVME drives: values >=64KiB are fine,
+// but usually peak around 256KiB-1MiB. Also compare with particular NVME parameters, e.g.
+// 32 pages (Maximum Data Transfer Size) * page size (MPSMIN = Memory Page Size) = 128KiB.
+const DEFAULT_WRITE_SIZE: usize = 512 * 1024;
+
+// 99.9% of accounts storage files are < 8MiB
+type BacklogVec = SmallVec<[PendingWrite; 8 * 1024 * 1024 / DEFAULT_WRITE_SIZE]>;
+
+// Sanity limit for slab size and number of concurrent operations, in practice with 0.5-1GiB
+// buffer this is also close to the number of available buffers that small files will use up.
+// Also, permitting too many open files results in many submitted open ops, which will contend
+// on the directory inode lock.
+const MAX_OPEN_FILES: usize = 512;
+
+// We need a few threads to saturate the disk bandwidth, especially that we are writing lots
+// of small files, so the number of ops / write size is high. We also need open ops and writes
+// to run concurrently.
+// We shouldn't use too many threads, as they will contend a lot to lock the directory inode
+// (on open, since in accounts-db most files land in a single dir).
+const MAX_IOWQ_WORKERS: u32 = 4;
+
+const CHECK_PROGRESS_AFTER_SUBMIT_TIMEOUT: Option<Duration> = Some(Duration::from_millis(10));
+
+/// Multiple files creator with `io_uring` queue for open -> write -> close
+/// operations.
+pub struct IoUringFileCreator<'a, B = LargeBuffer> {
+    ring: Ring<FileCreatorState<'a>, FileCreatorOp>,
+    /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `ring`
+    /// (should get dropped last)
+    _backing_buffer: B,
+}
+
+impl<'a> IoUringFileCreator<'a, LargeBuffer> {
+    /// Create a new `IoUringFileCreator` using internally allocated buffer of specified
+    /// `buf_size` and default write size.
+    pub fn with_buffer_capacity<F: FnMut(PathBuf) + 'a>(
+        buf_size: usize,
+        file_complete: F,
+    ) -> io::Result<Self> {
+        Self::with_buffer(
+            LargeBuffer::new(buf_size),
+            DEFAULT_WRITE_SIZE,
+            file_complete,
+        )
+    }
+}
+
+impl<'a, B: AsMut<[u8]>> IoUringFileCreator<'a, B> {
+    /// Create a new `IoUringFileCreator` using provided `buffer` and `file_complete`
+    /// to notify caller when file contents are already persisted.
+    ///
+    /// `buffer` is the internal buffer used for writing scheduled file contents.
+    /// It must be at least `write_capacity` long. The creator will execute multiple
+    /// `write_capacity` sized writes in parallel to empty the work queue of files to create.
+    pub fn with_buffer<F: FnMut(PathBuf) + 'a>(
+        mut buffer: B,
+        write_capacity: usize,
+        file_complete: F,
+    ) -> io::Result<Self> {
+        // Let submission queue hold half of buffers before we explicitly syscall
+        // to submit them for writing (lets kernel start processing before we run out of buffers,
+        // but also amortizes number of `submit` syscalls made).
+        let ring_qsize = (buffer.as_mut().len() / write_capacity / 2).max(1) as u32;
+        let ring = IoUring::builder().build(ring_qsize)?;
+        // Maximum number of spawned [bounded IO, unbounded IO] kernel threads, we don't expect
+        // any unbounded work, but limit it to 1 just in case (0 leaves it unlimited).
+        ring.submitter()
+            .register_iowq_max_workers(&mut [MAX_IOWQ_WORKERS, 1])?;
+        Self::with_buffer_and_ring(ring, buffer, write_capacity, file_complete)
+    }
+
+    fn with_buffer_and_ring<F: FnMut(PathBuf) + 'a>(
+        ring: IoUring,
+        mut backing_buffer: B,
+        write_capacity: usize,
+        file_complete: F,
+    ) -> io::Result<Self> {
+        let buffer = backing_buffer.as_mut();
+        // Take prefix of buffer that is aligned to write_capacity
+        assert!(buffer.len() >= write_capacity);
+        let write_aligned_buf_len = buffer.len() / write_capacity * write_capacity;
+        let buffer = &mut buffer[..write_aligned_buf_len];
+
+        // Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are
+        // dropped before `backing_buffer` is dropped.
+        let buffers = unsafe { FixedIoBuffer::split_buffer_chunks(buffer, write_capacity) };
+        let state = FileCreatorState::new(buffers.collect(), file_complete);
+        let ring = Ring::new(ring, state);
+
+        // Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order
+        // guarantees that the ring is destroyed before `_backing_buffer` is dropped.
+        unsafe { FixedIoBuffer::register(buffer, &ring)? };
+
+        // Fixed file descriptor slots. OpenAt will update them to valid fds. Length of registered
+        // slots must match the `state.files` slab whose indices are used as fd slot indices.
+        let fds = vec![-1; MAX_OPEN_FILES];
+        ring.register_files(&fds)?;
+
+        Ok(Self {
+            ring,
+            _backing_buffer: backing_buffer,
+        })
+    }
+}
+
+impl<B> FileCreator for IoUringFileCreator<'_, B> {
+    fn schedule_create_at_dir(
+        &mut self,
+        path: PathBuf,
+        mode: u32,
+        parent_dir_handle: Arc<File>,
+        contents: &mut dyn Read,
+    ) -> io::Result<()> {
+        let file_key = self.open(path, mode, Some(parent_dir_handle))?;
+        self.write_and_close(contents, file_key)
+    }
+
+    fn file_complete(&mut self, path: PathBuf) {
+        (self.ring.context_mut().file_complete)(path)
+    }
+
+    fn drain(&mut self) -> io::Result<()> {
+        let res = self.ring.drain();
+        self.ring.context().log_stats();
+        res
+    }
+}
+
+impl<B> IoUringFileCreator<'_, B> {
+    /// Schedule opening file at `path` with `mode` permissons.
+    ///
+    /// Returns key that can be used for scheduling writes for it.
+    fn open(
+        &mut self,
+        path: PathBuf,
+        mode: u32,
+        dir_handle: Option<Arc<File>>,
+    ) -> io::Result<usize> {
+        let file = PendingFile::from_path(path);
+        let path_bytes = Pin::new(file.zero_terminated_path_bytes(dir_handle.is_some()));
+
+        let file_key = self.wait_add_file(file)?;
+
+        let op = FileCreatorOp::Open(OpenOp {
+            dir_handle,
+            path_bytes,
+            mode,
+            file_key,
+        });
+        self.ring.push(op)?;
+
+        Ok(file_key)
+    }
+
+    fn wait_add_file(&mut self, file: PendingFile) -> io::Result<usize> {
+        loop {
+            self.ring.process_completions()?;
+            if self.ring.context().files.len() < self.ring.context().files.capacity() {
+                break;
+            }
+            self.ring
+                .submit_and_wait(1, CHECK_PROGRESS_AFTER_SUBMIT_TIMEOUT)?;
+        }
+        let file_key = self.ring.context_mut().files.insert(file);
+        Ok(file_key)
+    }
+
+    fn write_and_close(&mut self, mut src: impl Read, file_key: usize) -> io::Result<()> {
+        let mut offset = 0;
+        loop {
+            let buf = self.wait_free_buf()?;
+
+            let state = self.ring.context_mut();
+            let file = state.files.get_mut(file_key).unwrap();
+
+            // Safety: the buffer points to the valid memory backed by `self._backing_buffer`.
+            // It's obtained from the queue of free buffers and is written to exclusively
+            // here before being handled to the kernel or backlog in `file`.
+            let mut_slice = unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len()) };
+            let len = src.read(mut_slice)?;
+
+            if len == 0 {
+                file.eof = true;
+
+                state.buffers.push_front(buf);
+                if file.is_completed() {
+                    (state.file_complete)(mem::take(&mut file.path));
+                    self.ring
+                        .push(FileCreatorOp::Close(CloseOp::new(file_key)))?;
+                }
+                break;
+            }
+
+            file.writes_started += 1;
+            if file.completed_open {
+                let op = WriteOp {
+                    file_key,
+                    offset,
+                    buf,
+                    write_len: len,
+                };
+                state.submitted_writes_size += len;
+                self.ring.push(FileCreatorOp::Write(op))?;
+            } else {
+                file.backlog.push((buf, offset, len));
+            }
+
+            offset += len;
+        }
+
+        Ok(())
+    }
+
+    fn wait_free_buf(&mut self) -> io::Result<FixedIoBuffer> {
+        loop {
+            self.ring.process_completions()?;
+            let state = self.ring.context_mut();
+            if let Some(buf) = state.buffers.pop_front() {
+                return Ok(buf);
+            }
+            state.stats.no_buf_count += 1;
+            state.stats.no_buf_sum_submitted_write_sizes += state.submitted_writes_size;
+
+            self.ring
+                .submit_and_wait(1, CHECK_PROGRESS_AFTER_SUBMIT_TIMEOUT)?;
+        }
+    }
+}
+
+struct FileCreatorState<'a> {
+    files: FixedSlab<PendingFile>,
+    buffers: VecDeque<FixedIoBuffer>,
+    /// Externally provided callback to be called on paths of files that were written
+    file_complete: Box<dyn FnMut(PathBuf) + 'a>,
+    open_fds: usize,
+    /// Total write length of submitted writes
+    submitted_writes_size: usize,
+    stats: FileCreatorStats,
+}
+
+impl<'a> FileCreatorState<'a> {
+    fn new(buffers: VecDeque<FixedIoBuffer>, file_complete: impl FnMut(PathBuf) + 'a) -> Self {
+        Self {
+            files: FixedSlab::with_capacity(MAX_OPEN_FILES),
+            buffers,
+            file_complete: Box::new(file_complete),
+            open_fds: 0,
+            submitted_writes_size: 0,
+            stats: FileCreatorStats::default(),
+        }
+    }
+
+    /// Returns write backlog that needs to be submitted to IO ring
+    fn mark_file_opened(&mut self, file_key: usize) -> BacklogVec {
+        let file = self.files.get_mut(file_key).unwrap();
+        file.completed_open = true;
+        self.open_fds += 1;
+        if self.buffers.len() * 2 > self.buffers.capacity() {
+            self.stats.large_buf_headroom_count += 1;
+        }
+        mem::take(&mut file.backlog)
+    }
+
+    /// Returns true if all of the writes are now done
+    fn mark_write_completed(
+        &mut self,
+        file_key: usize,
+        write_len: usize,
+        buf: FixedIoBuffer,
+    ) -> bool {
+        self.submitted_writes_size -= write_len;
+        self.buffers.push_front(buf);
+
+        let file = self.files.get_mut(file_key).unwrap();
+        file.writes_completed += 1;
+        if file.is_completed() {
+            (self.file_complete)(mem::take(&mut file.path));
+            return true;
+        }
+        false
+    }
+
+    fn mark_file_closed(&mut self, file_key: usize) {
+        let _ = self.files.remove(file_key);
+        self.open_fds -= 1;
+    }
+
+    fn log_stats(&self) {
+        self.stats.log();
+    }
+}
+
+#[derive(Debug, Default)]
+struct FileCreatorStats {
+    /// Count of cases when more than half of buffers are free (files are written
+    /// faster than submitted - consider less buffers or speeding up submission)
+    large_buf_headroom_count: u32,
+    /// Count of cases when we run out of free buffers (files are not written fast
+    /// enough - consider more buffers or tuning write bandwidth / patterns)
+    no_buf_count: u32,
+    /// Sum of all outstanding write sizes at moments of encountering no free buf
+    no_buf_sum_submitted_write_sizes: usize,
+}
+
+impl FileCreatorStats {
+    fn log(&self) {
+        let avg_writes_at_no_buf = self
+            .no_buf_sum_submitted_write_sizes
+            .checked_div(self.no_buf_count as usize)
+            .unwrap_or_default();
+        log::info!(
+            "files creation stats - large buf headroom: {}, no buf count: {},\
+            avg pending writes at no buf: {avg_writes_at_no_buf}",
+            self.large_buf_headroom_count,
+            self.no_buf_count,
+        );
+    }
+}
+
+#[derive(Debug)]
+struct OpenOp {
+    dir_handle: Option<Arc<File>>,
+    path_bytes: Pin<Vec<u8>>,
+    mode: libc::mode_t,
+    file_key: usize,
+}
+
+impl OpenOp {
+    fn entry(&mut self) -> squeue::Entry {
+        let at_dir_fd = types::Fd(
+            self.dir_handle
+                .as_ref()
+                .map(AsRawFd::as_raw_fd)
+                .unwrap_or(libc::AT_FDCWD),
+        );
+        opcode::OpenAt::new(at_dir_fd, self.path_bytes.as_ptr() as _)
+            .flags(O_CREAT | O_TRUNC | O_NOFOLLOW | O_WRONLY | O_NOATIME | O_NONBLOCK)
+            .mode(self.mode)
+            .file_index(Some(
+                types::DestinationSlot::try_from_slot_target(self.file_key as u32).unwrap(),
+            ))
+            .build()
+    }
+
+    fn complete(
+        &mut self,
+        ring: &mut Completion<FileCreatorState, FileCreatorOp>,
+        res: io::Result<i32>,
+    ) -> io::Result<()>
+    where
+        Self: Sized,
+    {
+        res?;
+
+        let backlog = ring.context_mut().mark_file_opened(self.file_key);
+        for (buf, offset, len) in backlog {
+            let op = WriteOp {
+                file_key: self.file_key,
+                offset,
+                buf,
+                write_len: len,
+            };
+            ring.context_mut().submitted_writes_size += len;
+            ring.push(FileCreatorOp::Write(op));
+        }
+
+        Ok(())
+    }
+}
+
+#[derive(Debug)]
+struct CloseOp {
+    file_key: usize,
+}
+
+impl<'a> CloseOp {
+    fn new(file_key: usize) -> Self {
+        Self { file_key }
+    }
+
+    fn entry(&mut self) -> squeue::Entry {
+        opcode::Close::new(types::Fixed(self.file_key as u32)).build()
+    }
+
+    fn complete(
+        &mut self,
+        ring: &mut Completion<FileCreatorState<'a>, FileCreatorOp>,
+        res: io::Result<i32>,
+    ) -> io::Result<()>
+    where
+        Self: Sized,
+    {
+        let _ = res?;
+        ring.context_mut().mark_file_closed(self.file_key);
+        Ok(())
+    }
+}
+
+#[derive(Debug)]
+struct WriteOp {
+    file_key: usize,
+    offset: usize,
+    buf: FixedIoBuffer,
+    write_len: usize,
+}
+
+impl<'a> WriteOp {
+    fn entry(&mut self) -> squeue::Entry {
+        let WriteOp {
+            file_key,
+            offset,
+            buf,
+            write_len,
+        } = self;
+
+        // Safety: buf is owned by `WriteOp` during the operation handling by the kernel and
+        // reclaimed after completion passed in a call to `mark_write_completed`.
+        opcode::WriteFixed::new(
+            types::Fixed(*file_key as u32),
+            unsafe { buf.as_mut_ptr() },
+            *write_len as u32,
+            buf.io_buf_index()
+                .expect("should have a valid fixed buffer"),
+        )
+        .offset(*offset as u64)
+        .ioprio(IO_PRIO_BE_HIGHEST)
+        .build()
+    }
+
+    fn complete(
+        &mut self,
+        ring: &mut Completion<FileCreatorState<'a>, FileCreatorOp>,
+        res: io::Result<i32>,
+    ) -> io::Result<()>
+    where
+        Self: Sized,
+    {
+        let written = res? as usize;
+
+        let WriteOp {
+            file_key,
+            offset: _,
+            ref mut buf,
+            write_len,
+        } = self;
+
+        // unless specified otherwise, the io uring worker will retry automatically on EAGAIN
+        assert_eq!(written, *write_len, "short write");
+
+        let buf = mem::replace(buf, FixedIoBuffer::empty());
+        if ring
+            .context_mut()
+            .mark_write_completed(*file_key, *write_len, buf)
+        {
+            ring.push(FileCreatorOp::Close(CloseOp::new(*file_key)));
+        }
+
+        Ok(())
+    }
+}
+
+#[derive(Debug)]
+enum FileCreatorOp {
+    Open(OpenOp),
+    Close(CloseOp),
+    Write(WriteOp),
+}
+
+impl RingOp<FileCreatorState<'_>> for FileCreatorOp {
+    fn entry(&mut self) -> squeue::Entry {
+        match self {
+            Self::Open(op) => op.entry(),
+            Self::Close(op) => op.entry(),
+            Self::Write(op) => op.entry(),
+        }
+    }
+
+    fn complete(
+        &mut self,
+        ring: &mut Completion<FileCreatorState, Self>,
+        res: io::Result<i32>,
+    ) -> io::Result<()>
+    where
+        Self: Sized,
+    {
+        match self {
+            Self::Open(op) => op.complete(ring, res),
+            Self::Close(op) => op.complete(ring, res),
+            Self::Write(op) => op.complete(ring, res),
+        }
+    }
+}
+
+type PendingWrite = (FixedIoBuffer, usize, usize);
+
+#[derive(Debug)]
+struct PendingFile {
+    path: PathBuf,
+    completed_open: bool,
+    backlog: BacklogVec,
+    eof: bool,
+    writes_started: usize,
+    writes_completed: usize,
+}
+
+impl PendingFile {
+    fn from_path(path: PathBuf) -> Self {
+        Self {
+            path,
+            completed_open: false,
+            backlog: SmallVec::new(),
+            writes_started: 0,
+            writes_completed: 0,
+            eof: false,
+        }
+    }
+
+    fn zero_terminated_path_bytes(&self, only_filename: bool) -> Vec<u8> {
+        let mut path_bytes = Vec::with_capacity(libc::PATH_MAX as usize);
+        let buf_ptr = path_bytes.as_mut_ptr();
+        let bytes = if only_filename {
+            self.path.file_name().unwrap_or_default().as_bytes()
+        } else {
+            self.path.as_os_str().as_bytes()
+        };
+        assert!(bytes.len() < path_bytes.capacity());
+        // Safety:
+        // We know that the buffer is large enough to hold the copy and the
+        // pointers don't overlap.
+        unsafe {
+            ptr::copy_nonoverlapping(bytes.as_ptr(), buf_ptr, bytes.len());
+            buf_ptr.add(bytes.len()).write(0);
+            path_bytes.set_len(bytes.len() + 1);
+        }
+        path_bytes
+    }
+
+    fn is_completed(&self) -> bool {
+        self.eof && self.writes_started == self.writes_completed
+    }
+}

+ 117 - 18
accounts-db/src/io_uring/memory.rs

@@ -1,9 +1,19 @@
-use std::{
-    ops::{Deref, DerefMut},
-    ptr::{self, NonNull},
-    slice,
+use {
+    agave_io_uring::{Ring, RingOp},
+    std::{
+        io,
+        ops::{Deref, DerefMut},
+        ptr::{self, NonNull},
+        slice,
+    },
 };
 
+// We use fixed buffers to save the cost of mapping/unmapping them at each operation.
+//
+// Instead of doing many large allocations and registering those, we do a single large one
+// and chunk it in slices of up to 1G each.
+const FIXED_BUFFER_LEN: usize = 1024 * 1024 * 1024;
+
 pub enum LargeBuffer {
     Vec(Vec<u8>),
     HugeTable(PageAlignedMemory),
@@ -43,6 +53,7 @@ impl LargeBuffer {
     /// using HugeTable when it is available on the host.
     pub fn new(size: usize) -> Self {
         if size > PageAlignedMemory::page_size() {
+            let size = size.next_power_of_two();
             if let Ok(alloc) = PageAlignedMemory::alloc_huge_table(size) {
                 log::info!("obtained hugetable io_uring buffer (len={size})");
                 return Self::HugeTable(alloc);
@@ -120,49 +131,137 @@ impl DerefMut for PageAlignedMemory {
     }
 }
 
-/// Fixed mutable view into externally allocated bytes buffer
+/// Fixed mutable view into externally allocated IO bytes buffer
+/// registered in `io_uring` for access in scheduled IO operations.
 ///
-/// It is an unsafe (no lifetime tracking) equivalent of `&mut [u8]`
-pub struct BorrowedBytesMut {
+/// It is used as an unsafe (no lifetime tracking) equivalent of `&mut [u8]`.
+#[derive(Debug)]
+pub(super) struct FixedIoBuffer {
     ptr: *mut u8,
     size: usize,
+    io_buf_index: Option<u16>,
 }
 
-impl BorrowedBytesMut {
+impl FixedIoBuffer {
     pub const fn empty() -> Self {
         Self {
             ptr: std::ptr::null_mut(),
             size: 0,
+            io_buf_index: None,
         }
     }
 
-    pub fn from_mut_slice(buf: &mut [u8]) -> Self {
-        Self {
-            ptr: buf.as_mut_ptr(),
-            size: buf.len(),
-        }
-    }
+    /// Split buffer into `chunk_size` sized `IoFixedBuffer` buffers for use as registered
+    /// buffer in io_uring operations.
+    pub unsafe fn split_buffer_chunks(
+        buffer: &mut [u8],
+        chunk_size: usize,
+    ) -> impl Iterator<Item = Self> + use<'_> {
+        assert!(
+            buffer.len() / FIXED_BUFFER_LEN <= u16::MAX as usize,
+            "buffer too large to register in io_uring"
+        );
+        let buf_start = buffer.as_ptr() as usize;
 
-    pub fn as_mut_ptr(&self) -> *mut u8 {
-        self.ptr
+        buffer.chunks_exact_mut(chunk_size).map(move |buf| {
+            let io_buf_index = (buf.as_ptr() as usize - buf_start) / FIXED_BUFFER_LEN;
+            Self {
+                ptr: buf.as_mut_ptr(),
+                size: buf.len(),
+                io_buf_index: Some(io_buf_index as u16),
+            }
+        })
     }
 
     pub fn len(&self) -> usize {
         self.size
     }
 
+    /// Safety: while just returning without dereferencing a pointer is safe, this is marked unsafe
+    /// so that the callers are encouraged to reason about the lifetime of the buffer.
+    pub unsafe fn as_mut_ptr(&self) -> *mut u8 {
+        self.ptr
+    }
+
+    /// The index of the fixed buffer in the ring. See register_buffers().
+    pub fn io_buf_index(&self) -> Option<u16> {
+        self.io_buf_index
+    }
+
     /// Return a clone of `self` reduced to specified `size`
-    pub fn sub_buf_to(&self, size: usize) -> Self {
+    pub fn into_shrinked(self, size: usize) -> Self {
         assert!(size <= self.size);
         Self {
             ptr: self.ptr,
             size,
+            io_buf_index: self.io_buf_index,
         }
     }
+
+    /// Registed provided buffer as fixed buffer in `io_uring`.
+    pub unsafe fn register<S, E: RingOp<S>>(
+        buffer: &mut [u8],
+        ring: &Ring<S, E>,
+    ) -> io::Result<()> {
+        adjust_ulimit_memlock(buffer.len())?;
+        let iovecs = buffer
+            .chunks(FIXED_BUFFER_LEN)
+            .map(|buf| libc::iovec {
+                iov_base: buf.as_ptr() as _,
+                iov_len: buf.len(),
+            })
+            .collect::<Vec<_>>();
+        unsafe { ring.register_buffers(&iovecs) }
+    }
 }
 
-impl AsRef<[u8]> for BorrowedBytesMut {
+impl AsRef<[u8]> for FixedIoBuffer {
     fn as_ref(&self) -> &[u8] {
         unsafe { slice::from_raw_parts(self.ptr, self.size) }
     }
 }
+
+pub fn adjust_ulimit_memlock(min_required: usize) -> io::Result<()> {
+    // This value reflects recommended memory lock limit documented in the validator's
+    // setup instructions at docs/src/operations/guides/validator-start.md
+    const DESIRED_MEMLOCK: u64 = 2_000_000_000;
+
+    fn get_memlock() -> libc::rlimit {
+        let mut memlock = libc::rlimit {
+            rlim_cur: 0,
+            rlim_max: 0,
+        };
+        if unsafe { libc::getrlimit(libc::RLIMIT_MEMLOCK, &mut memlock) } != 0 {
+            log::warn!("getrlimit(RLIMIT_MEMLOCK) failed");
+        }
+        memlock
+    }
+
+    let mut memlock = get_memlock();
+    let current = memlock.rlim_cur as usize;
+    if current < min_required {
+        memlock.rlim_cur = DESIRED_MEMLOCK;
+        memlock.rlim_max = DESIRED_MEMLOCK;
+        if unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &memlock) } != 0 {
+            log::error!(
+                "Unable to increase the maximum memory lock limit to {} from {current}",
+                memlock.rlim_cur
+            );
+
+            if cfg!(target_os = "macos") {
+                log::error!(
+                    "On mac OS you may need to run \
+                    |sudo launchctl limit memlock {DESIRED_MEMLOCK} {DESIRED_MEMLOCK}| first"
+                );
+            }
+            return Err(io::Error::new(
+                io::ErrorKind::OutOfMemory,
+                "unable to set memory lock limit",
+            ));
+        }
+
+        memlock = get_memlock();
+        log::info!("Bumped maximum memory lock limit: {}", memlock.rlim_cur);
+    }
+    Ok(())
+}

+ 7 - 0
accounts-db/src/io_uring/mod.rs

@@ -1,5 +1,12 @@
 #![cfg(target_os = "linux")]
 
 pub mod dir_remover;
+pub mod file_creator;
 pub mod memory;
 pub mod sequential_file_reader;
+
+// Based on Linux <uapi/linux/ioprio.h>
+const IO_PRIO_CLASS_SHIFT: u16 = 13;
+const IO_PRIO_CLASS_BE: u16 = 2;
+const IO_PRIO_LEVEL_HIGHEST: u16 = 0;
+const IO_PRIO_BE_HIGHEST: u16 = IO_PRIO_CLASS_BE << IO_PRIO_CLASS_SHIFT | IO_PRIO_LEVEL_HIGHEST;

+ 60 - 112
accounts-db/src/io_uring/sequential_file_reader.rs

@@ -1,32 +1,30 @@
 use {
-    crate::io_uring::memory::{BorrowedBytesMut, LargeBuffer},
+    super::{
+        memory::{FixedIoBuffer, LargeBuffer},
+        IO_PRIO_BE_HIGHEST,
+    },
     agave_io_uring::{Completion, Ring, RingOp},
     io_uring::{opcode, squeue, types, IoUring},
     std::{
-        fs::File,
+        fs::{File, OpenOptions},
         io::{self, BufRead, Cursor, Read},
         mem,
-        os::fd::{AsRawFd as _, RawFd},
+        os::{
+            fd::{AsRawFd as _, RawFd},
+            unix::fs::OpenOptionsExt,
+        },
         path::Path,
     },
 };
 
+// Based on transfers seen with `dd bs=SIZE` for NVME drives: values >=64KiB are fine,
+// but peak at 1MiB. Also compare with particular NVME parameters, e.g.
+// 32 pages (Maximum Data Transfer Size) * page size (MPSMIN = Memory Page Size) = 128KiB.
 const DEFAULT_READ_SIZE: usize = 1024 * 1024;
-#[allow(dead_code)]
-const DEFAULT_BUFFER_SIZE: usize = 64 * DEFAULT_READ_SIZE;
 const SQPOLL_IDLE_TIMEOUT: u32 = 50;
-const MAX_IOWQ_WORKERS: u32 = 4;
-
-// Based on Linux <uapi/linux/ioprio.h>
-const IO_PRIO_CLASS_SHIFT: u16 = 13;
-const IO_PRIO_CLASS_BE: u16 = 2;
-const IO_PRIO_LEVEL_HIGHEST: u16 = 0;
-const IO_PRIO_BE_HIGHEST: u16 = IO_PRIO_CLASS_BE << IO_PRIO_CLASS_SHIFT | IO_PRIO_LEVEL_HIGHEST;
-
-// We register fixed buffers in chunks of up to 1GB as this is faster than registering many
-// `read_capacity` buffers. Registering fixed buffers saves the kernel some work in
-// checking/mapping/unmapping buffers for each read operation.
-const FIXED_BUFFER_LEN: usize = 1024 * 1024 * 1024;
+// For large file we don't really use workers as few regularly submitted requests get handled
+// within sqpoll thread. Allow some workers just in case, but limit them.
+const MAX_IOWQ_WORKERS: u32 = 2;
 
 /// Reader for non-seekable files.
 ///
@@ -34,21 +32,12 @@ const FIXED_BUFFER_LEN: usize = 1024 * 1024 * 1024;
 pub struct SequentialFileReader<B> {
     // Note: state is tied to `backing_buffer` and contains unsafe pointer references to it
     inner: Ring<SequentialFileReaderState, ReadOp>,
-    /// Owned buffer used across lifespan of `inner` (should get dropped last)
-    #[allow(dead_code)]
-    backing_buffer: B,
+    /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner`
+    /// (should get dropped last)
+    _backing_buffer: B,
 }
 
 impl SequentialFileReader<LargeBuffer> {
-    /// Create a new `SequentialFileReader` for the given `path` using internally allocated
-    /// large buffer and default read size.
-    ///
-    /// See [SequentialFileReader::with_buffer] for more information.
-    #[allow(dead_code)]
-    pub fn new(path: impl AsRef<Path>) -> io::Result<Self> {
-        Self::with_capacity(DEFAULT_BUFFER_SIZE, path)
-    }
-
     /// Create a new `SequentialFileReader` for the given `path` using internally allocated
     /// buffer of specified `buf_size` and default read size.
     pub fn with_capacity(buf_size: usize, path: impl AsRef<Path>) -> io::Result<Self> {
@@ -76,16 +65,16 @@ impl<B: AsMut<[u8]>> SequentialFileReader<B> {
         mut buffer: B,
         read_capacity: usize,
     ) -> io::Result<Self> {
-        let buf_len = buffer.as_mut().len();
-
         // Let submission queue hold half of buffers before we explicitly syscall
         // to submit them for reading.
-        let ring_qsize = (buf_len / read_capacity / 2).max(1) as u32;
+        let ring_qsize = (buffer.as_mut().len() / read_capacity / 2).max(1) as u32;
         let ring = IoUring::builder()
             .setup_sqpoll(SQPOLL_IDLE_TIMEOUT)
             .build(ring_qsize)?;
+        // Maximum number of spawned [bounded IO, unbounded IO] kernel threads, we don't expect
+        // any unbounded work, but limit it to 1 just in case (0 leaves it unlimited).
         ring.submitter()
-            .register_iowq_max_workers(&mut [MAX_IOWQ_WORKERS, 0])?;
+            .register_iowq_max_workers(&mut [MAX_IOWQ_WORKERS, 1])?;
         Self::with_buffer_and_ring(buffer, ring, path, read_capacity)
     }
 
@@ -104,25 +93,15 @@ impl<B: AsMut<[u8]>> SequentialFileReader<B> {
             "buffer size must be a multiple of read_capacity"
         );
 
-        // Split the buffer into `read_capacity` sized chunks.
-        let buf_start = buffer.as_ptr() as usize;
-        let buffers = buffer
-            .chunks_exact_mut(read_capacity)
-            .map(|buf| {
-                let io_buf_index = (buf.as_ptr() as usize - buf_start) / FIXED_BUFFER_LEN;
-                ReadBufState::Uninit {
-                    io_buf_index,
-                    buf: BorrowedBytesMut::from_mut_slice(buf),
-                }
-            })
-            .collect::<Vec<_>>();
-
-        let file = std::os::unix::fs::OpenOptionsExt::custom_flags(
-            std::fs::OpenOptions::new().read(true),
-            libc::O_NOATIME,
-        )
-        .open(path)?;
-
+        let file = OpenOptions::new()
+            .read(true)
+            .custom_flags(libc::O_NOATIME)
+            .open(path)?;
+        // Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are
+        // dropped before `backing_buffer` is dropped.
+        let buffers = unsafe { FixedIoBuffer::split_buffer_chunks(buffer, read_capacity) }
+            .map(ReadBufState::Uninit)
+            .collect();
         let ring = Ring::new(
             ring,
             SequentialFileReaderState {
@@ -134,21 +113,14 @@ impl<B: AsMut<[u8]>> SequentialFileReader<B> {
                 current_buf: 0,
             },
         );
-        let iovecs = buffer
-            .chunks(FIXED_BUFFER_LEN)
-            .map(|buf| libc::iovec {
-                iov_base: buf.as_ptr() as _,
-                iov_len: buf.len(),
-            })
-            .collect::<Vec<_>>();
-        // Safety:
-        // The iovecs point to a buffer which is guaranteed to be valid for the
-        // lifetime of the reader
-        unsafe { ring.register_buffers(&iovecs)? };
+
+        // Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order
+        // guarantees that the ring is destroyed before `_backing_buffer` is dropped.
+        unsafe { FixedIoBuffer::register(buffer, &ring)? };
 
         let mut reader = Self {
             inner: ring,
-            backing_buffer,
+            _backing_buffer: backing_buffer,
         };
 
         // Start reading all buffers.
@@ -180,12 +152,11 @@ impl<B: AsMut<[u8]>> SequentialFileReader<B> {
         } = &mut self.inner.context_mut();
         let read_buf = mem::replace(&mut buffers[index], ReadBufState::Reading);
         match read_buf {
-            ReadBufState::Uninit { buf, io_buf_index } => {
+            ReadBufState::Uninit(buf) => {
                 let op = ReadOp {
                     fd: file.as_raw_fd(),
                     buf,
                     buf_off: 0,
-                    io_buf_index,
                     file_off: *offset,
                     read_len: *read_capacity,
                     reader_buf_index: index,
@@ -198,7 +169,7 @@ impl<B: AsMut<[u8]>> SequentialFileReader<B> {
                 // Safety:
                 // The op points to a buffer which is guaranteed to be valid for
                 // the lifetime of the operation
-                self.inner.push(op)?
+                self.inner.push(op)?;
             }
             _ => unreachable!("called start_reading_buf on a non-empty buffer"),
         }
@@ -228,10 +199,7 @@ impl<B: AsMut<[u8]>> BufRead for SequentialFileReader<B> {
             let num_buffers = state.buffers.len();
             let read_buf = &mut state.buffers[state.current_buf];
             match read_buf {
-                ReadBufState::Full {
-                    ref mut cursor,
-                    io_buf_index,
-                } => {
+                ReadBufState::Full(ref mut cursor) => {
                     if !cursor.fill_buf()?.is_empty() {
                         // we have some data available
                         break true;
@@ -247,7 +215,7 @@ impl<B: AsMut<[u8]>> BufRead for SequentialFileReader<B> {
                         state.current_buf = (state.current_buf + 1) % num_buffers;
                     } else {
                         // we have finished consuming this buffer, queue the next read
-                        let cursor = mem::replace(cursor, Cursor::new(BorrowedBytesMut::empty()));
+                        let cursor = mem::replace(cursor, Cursor::new(FixedIoBuffer::empty()));
                         let buf = cursor.into_inner();
 
                         // The very last read when we hit EOF could return less than `read_capacity`, in
@@ -258,10 +226,7 @@ impl<B: AsMut<[u8]>> BufRead for SequentialFileReader<B> {
                         // didn't reset the length it wouldn't matter.
                         debug_assert!(buf.len() == state.read_capacity);
 
-                        state.buffers[index] = ReadBufState::Uninit {
-                            buf,
-                            io_buf_index: *io_buf_index,
-                        };
+                        state.buffers[index] = ReadBufState::Uninit(buf);
                         state.current_buf = (state.current_buf + 1) % num_buffers;
 
                         self.start_reading_buf(index)?;
@@ -270,7 +235,7 @@ impl<B: AsMut<[u8]>> BufRead for SequentialFileReader<B> {
                     // move to the next buffer and check again whether we have data
                     continue;
                 }
-                ReadBufState::Uninit { .. } => unreachable!("should be initialized"),
+                ReadBufState::Uninit(_) => unreachable!("should be initialized"),
                 _ => break false,
             }
         };
@@ -280,8 +245,8 @@ impl<B: AsMut<[u8]>> BufRead for SequentialFileReader<B> {
             let state = self.inner.context();
 
             match &state.buffers[state.current_buf] {
-                ReadBufState::Full { .. } => break,
-                ReadBufState::Uninit { .. } => unreachable!("should be initialized"),
+                ReadBufState::Full(_) => break,
+                ReadBufState::Uninit(_) => unreachable!("should be initialized"),
                 // Still no data, wait for more completions, but submit in case the SQPOLL
                 // thread is asleep and there are queued entries in the submission queue.
                 ReadBufState::Reading => self.inner.submit()?,
@@ -291,7 +256,7 @@ impl<B: AsMut<[u8]>> BufRead for SequentialFileReader<B> {
         // At this point we must have data or be at EOF.
         let state = self.inner.context_mut();
         match &mut state.buffers[state.current_buf] {
-            ReadBufState::Full { cursor, .. } => Ok(cursor.fill_buf()?),
+            ReadBufState::Full(cursor) => Ok(cursor.fill_buf()?),
             // after the loop above we either have some data or we must be at EOF
             _ => unreachable!(),
         }
@@ -300,7 +265,7 @@ impl<B: AsMut<[u8]>> BufRead for SequentialFileReader<B> {
     fn consume(&mut self, amt: usize) {
         let state = self.inner.context_mut();
         match &mut state.buffers[state.current_buf] {
-            ReadBufState::Full { cursor, .. } => cursor.consume(amt),
+            ReadBufState::Full(cursor) => cursor.consume(amt),
             _ => assert_eq!(amt, 0),
         }
     }
@@ -309,37 +274,25 @@ impl<B: AsMut<[u8]>> BufRead for SequentialFileReader<B> {
 enum ReadBufState {
     /// The buffer is pending submission to read queue (on initialization and
     /// in transition from `Full` to `Reading`).
-    Uninit {
-        buf: BorrowedBytesMut,
-        io_buf_index: usize,
-    },
+    Uninit(FixedIoBuffer),
     /// The buffer is currently being read and there's a corresponding ReadOp in
     /// the ring.
     Reading,
     /// The buffer is filled and ready to be consumed.
-    Full {
-        cursor: Cursor<BorrowedBytesMut>,
-        io_buf_index: usize,
-    },
+    Full(Cursor<FixedIoBuffer>),
 }
 
 impl std::fmt::Debug for ReadBufState {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
-            Self::Uninit {
-                buf: _,
-                io_buf_index,
-            } => f
+            Self::Uninit(buf) => f
                 .debug_struct("Uninit")
-                .field("io_buf_index", io_buf_index)
+                .field("io_buf_index", &buf.io_buf_index())
                 .finish(),
             Self::Reading => write!(f, "Reading"),
-            Self::Full {
-                cursor: _,
-                io_buf_index,
-            } => f
+            Self::Full(cursor) => f
                 .debug_struct("Full")
-                .field("io_buf_index", io_buf_index)
+                .field("io_buf_index", &cursor.get_ref().io_buf_index())
                 .finish(),
         }
     }
@@ -347,13 +300,11 @@ impl std::fmt::Debug for ReadBufState {
 
 struct ReadOp {
     fd: RawFd,
-    buf: BorrowedBytesMut,
+    buf: FixedIoBuffer,
     /// This is the offset inside the buffer. It's typically 0, but can be non-zero if a previous
     /// read returned less data than requested (because of EINTR or whatever) and we submitted a new
     /// read for the remaining data.
     buf_off: usize,
-    /// The index of the fixed buffer in the ring. See register_buffers().
-    io_buf_index: usize,
     /// The offset in the file.
     file_off: usize,
     /// The length of the read. This is typically `read_capacity` but can be less if a previous read
@@ -369,7 +320,7 @@ impl std::fmt::Debug for ReadOp {
         f.debug_struct("ReadOp")
             .field("fd", &self.fd)
             .field("buf_off", &self.buf_off)
-            .field("io_buf_index", &self.io_buf_index)
+            .field("io_buf_index", &self.buf.io_buf_index())
             .field("file_off", &self.file_off)
             .field("read_len", &self.read_len)
             .field("reader_buf_index", &self.reader_buf_index)
@@ -383,7 +334,6 @@ impl RingOp<SequentialFileReaderState> for ReadOp {
             fd,
             buf,
             buf_off,
-            io_buf_index,
             file_off,
             read_len,
             reader_buf_index: _,
@@ -394,7 +344,8 @@ impl RingOp<SequentialFileReaderState> for ReadOp {
             // Safety: we assert that the buffer is large enough to hold the read.
             unsafe { buf.as_mut_ptr().byte_add(*buf_off) },
             *read_len as u32,
-            *io_buf_index as u16,
+            buf.io_buf_index()
+                .expect("should have a valid fixed buffer"),
         )
         .offset(*file_off as u64)
         .ioprio(IO_PRIO_BE_HIGHEST)
@@ -410,7 +361,6 @@ impl RingOp<SequentialFileReaderState> for ReadOp {
             fd,
             buf,
             buf_off,
-            io_buf_index,
             file_off,
             read_len,
             reader_buf_index,
@@ -423,14 +373,14 @@ impl RingOp<SequentialFileReaderState> for ReadOp {
         }
 
         let total_read_len = *buf_off + last_read_len;
+        let buf = mem::replace(buf, FixedIoBuffer::empty());
 
         if last_read_len > 0 && last_read_len < *read_len {
             // Partial read, retry the op with updated offsets
             let op: ReadOp = ReadOp {
                 fd: *fd,
-                buf: buf.sub_buf_to(buf.len()), // Still use the full buf
+                buf,
                 buf_off: total_read_len,
-                io_buf_index: *io_buf_index,
                 file_off: *file_off + last_read_len,
                 read_len: *read_len - last_read_len,
                 reader_buf_index: *reader_buf_index,
@@ -440,10 +390,8 @@ impl RingOp<SequentialFileReaderState> for ReadOp {
             // lifetime of the operation
             completion.push(op);
         } else {
-            reader_state.buffers[*reader_buf_index] = ReadBufState::Full {
-                cursor: Cursor::new(buf.sub_buf_to(total_read_len)),
-                io_buf_index: *io_buf_index,
-            };
+            reader_state.buffers[*reader_buf_index] =
+                ReadBufState::Full(Cursor::new(buf.into_shrinked(total_read_len)));
         }
 
         Ok(())

+ 1 - 1
io-uring/src/lib.rs

@@ -1,11 +1,11 @@
 #![cfg(target_os = "linux")]
 mod ring;
 mod slab;
-pub use ring::*;
 use {
     io_uring::IoUring,
     std::{io, sync::Once},
 };
+pub use {ring::*, slab::FixedSlab};
 
 pub fn io_uring_supported() -> bool {
     static mut IO_URING_SUPPORTED: bool = false;

+ 12 - 1
io-uring/src/ring.rs

@@ -6,7 +6,7 @@ use {
         IoUring,
     },
     smallvec::{smallvec, SmallVec},
-    std::{io, time::Duration},
+    std::{io, os::fd::RawFd, time::Duration},
 };
 
 /// An io_uring instance.
@@ -53,6 +53,17 @@ impl<T, E: RingOp<T>> Ring<T, E> {
         self.ring.submitter().register_buffers(iovecs)
     }
 
+    /// Registers file descriptors as fixed for I/O with the kernel.
+    ///
+    /// Operations may then use `types::Fixed(index)` for index in `fds` to refer to the
+    /// registered file descriptor.
+    ///
+    /// `-1` values can be used as slots for kernel managed fixed file descriptors (created by
+    /// open operation).
+    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
+        self.ring.submitter().register_files(fds)
+    }
+
     /// Pushes an operation to the submission queue.
     ///
     /// Once completed, [RingOp::complete] will be called with the result.

+ 1 - 1
io-uring/src/slab.rs

@@ -1,6 +1,6 @@
 use slab::Slab;
 
-pub(crate) struct FixedSlab<T> {
+pub struct FixedSlab<T> {
     inner: Slab<T>,
 }
 

+ 0 - 1
programs/sbf/Cargo.lock

@@ -5492,7 +5492,6 @@ dependencies = [
  "solana-message",
  "solana-metrics",
  "solana-nohash-hasher",
- "solana-perf",
  "solana-pubkey",
  "solana-rayon-threadlimit",
  "solana-rent-collector",

+ 30 - 107
runtime/src/snapshot_utils.rs

@@ -25,7 +25,7 @@ use {
         account_storage_reader::AccountStorageReader,
         accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
         accounts_file::{AccountsFile, AccountsFileError, StorageAccess},
-        hardened_unpack::{self, ArchiveChunker, BytesChannelReader, MultiBytes, UnpackError},
+        hardened_unpack::{self, UnpackError},
         utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
     },
     solana_clock::{Epoch, Slot},
@@ -35,7 +35,7 @@ use {
         cmp::Ordering,
         collections::{HashMap, HashSet},
         fmt, fs,
-        io::{self, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
+        io::{self, BufRead, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
         mem,
         num::{NonZeroU64, NonZeroUsize},
         ops::RangeInclusive,
@@ -51,7 +51,7 @@ use {
 };
 #[cfg(feature = "dev-context-only-utils")]
 use {
-    hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*,
+    hardened_unpack::UnpackedAppendVecMap,
     solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
 };
 
@@ -1564,9 +1564,6 @@ pub(crate) fn get_storages_to_serialize(
         .collect::<Vec<_>>()
 }
 
-// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
-const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
-
 /// Unarchives the given full and incremental snapshot archives, as long as they are compatible.
 pub fn verify_and_unarchive_snapshots(
     bank_snapshots_dir: impl AsRef<Path>,
@@ -1580,8 +1577,6 @@ pub fn verify_and_unarchive_snapshots(
         incremental_snapshot_archive_info,
     )?;
 
-    let num_worker_threads = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
-
     let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0));
     let UnarchivedSnapshot {
         unpack_dir: full_unpack_dir,
@@ -1597,7 +1592,6 @@ pub fn verify_and_unarchive_snapshots(
         "snapshot untar",
         account_paths,
         full_snapshot_archive_info.archive_format(),
-        num_worker_threads,
         next_append_vec_id.clone(),
         storage_access,
     )?;
@@ -1624,7 +1618,6 @@ pub fn verify_and_unarchive_snapshots(
             "incremental snapshot untar",
             account_paths,
             incremental_snapshot_archive_info.archive_format(),
-            num_worker_threads,
             next_append_vec_id.clone(),
             storage_access,
         )?;
@@ -1664,19 +1657,20 @@ pub fn verify_and_unarchive_snapshots(
     ))
 }
 
-/// Spawns a thread for unpacking a snapshot
-fn spawn_unpack_snapshot_thread(
-    chunks_receiver: crossbeam_channel::Receiver<MultiBytes>,
+/// Streams unpacked files across channel
+fn streaming_unarchive_snapshot(
     file_sender: Sender<PathBuf>,
-    account_paths: Arc<Vec<PathBuf>>,
-    ledger_dir: Arc<PathBuf>,
-    thread_index: usize,
+    account_paths: Vec<PathBuf>,
+    ledger_dir: PathBuf,
+    snapshot_archive_path: PathBuf,
+    archive_format: ArchiveFormat,
 ) -> JoinHandle<Result<()>> {
     Builder::new()
-        .name(format!("solUnpkSnpsht{thread_index:02}"))
+        .name("solTarUnpack".to_string())
         .spawn(move || {
+            let decompressor = decompressed_tar_reader(archive_format, snapshot_archive_path)?;
             hardened_unpack::streaming_unpack_snapshot(
-                Archive::new(BytesChannelReader::new(chunks_receiver)),
+                Archive::new(decompressor),
                 ledger_dir.as_path(),
                 &account_paths,
                 &file_sender,
@@ -1686,70 +1680,20 @@ fn spawn_unpack_snapshot_thread(
         .unwrap()
 }
 
-/// Streams unpacked files across channel
-fn streaming_unarchive_snapshot(
-    file_sender: Sender<PathBuf>,
-    account_paths: Vec<PathBuf>,
-    ledger_dir: PathBuf,
-    snapshot_archive_path: PathBuf,
-    archive_format: ArchiveFormat,
-    num_threads: usize,
-) -> Vec<JoinHandle<Result<()>>> {
-    let account_paths = Arc::new(account_paths);
-    let ledger_dir = Arc::new(ledger_dir);
-
-    let mut handles = vec![];
-
-    let (chunk_sender, chunk_receiver) = crossbeam_channel::bounded(num_threads * 2);
-    handles.push(spawn_archive_chunker_thread(
-        snapshot_archive_path,
-        archive_format,
-        chunk_sender,
-    ));
-
-    for thread_index in 0..num_threads {
-        handles.push(spawn_unpack_snapshot_thread(
-            chunk_receiver.clone(),
-            file_sender.clone(),
-            account_paths.clone(),
-            ledger_dir.clone(),
-            thread_index,
-        ))
-    }
-
-    handles
-}
-
-fn archive_chunker_from_path(
-    archive_path: &Path,
+fn decompressed_tar_reader(
     archive_format: ArchiveFormat,
-) -> io::Result<ArchiveChunker<ArchiveFormatDecompressor<Box<dyn std::io::BufRead>>>> {
-    const INPUT_READER_BUF_SIZE: usize = 128 * 1024 * 1024;
-    let buf_reader = solana_accounts_db::large_file_buf_reader(archive_path, INPUT_READER_BUF_SIZE)
-        .map_err(|err| {
-            IoError::other(format!(
-                "failed to open snapshot archive '{}': {err}",
-                archive_path.display(),
-            ))
-        })?;
-    let decompressor = ArchiveFormatDecompressor::new(archive_format, buf_reader)?;
-    Ok(ArchiveChunker::new(decompressor))
-}
-
-fn spawn_archive_chunker_thread(
     archive_path: impl AsRef<Path>,
-    archive_format: ArchiveFormat,
-    chunk_sender: Sender<MultiBytes>,
-) -> JoinHandle<Result<()>> {
-    let archive_path = archive_path.as_ref().to_path_buf();
-    Builder::new()
-        .name("solTarDecompr".to_string())
-        .spawn(move || {
-            let chunker = archive_chunker_from_path(&archive_path, archive_format)?;
-            chunker.decode_and_send_chunks(chunk_sender)?;
-            Ok(())
-        })
-        .unwrap()
+) -> Result<ArchiveFormatDecompressor<Box<dyn BufRead + 'static>>> {
+    const INPUT_READER_BUF_SIZE: usize = 128 * 1024 * 1024;
+    let buf_reader =
+        solana_accounts_db::large_file_buf_reader(archive_path.as_ref(), INPUT_READER_BUF_SIZE)
+            .map_err(|err| {
+                io::Error::other(format!(
+                    "failed to open snapshot archive '{}': {err}",
+                    archive_path.as_ref().display(),
+                ))
+            })?;
+    Ok(ArchiveFormatDecompressor::new(archive_format, buf_reader)?)
 }
 
 /// Used to determine if a filename is structured like a version file, bank file, or storage file
@@ -1901,7 +1845,6 @@ fn unarchive_snapshot(
     measure_name: &'static str,
     account_paths: &[PathBuf],
     archive_format: ArchiveFormat,
-    num_untar_threads: usize,
     next_append_vec_id: Arc<AtomicAccountsFileId>,
     storage_access: StorageAccess,
 ) -> Result<UnarchivedSnapshot> {
@@ -1911,18 +1854,15 @@ fn unarchive_snapshot(
     let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
 
     let (file_sender, file_receiver) = crossbeam_channel::unbounded();
-    let unarchive_handles = streaming_unarchive_snapshot(
+    let unarchive_handle = streaming_unarchive_snapshot(
         file_sender,
         account_paths.to_vec(),
         unpack_dir.path().to_path_buf(),
         snapshot_archive_path.as_ref().to_path_buf(),
         archive_format,
-        num_untar_threads,
     );
 
-    let num_rebuilder_threads = num_cpus::get_physical()
-        .saturating_sub(num_untar_threads)
-        .max(1);
+    let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
     let snapshot_result = snapshot_fields_from_files(&file_receiver).and_then(
         |SnapshotFieldsBundle {
              snapshot_version,
@@ -1959,9 +1899,7 @@ fn unarchive_snapshot(
             })
         },
     );
-    for handle in unarchive_handles {
-        handle.join().unwrap()?;
-    }
+    unarchive_handle.join().unwrap()?;
     snapshot_result
 }
 
@@ -2465,24 +2403,9 @@ fn unpack_snapshot_local(
     num_threads: usize,
 ) -> Result<UnpackedAppendVecMap> {
     assert!(num_threads > 0);
-
-    let (chunk_sender, chunk_receiver) = crossbeam_channel::bounded(num_threads);
-    let handle = spawn_archive_chunker_thread(snapshot_path, archive_format, chunk_sender);
-
-    // create 'num_threads' # of parallel workers, each receiving chunks of archive to extract.
-    let all_unpacked_append_vec_map = (0..num_threads)
-        .into_par_iter()
-        .map(|_| {
-            let archive_subset = Archive::new(BytesChannelReader::new(chunk_receiver.clone()));
-            hardened_unpack::unpack_snapshot(archive_subset, ledger_dir, account_paths)
-        })
-        .collect::<Vec<_>>();
-    handle.join().unwrap()?;
-
-    let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
-    for h in all_unpacked_append_vec_map {
-        unpacked_append_vec_map.extend(h?);
-    }
+    let archive = Archive::new(decompressed_tar_reader(archive_format, snapshot_path)?);
+    let unpacked_append_vec_map =
+        hardened_unpack::unpack_snapshot(archive, ledger_dir, account_paths)?;
 
     Ok(unpacked_append_vec_map)
 }

+ 0 - 1
svm/examples/Cargo.lock

@@ -5325,7 +5325,6 @@ dependencies = [
  "solana-message",
  "solana-metrics",
  "solana-nohash-hasher",
- "solana-perf",
  "solana-pubkey",
  "solana-rayon-threadlimit",
  "solana-rent-collector",