Browse Source

Splits candidates between flush and evict (#9158)

* Splits candidates between flush and evict

* pr: tests check dirty/clean better
Brooks 1 day ago
parent
commit
c7b691570c
1 changed files with 175 additions and 134 deletions
  1. 175 134
      accounts-db/src/accounts_index/in_mem_accounts_index.rs

+ 175 - 134
accounts-db/src/accounts_index/in_mem_accounts_index.rs

@@ -864,15 +864,15 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
         }
     }
 
-    /// Collect possible evictions from `iter` by checking age
-    /// Filter as much as possible and capture dirty flag
+    /// Collect candidates to flush/evict from `iter` by checking age
     /// Skip entries with ref_count != 1 since they will be rejected later anyway
     fn gather_possible_evictions<'a>(
         iter: impl Iterator<Item = (&'a Pubkey, &'a Box<AccountMapEntry<T>>)>,
         current_age: Age,
         ages_flushing_now: Age,
-    ) -> Vec<(Pubkey, /*is_dirty*/ bool)> {
-        let mut possible_evictions = Vec::new();
+    ) -> (CandidatesToFlush, CandidatesToEvict) {
+        let mut candidates_to_flush = Vec::new();
+        let mut candidates_to_evict = Vec::new();
         for (k, v) in iter {
             if current_age.wrapping_sub(v.age()) > ages_flushing_now {
                 // not planning to evict this item from memory within 'ages_flushing_now' ages
@@ -886,22 +886,28 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
                 continue;
             }
 
-            possible_evictions.push((*k, v.dirty()));
+            if v.dirty() {
+                candidates_to_flush.push(*k);
+            } else {
+                candidates_to_evict.push(*k);
+            }
         }
-        possible_evictions
+        (
+            CandidatesToFlush(candidates_to_flush),
+            CandidatesToEvict(candidates_to_evict),
+        )
     }
 
     /// scan loop
     /// holds read lock
