Bläddra i källkod

Handle already discarded packets in gpu sigverify path (#22680)

sakridge 3 år sedan
förälder
incheckning
2e56c59bcb

+ 5 - 1
core/src/cluster_info_vote_listener.rs

@@ -297,7 +297,11 @@ impl ClusterInfoVoteListener {
         let mut packet_batches = packet::to_packet_batches(&votes, 1);
 
         // Votes should already be filtered by this point.
-        sigverify::ed25519_verify_cpu(&mut packet_batches, /*reject_non_vote=*/ false);
+        sigverify::ed25519_verify_cpu(
+            &mut packet_batches,
+            /*reject_non_vote=*/ false,
+            votes.len(),
+        );
         let root_bank = bank_forks.read().unwrap().root_bank();
         let epoch_schedule = root_bank.epoch_schedule();
         votes

+ 6 - 1
core/src/sigverify.rs

@@ -40,12 +40,17 @@ impl Default for TransactionSigVerifier {
 }
 
 impl SigVerifier for TransactionSigVerifier {
-    fn verify_batches(&self, mut batches: Vec<PacketBatch>) -> Vec<PacketBatch> {
+    fn verify_batches(
+        &self,
+        mut batches: Vec<PacketBatch>,
+        valid_packets: usize,
+    ) -> Vec<PacketBatch> {
         sigverify::ed25519_verify(
             &mut batches,
             &self.recycler,
             &self.recycler_out,
             self.reject_non_vote,
+            valid_packets,
         );
         batches
     }

+ 7 - 2
core/src/sigverify_shreds.rs

@@ -41,7 +41,11 @@ impl ShredSigVerifier {
 }
 
 impl SigVerifier for ShredSigVerifier {
-    fn verify_batches(&self, mut batches: Vec<PacketBatch>) -> Vec<PacketBatch> {
+    fn verify_batches(
+        &self,
+        mut batches: Vec<PacketBatch>,
+        _valid_packets: usize,
+    ) -> Vec<PacketBatch> {
         let r_bank = self.bank_forks.read().unwrap().working_bank();
         let slots: HashSet<u64> = Self::read_slots(&batches);
         let mut leader_slots: HashMap<u64, [u8; 32]> = slots
@@ -161,7 +165,8 @@ pub mod tests {
         batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
         batches[0].packets[1].meta.size = shred.payload.len();
 
-        let rv = verifier.verify_batches(batches);
+        let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches);
+        let rv = verifier.verify_batches(batches, num_packets);
         assert!(!rv[0].packets[0].meta.discard());
         assert!(rv[0].packets[1].meta.discard());
     }

+ 11 - 5
core/src/sigverify_stage.rs

@@ -40,7 +40,7 @@ pub struct SigVerifyStage {
 }
 
 pub trait SigVerifier {
-    fn verify_batches(&self, batches: Vec<PacketBatch>) -> Vec<PacketBatch>;
+    fn verify_batches(&self, batches: Vec<PacketBatch>, valid_packets: usize) -> Vec<PacketBatch>;
 }
 
 #[derive(Default, Clone)]
@@ -171,7 +171,11 @@ impl SigVerifierStats {
 }
 
 impl SigVerifier for DisabledSigVerifier {
-    fn verify_batches(&self, mut batches: Vec<PacketBatch>) -> Vec<PacketBatch> {
+    fn verify_batches(
+        &self,
+        mut batches: Vec<PacketBatch>,
+        _valid_packets: usize,
+    ) -> Vec<PacketBatch> {
         sigverify::ed25519_verify_disabled(&mut batches);
         batches
     }
@@ -235,14 +239,16 @@ impl SigVerifyStage {
         let num_unique = num_packets.saturating_sub(dedup_fail);
 
         let mut discard_time = Measure::start("sigverify_discard_time");
+        let mut num_valid_packets = num_unique;
         if num_unique > MAX_SIGVERIFY_BATCH {
-            Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH)
-        };
+            Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH);
+            num_valid_packets = MAX_SIGVERIFY_BATCH;
+        }
         let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH);
         discard_time.stop();
 
         let mut verify_batch_time = Measure::start("sigverify_batch_time");
-        let batches = verifier.verify_batches(batches);
+        let batches = verifier.verify_batches(batches, num_valid_packets);
         sendr.send(batches)?;
         verify_batch_time.stop();
 

+ 7 - 1
entry/src/entry.rs

@@ -35,7 +35,10 @@ use {
         cell::RefCell,
         cmp,
         ffi::OsStr,
-        sync::{Arc, Mutex, Once},
+        sync::{
+            atomic::{AtomicUsize, Ordering},
+            Arc, Mutex, Once,
+        },
         thread::{self, JoinHandle},
         time::Instant,
     },
@@ -497,11 +500,13 @@ pub fn start_verify_transactions(
                 })
                 .flatten()
                 .collect::<Vec<_>>();
+            let total_packets = AtomicUsize::new(0);
             let mut packet_batches = entry_txs
                 .par_iter()
                 .chunks(PACKETS_PER_BATCH)
                 .map(|slice| {
                     let vec_size = slice.len();
+                    total_packets.fetch_add(vec_size, Ordering::Relaxed);
                     let mut packet_batch = PacketBatch::new_with_recycler(
                         verify_recyclers.packet_recycler.clone(),
                         vec_size,
@@ -544,6 +549,7 @@ pub fn start_verify_transactions(
                     &tx_offset_recycler,
                     &out_recycler,
                     false,
+                    total_packets.load(Ordering::Relaxed),
                 );
                 let verified = packet_batches
                     .iter()

+ 66 - 4
perf/benches/sigverify.rs

@@ -3,22 +3,84 @@
 extern crate test;
 
 use {
-    solana_perf::{packet::to_packet_batches, recycler::Recycler, sigverify, test_tx::test_tx},
+    log::*,
+    rand::{thread_rng, Rng},
+    solana_perf::{
+        packet::{to_packet_batches, Packet, PacketBatch},
+        recycler::Recycler,
+        sigverify,
+        test_tx::{test_multisig_tx, test_tx},
+    },
     test::Bencher,
 };
 
+const NUM: usize = 256;
+
 #[bench]
-fn bench_sigverify(bencher: &mut Bencher) {
+fn bench_sigverify_simple(bencher: &mut Bencher) {
     let tx = test_tx();
+    let num_packets = NUM;
+
+    // generate packet vector
+    let mut batches = to_packet_batches(
+        &std::iter::repeat(tx).take(num_packets).collect::<Vec<_>>(),
+        128,
+    );
+
+    let recycler = Recycler::default();
+    let recycler_out = Recycler::default();
+    // verify packets
+    bencher.iter(|| {
+        let _ans =
+            sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets);
+    })
+}
+
+#[bench]
+#[ignore]
+fn bench_sigverify_uneven(bencher: &mut Bencher) {
+    solana_logger::setup();
+    let simple_tx = test_tx();
+    let multi_tx = test_multisig_tx();
+    let mut tx;
 
+    let num_packets = NUM * 50;
+    let mut num_valid = 0;
+    let mut current_packets = 0;
     // generate packet vector
-    let mut batches = to_packet_batches(&std::iter::repeat(tx).take(128).collect::<Vec<_>>(), 128);
+    let mut batches = vec![];
+    while current_packets < num_packets {
+        let mut len: usize = thread_rng().gen_range(1, 128);
+        current_packets += len;
+        if current_packets > num_packets {
+            len -= current_packets - num_packets;
+            current_packets = num_packets;
+        }
+        let mut batch = PacketBatch::with_capacity(len);
+        batch.packets.resize(len, Packet::default());
+        for packet in batch.packets.iter_mut() {
+            if thread_rng().gen_ratio(1, 2) {
+                tx = simple_tx.clone();
+            } else {
+                tx = multi_tx.clone();
+            };
+            Packet::populate_packet(packet, None, &tx).expect("serialize request");
+            if thread_rng().gen_ratio((num_packets - NUM) as u32, num_packets as u32) {
+                packet.meta.set_discard(true);
+            } else {
+                num_valid += 1;
+            }
+        }
+        batches.push(batch);
+    }
+    info!("num_packets: {} valid: {}", num_packets, num_valid);
 
     let recycler = Recycler::default();
     let recycler_out = Recycler::default();
     // verify packets
     bencher.iter(|| {
-        let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false);
+        let _ans =
+            sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets);
     })
 }
 

+ 10 - 10
perf/src/packet.rs

@@ -82,16 +82,16 @@ impl PacketBatch {
 }
 
 pub fn to_packet_batches<T: Serialize>(xs: &[T], chunks: usize) -> Vec<PacketBatch> {
-    let mut out = vec![];
-    for x in xs.chunks(chunks) {
-        let mut batch = PacketBatch::with_capacity(x.len());
-        batch.packets.resize(x.len(), Packet::default());
-        for (i, packet) in x.iter().zip(batch.packets.iter_mut()) {
-            Packet::populate_packet(packet, None, i).expect("serialize request");
-        }
-        out.push(batch);
-    }
-    out
+    xs.chunks(chunks)
+        .map(|x| {
+            let mut batch = PacketBatch::with_capacity(x.len());
+            batch.packets.resize(x.len(), Packet::default());
+            for (i, packet) in x.iter().zip(batch.packets.iter_mut()) {
+                Packet::populate_packet(packet, None, i).expect("serialize request");
+            }
+            batch
+        })
+        .collect()
 }
 
 #[cfg(test)]

+ 81 - 41
perf/src/sigverify.rs

@@ -385,41 +385,45 @@ pub fn generate_offsets(
     let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets");
     msg_sizes.set_pinnable();
     let mut current_offset: usize = 0;
-    let mut v_sig_lens = Vec::new();
-    batches.iter_mut().for_each(|batch| {
-        let mut sig_lens = Vec::new();
-        batch.packets.iter_mut().for_each(|packet| {
-            let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote);
-
-            sig_lens.push(packet_offsets.sig_len);
+    let offsets = batches
+        .iter_mut()
+        .map(|batch| {
+            batch
+                .packets
+                .iter_mut()
+                .map(|packet| {
+                    let packet_offsets =
+                        get_packet_offsets(packet, current_offset, reject_non_vote);
 
-            trace!("pubkey_offset: {}", packet_offsets.pubkey_start);
+                    trace!("pubkey_offset: {}", packet_offsets.pubkey_start);
 
-            let mut pubkey_offset = packet_offsets.pubkey_start;
-            let mut sig_offset = packet_offsets.sig_start;
-            let msg_size = current_offset.saturating_add(packet.meta.size) as u32;
-            for _ in 0..packet_offsets.sig_len {
-                signature_offsets.push(sig_offset);
-                sig_offset = sig_offset.saturating_add(size_of::<Signature>() as u32);
+                    let mut pubkey_offset = packet_offsets.pubkey_start;
+                    let mut sig_offset = packet_offsets.sig_start;
+                    let msg_size = current_offset.saturating_add(packet.meta.size) as u32;
+                    for _ in 0..packet_offsets.sig_len {
+                        signature_offsets.push(sig_offset);
+                        sig_offset = sig_offset.saturating_add(size_of::<Signature>() as u32);
 
-                pubkey_offsets.push(pubkey_offset);
-                pubkey_offset = pubkey_offset.saturating_add(size_of::<Pubkey>() as u32);
+                        pubkey_offsets.push(pubkey_offset);
+                        pubkey_offset = pubkey_offset.saturating_add(size_of::<Pubkey>() as u32);
 
-                msg_start_offsets.push(packet_offsets.msg_start);
+                        msg_start_offsets.push(packet_offsets.msg_start);
 
-                let msg_size = msg_size.saturating_sub(packet_offsets.msg_start);
-                msg_sizes.push(msg_size);
-            }
-            current_offset = current_offset.saturating_add(size_of::<Packet>());
-        });
-        v_sig_lens.push(sig_lens);
-    });
+                        let msg_size = msg_size.saturating_sub(packet_offsets.msg_start);
+                        msg_sizes.push(msg_size);
+                    }
+                    current_offset = current_offset.saturating_add(size_of::<Packet>());
+                    packet_offsets.sig_len
+                })
+                .collect()
+        })
+        .collect();
     (
         signature_offsets,
         pubkey_offsets,
         msg_start_offsets,
         msg_sizes,
-        v_sig_lens,
+        offsets,
     )
 }
 
@@ -492,9 +496,8 @@ impl Deduper {
     }
 }
 
-pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) {
+pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) {
     use rayon::prelude::*;
-    let packet_count = count_packets_in_batches(batches);
     debug!("CPU ECDSA for {}", packet_count);
     PAR_THREAD_POOL.install(|| {
         batches.into_par_iter().for_each(|batch| {
@@ -574,7 +577,9 @@ pub fn get_checked_scalar(scalar: &[u8; 32]) -> Result<[u8; 32], PacketError> {
 pub fn mark_disabled(batches: &mut [PacketBatch], r: &[Vec<u8>]) {
     for (batch, v) in batches.iter_mut().zip(r) {
         for (pkt, f) in batch.packets.iter_mut().zip(v) {
-            pkt.meta.set_discard(*f == 0);
+            if !pkt.meta.discard() {
+                pkt.meta.set_discard(*f == 0);
+            }
         }
     }
 }
@@ -584,29 +589,35 @@ pub fn ed25519_verify(
     recycler: &Recycler<TxOffset>,
     recycler_out: &Recycler<PinnedVec<u8>>,
     reject_non_vote: bool,
+    valid_packet_count: usize,
 ) {
     let api = perf_libs::api();
     if api.is_none() {
-        return ed25519_verify_cpu(batches, reject_non_vote);
+        return ed25519_verify_cpu(batches, reject_non_vote, valid_packet_count);
     }
     let api = api.unwrap();
 
     use crate::packet::PACKET_DATA_SIZE;
-    let packet_count = count_packets_in_batches(batches);
 
+    let total_packet_count = count_packets_in_batches(batches);
     // micro-benchmarks show GPU time for smallest batch around 15-20ms
     // and CPU speed for 64-128 sigverifies around 10-20ms. 64 is a nice
     // power-of-two number around that accounting for the fact that the CPU
     // may be busy doing other things while being a real validator
     // TODO: dynamically adjust this crossover
-    if packet_count < 64 {
-        return ed25519_verify_cpu(batches, reject_non_vote);
+    if valid_packet_count < 64
+        || 100usize
+            .wrapping_mul(valid_packet_count)
+            .wrapping_div(total_packet_count)
+            < 90
+    {
+        return ed25519_verify_cpu(batches, reject_non_vote, valid_packet_count);
     }
 
     let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) =
         generate_offsets(batches, recycler, reject_non_vote);
 
-    debug!("CUDA ECDSA for {}", packet_count);
+    debug!("CUDA ECDSA for {}", valid_packet_count);
     debug!("allocating out..");
     let mut out = recycler_out.allocate("out_buffer");
     out.set_pinnable();
@@ -619,8 +630,7 @@ pub fn ed25519_verify(
             elems: batch.packets.as_ptr(),
             num: batch.packets.len() as u32,
         });
-        let mut v = Vec::new();
-        v.resize(batch.packets.len(), 0);
+        let v = vec![0u8; batch.packets.len()];
         rvs.push(v);
         num_packets = num_packets.saturating_add(batch.packets.len());
     }
@@ -651,7 +661,7 @@ pub fn ed25519_verify(
     trace!("done verify");
     copy_return_values(&sig_lens, &out, &mut rvs);
     mark_disabled(batches, &rvs);
-    inc_new_counter_debug!("ed25519_verify_gpu", packet_count);
+    inc_new_counter_debug!("ed25519_verify_gpu", valid_packet_count);
 }
 
 #[cfg(test)]
@@ -704,6 +714,7 @@ mod tests {
         let mut batches: Vec<PacketBatch> = vec![batch];
         mark_disabled(&mut batches, &[vec![0]]);
         assert!(batches[0].packets[0].meta.discard());
+        batches[0].packets[0].meta.set_discard(false);
         mark_disabled(&mut batches, &[vec![1]]);
         assert!(!batches[0].packets[0].meta.discard());
     }
@@ -1005,6 +1016,29 @@ mod tests {
         );
     }
 
+    fn generate_packet_batches_random_size(
+        packet: &Packet,
+        max_packets_per_batch: usize,
+        num_batches: usize,
+    ) -> Vec<PacketBatch> {
+        // generate packet vector
+        let batches: Vec<_> = (0..num_batches)
+            .map(|_| {
+                let mut packet_batch = PacketBatch::default();
+                packet_batch.packets.resize(0, Packet::default());
+                let num_packets_per_batch = thread_rng().gen_range(1, max_packets_per_batch);
+                for _ in 0..num_packets_per_batch {
+                    packet_batch.packets.push(packet.clone());
+                }
+                assert_eq!(packet_batch.packets.len(), num_packets_per_batch);
+                packet_batch
+            })
+            .collect();
+        assert_eq!(batches.len(), num_batches);
+
+        batches
+    }
+
     fn generate_packet_batches(
         packet: &Packet,
         num_packets_per_batch: usize,
@@ -1052,7 +1086,8 @@ mod tests {
     fn ed25519_verify(batches: &mut [PacketBatch]) {
         let recycler = Recycler::default();
         let recycler_out = Recycler::default();
-        sigverify::ed25519_verify(batches, &recycler, &recycler_out, false);
+        let packet_count = sigverify::count_packets_in_batches(batches);
+        sigverify::ed25519_verify(batches, &recycler, &recycler_out, false, packet_count);
     }
 
     #[test]
@@ -1133,9 +1168,8 @@ mod tests {
         let recycler = Recycler::default();
         let recycler_out = Recycler::default();
         for _ in 0..50 {
-            let n = thread_rng().gen_range(1, 30);
             let num_batches = thread_rng().gen_range(2, 30);
-            let mut batches = generate_packet_batches(&packet, n, num_batches);
+            let mut batches = generate_packet_batches_random_size(&packet, 128, num_batches);
 
             let num_modifications = thread_rng().gen_range(0, 5);
             for _ in 0..num_modifications {
@@ -1147,11 +1181,17 @@ mod tests {
                     batches[batch].packets[packet].data[offset].wrapping_add(add);
             }
 
+            let batch_to_disable = thread_rng().gen_range(0, batches.len());
+            for p in batches[batch_to_disable].packets.iter_mut() {
+                p.meta.set_discard(true);
+            }
+
             // verify from GPU verification pipeline (when GPU verification is enabled) are
             // equivalent to the CPU verification pipeline.
             let mut batches_cpu = batches.clone();
-            sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false);
-            ed25519_verify_cpu(&mut batches_cpu, false);
+            let packet_count = sigverify::count_packets_in_batches(&batches);
+            sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, packet_count);
+            ed25519_verify_cpu(&mut batches_cpu, false, packet_count);
 
             // check result
             batches