Explorar o código

ledger: Check whether the shred is resigned before calling `resign_shred` (#6735)

Currently, Turbine is calling `get_shred_mut` and `resign_shred` on all
packets received through the `shred_fetch_receiver` channel. That
imposes a requirement for all packets to be mutable.

`BytesPacket`, which is used in QUIC servers, and is going to be
gradually introduced in other servers as well, is immutable (it uses
`Bytes`, not `BytesMut`). Therefore, mutating it requires making a copy.

`get_shred_mut` is throwing an error if it encounters
`PacketRefMut::Bytes`.

To fix these issues:

* Check whether the shred is resigned before expecting mutability of the
  packet.
  * If the shred contained in `BytesPacket` is resigned, make a copy.
    Resigned packets are a minority among all turbine packets. For
    50mbps coming to turbine, only around 2mbps are resigned.
* Change `get_shred_mut` to take a buffer, not a packet, to get rid
  of erroneus expectation for all packet types to be mutable. Turbine
  is the only caller, let it do the match statement over all packet
  types.

The change is non-functional on its own, but it's a prerequisite for
making Turbine networking stack zero-copy (attempted in #6309).
Michal Rostecki hai 3 meses
pai
achega
9793f17004

+ 1 - 1
ledger/Cargo.toml

@@ -17,7 +17,7 @@ crate-type = ["lib"]
 name = "solana_ledger"
 
 [features]
-dev-context-only-utils = []
+dev-context-only-utils = ["solana-perf/dev-context-only-utils"]
 frozen-abi = [
     "dep:solana-frozen-abi",
     "dep:solana-frozen-abi-macro",

+ 3 - 11
ledger/src/shred.rs

@@ -56,7 +56,7 @@ pub(crate) use self::{
     payload::serde_bytes_payload,
 };
 #[cfg(any(test, feature = "dev-context-only-utils"))]
-use solana_perf::packet::{bytes::Bytes, BytesPacket, Meta, Packet};
+use solana_perf::packet::Packet;
 pub use {
     self::{
         payload::Payload,
@@ -201,6 +201,8 @@ pub enum Error {
     InvalidShredType,
     #[error("Invalid shred variant")]
     InvalidShredVariant,
+    #[error("Invalid packet size, could not get the shred")]
+    InvalidPacketSize,
     #[error(transparent)]
     IoError(#[from] std::io::Error),
     #[error("Unknown proof size")]
@@ -419,16 +421,6 @@ impl Shred {
         packet.meta_mut().size = size;
     }
 
-    #[cfg(any(test, feature = "dev-context-only-utils"))]
-    pub fn to_packet(&self) -> BytesPacket {
-        let buffer: &[u8] = match self.payload() {
-            Payload::Shared(bytes) => bytes.as_ref(),
-            Payload::Unique(bytes) => bytes.as_ref(),
-        };
-        let buffer = Bytes::copy_from_slice(buffer);
-        BytesPacket::new(buffer, Meta::default())
-    }
-
     // TODO: Should this sanitize output?
     pub fn new_from_data(
         slot: Slot,

+ 43 - 0
ledger/src/shred/payload.rs

@@ -2,6 +2,15 @@ use std::{
     ops::{Deref, DerefMut},
     sync::Arc,
 };
+#[cfg(any(test, feature = "dev-context-only-utils"))]
+use {
+    crate::shred::Nonce,
+    solana_perf::packet::{
+        bytes::{BufMut, BytesMut},
+        BytesPacket, Meta, Packet,
+    },
+    std::mem,
+};
 
 #[derive(Clone, Debug, Eq)]
 pub enum Payload {
@@ -60,6 +69,40 @@ impl Payload {
     }
 }
 
+#[cfg(any(test, feature = "dev-context-only-utils"))]
+impl Payload {
+    pub fn copy_to_packet(&self, packet: &mut Packet) {
+        let size = self.len();
+        packet.buffer_mut()[..size].copy_from_slice(&self[..]);
+        packet.meta_mut().size = size;
+    }
+
+    pub fn to_packet(&self, nonce: Option<Nonce>) -> Packet {
+        let mut packet = Packet::default();
+        let size = self.len();
+        packet.buffer_mut()[..size].copy_from_slice(self);
+        let size = if let Some(nonce) = nonce {
+            let full_size = size + mem::size_of::<Nonce>();
+            packet.buffer_mut()[size..full_size].copy_from_slice(&nonce.to_le_bytes());
+            full_size
+        } else {
+            size
+        };
+        packet.meta_mut().size = size;
+        packet
+    }
+
+    pub fn to_bytes_packet(&self, nonce: Option<Nonce>) -> BytesPacket {
+        let cap = self.len() + nonce.map(|_| mem::size_of::<Nonce>()).unwrap_or(0);
+        let mut buffer = BytesMut::with_capacity(cap);
+        buffer.put_slice(&self[..]);
+        if let Some(nonce) = nonce {
+            buffer.put_u32(nonce);
+        }
+        BytesPacket::new(buffer.freeze(), Meta::default())
+    }
+}
+
 pub(crate) mod serde_bytes_payload {
     use {
         super::Payload,

+ 91 - 54
ledger/src/shred/wire.rs

@@ -43,17 +43,8 @@ where
 }
 
 #[inline]
-pub fn get_shred_mut<'a>(packet: &'a mut PacketRefMut) -> Option<&'a mut [u8]> {
-    // This function is used only in turbine for re-signing shreds.
-    match packet {
-        // Currently, turbine uses only `Packet`, which allows mutability.
-        PacketRefMut::Packet(packet) => {
-            let buffer = packet.buffer_mut();
-            buffer.get_mut(..get_shred_size(buffer)?)
-        }
-        // `BytesPacket` is immutable, but not used in turbine.
-        PacketRefMut::Bytes(_) => unreachable!("`BytesPacket` is not used in turbine"),
-    }
+pub fn get_shred_mut(buffer: &mut [u8]) -> Option<&mut [u8]> {
+    buffer.get_mut(..get_shred_size(buffer)?)
 }
 
 #[inline]
@@ -303,7 +294,7 @@ pub fn get_retransmitter_signature(shred: &[u8]) -> Result<Signature, Error> {
     Ok(Signature::from(<[u8; 64]>::try_from(bytes).unwrap()))
 }
 
-pub(crate) fn is_retransmitter_signed_variant(shred: &[u8]) -> Result<bool, Error> {
+pub fn is_retransmitter_signed_variant(shred: &[u8]) -> Result<bool, Error> {
     match get_shred_variant(shred)? {
         ShredVariant::LegacyCode | ShredVariant::LegacyData => Ok(false),
         ShredVariant::MerkleCode {
@@ -328,6 +319,34 @@ pub fn set_retransmitter_signature(shred: &mut [u8], signature: &Signature) -> R
     Ok(())
 }
 
+/// Resigns the packet's Merkle root as the retransmitter node in the
+/// Turbine broadcast tree. This signature is in addition to leader's
+/// signature which is left intact.
+pub fn resign_packet(packet: &mut PacketRefMut, keypair: &Keypair) -> Result<(), Error> {
+    match packet {
+        PacketRefMut::Packet(packet) => {
+            let shred = get_shred_mut(packet.buffer_mut()).ok_or(Error::InvalidPacketSize)?;
+            resign_shred(shred, keypair)
+        }
+        // `Bytes` are immutable. Therefore, to resign the shred from
+        // `BytesPacket`, we need to copy the packet's buffer, then modify that
+        // copy and assign it to the packet.
+        // We resign only the last FEC set in the block. For 50mbps coming to
+        // turbine, only around 2mbps are resigned. For now, we accept the
+        // necessity of copying that minority of packets.
+        PacketRefMut::Bytes(packet) => {
+            let mut buffer = packet.buffer().to_vec();
+            let shred = get_shred_mut(&mut buffer).ok_or(Error::InvalidPacketSize)?;
+
+            resign_shred(shred, keypair)?;
+
+            packet.set_buffer(buffer);
+
+            Ok(())
+        }
+    }
+}
+
 /// Resigns the shred's Merkle root as the retransmitter node in the
 /// Turbine broadcast tree. This signature is in addition to leader's
 /// signature which is left intact.
@@ -446,7 +465,6 @@ mod tests {
         assert_matches::assert_matches,
         rand::Rng,
         solana_perf::packet::PacketFlags,
-        std::io::{Cursor, Write},
         test_case::test_matrix,
     };
 
@@ -456,34 +474,61 @@ mod tests {
         Signature::from(signature)
     }
 
-    fn write_shred<R: Rng>(
-        rng: &mut R,
-        shred: impl AsRef<[u8]>,
-        nonce: Option<Nonce>,
-        packet: &mut Packet,
-    ) {
-        let buffer = packet.buffer_mut();
-        let capacity = buffer.len();
-        let mut cursor = Cursor::new(buffer);
-        cursor.write_all(shred.as_ref()).unwrap();
-        // Write some random many bytes trailing shred payload.
-        let mut bytes = {
-            let size = capacity
-                - cursor.position() as usize
-                - if nonce.is_some() {
-                    std::mem::size_of::<Nonce>()
-                } else {
-                    0
-                };
-            vec![0u8; rng.gen_range(0..=size)]
-        };
-        rng.fill(&mut bytes[..]);
-        cursor.write_all(&bytes).unwrap();
-        // Write nonce after random trailing bytes.
-        if let Some(nonce) = nonce {
-            cursor.write_all(&nonce.to_le_bytes()).unwrap();
+    #[test_matrix(
+        [true, false],
+        [true, false],
+        [true, false]
+    )]
+    fn test_resign_packet(repaired: bool, chained: bool, is_last_in_slot: bool) {
+        let mut rng = rand::thread_rng();
+        let slot = 318_230_963 + rng.gen_range(0..318_230_963);
+        let data_size = 1200 * rng.gen_range(32..64);
+        let mut shreds =
+            make_merkle_shreds_for_tests(&mut rng, slot, data_size, chained, is_last_in_slot)
+                .unwrap();
+        for shred in shreds.iter_mut() {
+            let keypair = Keypair::new();
+            let signature = make_dummy_signature(&mut rng);
+            let nonce = repaired.then(|| rng.gen::<Nonce>());
+            if chained && is_last_in_slot {
+                shred.set_retransmitter_signature(&signature).unwrap();
+
+                let packet = &mut shred.payload().to_packet(nonce);
+                if repaired {
+                    packet.meta_mut().flags |= PacketFlags::REPAIR;
+                }
+                resign_packet(&mut packet.into(), &keypair).unwrap();
+
+                let packet = &mut shred.payload().to_bytes_packet(nonce);
+                if repaired {
+                    packet.meta_mut().flags |= PacketFlags::REPAIR;
+                }
+                resign_packet(&mut packet.as_mut(), &keypair).unwrap();
+            } else {
+                assert_matches!(
+                    shred.set_retransmitter_signature(&signature),
+                    Err(Error::InvalidShredVariant)
+                );
+
+                let packet = &mut shred.payload().to_packet(nonce);
+                if repaired {
+                    packet.meta_mut().flags |= PacketFlags::REPAIR;
+                }
+                assert_matches!(
+                    resign_packet(&mut packet.into(), &keypair),
+                    Err(Error::InvalidShredVariant)
+                );
+
+                let packet = &mut shred.payload().to_bytes_packet(nonce);
+                if repaired {
+                    packet.meta_mut().flags |= PacketFlags::REPAIR;
+                }
+                assert_matches!(
+                    resign_packet(&mut packet.as_mut(), &keypair),
+                    Err(Error::InvalidShredVariant)
+                );
+            }
         }
-        packet.meta_mut().size = usize::try_from(cursor.position()).unwrap();
     }
 
     #[test_matrix(
@@ -510,30 +555,22 @@ mod tests {
             }
         }
         for shred in &shreds {
-            let mut packet = Packet::default();
+            let nonce = repaired.then(|| rng.gen::<Nonce>());
+            let mut packet = shred.payload().to_packet(nonce);
             if repaired {
                 packet.meta_mut().flags |= PacketFlags::REPAIR;
             }
-            let nonce = repaired.then(|| rng.gen::<Nonce>());
-            write_shred(&mut rng, shred.payload(), nonce, &mut packet);
-            let mut packet = PacketRefMut::Packet(&mut packet);
+            let packet = PacketRef::Packet(&packet);
             assert_eq!(
                 packet.data(..).map(get_shred_size).unwrap().unwrap(),
                 shred.payload().len()
             );
+            let bytes = get_shred(packet).unwrap();
+            assert_eq!(bytes, shred.payload().as_ref());
             assert_eq!(
-                get_shred(packet.as_ref()).unwrap(),
-                shred.payload().as_ref()
-            );
-            assert_eq!(
-                get_shred_mut(&mut packet).unwrap(),
-                shred.payload().as_ref(),
-            );
-            assert_eq!(
-                get_shred_and_repair_nonce(packet.as_ref()).unwrap(),
+                get_shred_and_repair_nonce(packet).unwrap(),
                 (shred.payload().as_ref(), nonce),
             );
-            let bytes = get_shred(packet.as_ref()).unwrap();
             let shred_common_header = shred.common_header();
             assert_eq!(
                 get_common_header_bytes(bytes).unwrap(),

+ 14 - 1
perf/src/packet.rs

@@ -50,7 +50,8 @@ impl BytesPacket {
     }
 
     #[cfg(feature = "dev-context-only-utils")]
-    pub fn from_bytes(dest: Option<&SocketAddr>, buffer: Bytes) -> Self {
+    pub fn from_bytes(dest: Option<&SocketAddr>, buffer: impl Into<Bytes>) -> Self {
+        let buffer = buffer.into();
         let mut meta = Meta {
             size: buffer.len(),
             ..Default::default()
@@ -133,6 +134,18 @@ impl BytesPacket {
     pub fn as_mut(&mut self) -> PacketRefMut<'_> {
         PacketRefMut::Bytes(self)
     }
+
+    #[inline]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }
+
+    #[inline]
+    pub fn set_buffer(&mut self, buffer: impl Into<Bytes>) {
+        let buffer = buffer.into();
+        self.meta.size = buffer.len();
+        self.buffer = buffer;
+    }
 }
 
 #[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]

+ 2 - 0
turbine/Cargo.toml

@@ -63,8 +63,10 @@ assert_matches = { workspace = true }
 bencher = { workspace = true }
 bs58 = { workspace = true }
 solana-genesis-config = { workspace = true }
+solana-ledger = { workspace = true, features = ["dev-context-only-utils"] }
 solana-logger = { workspace = true }
 solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
+solana-signature = { workspace = true, features = ["rand"] }
 solana-transaction = { workspace = true }
 test-case = { workspace = true }
 

+ 235 - 53
turbine/src/sigverify_shreds.rs

@@ -12,10 +12,19 @@ use {
     solana_keypair::Keypair,
     solana_ledger::{
         leader_schedule_cache::LeaderScheduleCache,
-        shred,
+        shred::{
+            self,
+            layout::{get_shred, resign_packet},
+            wire::is_retransmitter_signed_variant,
+        },
         sigverify_shreds::{verify_shreds_gpu, LruCache},
     },
-    solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
+    solana_perf::{
+        self,
+        deduper::Deduper,
+        packet::{PacketBatch, PacketRefMut},
+        recycler_cache::RecyclerCache,
+    },
     solana_pubkey::Pubkey,
     solana_runtime::{bank::Bank, bank_forks::BankForks},
     solana_signer::Signer,
@@ -30,6 +39,7 @@ use {
         thread::{Builder, JoinHandle},
         time::{Duration, Instant},
     },
+    thiserror::Error,
 };
 
 // 34MB where each cache entry is 136 bytes.
@@ -51,12 +61,20 @@ const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(30);
 const SIGVERIFY_SHRED_BATCH_SIZE: usize = 1024;
 
 #[allow(clippy::enum_variant_names)]
-enum Error {
+enum ShredSigverifyError {
     RecvDisconnected,
     RecvTimeout,
     SendError,
 }
 
+#[derive(Debug, Error)]
+enum ResignError {
+    #[error("verification of retransmitter signature failed")]
+    VerifyRetransmitterSignature,
+    #[error(transparent)]
+    Shred(#[from] shred::Error),
+}
+
 pub fn spawn_shred_sigverify(
     cluster_info: Arc<ClusterInfo>,
     bank_forks: Arc<RwLock<BankForks>>,
@@ -106,9 +124,9 @@ pub fn spawn_shred_sigverify(
                 &mut shred_buffer,
             ) {
                 Ok(()) => (),
-                Err(Error::RecvTimeout) => (),
-                Err(Error::RecvDisconnected) => break,
-                Err(Error::SendError) => break,
+                Err(ShredSigverifyError::RecvTimeout) => (),
+                Err(ShredSigverifyError::RecvDisconnected) => break,
+                Err(ShredSigverifyError::SendError) => break,
             }
             stats.maybe_submit();
         }
@@ -135,7 +153,7 @@ fn run_shred_sigverify<const K: usize>(
     cache: &RwLock<LruCache>,
     stats: &mut ShredSigVerifyStats,
     shred_buffer: &mut Vec<PacketBatch>,
-) -> Result<(), Error> {
+) -> Result<(), ShredSigverifyError> {
     const RECV_TIMEOUT: Duration = Duration::from_secs(1);
     let packets = shred_fetch_receiver.recv_timeout(RECV_TIMEOUT)?;
     stats.num_packets += packets.len();
@@ -202,48 +220,18 @@ fn run_shred_sigverify<const K: usize>(
             .flatten()
             .filter(|packet| !packet.meta().discard())
             .for_each(|mut packet| {
-                let repair = packet.meta().repair();
-                let Some(shred) = shred::layout::get_shred_mut(&mut packet) else {
-                    packet.meta_mut().set_discard(true);
-                    return;
-                };
-                // Repair packets do not follow turbine tree and
-                // are verified using the trailing nonce.
-                if !repair
-                    && !verify_retransmitter_signature(
-                        shred,
-                        &root_bank,
-                        &working_bank,
-                        cluster_info,
-                        leader_schedule_cache,
-                        cluster_nodes_cache,
-                        stats,
-                    )
+                if maybe_verify_and_resign_packet(
+                    &mut packet,
+                    &root_bank,
+                    &working_bank,
+                    cluster_info,
+                    leader_schedule_cache,
+                    cluster_nodes_cache,
+                    stats,
+                    keypair,
+                )
+                .is_err()
                 {
-                    stats
-                        .num_invalid_retransmitter
-                        .fetch_add(1, Ordering::Relaxed);
-                    if shred::layout::get_slot(shred)
-                        .map(|slot| {
-                            check_feature_activation(
-                                &feature_set::verify_retransmitter_signature::id(),
-                                slot,
-                                &root_bank,
-                            )
-                        })
-                        .unwrap_or_default()
-                    {
-                        packet.meta_mut().set_discard(true);
-                        return;
-                    }
-                }
-                // We can ignore Error::InvalidShredVariant because that
-                // basically means that the shred is of a variant which
-                // cannot be signed by the retransmitter node.
-                if !matches!(
-                    shred::layout::resign_shred(shred, keypair),
-                    Ok(()) | Err(shred::Error::InvalidShredVariant)
-                ) {
                     packet.meta_mut().set_discard(true);
                 }
             })
@@ -292,6 +280,58 @@ fn run_shred_sigverify<const K: usize>(
     Ok(())
 }
 
+/// Checks whether the shred in the given `packet` is of resigned variant. If
+/// yes, it calls [`verify_and_resign_shred`].
+fn maybe_verify_and_resign_packet(
+    packet: &mut PacketRefMut,
+    root_bank: &Bank,
+    working_bank: &Bank,
+    cluster_info: &ClusterInfo,
+    leader_schedule_cache: &LeaderScheduleCache,
+    cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
+    stats: &ShredSigVerifyStats,
+    keypair: &Keypair,
+) -> Result<(), ResignError> {
+    let repair = packet.meta().repair();
+    let shred = get_shred(packet.as_ref()).ok_or(shred::Error::InvalidPacketSize)?;
+    let is_signed = is_retransmitter_signed_variant(shred)?;
+    if is_signed {
+        // Repair packets do not follow turbine tree and
+        // are verified using the trailing nonce.
+        if !repair
+            && !verify_retransmitter_signature(
+                shred,
+                root_bank,
+                working_bank,
+                cluster_info,
+                leader_schedule_cache,
+                cluster_nodes_cache,
+                stats,
+            )
+        {
+            stats
+                .num_invalid_retransmitter
+                .fetch_add(1, Ordering::Relaxed);
+            if shred::layout::get_slot(shred)
+                .map(|slot| {
+                    check_feature_activation(
+                        &feature_set::verify_retransmitter_signature::id(),
+                        slot,
+                        root_bank,
+                    )
+                })
+                .unwrap_or_default()
+            {
+                return Err(ResignError::VerifyRetransmitterSignature);
+            }
+        }
+
+        resign_packet(packet, keypair)?;
+    }
+
+    Ok(())
+}
+
 #[must_use]
 fn verify_retransmitter_signature(
     shred: &[u8],
@@ -413,7 +453,7 @@ fn count_discards(packets: &[PacketBatch]) -> usize {
         .count()
 }
 
-impl From<RecvTimeoutError> for Error {
+impl From<RecvTimeoutError> for ShredSigverifyError {
     fn from(err: RecvTimeoutError) -> Self {
         match err {
             RecvTimeoutError::Timeout => Self::RecvTimeout,
@@ -422,7 +462,7 @@ impl From<RecvTimeoutError> for Error {
     }
 }
 
-impl<T> From<SendError<T>> for Error {
+impl<T> From<SendError<T>> for ShredSigverifyError {
     fn from(_: SendError<T>) -> Self {
         Self::SendError
     }
@@ -530,16 +570,21 @@ impl ShredSigVerifyStats {
 mod tests {
     use {
         super::*,
-        solana_entry::entry::create_ticks,
+        rand::Rng,
+        solana_entry::entry::{create_ticks, Entry},
+        solana_gossip::contact_info::ContactInfo,
         solana_hash::Hash,
         solana_keypair::Keypair,
         solana_ledger::{
             genesis_utils::create_genesis_config_with_leader,
-            shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
+            shred::{Nonce, ProcessShredsStats, ReedSolomonCache, Shredder},
         },
-        solana_perf::packet::{Packet, PinnedPacketBatch},
+        solana_perf::packet::{Packet, PacketFlags, PinnedPacketBatch},
         solana_runtime::bank::Bank,
         solana_signer::Signer,
+        solana_streamer::socket::SocketAddrSpace,
+        solana_time_utils::timestamp,
+        test_case::test_matrix,
     };
 
     #[test]
@@ -607,4 +652,141 @@ mod tests {
         assert!(!batches[0].get(0).unwrap().meta().discard());
         assert!(batches[0].get(1).unwrap().meta().discard());
     }
+
+    #[test_matrix(
+        [true, false],
+        [true, false]
+    )]
+    fn test_maybe_verify_and_resign_packet(repaired: bool, is_last_in_slot: bool) {
+        let mut rng = rand::thread_rng();
+
+        let leader_keypair = Arc::new(Keypair::new());
+        let leader_pubkey = leader_keypair.pubkey();
+        let bank = Bank::new_for_tests(
+            &create_genesis_config_with_leader(100, &leader_pubkey, 10).genesis_config,
+        );
+        let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
+        let bank_forks = BankForks::new_rw_arc(bank);
+        let (working_bank, root_bank) = {
+            let bank_forks = bank_forks.read().unwrap();
+            (bank_forks.working_bank(), bank_forks.root_bank())
+        };
+
+        let chained_merkle_root = Some(Hash::new_from_array(rng.gen()));
+
+        let shredder = Shredder::new(root_bank.slot(), root_bank.parent_slot(), 0, 0).unwrap();
+        let entries = vec![Entry::new(&Hash::default(), 0, vec![])];
+        let mut shreds: Vec<_> = shredder
+            .make_merkle_shreds_from_entries(
+                &leader_keypair,
+                &entries,
+                is_last_in_slot,
+                chained_merkle_root,
+                0,
+                0,
+                &ReedSolomonCache::default(),
+                &mut ProcessShredsStats::default(),
+            )
+            .collect();
+
+        let cluster_info = ClusterInfo::new(
+            ContactInfo::new_localhost(&leader_pubkey, timestamp()),
+            leader_keypair,
+            SocketAddrSpace::Unspecified,
+        );
+
+        let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
+            CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
+            CLUSTER_NODES_CACHE_TTL,
+        );
+        let stats = ShredSigVerifyStats::new(Instant::now());
+
+        for shred in shreds.iter_mut() {
+            let keypair = Keypair::new();
+            let nonce = repaired.then(|| rng.gen::<Nonce>());
+            if is_last_in_slot {
+                let packet = &mut shred.payload().to_packet(nonce);
+                let buf_before = packet.buffer_mut().to_vec();
+                if repaired {
+                    packet.meta_mut().flags |= PacketFlags::REPAIR;
+                }
+                maybe_verify_and_resign_packet(
+                    &mut packet.into(),
+                    &root_bank,
+                    &working_bank,
+                    &cluster_info,
+                    &leader_schedule_cache,
+                    &cluster_nodes_cache,
+                    &stats,
+                    &keypair,
+                )
+                .expect("packet should pass the verification");
+                assert!(!packet.meta().discard());
+
+                // Check whether the packet was modified.
+                assert_ne!(&buf_before, &packet.data(..).unwrap());
+
+                let mut bytes_packet = shred.payload().to_bytes_packet(nonce);
+                if repaired {
+                    bytes_packet.meta_mut().flags |= PacketFlags::REPAIR;
+                }
+                let buf_addr = bytes_packet.buffer().as_ptr().addr();
+                maybe_verify_and_resign_packet(
+                    &mut bytes_packet.as_mut(),
+                    &root_bank,
+                    &working_bank,
+                    &cluster_info,
+                    &leader_schedule_cache,
+                    &cluster_nodes_cache,
+                    &stats,
+                    &keypair,
+                )
+                .expect("packet should pass the verification");
+                assert!(!bytes_packet.meta().discard());
+
+                // Check whether the packet was modified.
+                let buf_addr_after = bytes_packet.buffer().as_ptr().addr();
+                assert_ne!(buf_addr, buf_addr_after);
+            } else {
+                let packet = &mut shred.payload().to_packet(nonce);
+                if repaired {
+                    packet.meta_mut().flags |= PacketFlags::REPAIR;
+                }
+                maybe_verify_and_resign_packet(
+                    &mut packet.into(),
+                    &root_bank,
+                    &working_bank,
+                    &cluster_info,
+                    &leader_schedule_cache,
+                    &cluster_nodes_cache,
+                    &stats,
+                    &keypair,
+                )
+                .expect("packet should pass the verification");
+                assert!(!packet.meta().discard());
+
+                let mut bytes_packet = shred.payload().to_bytes_packet(nonce);
+                if repaired {
+                    bytes_packet.meta_mut().flags |= PacketFlags::REPAIR;
+                }
+                let buf_addr = bytes_packet.buffer().as_ptr().addr();
+                maybe_verify_and_resign_packet(
+                    &mut bytes_packet.as_mut(),
+                    &root_bank,
+                    &working_bank,
+                    &cluster_info,
+                    &leader_schedule_cache,
+                    &cluster_nodes_cache,
+                    &stats,
+                    &keypair,
+                )
+                .expect("packet should pass the verification");
+                assert!(!packet.meta().discard());
+
+                // Packet should not be modified.
+                let buf_addr_after = bytes_packet.buffer().as_ptr().addr();
+                assert_eq!(buf_addr, buf_addr_after);
+            }
+        }
+    }
 }