Jelajahi Sumber

Remove pubkey from DecisionMaker (#7077)

Andrew Fitzgerald 3 bulan lalu
induk
melakukan
eba01dde77

+ 0 - 9
banking-bench/src/main.rs

@@ -13,7 +13,6 @@ use {
         banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT},
         validator::{BlockProductionMethod, TransactionStructure},
     },
-    solana_gossip::cluster_info::{ClusterInfo, Node},
     solana_hash::Hash,
     solana_keypair::Keypair,
     solana_ledger::{
@@ -32,7 +31,6 @@ use {
     },
     solana_signature::Signature,
     solana_signer::Signer,
-    solana_streamer::socket::SocketAddrSpace,
     solana_system_interface::instruction as system_instruction,
     solana_system_transaction as system_transaction,
     solana_time_utils::timestamp,
@@ -448,12 +446,6 @@ fn main() {
         )))
         .unwrap();
     let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
-    let cluster_info = {
-        let keypair = Arc::new(Keypair::new());
-        let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
-        ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
-    };
-    let cluster_info = Arc::new(cluster_info);
     let Channels {
         non_vote_sender,
         non_vote_receiver,
@@ -465,7 +457,6 @@ fn main() {
     let banking_stage = BankingStage::new_num_threads(
         block_production_method,
         transaction_struct,
-        &cluster_info,
         &poh_recorder,
         transaction_recorder,
         non_vote_receiver,

+ 0 - 9
core/benches/banking_stage.rs

@@ -21,7 +21,6 @@ use {
     solana_core::{banking_stage::BankingStage, banking_trace::BankingTracer},
     solana_entry::entry::{next_hash, Entry},
     solana_genesis_config::GenesisConfig,
-    solana_gossip::cluster_info::{ClusterInfo, Node},
     solana_hash::Hash,
     solana_keypair::Keypair,
     solana_ledger::{
@@ -39,7 +38,6 @@ use {
     },
     solana_signature::Signature,
     solana_signer::Signer,
-    solana_streamer::socket::SocketAddrSpace,
     solana_system_interface::instruction as system_instruction,
     solana_system_transaction as system_transaction,
     solana_time_utils::timestamp,
@@ -236,17 +234,10 @@ fn bench_banking(
     );
     let (exit, poh_recorder, transaction_recorder, poh_service, signal_receiver) =
         create_test_recorder(bank.clone(), blockstore, None, None);
-    let cluster_info = {
-        let keypair = Arc::new(Keypair::new());
-        let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
-        ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
-    };
-    let cluster_info = Arc::new(cluster_info);
     let (s, _r) = unbounded();
     let _banking_stage = BankingStage::new(
         block_production_method,
         transaction_struct,
-        &cluster_info,
         &poh_recorder,
         transaction_recorder,
         non_vote_receiver,

+ 0 - 5
core/src/banking_simulation.rs

@@ -774,10 +774,6 @@ impl BankingSimulator {
         assert!(retracer.is_enabled());
         info!("Enabled banking retracer (dir_byte_limit: {BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT})",);
 
-        // Create a partially-dummy ClusterInfo for the banking stage.
-        let cluster_info_for_banking = Arc::new(DummyClusterInfo {
-            id: simulated_leader.into(),
-        });
         let Channels {
             non_vote_sender,
             non_vote_receiver,
@@ -830,7 +826,6 @@ impl BankingSimulator {
         let banking_stage = BankingStage::new_num_threads(
             block_production_method.clone(),
             transaction_struct.clone(),
-            &cluster_info_for_banking,
             &poh_recorder,
             transaction_recorder,
             non_vote_receiver,

+ 1 - 31
core/src/banking_stage.rs

@@ -366,7 +366,6 @@ impl BankingStage {
     pub fn new(
         block_production_method: BlockProductionMethod,
         transaction_struct: TransactionStructure,
-        cluster_info: &impl LikeClusterInfo,
         poh_recorder: &Arc<RwLock<PohRecorder>>,
         transaction_recorder: TransactionRecorder,
         non_vote_receiver: BankingPacketReceiver,
@@ -381,7 +380,6 @@ impl BankingStage {
         Self::new_num_threads(
             block_production_method,
             transaction_struct,
-            cluster_info,
             poh_recorder,
             transaction_recorder,
             non_vote_receiver,
@@ -400,7 +398,6 @@ impl BankingStage {
     pub fn new_num_threads(
         block_production_method: BlockProductionMethod,
         transaction_struct: TransactionStructure,
-        cluster_info: &impl LikeClusterInfo,
         poh_recorder: &Arc<RwLock<PohRecorder>>,
         transaction_recorder: TransactionRecorder,
         non_vote_receiver: BankingPacketReceiver,
@@ -420,7 +417,6 @@ impl BankingStage {
         Self::new_central_scheduler(
             transaction_struct,
             use_greedy_scheduler,
-            cluster_info,
             poh_recorder,
             transaction_recorder,
             non_vote_receiver,
@@ -439,7 +435,6 @@ impl BankingStage {
     pub fn new_central_scheduler(
         transaction_struct: TransactionStructure,
         use_greedy_scheduler: bool,
-        cluster_info: &impl LikeClusterInfo,
         poh_recorder: &Arc<RwLock<PohRecorder>>,
         transaction_recorder: TransactionRecorder,
         non_vote_receiver: BankingPacketReceiver,
@@ -458,7 +453,7 @@ impl BankingStage {
             VoteStorage::new(&bank)
         };
 
-        let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
+        let decision_maker = DecisionMaker::new(poh_recorder.clone());
         let committer = Committer::new(
             transaction_status_sender.clone(),
             replay_vote_sender.clone(),
@@ -688,7 +683,6 @@ mod tests {
         crossbeam_channel::{unbounded, Receiver},
         itertools::Itertools,
         solana_entry::entry::{self, Entry, EntrySlice},
-        solana_gossip::cluster_info::Node,
         solana_hash::Hash,
         solana_keypair::Keypair,
         solana_ledger::{
@@ -710,7 +704,6 @@ mod tests {
         solana_runtime::{bank::Bank, genesis_utils::bootstrap_validator_stake_lamports},
         solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
         solana_signer::Signer,
-        solana_streamer::socket::SocketAddrSpace,
         solana_system_transaction as system_transaction,
         solana_transaction::{sanitized::SanitizedTransaction, Transaction},
         solana_vote::vote_transaction::new_tower_sync_transaction,
@@ -723,14 +716,6 @@ mod tests {
         test_case::test_case,
     };
 
-    pub(crate) fn new_test_cluster_info(keypair: Option<Arc<Keypair>>) -> (Node, ClusterInfo) {
-        let keypair = keypair.unwrap_or_else(|| Arc::new(Keypair::new()));
-        let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
-        let cluster_info =
-            ClusterInfo::new(node.info.clone(), keypair, SocketAddrSpace::Unspecified);
-        (node, cluster_info)
-    }
-
     pub(crate) fn sanitize_transactions(
         txs: Vec<Transaction>,
     ) -> Vec<RuntimeTransaction<SanitizedTransaction>> {
@@ -760,14 +745,11 @@ mod tests {
         );
         let (exit, poh_recorder, transaction_recorder, poh_service, _entry_receiever) =
             create_test_recorder(bank, blockstore, None, None);
-        let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None);
-        let cluster_info = Arc::new(cluster_info);
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
 
         let banking_stage = BankingStage::new(
             BlockProductionMethod::CentralScheduler,
             transaction_struct,
-            &cluster_info,
             &poh_recorder,
             transaction_recorder,
             non_vote_receiver,
@@ -818,14 +800,11 @@ mod tests {
         };
         let (exit, poh_recorder, transaction_recorder, poh_service, entry_receiver) =
             create_test_recorder(bank.clone(), blockstore, Some(poh_config), None);
-        let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None);
-        let cluster_info = Arc::new(cluster_info);
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
 
         let banking_stage = BankingStage::new(
             BlockProductionMethod::CentralScheduler,
             transaction_struct,
-            &cluster_info,
             &poh_recorder,
             transaction_recorder,
             non_vote_receiver,
@@ -885,14 +864,11 @@ mod tests {
         );
         let (exit, poh_recorder, transaction_recorder, poh_service, entry_receiver) =
             create_test_recorder(bank.clone(), blockstore, None, None);
-        let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None);
-        let cluster_info = Arc::new(cluster_info);
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
 
         let banking_stage = BankingStage::new(
             block_production_method,
             transaction_struct,
-            &cluster_info,
             &poh_recorder,
             transaction_recorder,
             non_vote_receiver,
@@ -1042,12 +1018,9 @@ mod tests {
             let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
             let (exit, poh_recorder, transaction_recorder, poh_service, entry_receiver) =
                 create_test_recorder(bank.clone(), blockstore, None, None);
-            let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None);
-            let cluster_info = Arc::new(cluster_info);
             let _banking_stage = BankingStage::new(
                 BlockProductionMethod::CentralScheduler,
                 transaction_struct,
-                &cluster_info,
                 &poh_recorder,
                 transaction_recorder,
                 non_vote_receiver,
@@ -1229,14 +1202,11 @@ mod tests {
         );
         let (exit, poh_recorder, transaction_recorder, poh_service, _entry_receiver) =
             create_test_recorder(bank.clone(), blockstore, None, None);
-        let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None);
-        let cluster_info = Arc::new(cluster_info);
         let (replay_vote_sender, _replay_vote_receiver) = unbounded();
 
         let banking_stage = BankingStage::new(
             BlockProductionMethod::CentralScheduler,
             transaction_struct,
-            &cluster_info,
             &poh_recorder,
             transaction_recorder,
             non_vote_receiver,

+ 5 - 55
core/src/banking_stage/decision_maker.rs

@@ -4,7 +4,6 @@ use {
         HOLD_TRANSACTIONS_SLOT_OFFSET,
     },
     solana_poh::poh_recorder::{BankStart, PohRecorder},
-    solana_pubkey::Pubkey,
     solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus},
     std::{
         sync::{atomic::{AtomicBool, Ordering::Relaxed}, Arc, RwLock},
@@ -32,7 +31,6 @@ impl BufferedPacketsDecision {
 
 #[derive(Clone, derive_more::Debug)]
 pub struct DecisionMaker {
-    my_pubkey: Pubkey,
     #[debug("{poh_recorder:p}")]
     poh_recorder: Arc<RwLock<PohRecorder>>,
 
@@ -41,9 +39,8 @@ pub struct DecisionMaker {
 }
 
 impl DecisionMaker {
-    pub fn new(my_pubkey: Pubkey, poh_recorder: Arc<RwLock<PohRecorder>>) -> Self {
+    pub fn new(poh_recorder: Arc<RwLock<PohRecorder>>) -> Self {
         Self {
-            my_pubkey,
             poh_recorder,
             cached_decision: None,
             last_decision_time: Instant::now(),
@@ -71,11 +68,9 @@ impl DecisionMaker {
         {
             let poh_recorder = self.poh_recorder.read().unwrap();
             decision = Self::consume_or_forward_packets(
-                &self.my_pubkey,
                 || Self::bank_start(&poh_recorder),
                 || Self::would_be_leader_shortly(&poh_recorder),
                 || Self::would_be_leader(&poh_recorder),
-                || Self::leader_pubkey(&poh_recorder),
             );
         }
 
@@ -83,11 +78,9 @@ impl DecisionMaker {
     }
 
     fn consume_or_forward_packets(
-        my_pubkey: &Pubkey,
         bank_start_fn: impl FnOnce() -> Option<BankStart>,
         would_be_leader_shortly_fn: impl FnOnce() -> bool,
         would_be_leader_fn: impl FnOnce() -> bool,
-        leader_pubkey_fn: impl FnOnce() -> Option<Pubkey>,
     ) -> BufferedPacketsDecision {
         // If has active bank, then immediately process buffered packets
         // otherwise, based on leader schedule to either forward or hold packets
@@ -101,17 +94,9 @@ impl DecisionMaker {
             // Node will be leader within ~20 slots, hold the transactions in
             // case it is the only node which produces an accepted slot.
             BufferedPacketsDecision::ForwardAndHold
-        } else if let Some(x) = leader_pubkey_fn() {
-            if x != *my_pubkey {
-                // If the current node is not the leader, forward the buffered packets
-                BufferedPacketsDecision::Forward
-            } else {
-                // If the current node is the leader, return the buffered packets as is
-                BufferedPacketsDecision::Hold
-            }
         } else {
-            // We don't know the leader. Hold the packets for now
-            BufferedPacketsDecision::Hold
+            // If the current node is not the leader, forward the buffered packets
+            BufferedPacketsDecision::Forward
         }
     }
 
@@ -130,10 +115,6 @@ impl DecisionMaker {
     fn would_be_leader(poh_recorder: &PohRecorder) -> bool {
         poh_recorder.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT)
     }
-
-    fn leader_pubkey(poh_recorder: &PohRecorder) -> Option<Pubkey> {
-        poh_recorder.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET)
-    }
 }
 
 #[derive(Debug)]
@@ -177,6 +158,7 @@ mod tests {
         solana_clock::NUM_CONSECUTIVE_LEADER_SLOTS,
         solana_ledger::{blockstore::Blockstore, genesis_utils::create_genesis_config},
         solana_poh::poh_recorder::create_test_recorder,
+        solana_pubkey::Pubkey,
         solana_runtime::bank::Bank,
         std::{
             env::temp_dir,
@@ -215,7 +197,7 @@ mod tests {
         poh_service.join().unwrap();
 
         let my_pubkey = Pubkey::new_unique();
-        let decision_maker = DecisionMaker::new(my_pubkey, poh_recorder.clone());
+        let decision_maker = DecisionMaker::new(poh_recorder.clone());
         poh_recorder.write().unwrap().reset(bank.clone(), None);
         let slot = bank.slot() + 1;
         let bank = Arc::new(Bank::new_from_parent(bank, &my_pubkey, slot));
@@ -274,8 +256,6 @@ mod tests {
 
     #[test]
     fn test_should_process_or_forward_packets() {
-        let my_pubkey = solana_pubkey::new_rand();
-        let my_pubkey1 = solana_pubkey::new_rand();
         let bank = Arc::new(Bank::default_for_tests());
         let bank_start = Some(BankStart {
             working_bank: bank,
@@ -284,68 +264,38 @@ mod tests {
         // having active bank allows to consume immediately
         assert_matches!(
             DecisionMaker::consume_or_forward_packets(
-                &my_pubkey,
                 || bank_start.clone(),
                 || panic!("should not be called"),
                 || panic!("should not be called"),
-                || panic!("should not be called")
             ),
             BufferedPacketsDecision::Consume(_)
         );
-        // Unknown leader, hold the packets
-        assert_matches!(
-            DecisionMaker::consume_or_forward_packets(
-                &my_pubkey,
-                || None,
-                || false,
-                || false,
-                || None
-            ),
-            BufferedPacketsDecision::Hold
-        );
         // Leader other than me, forward the packets
         assert_matches!(
             DecisionMaker::consume_or_forward_packets(
-                &my_pubkey,
                 || None,
                 || false,
                 || false,
-                || Some(my_pubkey1),
             ),
             BufferedPacketsDecision::Forward
         );
         // Will be leader shortly, hold the packets
         assert_matches!(
             DecisionMaker::consume_or_forward_packets(
-                &my_pubkey,
                 || None,
                 || true,
                 || panic!("should not be called"),
-                || panic!("should not be called"),
             ),
             BufferedPacketsDecision::Hold
         );
         // Will be leader (not shortly), forward and hold
         assert_matches!(
             DecisionMaker::consume_or_forward_packets(
-                &my_pubkey,
                 || None,
                 || false,
                 || true,
-                || panic!("should not be called"),
             ),
             BufferedPacketsDecision::ForwardAndHold
         );
-        // Current leader matches my pubkey, hold
-        assert_matches!(
-            DecisionMaker::consume_or_forward_packets(
-                &my_pubkey1,
-                || None,
-                || false,
-                || false,
-                || Some(my_pubkey1),
-            ),
-            BufferedPacketsDecision::Hold
-        );
     }
 }

+ 1 - 1
core/src/banking_stage/transaction_scheduler/scheduler_controller.rs

@@ -424,7 +424,7 @@ mod tests {
             Arc::new(AtomicBool::default()),
         );
         let poh_recorder = Arc::new(RwLock::new(poh_recorder));
-        let decision_maker = DecisionMaker::new(Pubkey::new_unique(), poh_recorder.clone());
+        let decision_maker = DecisionMaker::new(poh_recorder.clone());
 
         let (banking_packet_sender, banking_packet_receiver) = unbounded();
         let receive_and_buffer =

+ 1 - 4
core/src/banking_stage/unified_scheduler.rs

@@ -32,7 +32,6 @@ use {
     super::{
         decision_maker::{BufferedPacketsDecision, DecisionMaker, DecisionMakerWrapper},
         packet_deserializer::PacketDeserializer,
-        LikeClusterInfo,
     },
     crate::banking_trace::Channels,
     agave_banking_stage_ingress_types::BankingPacketBatch,
@@ -48,16 +47,14 @@ pub(crate) fn ensure_banking_stage_setup(
     pool: &DefaultSchedulerPool,
     bank_forks: &Arc<RwLock<BankForks>>,
     channels: &Channels,
-    cluster_info: &impl LikeClusterInfo,
     poh_recorder: &Arc<RwLock<PohRecorder>>,
     transaction_recorder: TransactionRecorder,
     num_threads: u32,
 ) {
     let mut root_bank_cache = RootBankCache::new(bank_forks.clone());
     let unified_receiver = channels.unified_receiver().clone();
-    let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
+    let mut decision_maker = DecisionMaker::new(poh_recorder.clone());
     let banking_stage_monitor = Box::new(DecisionMakerWrapper::new(decision_maker.clone()));
-
     let banking_packet_handler = Box::new(
         move |helper: &BankingStageHelper, batches: BankingPacketBatch| {
             let decision = decision_maker.make_consume_or_forward_decision();

+ 0 - 1
core/src/tpu.rs

@@ -327,7 +327,6 @@ impl Tpu {
         let banking_stage = BankingStage::new(
             block_production_method,
             transaction_struct,
-            cluster_info,
             poh_recorder,
             transaction_recorder,
             non_vote_receiver,

+ 0 - 14
core/tests/unified_scheduler.rs

@@ -19,9 +19,7 @@ use {
         unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
     },
     solana_entry::entry::Entry,
-    solana_gossip::cluster_info::{ClusterInfo, Node},
     solana_hash::Hash,
-    solana_keypair::Keypair,
     solana_ledger::{
         blockstore::Blockstore, create_new_tmp_ledger_auto_delete,
         genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache,
@@ -35,8 +33,6 @@ use {
         prioritization_fee_cache::PrioritizationFeeCache,
     },
     solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
-    solana_signer::Signer,
-    solana_streamer::socket::SocketAddrSpace,
     solana_system_transaction as system_transaction,
     solana_timings::ExecuteTimings,
     solana_transaction_error::TransactionResult as Result,
@@ -237,20 +233,10 @@ fn test_scheduler_producing_blocks() {
         let banking_tracer = BankingTracer::new_disabled();
         banking_tracer.create_channels(true)
     };
-    let cluster_info = {
-        let keypair = Arc::new(Keypair::new());
-        let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
-        Arc::new(ClusterInfo::new(
-            node.info,
-            keypair,
-            SocketAddrSpace::Unspecified,
-        ))
-    };
     ensure_banking_stage_setup(
         &pool,
         &bank_forks,
         &channels,
-        &cluster_info,
         &poh_recorder,
         transaction_recorder,
         BankingStage::num_threads(),