浏览代码

verifies retransmitter signature on chained Merkle shreds (#1735)

behzad nouri 1 年之前
父节点
当前提交
6f94686181
共有 6 个文件被更改,包括 163 次插入26 次删除
  1. 1 0
      Cargo.lock
  2. 19 6
      ledger/src/shred.rs
  3. 1 0
      programs/sbf/Cargo.lock
  4. 1 0
      turbine/Cargo.toml
  5. 1 2
      turbine/src/cluster_nodes.rs
  6. 140 18
      turbine/src/sigverify_shreds.rs

+ 1 - 0
Cargo.lock

@@ -7717,6 +7717,7 @@ dependencies = [
  "solana-runtime",
  "solana-sdk",
  "solana-streamer",
+ "static_assertions",
  "test-case",
  "thiserror",
  "tokio",

+ 19 - 6
ledger/src/shred.rs

@@ -757,11 +757,8 @@ pub mod layout {
             .map(Hash::new)
     }
 
-    pub(crate) fn set_retransmitter_signature(
-        shred: &mut [u8],
-        signature: &Signature,
-    ) -> Result<(), Error> {
-        let offset = match get_shred_variant(shred)? {
+    fn get_retransmitter_signature_offset(shred: &[u8]) -> Result<usize, Error> {
+        match get_shred_variant(shred)? {
             ShredVariant::LegacyCode | ShredVariant::LegacyData => Err(Error::InvalidShredVariant),
             ShredVariant::MerkleCode {
                 proof_size,
@@ -777,7 +774,23 @@ pub mod layout {
             } => {
                 merkle::ShredData::get_retransmitter_signature_offset(proof_size, chained, resigned)
             }
-        }?;
+        }
+    }
+
+    pub fn get_retransmitter_signature(shred: &[u8]) -> Result<Signature, Error> {
+        let offset = get_retransmitter_signature_offset(shred)?;
+        shred
+            .get(offset..offset + SIZE_OF_SIGNATURE)
+            .map(|bytes| <[u8; SIZE_OF_SIGNATURE]>::try_from(bytes).unwrap())
+            .map(Signature::from)
+            .ok_or(Error::InvalidPayloadSize(shred.len()))
+    }
+
+    pub(crate) fn set_retransmitter_signature(
+        shred: &mut [u8],
+        signature: &Signature,
+    ) -> Result<(), Error> {
+        let offset = get_retransmitter_signature_offset(shred)?;
         let Some(buffer) = shred.get_mut(offset..offset + SIZE_OF_SIGNATURE) else {
             return Err(Error::InvalidPayloadSize(shred.len()));
         };

+ 1 - 0
programs/sbf/Cargo.lock

@@ -6440,6 +6440,7 @@ dependencies = [
  "solana-runtime",
  "solana-sdk",
  "solana-streamer",
+ "static_assertions",
  "thiserror",
  "tokio",
 ]

+ 1 - 0
turbine/Cargo.toml

@@ -36,6 +36,7 @@ solana-rpc-client-api = { workspace = true }
 solana-runtime = { workspace = true }
 solana-sdk = { workspace = true }
 solana-streamer = { workspace = true }
+static_assertions = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
 

+ 1 - 2
turbine/src/cluster_nodes.rs

@@ -242,8 +242,7 @@ impl ClusterNodes<RetransmitStage> {
 
     // Returns the parent node in the turbine broadcast tree.
     // Returns None if the node is the root of the tree or if it is not staked.
-    #[allow(unused)]
-    fn get_retransmit_parent(
+    pub(crate) fn get_retransmit_parent(
         &self,
         leader: &Pubkey,
         shred: &ShredId,

+ 140 - 18
turbine/src/sigverify_shreds.rs

@@ -1,4 +1,8 @@
 use {
+    crate::{
+        cluster_nodes::{self, ClusterNodesCache},
+        retransmit_stage::RetransmitStage,
+    },
     crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
     rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
     solana_gossip::cluster_info::ClusterInfo,
@@ -9,15 +13,22 @@ use {
     },
     solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
     solana_rayon_threadlimit::get_thread_count,
-    solana_runtime::{bank::Bank, bank_forks::BankForks},
+    solana_runtime::{
+        bank::{Bank, MAX_LEADER_SCHEDULE_STAKES},
+        bank_forks::BankForks,
+    },
     solana_sdk::{
         clock::Slot,
         pubkey::Pubkey,
         signature::{Keypair, Signer},
     },
+    static_assertions::const_assert_eq,
     std::{
         collections::HashMap,
-        sync::{Arc, RwLock},
+        sync::{
+            atomic::{AtomicUsize, Ordering},
+            Arc, RwLock,
+        },
         thread::{Builder, JoinHandle},
         time::{Duration, Instant},
     },
@@ -30,6 +41,16 @@ const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
 const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
 const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);
 
+// Num epochs capacity should be at least 2 because near the epoch boundary we
+// may receive shreds from the other side of the epoch boundary. Because of the
+// TTL based eviction it does not make sense to cache more than
+// MAX_LEADER_SCHEDULE_STAKES epochs.
+const_assert_eq!(CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, 5);
+const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = MAX_LEADER_SCHEDULE_STAKES as usize;
+// Because for ClusterNodes::get_retransmit_parent only pubkeys of staked nodes
+// are needed, we can use longer durations for cache TTL.
+const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(30);
+
 #[allow(clippy::enum_variant_names)]
 enum Error {
     RecvDisconnected,
@@ -48,6 +69,10 @@ pub fn spawn_shred_sigverify(
     let recycler_cache = RecyclerCache::warmed();
     let mut stats = ShredSigVerifyStats::new(Instant::now());
     let cache = RwLock::new(LruCache::new(SIGVERIFY_LRU_CACHE_CAPACITY));
+    let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
+        CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
+        CLUSTER_NODES_CACHE_TTL,
+    );
     let thread_pool = ThreadPoolBuilder::new()
         .num_threads(get_thread_count())
         .thread_name(|i| format!("solSvrfyShred{i:02}"))
@@ -66,6 +91,7 @@ pub fn spawn_shred_sigverify(
             match run_shred_sigverify(
                 &thread_pool,
                 &keypair,
+                &cluster_info,
                 &bank_forks,
                 &leader_schedule_cache,
                 &recycler_cache,
@@ -73,6 +99,7 @@ pub fn spawn_shred_sigverify(
                 &shred_fetch_receiver,
                 &retransmit_sender,
                 &verified_sender,
+                &cluster_nodes_cache,
                 &cache,
                 &mut stats,
             ) {
@@ -94,6 +121,7 @@ pub fn spawn_shred_sigverify(
 fn run_shred_sigverify<const K: usize>(
     thread_pool: &ThreadPool,
     keypair: &Keypair,
+    cluster_info: &ClusterInfo,
     bank_forks: &RwLock<BankForks>,
     leader_schedule_cache: &LeaderScheduleCache,
     recycler_cache: &RecyclerCache,
@@ -101,6 +129,7 @@ fn run_shred_sigverify<const K: usize>(
     shred_fetch_receiver: &Receiver<PacketBatch>,
     retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
     verified_sender: &Sender<Vec<PacketBatch>>,
+    cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
     cache: &RwLock<LruCache>,
     stats: &mut ShredSigVerifyStats,
 ) -> Result<(), Error> {
@@ -128,17 +157,22 @@ fn run_shred_sigverify<const K: usize>(
             .map(|packet| packet.meta_mut().set_discard(true))
             .count()
     });
+    let (working_bank, root_bank) = {
+        let bank_forks = bank_forks.read().unwrap();
+        (bank_forks.working_bank(), bank_forks.root_bank())
+    };
     verify_packets(
         thread_pool,
         &keypair.pubkey(),
-        bank_forks,
+        &working_bank,
         leader_schedule_cache,
         recycler_cache,
         &mut packets,
         cache,
     );
     stats.num_discards_post += count_discards(&packets);
-    // Resign shreds Merkle root as the retransmitter node.
+    // Verify retransmitter's signature, and resign shreds
+    // Merkle root as the retransmitter node.
     let resign_start = Instant::now();
     thread_pool.install(|| {
         packets
@@ -146,16 +180,36 @@ fn run_shred_sigverify<const K: usize>(
             .flatten()
             .filter(|packet| !packet.meta().discard())
             .for_each(|packet| {
-                if let Some(shred) = shred::layout::get_shred_mut(packet) {
-                    // We can ignore Error::InvalidShredVariant because that
-                    // basically means that the shred is of a variant which
-                    // cannot be signed by the retransmitter node.
-                    if !matches!(
-                        shred::layout::resign_shred(shred, keypair),
-                        Ok(()) | Err(shred::Error::InvalidShredVariant)
-                    ) {
-                        packet.meta_mut().set_discard(true);
-                    }
+                let repair = packet.meta().repair();
+                let Some(shred) = shred::layout::get_shred_mut(packet) else {
+                    packet.meta_mut().set_discard(true);
+                    return;
+                };
+                // Repair packets do not follow turbine tree and
+                // are verified using the trailing nonce.
+                if !repair
+                    && !verify_retransmitter_signature(
+                        shred,
+                        &root_bank,
+                        &working_bank,
+                        cluster_info,
+                        leader_schedule_cache,
+                        cluster_nodes_cache,
+                        stats,
+                    )
+                {
+                    stats
+                        .num_invalid_retransmitter
+                        .fetch_add(1, Ordering::Relaxed);
+                }
+                // We can ignore Error::InvalidShredVariant because that
+                // basically means that the shred is of a variant which
+                // cannot be signed by the retransmitter node.
+                if !matches!(
+                    shred::layout::resign_shred(shred, keypair),
+                    Ok(()) | Err(shred::Error::InvalidShredVariant)
+                ) {
+                    packet.meta_mut().set_discard(true);
                 }
             })
     });
@@ -175,18 +229,64 @@ fn run_shred_sigverify<const K: usize>(
     Ok(())
 }
 
+#[must_use]
+fn verify_retransmitter_signature(
+    shred: &[u8],
+    root_bank: &Bank,
+    working_bank: &Bank,
+    cluster_info: &ClusterInfo,
+    leader_schedule_cache: &LeaderScheduleCache,
+    cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
+    stats: &ShredSigVerifyStats,
+) -> bool {
+    let signature = match shred::layout::get_retransmitter_signature(shred) {
+        Ok(signature) => signature,
+        // If the shred is not of resigned variant,
+        // then there is nothing to verify.
+        Err(shred::Error::InvalidShredVariant) => return true,
+        Err(_) => return false,
+    };
+    let Some(merkle_root) = shred::layout::get_merkle_root(shred) else {
+        return false;
+    };
+    let Some(shred) = shred::layout::get_shred_id(shred) else {
+        return false;
+    };
+    let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), Some(working_bank))
+    else {
+        stats
+            .num_unknown_slot_leader
+            .fetch_add(1, Ordering::Relaxed);
+        return false;
+    };
+    let cluster_nodes =
+        cluster_nodes_cache.get(shred.slot(), root_bank, working_bank, cluster_info);
+    let data_plane_fanout = cluster_nodes::get_data_plane_fanout(shred.slot(), root_bank);
+    let parent = match cluster_nodes.get_retransmit_parent(&leader, &shred, data_plane_fanout) {
+        Ok(Some(parent)) => parent,
+        Ok(None) => return true,
+        Err(err) => {
+            error!("get_retransmit_parent: {err:?}");
+            stats
+                .num_unknown_turbine_parent
+                .fetch_add(1, Ordering::Relaxed);
+            return false;
+        }
+    };
+    signature.verify(parent.as_ref(), merkle_root.as_ref())
+}
+
 fn verify_packets(
     thread_pool: &ThreadPool,
     self_pubkey: &Pubkey,
-    bank_forks: &RwLock<BankForks>,
+    working_bank: &Bank,
     leader_schedule_cache: &LeaderScheduleCache,
     recycler_cache: &RecyclerCache,
     packets: &mut [PacketBatch],
     cache: &RwLock<LruCache>,
 ) {
-    let working_bank = bank_forks.read().unwrap().working_bank();
     let leader_slots: HashMap<Slot, Pubkey> =
-        get_slot_leaders(self_pubkey, packets, leader_schedule_cache, &working_bank)
+        get_slot_leaders(self_pubkey, packets, leader_schedule_cache, working_bank)
             .into_iter()
             .filter_map(|(slot, pubkey)| Some((slot, pubkey?)))
             .chain(std::iter::once((Slot::MAX, Pubkey::default())))
@@ -262,7 +362,10 @@ struct ShredSigVerifyStats {
     num_discards_post: usize,
     num_discards_pre: usize,
     num_duplicates: usize,
+    num_invalid_retransmitter: AtomicUsize,
     num_retransmit_shreds: usize,
+    num_unknown_slot_leader: AtomicUsize,
+    num_unknown_turbine_parent: AtomicUsize,
     elapsed_micros: u64,
     resign_micros: u64,
 }
@@ -280,7 +383,10 @@ impl ShredSigVerifyStats {
             num_deduper_saturations: 0usize,
             num_discards_post: 0usize,
             num_duplicates: 0usize,
+            num_invalid_retransmitter: AtomicUsize::default(),
             num_retransmit_shreds: 0usize,
+            num_unknown_slot_leader: AtomicUsize::default(),
+            num_unknown_turbine_parent: AtomicUsize::default(),
             elapsed_micros: 0u64,
             resign_micros: 0u64,
         }
@@ -299,7 +405,22 @@ impl ShredSigVerifyStats {
             ("num_deduper_saturations", self.num_deduper_saturations, i64),
             ("num_discards_post", self.num_discards_post, i64),
             ("num_duplicates", self.num_duplicates, i64),
+            (
+                "num_invalid_retransmitter",
+                self.num_invalid_retransmitter.load(Ordering::Relaxed),
+                i64
+            ),
             ("num_retransmit_shreds", self.num_retransmit_shreds, i64),
+            (
+                "num_unknown_slot_leader",
+                self.num_unknown_slot_leader.load(Ordering::Relaxed),
+                i64
+            ),
+            (
+                "num_unknown_turbine_parent",
+                self.num_unknown_turbine_parent.load(Ordering::Relaxed),
+                i64
+            ),
             ("elapsed_micros", self.elapsed_micros, i64),
             ("resign_micros", self.resign_micros, i64),
         );
@@ -365,10 +486,11 @@ mod tests {
 
         let cache = RwLock::new(LruCache::new(/*capacity:*/ 128));
         let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
+        let working_bank = bank_forks.read().unwrap().working_bank();
         verify_packets(
             &thread_pool,
             &Pubkey::new_unique(), // self_pubkey
-            &bank_forks,
+            &working_bank,
             &leader_schedule_cache,
             &RecyclerCache::warmed(),
             &mut batches,