Browse Source

v1.18: gossip: notify state machine of duplicate proofs (backport of #32963) (#35006)

gossip: notify state machine of duplicate proofs (#32963)

* gossip: notify state machine of duplicate proofs

* Add feature flag for ingesting duplicate proofs from Gossip.

* Use the Epoch the shred is in instead of the root bank epoch.

* Fix unittest by activating the feature.

* Add a test for feature disabled case.

* EpochSchedule is now not copyable, clone it explicitly.

* pr feedback: read epoch schedule on startup, add guard for ff recache

* pr feedback: bank_forks lock, -cached_slots_in_epoch, init ff

* pr feedback: bank.forks_try_read() -> read()

* pr feedback: fix local-cluster setup

* local-cluster: do not expose gossip internals, use retry mechanism instead

* local-cluster: split out case 4b into separate test and ignore

* pr feedback: avoid taking lock if ff is already found

* pr feedback: do not cache ff epoch

* pr feedback: bank_forks lock, revert to cached_slots_in_epoch

* pr feedback: move local variable into helper function

* pr feedback: use let else, remove epoch 0 hack

---------

Co-authored-by: Wen <crocoxu@gmail.com>
(cherry picked from commit 93271d91b0a379a3144c68a8baed6bc6eb33af69)

Co-authored-by: Ashwin Sekar <ashwin@solana.com>
mergify[bot] 1 year ago
parent
commit
ed34e4be32

+ 2 - 1
core/src/tvu.rs

@@ -228,7 +228,7 @@ impl Tvu {
                 leader_schedule_cache.clone(),
                 verified_vote_receiver,
                 completed_data_sets_sender,
-                duplicate_slots_sender,
+                duplicate_slots_sender.clone(),
                 ancestor_hashes_replay_update_receiver,
                 dumped_slots_receiver,
                 popular_pruned_forks_sender,
@@ -337,6 +337,7 @@ impl Tvu {
                 blockstore,
                 leader_schedule_cache.clone(),
                 bank_forks.clone(),
+                duplicate_slots_sender,
             ),
         );
 

+ 2 - 0
gossip/src/duplicate_shred.rs

@@ -56,6 +56,8 @@ pub enum Error {
     BlockstoreInsertFailed(#[from] BlockstoreError),
     #[error("data chunk mismatch")]
     DataChunkMismatch,
+    #[error("unable to send duplicate slot to state machine")]
+    DuplicateSlotSenderFailure,
     #[error("invalid chunk_index: {chunk_index}, num_chunks: {num_chunks}")]
     InvalidChunkIndex { chunk_index: u8, num_chunks: u8 },
     #[error("invalid duplicate shreds")]

+ 129 - 18
gossip/src/duplicate_shred_handler.rs

@@ -3,11 +3,13 @@ use {
         duplicate_shred::{self, DuplicateShred, Error},
         duplicate_shred_listener::DuplicateShredHandlerTrait,
     },
+    crossbeam_channel::Sender,
     log::error,
     solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
     solana_runtime::bank_forks::BankForks,
     solana_sdk::{
         clock::{Epoch, Slot},
+        feature_set,
         pubkey::Pubkey,
     },
     std::{
@@ -44,6 +46,8 @@ pub struct DuplicateShredHandler {
     cached_on_epoch: Epoch,
     cached_staked_nodes: Arc<HashMap<Pubkey, u64>>,
     cached_slots_in_epoch: u64,
+    // Used to notify duplicate consensus state machine
+    duplicate_slots_sender: Sender<Slot>,
 }
 
 impl DuplicateShredHandlerTrait for DuplicateShredHandler {
@@ -63,6 +67,7 @@ impl DuplicateShredHandler {
         blockstore: Arc<Blockstore>,
         leader_schedule_cache: Arc<LeaderScheduleCache>,
         bank_forks: Arc<RwLock<BankForks>>,
+        duplicate_slots_sender: Sender<Slot>,
     ) -> Self {
         Self {
             buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(),
@@ -74,6 +79,7 @@ impl DuplicateShredHandler {
             blockstore,
             leader_schedule_cache,
             bank_forks,
+            duplicate_slots_sender,
         }
     }
 
@@ -131,12 +137,30 @@ impl DuplicateShredHandler {
                     shred1.into_payload(),
                     shred2.into_payload(),
                 )?;
+                if self.should_notify_state_machine(slot) {
+                    // Notify duplicate consensus state machine
+                    self.duplicate_slots_sender
+                        .send(slot)
+                        .map_err(|_| Error::DuplicateSlotSenderFailure)?;
+                }
             }
             self.consumed.insert(slot, true);
         }
         Ok(())
     }
 
+    fn should_notify_state_machine(&self, slot: Slot) -> bool {
+        let root_bank = self.bank_forks.read().unwrap().root_bank();
+        let Some(activated_slot) = root_bank
+            .feature_set
+            .activated_slot(&feature_set::enable_gossip_duplicate_proof_ingestion::id())
+        else {
+            return false;
+        };
+        root_bank.epoch_schedule().get_epoch(slot)
+            > root_bank.epoch_schedule().get_epoch(activated_slot)
+    }
+
     fn should_consume_slot(&mut self, slot: Slot) -> bool {
         slot > self.last_root
             && slot < self.last_root.saturating_add(self.cached_slots_in_epoch)
@@ -211,12 +235,14 @@ mod tests {
             cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
             duplicate_shred::{from_shred, tests::new_rand_shred},
         },
+        crossbeam_channel::unbounded,
+        itertools::Itertools,
         solana_ledger::{
             genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
             get_tmp_ledger_path_auto_delete,
             shred::Shredder,
         },
-        solana_runtime::bank::Bank,
+        solana_runtime::{accounts_background_service::AbsRequestSender, bank::Bank},
         solana_sdk::{
             signature::{Keypair, Signer},
             timing::timestamp,
@@ -271,16 +297,34 @@ mod tests {
         let my_pubkey = my_keypair.pubkey();
         let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
         let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
-        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
+        let mut bank = Bank::new_for_tests(&genesis_config);
+        bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
+        let slots_in_epoch = bank.get_epoch_info().slots_in_epoch;
+        let bank_forks_arc = BankForks::new_rw_arc(bank);
+        {
+            let mut bank_forks = bank_forks_arc.write().unwrap();
+            let bank0 = bank_forks.get(0).unwrap();
+            bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
+            bank_forks.set_root(9, &AbsRequestSender::default(), None);
+        }
+        blockstore.set_roots([0, 9].iter()).unwrap();
         let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
-            &bank_forks.read().unwrap().working_bank(),
+            &bank_forks_arc.read().unwrap().working_bank(),
         ));
-        let mut duplicate_shred_handler =
-            DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
+        let (sender, receiver) = unbounded();
+        // The feature will only be activated at Epoch 1.
+        let start_slot: Slot = slots_in_epoch + 1;
+
+        let mut duplicate_shred_handler = DuplicateShredHandler::new(
+            blockstore.clone(),
+            leader_schedule_cache,
+            bank_forks_arc,
+            sender,
+        );
         let chunks = create_duplicate_proof(
             my_keypair.clone(),
             None,
-            1,
+            start_slot,
             None,
             DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
         )
@@ -288,20 +332,24 @@ mod tests {
         let chunks1 = create_duplicate_proof(
             my_keypair.clone(),
             None,
-            2,
+            start_slot + 1,
             None,
             DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
         )
         .unwrap();
-        assert!(!blockstore.has_duplicate_shreds_in_slot(1));
-        assert!(!blockstore.has_duplicate_shreds_in_slot(2));
+        assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
+        assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 1));
         // Test that two proofs are mixed together, but we can store the proofs fine.
         for (chunk1, chunk2) in chunks.zip(chunks1) {
             duplicate_shred_handler.handle(chunk1);
             duplicate_shred_handler.handle(chunk2);
         }
-        assert!(blockstore.has_duplicate_shreds_in_slot(1));
-        assert!(blockstore.has_duplicate_shreds_in_slot(2));
+        assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
+        assert!(blockstore.has_duplicate_shreds_in_slot(start_slot + 1));
+        assert_eq!(
+            receiver.try_iter().collect_vec(),
+            vec![start_slot, start_slot + 1]
+        );
 
         // Test all kinds of bad proofs.
         for error in [
@@ -312,7 +360,7 @@ mod tests {
             match create_duplicate_proof(
                 my_keypair.clone(),
                 None,
-                3,
+                start_slot + 2,
                 Some(error),
                 DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
             ) {
@@ -321,7 +369,8 @@ mod tests {
                     for chunk in chunks {
                         duplicate_shred_handler.handle(chunk);
                     }
-                    assert!(!blockstore.has_duplicate_shreds_in_slot(3));
+                    assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 2));
+                    assert!(receiver.is_empty());
                 }
             }
         }
@@ -337,13 +386,29 @@ mod tests {
         let my_pubkey = my_keypair.pubkey();
         let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
         let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
-        let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
+        let mut bank = Bank::new_for_tests(&genesis_config);
+        bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
+        let slots_in_epoch = bank.get_epoch_info().slots_in_epoch;
+        let bank_forks_arc = BankForks::new_rw_arc(bank);
+        {
+            let mut bank_forks = bank_forks_arc.write().unwrap();
+            let bank0 = bank_forks.get(0).unwrap();
+            bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
+            bank_forks.set_root(9, &AbsRequestSender::default(), None);
+        }
+        blockstore.set_roots([0, 9].iter()).unwrap();
         let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
-            &bank_forks.read().unwrap().working_bank(),
+            &bank_forks_arc.read().unwrap().working_bank(),
         ));
-        let mut duplicate_shred_handler =
-            DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
-        let start_slot: Slot = 1;
+        let (sender, receiver) = unbounded();
+        let mut duplicate_shred_handler = DuplicateShredHandler::new(
+            blockstore.clone(),
+            leader_schedule_cache,
+            bank_forks_arc,
+            sender,
+        );
+        // The feature will only be activated at Epoch 1.
+        let start_slot: Slot = slots_in_epoch + 1;
 
         // This proof will not be accepted because num_chunks is too large.
         let chunks = create_duplicate_proof(
@@ -358,6 +423,7 @@ mod tests {
             duplicate_shred_handler.handle(chunk);
         }
         assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
+        assert!(receiver.is_empty());
 
         // This proof will be rejected because the slot is too far away in the future.
         let future_slot =
@@ -374,6 +440,7 @@ mod tests {
             duplicate_shred_handler.handle(chunk);
         }
         assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot));
+        assert!(receiver.is_empty());
 
         // Send in two proofs, the first proof showing up will be accepted, the following
         // proofs will be discarded.
@@ -388,10 +455,54 @@ mod tests {
         // handle chunk 0 of the first proof.
         duplicate_shred_handler.handle(chunks.next().unwrap());
         assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
+        assert!(receiver.is_empty());
         // Now send in the rest of the first proof, it will succeed.
         for chunk in chunks {
             duplicate_shred_handler.handle(chunk);
         }
         assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
+        assert_eq!(receiver.try_iter().collect_vec(), vec![start_slot]);
+    }
+
+    #[test]
+    fn test_feature_disabled() {
+        let ledger_path = get_tmp_ledger_path_auto_delete!();
+        let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
+        let my_keypair = Arc::new(Keypair::new());
+        let my_pubkey = my_keypair.pubkey();
+        let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
+        let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
+        let mut bank = Bank::new_for_tests(&genesis_config);
+        bank.deactivate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
+        assert!(!bank
+            .feature_set
+            .is_active(&feature_set::enable_gossip_duplicate_proof_ingestion::id()));
+        let bank_forks_arc = BankForks::new_rw_arc(bank);
+        let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
+            &bank_forks_arc.read().unwrap().working_bank(),
+        ));
+        let (sender, receiver) = unbounded();
+
+        let mut duplicate_shred_handler = DuplicateShredHandler::new(
+            blockstore.clone(),
+            leader_schedule_cache,
+            bank_forks_arc,
+            sender,
+        );
+        let chunks = create_duplicate_proof(
+            my_keypair.clone(),
+            None,
+            1,
+            None,
+            DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
+        )
+        .unwrap();
+        assert!(!blockstore.has_duplicate_shreds_in_slot(1));
+        for chunk in chunks {
+            duplicate_shred_handler.handle(chunk);
+        }
+        // If feature disabled, blockstore gets signal but state machine doesn't see it.
+        assert!(blockstore.has_duplicate_shreds_in_slot(1));
+        assert!(receiver.try_iter().collect_vec().is_empty());
     }
 }