-    /// identifies items which are potential candidates to evict
-    /// Returns pubkeys whose age indicates they may be evicted now, pending further checks.
+    /// Returns candidates to flush/evict now, pending further checks.
     /// Entries with ref_count != 1 are filtered out during scan
     fn flush_scan(
         &self,
         current_age: Age,
         _flush_guard: &FlushGuard,
         ages_flushing_now: Age,
-    ) -> Vec<(Pubkey, /*is_dirty*/ bool)> {
+    ) -> (CandidatesToFlush, CandidatesToEvict) {
         let (possible_evictions, m) = {
             let map = self.map_internal.read().unwrap();
             let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
@@ -1075,120 +1081,107 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
 
         Self::update_stat(&self.stats().buckets_scanned, 1);
 
-        // scan in-mem map for items that we may evict
-        let evictions_age_possible = self.flush_scan(current_age, flush_guard, ages_flushing_now);
+        // scan in-mem map for candidates to flush/evict
+        let (candidates_to_flush, candidates_to_evict) =
+            self.flush_scan(current_age, flush_guard, ages_flushing_now);
 
-        if !evictions_age_possible.is_empty() {
-            // write to disk outside in-mem map read lock
-            let disk = self.bucket.as_ref().unwrap();
-            let mut flush_stats = DiskFlushStats::new();
-            // we don't care about lock time in this metric - bg threads can wait
-            let flush_update_measure = Measure::start("flush_update");
+        // write to disk outside in-mem map read lock
+        let disk = self.bucket.as_ref().unwrap();
+        let mut flush_stats = DiskFlushStats::new();
 
-            // Process each eviction candidate
-            // For dirty entries: lock map briefly, get entry, calculate disk value, release lock, then write to disk
-            // For clean entries: skip checks and pass to evict_from_cache
-            let evictions_age: Vec<_> = evictions_age_possible
-                .into_iter()
-                .filter_map(|(key, is_dirty)| {
-                    if !is_dirty {
-                        // Entry was not dirty at scan time and had ref_count == 1
-                        // Skip all checks (including should_evict_from_mem) and do not do any disk ops
-                        // Pass directly to evict_from_cache, which will re-check conditions under write lock
-                        Some(key)
-                    } else {
-                        // Entry was dirty at scan time, need to write to disk
-                        let lock_measure = Measure::start("flush_read_lock");
-                        let (disk_entry, disk_ref_count) = {
-                            let map_read_guard = self.map_internal.read().unwrap();
-                            let entry = map_read_guard.get(&key)?;
-
-                            let mut mse = Measure::start("flush_should_evict");
-                            let should_evict = self.should_evict_from_mem(
-                                current_age,
-                                entry,
-                                true,
-                                ages_flushing_now,
-                            );
-                            mse.stop();
-                            flush_stats.flush_should_evict_us += mse.as_us();
-
-                            if !should_evict {
-                                // not evicting, so don't write, even if dirty
-                                flush_stats.flush_read_lock_us += lock_measure.end_as_us();
-                                return None;
-                            }
-
-                            // Step 1: Clear the dirty flag
-                            // Step 2: Extract data and perform disk update outside the lock
-                            // Race condition handling: If a parallel operation dirties the item again after scanning,
-                            // then we will set_dirty(true) and skip the disk update. The dirty flag will ensure the
-                            // next flush picks up the item again. If the item becomes dirty during our disk write,
-                            // that's ok - the dirty flag will be picked up on the next flush and prevent us from
-                            // evicting the item from the cache.
-                            if !entry.clear_dirty() {
-                                // Entry was not dirty anymore, skip disk write
-                                flush_stats.flush_read_lock_us += lock_measure.end_as_us();
-                                return Some(key);
-                            }
-
-                            // Check the refcount before grabbing the slot list read lock
-                            let mut ref_count = entry.ref_count();
-                            if ref_count != 1 {
-                                entry.set_dirty(true);
-                                flush_stats.flush_read_lock_us += lock_measure.end_as_us();
-                                return None;
-                            }
-
-                            let slot_list = entry.slot_list_read_lock();
-                            ref_count = entry.ref_count(); // re-check ref count after grabbing slot list lock
-                            if ref_count != 1 || slot_list.len() != 1 {
-                                entry.set_dirty(true);
-                                flush_stats.flush_read_lock_us += lock_measure.end_as_us();
-                                return None;
-                            }
-
-                            // since we know slot_list.len() == 1, we can create a stack-allocated array for single element
-                            let (slot, info) = slot_list[0];
-                            let disk_entry = [(slot, info.into())];
-
-                            (disk_entry, ref_count)
-                        };
+        // Process each candidate to flush
+        // For each entry: lock map briefly, get entry, calculate disk value, release lock, then write to disk
+        let flush_update_measure = Measure::start("flush_update");
+        let flushed_keys_to_evict: Vec<_> = candidates_to_flush
+            .0
+            .into_iter()
+            .filter_map(|key| {
+                // Entry was dirty at scan time, need to write to disk
+                let lock_measure = Measure::start("flush_read_lock");
+                let (disk_entry, disk_ref_count) = {
+                    let map_read_guard = self.map_internal.read().unwrap();
+                    let entry = map_read_guard.get(&key)?;
+
+                    let mut mse = Measure::start("flush_should_evict");
+                    let should_evict =
+                        self.should_evict_from_mem(current_age, entry, true, ages_flushing_now);
+                    mse.stop();
+                    flush_stats.flush_should_evict_us += mse.as_us();
+
+                    if !should_evict {
+                        // not evicting, so don't write, even if dirty
+                        flush_stats.flush_read_lock_us += lock_measure.end_as_us();
+                        return None;
+                    }
 
+                    // Step 1: Clear the dirty flag
+                    // Step 2: Extract data and perform disk update outside the lock
+                    // Race condition handling: If a parallel operation dirties the item again after scanning,
+                    // then we will set_dirty(true) and skip the disk update. The dirty flag will ensure the
+                    // next flush picks up the item again. If the item becomes dirty during our disk write,
+                    // that's ok - the dirty flag will be picked up on the next flush and prevent us from
+                    // evicting the item from the cache.
+                    if !entry.clear_dirty() {
+                        // Entry was not dirty anymore, skip disk write
                         flush_stats.flush_read_lock_us += lock_measure.end_as_us();
+                        return Some(key);
+                    }
 
-                        // Now write to disk WITHOUT holding any locks
-                        // may have to loop if disk has to grow and we have to retry the write
-                        loop {
-                            let disk_resize =
-                                disk.try_write(&key, (&disk_entry, disk_ref_count.into()));
-                            match disk_resize {
-                                Ok(_) => {
-                                    // successfully written to disk
-                                    flush_stats.flush_entries_updated_on_disk += 1;
-                                    break;
-                                }
-                                Err(err) => {
-                                    // disk needs to resize. This item did not get written. Resize and try again.
-                                    let m = Measure::start("flush_grow");
-                                    disk.grow(err);
-                                    flush_stats.flush_grow_us += m.end_as_us();
-                                }
-                            }
-                        }
+                    // Check the refcount before grabbing the slot list read lock
+                    let mut ref_count = entry.ref_count();
+                    if ref_count != 1 {
+                        entry.set_dirty(true);
+                        flush_stats.flush_read_lock_us += lock_measure.end_as_us();
+                        return None;
+                    }
 
-                        Some(key)
+                    let slot_list = entry.slot_list_read_lock();
+                    ref_count = entry.ref_count(); // re-check ref count after grabbing slot list lock
+                    if ref_count != 1 || slot_list.len() != 1 {
+                        entry.set_dirty(true);
+                        flush_stats.flush_read_lock_us += lock_measure.end_as_us();
+                        return None;
                     }
-                })
-                .collect();
 
-            flush_stats.flush_update_us = flush_update_measure.end_as_us();
-            flush_stats.update_to_stats(self.stats());
+                    // since we know slot_list.len() == 1, we can create a stack-allocated array for single element
+                    let (slot, info) = slot_list[0];
+                    let disk_entry = [(slot, info.into())];
+
+                    (disk_entry, ref_count)
+                };
+
+                flush_stats.flush_read_lock_us += lock_measure.end_as_us();
+
+                // Now write to disk WITHOUT holding any locks
+                // may have to loop if disk has to grow and we have to retry the write
+                loop {
+                    let disk_resize = disk.try_write(&key, (&disk_entry, disk_ref_count.into()));
+                    match disk_resize {
+                        Ok(_) => {
+                            // successfully written to disk
+                            flush_stats.flush_entries_updated_on_disk += 1;
+                            break;
+                        }
+                        Err(err) => {
+                            // disk needs to resize. This item did not get written. Resize and try again.
+                            let m = Measure::start("flush_grow");
+                            disk.grow(err);
+                            flush_stats.flush_grow_us += m.end_as_us();
+                        }
+                    }
+                }
+
+                Some(key)
+            })
+            .collect();
+        flush_stats.flush_update_us = flush_update_measure.end_as_us();
+        flush_stats.update_to_stats(self.stats());
+
+        let m = Measure::start("flush_evict");
+        self.evict_from_cache(&flushed_keys_to_evict, current_age, ages_flushing_now);
+        self.evict_from_cache(&candidates_to_evict.0, current_age, ages_flushing_now);
+        Self::update_time_stat(&self.stats().flush_evict_us, m);
 
-            let m = Measure::start("flush_evict");
-            self.evict_from_cache(evictions_age, current_age, ages_flushing_now);
-            Self::update_time_stat(&self.stats().flush_evict_us, m);
-        }
         if iterate_for_age {
             // completed iteration of the buckets at the current age
             assert_eq!(current_age, self.storage.current_age());
@@ -1197,7 +1190,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
     }
 
     // evict keys in 'evictions' from in-mem cache, likely due to age
-    fn evict_from_cache(&self, evictions: Vec<Pubkey>, current_age: Age, ages_flushing_now: Age) {
+    fn evict_from_cache(&self, evictions: &[Pubkey], current_age: Age, ages_flushing_now: Age) {
         if evictions.is_empty() {
             return;
         }
@@ -1327,6 +1320,18 @@ impl Drop for FlushGuard<'_> {
     }
 }
 
+/// Candidates in the in-mem index that may be flushed to disk, pending further checks.
+///
+/// Note, entries must be 'dirty' to be a candidate for flush.
+#[derive(Debug)]
+struct CandidatesToFlush(Vec<Pubkey>);
+
+/// Candidates in the in-mem index that may be evicted, pending further checks.
+///
+/// Note, entries must be 'clean' to be a candidate for eviction.
+#[derive(Debug)]
+struct CandidatesToEvict(Vec<Pubkey>);
+
 #[cfg(test)]
 mod tests {
     use {
@@ -1667,31 +1672,48 @@ mod tests {
 
     #[test]
     fn test_gather_possible_evictions() {
-        agave_logger::setup();
+        const AGE_MAX: Age = 255;
         let ref_count = 1;
-        let map: HashMap<_, _> = (0..=255)
+        // The values in the slot list elements do not matter.
+        // They are different so we can distinguish between 'dirty' and 'clean' for the test.
+        let slot_list_dirty = [(0xD1, 0xD2)];
+        let slot_list_clean = [(0xC3, 0xC4)];
+        let map_dirty: HashMap<_, _> = (0..=AGE_MAX)
+            .map(|age| {
+                let entry = Box::new(AccountMapEntry::new(
+                    SlotList::from(slot_list_dirty),
+                    ref_count,
+                    AccountMapEntryMeta::default(),
+                ));
+                entry.set_dirty(true);
+                entry.set_age(age);
+                (Pubkey::new_unique(), entry)
+            })
+            .collect();
+        let map_clean: HashMap<_, _> = (0..=AGE_MAX)
             .map(|age| {
-                let pk = Pubkey::from([age; 32]);
-                let one_element_slot_list = SlotList::from([(0, 0)]);
-                let one_element_slot_list_entry = Box::new(AccountMapEntry::new(
-                    one_element_slot_list,
+                let entry = Box::new(AccountMapEntry::new(
+                    SlotList::from(slot_list_clean),
                     ref_count,
                     AccountMapEntryMeta::default(),
                 ));
-                one_element_slot_list_entry.set_age(age);
-                (pk, one_element_slot_list_entry)
+                entry.set_dirty(false);
+                entry.set_age(age);
+                (Pubkey::new_unique(), entry)
             })
             .collect();
 
-        for current_age in 0..=255 {
-            for ages_flushing_now in 0..=255 {
-                let possible_evictions = InMemAccountsIndex::<u64, u64>::gather_possible_evictions(
-                    map.iter(),
-                    current_age,
-                    ages_flushing_now,
-                );
+        for current_age in 0..=AGE_MAX {
+            for ages_flushing_now in 0..=AGE_MAX {
+                let (candidates_to_flush, candidates_to_evict) =
+                    InMemAccountsIndex::<u64, u64>::gather_possible_evictions(
+                        map_dirty.iter().chain(&map_clean),
+                        current_age,
+                        ages_flushing_now,
+                    );
                 // Verify that the number of entries selected for eviction matches the expected count.
-                // Test setup: map contains 256 entries with ages 0-255 (one entry per age value).
+                // Test setup: map contains 256 dirty entries and 256 clean entries.
+                // Each with ages 0-255 (one entry per age value).
                 //
                 // gather_possible_evictions includes entries where:
                 //   current_age.wrapping_sub(entry.age) <= ages_flushing_now
@@ -1702,9 +1724,28 @@ mod tests {
                 // The window size is (ages_flushing_now + 1) because both endpoints are inclusive.
                 //
                 // Example: If current_age=10 and ages_flushing_now=3, we select ages 7,8,9,10 = 4 entries.
-                assert_eq!(possible_evictions.len(), 1 + ages_flushing_now as usize);
-                possible_evictions.iter().for_each(|(key, _is_dirty)| {
-                    let entry = map.get(key).unwrap();
+                assert_eq!(candidates_to_flush.0.len(), 1 + ages_flushing_now as usize);
+                assert_eq!(candidates_to_evict.0.len(), 1 + ages_flushing_now as usize);
+                candidates_to_flush.0.iter().for_each(|key| {
+                    let entry = map_dirty.get(key).unwrap();
+                    assert!(entry.dirty());
+                    assert_eq!(*entry.slot_list_read_lock(), slot_list_dirty);
+                    assert!(
+                        InMemAccountsIndex::<u64, u64>::should_evict_based_on_age(
+                            current_age,
+                            entry,
+                            ages_flushing_now,
+                        ),
+                        "current_age: {}, age: {}, ages_flushing_now: {}",
+                        current_age,
+                        entry.age(),
+                        ages_flushing_now
+                    );
+                });
+                candidates_to_evict.0.iter().for_each(|key| {
+                    let entry = map_clean.get(key).unwrap();
+                    assert!(!entry.dirty());
+                    assert_eq!(*entry.slot_list_read_lock(), slot_list_clean);
                     assert!(
                         InMemAccountsIndex::<u64, u64>::should_evict_based_on_age(
                             current_age,