Procházet zdrojové kódy

Move Deduper into a separate file (#29891)

steviez před 2 roky
rodič
revize
fa39bfef6b
5 změnil soubory, kde provedl 181 přidání a 166 odebrání
  1. 1 1
      core/src/sigverify_stage.rs
  2. 3 3
      perf/benches/dedup.rs
  3. 175 0
      perf/src/deduper.rs
  4. 1 0
      perf/src/lib.rs
  5. 1 162
      perf/src/sigverify.rs

+ 1 - 1
core/src/sigverify_stage.rs

@@ -12,10 +12,10 @@ use {
     itertools::Itertools,
     solana_measure::measure::Measure,
     solana_perf::{
+        deduper::Deduper,
         packet::{Packet, PacketBatch},
         sigverify::{
             count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches,
-            Deduper,
         },
     },
     solana_sdk::timing,

+ 3 - 3
perf/benches/dedup.rs

@@ -6,8 +6,8 @@ extern crate test;
 use {
     rand::prelude::*,
     solana_perf::{
+        deduper::Deduper,
         packet::{to_packet_batches, PacketBatch},
-        sigverify,
     },
     std::time::Duration,
     test::Bencher,
@@ -24,7 +24,7 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
 
 fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
     // verify packets
-    let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(2_000));
+    let mut deduper = Deduper::new(1_000_000, Duration::from_millis(2_000));
     bencher.iter(|| {
         let _ans = deduper.dedup_packets_and_count_discards(&mut batches, |_, _, _| ());
         deduper.reset();
@@ -112,7 +112,7 @@ fn bench_dedup_baseline(bencher: &mut Bencher) {
 #[bench]
 #[ignore]
 fn bench_dedup_reset(bencher: &mut Bencher) {
-    let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(0));
+    let mut deduper = Deduper::new(1_000_000, Duration::from_millis(0));
     bencher.iter(|| {
         deduper.reset();
     });

+ 175 - 0
perf/src/deduper.rs

@@ -0,0 +1,175 @@
+//! Utility to deduplicate baches of incoming network packets.
+
+use {
+    crate::packet::{Packet, PacketBatch},
+    ahash::AHasher,
+    rand::{thread_rng, Rng},
+    solana_sdk::saturating_add_assign,
+    std::{
+        convert::TryFrom,
+        hash::Hasher,
+        sync::atomic::{AtomicBool, AtomicU64, Ordering},
+        time::{Duration, Instant},
+    },
+};
+
+pub struct Deduper {
+    filter: Vec<AtomicU64>,
+    seed: (u128, u128),
+    age: Instant,
+    max_age: Duration,
+    pub saturated: AtomicBool,
+}
+
+impl Deduper {
+    pub fn new(size: u32, max_age: Duration) -> Self {
+        let mut filter: Vec<AtomicU64> = Vec::with_capacity(size as usize);
+        filter.resize_with(size as usize, Default::default);
+        let seed = thread_rng().gen();
+        Self {
+            filter,
+            seed,
+            age: Instant::now(),
+            max_age,
+            saturated: AtomicBool::new(false),
+        }
+    }
+
+    pub fn reset(&mut self) {
+        let now = Instant::now();
+        //this should reset every 500k unique packets per 1m sized deduper
+        //false positive rate is 1/1000 at that point
+        let saturated = self.saturated.load(Ordering::Relaxed);
+        if saturated || now.duration_since(self.age) > self.max_age {
+            let len = self.filter.len();
+            self.filter.clear();
+            self.filter.resize_with(len, AtomicU64::default);
+            self.seed = thread_rng().gen();
+            self.age = now;
+            self.saturated.store(false, Ordering::Relaxed);
+        }
+    }
+
+    /// Compute hash from packet data, returns (hash, bin_pos).
+    fn compute_hash(&self, packet: &Packet) -> (u64, usize) {
+        let mut hasher = AHasher::new_with_keys(self.seed.0, self.seed.1);
+        hasher.write(packet.data(..).unwrap_or_default());
+        let h = hasher.finish();
+        let len = self.filter.len();
+        let pos = (usize::try_from(h).unwrap()).wrapping_rem(len);
+        (h, pos)
+    }
+
+    // Deduplicates packets and returns 1 if packet is to be discarded. Else, 0.
+    fn dedup_packet(&self, packet: &mut Packet) -> u64 {
+        // If this packet was already marked as discard, drop it
+        if packet.meta().discard() {
+            return 1;
+        }
+        let (hash, pos) = self.compute_hash(packet);
+        // saturate each position with or
+        let prev = self.filter[pos].fetch_or(hash, Ordering::Relaxed);
+        if prev == u64::MAX {
+            self.saturated.store(true, Ordering::Relaxed);
+            //reset this value
+            self.filter[pos].store(hash, Ordering::Relaxed);
+        }
+        if hash == prev & hash {
+            packet.meta_mut().set_discard(true);
+            return 1;
+        }
+        0
+    }
+
+    pub fn dedup_packets_and_count_discards(
+        &self,
+        batches: &mut [PacketBatch],
+        mut process_received_packet: impl FnMut(&mut Packet, bool, bool),
+    ) -> u64 {
+        let mut num_removed: u64 = 0;
+        batches.iter_mut().for_each(|batch| {
+            batch.iter_mut().for_each(|p| {
+                let removed_before_sigverify = p.meta().discard();
+                let is_duplicate = self.dedup_packet(p);
+                if is_duplicate == 1 {
+                    saturating_add_assign!(num_removed, 1);
+                }
+                process_received_packet(p, removed_before_sigverify, is_duplicate == 1);
+            })
+        });
+        num_removed
+    }
+}
+
+#[cfg(test)]
+#[allow(clippy::integer_arithmetic)]
+mod tests {
+    use {
+        super::*,
+        crate::{packet::to_packet_batches, sigverify, test_tx::test_tx},
+    };
+
+    #[test]
+    fn test_dedup_same() {
+        let tx = test_tx();
+
+        let mut batches =
+            to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128);
+        let packet_count = sigverify::count_packets_in_batches(&batches);
+        let filter = Deduper::new(1_000_000, Duration::from_millis(0));
+        let mut num_deduped = 0;
+        let discard = filter.dedup_packets_and_count_discards(
+            &mut batches,
+            |_deduped_packet, _removed_before_sigverify_stage, _is_dup| {
+                num_deduped += 1;
+            },
+        ) as usize;
+        assert_eq!(num_deduped, discard + 1);
+        assert_eq!(packet_count, discard + 1);
+    }
+
+    #[test]
+    fn test_dedup_diff() {
+        let mut filter = Deduper::new(1_000_000, Duration::from_millis(0));
+        let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
+        let discard = filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
+        // because dedup uses a threadpool, there maybe up to N threads of txs that go through
+        assert_eq!(discard, 0);
+        filter.reset();
+        for i in filter.filter {
+            assert_eq!(i.load(Ordering::Relaxed), 0);
+        }
+    }
+
+    #[test]
+    #[ignore]
+    fn test_dedup_saturated() {
+        let filter = Deduper::new(1_000_000, Duration::from_millis(0));
+        let mut discard = 0;
+        assert!(!filter.saturated.load(Ordering::Relaxed));
+        for i in 0..1000 {
+            let mut batches =
+                to_packet_batches(&(0..1000).map(|_| test_tx()).collect::<Vec<_>>(), 128);
+            discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
+            trace!("{} {}", i, discard);
+            if filter.saturated.load(Ordering::Relaxed) {
+                break;
+            }
+        }
+        assert!(filter.saturated.load(Ordering::Relaxed));
+    }
+
+    #[test]
+    fn test_dedup_false_positive() {
+        let filter = Deduper::new(1_000_000, Duration::from_millis(0));
+        let mut discard = 0;
+        for i in 0..10 {
+            let mut batches =
+                to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
+            discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
+            debug!("false positive rate: {}/{}", discard, i * 1024);
+        }
+        //allow for 1 false positive even if extremely unlikely
+        assert!(discard < 2);
+    }
+}

+ 1 - 0
perf/src/lib.rs

@@ -1,5 +1,6 @@
 pub mod cuda_runtime;
 pub mod data_budget;
+pub mod deduper;
 pub mod discard;
 pub mod packet;
 pub mod perf_libs;

+ 1 - 162
perf/src/sigverify.rs

@@ -10,8 +10,6 @@ use {
         perf_libs,
         recycler::Recycler,
     },
-    ahash::AHasher,
-    rand::{thread_rng, Rng},
     rayon::{prelude::*, ThreadPool},
     solana_metrics::inc_new_counter_debug,
     solana_rayon_threadlimit::get_thread_count,
@@ -19,17 +17,10 @@ use {
         hash::Hash,
         message::{MESSAGE_HEADER_LENGTH, MESSAGE_VERSION_PREFIX},
         pubkey::Pubkey,
-        saturating_add_assign,
         short_vec::decode_shortu16_len,
         signature::Signature,
     },
-    std::{
-        convert::TryFrom,
-        hash::Hasher,
-        mem::size_of,
-        sync::atomic::{AtomicBool, AtomicU64, Ordering},
-        time::{Duration, Instant},
-    },
+    std::{convert::TryFrom, mem::size_of},
 };
 
 // Representing key tKeYE4wtowRb8yRroZShTipE18YVnqwXjsSAoNsFU6g
@@ -486,94 +477,6 @@ pub fn generate_offsets(
     )
 }
 
-pub struct Deduper {
-    filter: Vec<AtomicU64>,
-    seed: (u128, u128),
-    age: Instant,
-    max_age: Duration,
-    pub saturated: AtomicBool,
-}
-
-impl Deduper {
-    pub fn new(size: u32, max_age: Duration) -> Self {
-        let mut filter: Vec<AtomicU64> = Vec::with_capacity(size as usize);
-        filter.resize_with(size as usize, Default::default);
-        let seed = thread_rng().gen();
-        Self {
-            filter,
-            seed,
-            age: Instant::now(),
-            max_age,
-            saturated: AtomicBool::new(false),
-        }
-    }
-
-    pub fn reset(&mut self) {
-        let now = Instant::now();
-        //this should reset every 500k unique packets per 1m sized deduper
-        //false positive rate is 1/1000 at that point
-        let saturated = self.saturated.load(Ordering::Relaxed);
-        if saturated || now.duration_since(self.age) > self.max_age {
-            let len = self.filter.len();
-            self.filter.clear();
-            self.filter.resize_with(len, AtomicU64::default);
-            self.seed = thread_rng().gen();
-            self.age = now;
-            self.saturated.store(false, Ordering::Relaxed);
-        }
-    }
-
-    /// Compute hash from packet data, returns (hash, bin_pos).
-    fn compute_hash(&self, packet: &Packet) -> (u64, usize) {
-        let mut hasher = AHasher::new_with_keys(self.seed.0, self.seed.1);
-        hasher.write(packet.data(..).unwrap_or_default());
-        let h = hasher.finish();
-        let len = self.filter.len();
-        let pos = (usize::try_from(h).unwrap()).wrapping_rem(len);
-        (h, pos)
-    }
-
-    // Deduplicates packets and returns 1 if packet is to be discarded. Else, 0.
-    fn dedup_packet(&self, packet: &mut Packet) -> u64 {
-        // If this packet was already marked as discard, drop it
-        if packet.meta().discard() {
-            return 1;
-        }
-        let (hash, pos) = self.compute_hash(packet);
-        // saturate each position with or
-        let prev = self.filter[pos].fetch_or(hash, Ordering::Relaxed);
-        if prev == u64::MAX {
-            self.saturated.store(true, Ordering::Relaxed);
-            //reset this value
-            self.filter[pos].store(hash, Ordering::Relaxed);
-        }
-        if hash == prev & hash {
-            packet.meta_mut().set_discard(true);
-            return 1;
-        }
-        0
-    }
-
-    pub fn dedup_packets_and_count_discards(
-        &self,
-        batches: &mut [PacketBatch],
-        mut process_received_packet: impl FnMut(&mut Packet, bool, bool),
-    ) -> u64 {
-        let mut num_removed: u64 = 0;
-        batches.iter_mut().for_each(|batch| {
-            batch.iter_mut().for_each(|p| {
-                let removed_before_sigverify = p.meta().discard();
-                let is_duplicate = self.dedup_packet(p);
-                if is_duplicate == 1 {
-                    saturating_add_assign!(num_removed, 1);
-                }
-                process_received_packet(p, removed_before_sigverify, is_duplicate == 1);
-            })
-        });
-        num_removed
-    }
-}
-
 //inplace shrink a batch of packets
 pub fn shrink_batches(batches: &mut Vec<PacketBatch>) {
     let mut valid_batch_ix = 0;
@@ -1529,70 +1432,6 @@ mod tests {
         });
     }
 
-    #[test]
-    fn test_dedup_same() {
-        let tx = test_tx();
-
-        let mut batches =
-            to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128);
-        let packet_count = sigverify::count_packets_in_batches(&batches);
-        let filter = Deduper::new(1_000_000, Duration::from_millis(0));
-        let mut num_deduped = 0;
-        let discard = filter.dedup_packets_and_count_discards(
-            &mut batches,
-            |_deduped_packet, _removed_before_sigverify_stage, _is_dup| {
-                num_deduped += 1;
-            },
-        ) as usize;
-        assert_eq!(num_deduped, discard + 1);
-        assert_eq!(packet_count, discard + 1);
-    }
-
-    #[test]
-    fn test_dedup_diff() {
-        let mut filter = Deduper::new(1_000_000, Duration::from_millis(0));
-        let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
-        let discard = filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
-        // because dedup uses a threadpool, there maybe up to N threads of txs that go through
-        assert_eq!(discard, 0);
-        filter.reset();
-        for i in filter.filter {
-            assert_eq!(i.load(Ordering::Relaxed), 0);
-        }
-    }
-
-    #[test]
-    #[ignore]
-    fn test_dedup_saturated() {
-        let filter = Deduper::new(1_000_000, Duration::from_millis(0));
-        let mut discard = 0;
-        assert!(!filter.saturated.load(Ordering::Relaxed));
-        for i in 0..1000 {
-            let mut batches =
-                to_packet_batches(&(0..1000).map(|_| test_tx()).collect::<Vec<_>>(), 128);
-            discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
-            trace!("{} {}", i, discard);
-            if filter.saturated.load(Ordering::Relaxed) {
-                break;
-            }
-        }
-        assert!(filter.saturated.load(Ordering::Relaxed));
-    }
-
-    #[test]
-    fn test_dedup_false_positive() {
-        let filter = Deduper::new(1_000_000, Duration::from_millis(0));
-        let mut discard = 0;
-        for i in 0..10 {
-            let mut batches =
-                to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
-            discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
-            debug!("false positive rate: {}/{}", discard, i * 1024);
-        }
-        //allow for 1 false positive even if extremely unlikely
-        assert!(discard < 2);
-    }
-
     #[test]
     fn test_shrink_fuzz() {
         for _ in 0..5 {