+ 35 - 13
local-cluster/src/cluster_tests.rs

@@ -41,7 +41,7 @@ use {
     solana_vote_program::vote_transaction,
     std::{
         borrow::Borrow,
-        collections::{HashMap, HashSet},
+        collections::{HashMap, HashSet, VecDeque},
         net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
         path::Path,
         sync::{
@@ -489,6 +489,9 @@ pub fn start_gossip_voter(
         + std::marker::Send
         + 'static,
     sleep_ms: u64,
+    num_expected_peers: usize,
+    refresh_ms: u64,
+    max_votes_to_refresh: usize,
 ) -> GossipVoter {
     let exit = Arc::new(AtomicBool::new(false));
     let (gossip_service, tcp_listener, cluster_info) = gossip_service::make_gossip_node(
@@ -503,6 +506,15 @@ pub fn start_gossip_voter(
         SocketAddrSpace::Unspecified,
     );
 
+    // Wait for peer discovery
+    while cluster_info.gossip_peers().len() < num_expected_peers {
+        sleep(Duration::from_millis(sleep_ms));
+    }
+
+    let mut latest_voted_slot = 0;
+    let mut refreshable_votes: VecDeque<(Transaction, VoteTransaction)> = VecDeque::new();
+    let mut latest_push_attempt = Instant::now();
+
     let t_voter = {
         let exit = exit.clone();
         let cluster_info = cluster_info.clone();
@@ -514,6 +526,18 @@ pub fn start_gossip_voter(
                 }
 
                 let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor);
+                if labels.is_empty() {
+                    if latest_push_attempt.elapsed() > Duration::from_millis(refresh_ms) {
+                        for (leader_vote_tx, parsed_vote) in refreshable_votes.iter().rev() {
+                            let vote_slot = parsed_vote.last_voted_slot().unwrap();
+                            info!("gossip voter refreshing vote {}", vote_slot);
+                            process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info);
+                            latest_push_attempt = Instant::now();
+                        }
+                    }
+                    sleep(Duration::from_millis(sleep_ms));
+                    continue;
+                }
                 let mut parsed_vote_iter: Vec<_> = labels
                     .into_iter()
                     .zip(votes)
@@ -527,22 +551,20 @@ pub fn start_gossip_voter(
                 });
 
                 for (parsed_vote, leader_vote_tx) in &parsed_vote_iter {
-                    if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() {
-                        info!("received vote for {}", latest_vote_slot);
-                        process_vote_tx(
-                            latest_vote_slot,
-                            leader_vote_tx,
-                            parsed_vote,
-                            &cluster_info,
-                        )
+                    if let Some(vote_slot) = parsed_vote.last_voted_slot() {
+                        info!("received vote for {}", vote_slot);
+                        if vote_slot > latest_voted_slot {
+                            latest_voted_slot = vote_slot;
+                            refreshable_votes
+                                .push_front((leader_vote_tx.clone(), parsed_vote.clone()));
+                            refreshable_votes.truncate(max_votes_to_refresh);
+                        }
+                        process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info);
+                        latest_push_attempt = Instant::now();
                     }
                     // Give vote some time to propagate
                     sleep(Duration::from_millis(sleep_ms));
                 }
-
-                if parsed_vote_iter.is_empty() {
-                    sleep(Duration::from_millis(sleep_ms));
-                }
             }
         })
     };

+ 32 - 16
local-cluster/tests/local_cluster.rs

@@ -2740,6 +2740,9 @@ fn test_oc_bad_signatures() {
             }
         },
         voter_thread_sleep_ms as u64,
+        cluster.validators.len().saturating_sub(1),
+        0,
+        0,
     );
 
     let (mut block_subscribe_client, receiver) = PubsubClient::block_subscribe(
@@ -3740,6 +3743,18 @@ fn test_kill_partition_switch_threshold_progress() {
 #[serial]
 #[allow(unused_attributes)]
 fn test_duplicate_shreds_broadcast_leader() {
+    run_duplicate_shreds_broadcast_leader(true);
+}
+#[test]
+#[serial]
+#[ignore]
+#[allow(unused_attributes)]
+fn test_duplicate_shreds_broadcast_leader_ancestor_hashes() {
+    run_duplicate_shreds_broadcast_leader(false);
+}
+
+fn run_duplicate_shreds_broadcast_leader(vote_on_duplicate: bool) {
+    solana_logger::setup_with_default(RUST_LOG_FILTER);
     // Create 4 nodes:
     // 1) Bad leader sending different versions of shreds to both of the other nodes
     // 2) 1 node who's voting behavior in gossip
@@ -3790,11 +3805,13 @@ fn test_duplicate_shreds_broadcast_leader() {
     // for the partition.
     assert!(partition_node_stake < our_node_stake && partition_node_stake < good_node_stake);
 
+    let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
+
     // 1) Set up the cluster
     let (mut cluster, validator_keys) = test_faulty_node(
         BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig {
             partition: ClusterPartition::Stake(partition_node_stake),
-            duplicate_slot_sender: None,
+            duplicate_slot_sender: Some(duplicate_slot_sender),
         }),
         node_stakes,
         None,
@@ -3836,27 +3853,23 @@ fn test_duplicate_shreds_broadcast_leader() {
         {
             let node_keypair = node_keypair.insecure_clone();
             let vote_keypair = vote_keypair.insecure_clone();
-            let mut max_vote_slot = 0;
             let mut gossip_vote_index = 0;
+            let mut duplicate_slots = vec![];
             move |latest_vote_slot, leader_vote_tx, parsed_vote, cluster_info| {
                 info!("received vote for {}", latest_vote_slot);
                 // Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot.
-                if latest_vote_slot > max_vote_slot {
-                    let new_epoch_slots: Vec<Slot> =
-                        (max_vote_slot + 1..latest_vote_slot + 1).collect();
-                    info!(
-                        "Simulating epoch slots from our node: {:?}",
-                        new_epoch_slots
-                    );
-                    cluster_info.push_epoch_slots(&new_epoch_slots);
-                    max_vote_slot = latest_vote_slot;
-                }
+                let new_epoch_slots: Vec<Slot> = (0..latest_vote_slot + 1).collect();
+                info!(
+                    "Simulating epoch slots from our node: {:?}",
+                    new_epoch_slots
+                );
+                cluster_info.push_epoch_slots(&new_epoch_slots);
 
-                // Only vote on even slots. Note this may violate lockouts if the
-                // validator started voting on a different fork before we could exit
-                // it above.
+                for slot in duplicate_slot_receiver.try_iter() {
+                    duplicate_slots.push(slot);
+                }
                 let vote_hash = parsed_vote.hash();
-                if latest_vote_slot % 2 == 0 {
+                if vote_on_duplicate || !duplicate_slots.contains(&latest_vote_slot) {
                     info!(
                         "Simulating vote from our node on slot {}, hash {}",
                         latest_vote_slot, vote_hash
@@ -3894,6 +3907,9 @@ fn test_duplicate_shreds_broadcast_leader() {
             }
         },
         voter_thread_sleep_ms as u64,
+        cluster.validators.len().saturating_sub(1),
+        5000, // Refresh if 5 seconds of inactivity
+        5,    // Refresh the past 5 votes
     );
 
     // 4) Check that the cluster is making progress

+ 5 - 0
sdk/src/feature_set.rs

@@ -772,6 +772,10 @@ pub mod cost_model_requested_write_lock_cost {
     solana_sdk::declare_id!("wLckV1a64ngtcKPRGU4S4grVTestXjmNjxBjaKZrAcn");
 }
 
+pub mod enable_gossip_duplicate_proof_ingestion {
+    solana_sdk::declare_id!("FNKCMBzYUdjhHyPdsKG2LSmdzH8TCHXn3ytj8RNBS4nG");
+}
+
 lazy_static! {
     /// Map of feature identifiers to user-visible description
     pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@@ -960,6 +964,7 @@ lazy_static! {
         (enable_zk_proof_from_account::id(), "Enable zk token proof program to read proof from accounts instead of instruction data #34750"),
         (curve25519_restrict_msm_length::id(), "restrict curve25519 multiscalar multiplication vector lengths #34763"),
         (cost_model_requested_write_lock_cost::id(), "cost model uses number of requested write locks #34819"),
+        (enable_gossip_duplicate_proof_ingestion::id(), "enable gossip duplicate proof ingestion #32963"),
         /*************** ADD NEW FEATURES HERE ***************/
     ]
     .iter()

+ 1 - 1
vote/src/vote_transaction.rs

@@ -6,7 +6,7 @@ use {
     solana_vote_program::vote_state::{Vote, VoteStateUpdate},
 };
 
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Clone)]
 pub enum VoteTransaction {
     Vote(Vote),
     VoteStateUpdate(VoteStateUpdate),