Ver código fonte

Don't construct or notify `RpcSubscriptions` when the RPC is off (#6516)

* Don't construct or notify `RpcSubscriptions` when the RPC is off

* Less dumb Rust

* Keep going? Can't get it to typecheck.

* Revert some places that still need `Arc`s due to threads

* Cleanup tests, avoid a clone in tpu

---------

Co-authored-by: Jon C <me@jonc.dev>
Steven Luscher 4 meses atrás
pai
commit
a6d8924b60

+ 19 - 17
core/src/cluster_info_vote_listener.rs

@@ -193,7 +193,7 @@ impl ClusterInfoVoteListener {
         verified_packets_sender: BankingPacketSender,
         vote_tracker: Arc<VoteTracker>,
         bank_forks: Arc<RwLock<BankForks>>,
-        subscriptions: Arc<RpcSubscriptions>,
+        subscriptions: Option<Arc<RpcSubscriptions>>,
         verified_vote_sender: VerifiedVoteSender,
         gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
         replay_votes_receiver: ReplayVoteReceiver,
@@ -230,7 +230,7 @@ impl ClusterInfoVoteListener {
                     vote_tracker,
                     &mut bank_hash_cache,
                     dumped_slot_subscription,
-                    subscriptions,
+                    subscriptions.as_deref(),
                     gossip_verified_vote_hash_sender,
                     verified_vote_sender,
                     replay_votes_receiver,
@@ -318,7 +318,7 @@ impl ClusterInfoVoteListener {
         vote_tracker: Arc<VoteTracker>,
         bank_hash_cache: &mut BankHashCache,
         dumped_slot_subscription: DumpedSlotSubscription,
-        subscriptions: Arc<RpcSubscriptions>,
+        subscriptions: Option<&RpcSubscriptions>,
         gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
         verified_vote_sender: VerifiedVoteSender,
         replay_votes_receiver: ReplayVoteReceiver,
@@ -355,7 +355,7 @@ impl ClusterInfoVoteListener {
                 &gossip_vote_txs_receiver,
                 &vote_tracker,
                 &root_bank,
-                &subscriptions,
+                subscriptions,
                 &gossip_verified_vote_hash_sender,
                 &verified_vote_sender,
                 &replay_votes_receiver,
@@ -389,7 +389,7 @@ impl ClusterInfoVoteListener {
         gossip_vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
         vote_tracker: &VoteTracker,
         root_bank: &Bank,
-        subscriptions: &RpcSubscriptions,
+        subscriptions: Option<&RpcSubscriptions>,
         gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
         verified_vote_sender: &VerifiedVoteSender,
         replay_votes_receiver: &ReplayVoteReceiver,
@@ -445,7 +445,7 @@ impl ClusterInfoVoteListener {
         vote_transaction_signature: Signature,
         vote_tracker: &VoteTracker,
         root_bank: &Bank,
-        subscriptions: &RpcSubscriptions,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         verified_vote_sender: &VerifiedVoteSender,
         gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
         diff: &mut HashMap<Slot, HashMap<Pubkey, bool>>,
@@ -586,7 +586,9 @@ impl ClusterInfoVoteListener {
         *latest_vote_slot = max(*latest_vote_slot, last_vote_slot);
 
         if is_new_vote {
-            subscriptions.notify_vote(*vote_pubkey, vote, vote_transaction_signature);
+            if let Some(rpc_subscriptions) = rpc_subscriptions {
+                rpc_subscriptions.notify_vote(*vote_pubkey, vote, vote_transaction_signature);
+            }
             let _ = verified_vote_sender.send((*vote_pubkey, vote_slots));
         }
     }
@@ -597,7 +599,7 @@ impl ClusterInfoVoteListener {
         gossip_vote_txs: Vec<Transaction>,
         replayed_votes: Vec<ParsedVote>,
         root_bank: &Bank,
-        subscriptions: &RpcSubscriptions,
+        subscriptions: Option<&RpcSubscriptions>,
         gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
         verified_vote_sender: &VerifiedVoteSender,
         bank_notification_sender: &Option<BankNotificationSender>,
@@ -881,7 +883,7 @@ mod tests {
             &votes_receiver,
             &vote_tracker,
             &bank3,
-            &subscriptions,
+            Some(&subscriptions),
             &gossip_verified_vote_hash_sender,
             &verified_vote_sender,
             &replay_votes_receiver,
@@ -916,7 +918,7 @@ mod tests {
             &votes_receiver,
             &vote_tracker,
             &bank3,
-            &subscriptions,
+            Some(&subscriptions),
             &gossip_verified_vote_hash_sender,
             &verified_vote_sender,
             &replay_votes_receiver,
@@ -1010,7 +1012,7 @@ mod tests {
             &votes_txs_receiver,
             &vote_tracker,
             &bank0,
-            &subscriptions,
+            Some(&subscriptions),
             &gossip_verified_vote_hash_sender,
             &verified_vote_sender,
             &replay_votes_receiver,
@@ -1180,7 +1182,7 @@ mod tests {
             &votes_txs_receiver,
             &vote_tracker,
             &bank0,
-            &subscriptions,
+            Some(&subscriptions),
             &gossip_verified_vote_hash_sender,
             &verified_vote_sender,
             &replay_votes_receiver,
@@ -1293,7 +1295,7 @@ mod tests {
                     &votes_receiver,
                     &vote_tracker,
                     &bank,
-                    &subscriptions,
+                    Some(&subscriptions),
                     &gossip_verified_vote_hash_sender,
                     &verified_vote_sender,
                     &replay_votes_receiver,
@@ -1389,7 +1391,7 @@ mod tests {
                 Signature::default(),
             )],
             &bank,
-            &subscriptions,
+            Some(&subscriptions),
             &gossip_verified_vote_hash_sender,
             &verified_vote_sender,
             &None,
@@ -1438,7 +1440,7 @@ mod tests {
                 Signature::default(),
             )],
             &new_root_bank,
-            &subscriptions,
+            Some(&subscriptions),
             &gossip_verified_vote_hash_sender,
             &verified_vote_sender,
             &None,
@@ -1656,7 +1658,7 @@ mod tests {
             signature,
             &vote_tracker,
             &bank,
-            &subscriptions,
+            Some(&subscriptions),
             &verified_vote_sender,
             &gossip_verified_vote_hash_sender,
             &mut diff,
@@ -1689,7 +1691,7 @@ mod tests {
             signature,
             &vote_tracker,
             &bank,
-            &subscriptions,
+            Some(&subscriptions),
             &verified_vote_sender,
             &gossip_verified_vote_hash_sender,
             &mut diff,

+ 14 - 9
core/src/commitment_service.rs

@@ -67,7 +67,7 @@ impl AggregateCommitmentService {
     pub fn new(
         exit: Arc<AtomicBool>,
         block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
-        subscriptions: Arc<RpcSubscriptions>,
+        subscriptions: Option<Arc<RpcSubscriptions>>,
     ) -> (Sender<CommitmentAggregationData>, Self) {
         let (sender, receiver): (
             Sender<CommitmentAggregationData>,
@@ -83,9 +83,12 @@ impl AggregateCommitmentService {
                             break;
                         }
 
-                        if let Err(RecvTimeoutError::Disconnected) =
-                            Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit)
-                        {
+                        if let Err(RecvTimeoutError::Disconnected) = Self::run(
+                            &receiver,
+                            &block_commitment_cache,
+                            subscriptions.as_deref(),
+                            &exit,
+                        ) {
                             break;
                         }
                     })
@@ -97,7 +100,7 @@ impl AggregateCommitmentService {
     fn run(
         receiver: &Receiver<CommitmentAggregationData>,
         block_commitment_cache: &RwLock<BlockCommitmentCache>,
-        subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         exit: &AtomicBool,
     ) -> Result<(), RecvTimeoutError> {
         loop {
@@ -136,10 +139,12 @@ impl AggregateCommitmentService {
                 ),
             );
 
-            // Triggers rpc_subscription notifications as soon as new commitment data is available,
-            // sending just the commitment cache slot information that the notifications thread
-            // needs
-            subscriptions.notify_subscribers(update_commitment_slots);
+            if let Some(rpc_subscriptions) = rpc_subscriptions {
+                // Triggers rpc_subscription notifications as soon as new commitment data is
+                // available, sending just the commitment cache slot information that the
+                // notifications thread needs
+                rpc_subscriptions.notify_subscribers(update_commitment_slots);
+            }
         }
     }
 

+ 48 - 32
core/src/replay_stage.rs

@@ -280,7 +280,7 @@ pub struct ReplayStageConfig {
 }
 
 pub struct ReplaySenders {
-    pub rpc_subscriptions: Arc<RpcSubscriptions>,
+    pub rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
     pub slot_status_notifier: Option<SlotStatusNotifier>,
     pub transaction_status_sender: Option<TransactionStatusSender>,
     pub entry_notification_sender: Option<EntryNotifierSender>,
@@ -715,7 +715,7 @@ impl ReplayStage {
                     &blockstore,
                     &bank_forks,
                     &leader_schedule_cache,
-                    &rpc_subscriptions,
+                    rpc_subscriptions.as_deref(),
                     &slot_status_notifier,
                     &mut progress,
                     &mut replay_timing,
@@ -741,7 +741,7 @@ impl ReplayStage {
                     &mut heaviest_subtree_fork_choice,
                     &replay_vote_sender,
                     &bank_notification_sender,
-                    &rpc_subscriptions,
+                    rpc_subscriptions.as_deref(),
                     &slot_status_notifier,
                     &mut duplicate_slots_tracker,
                     &duplicate_confirmed_slots,
@@ -1002,7 +1002,7 @@ impl ReplayStage {
                         &leader_schedule_cache,
                         &lockouts_sender,
                         snapshot_controller.as_deref(),
-                        &rpc_subscriptions,
+                        rpc_subscriptions.as_deref(),
                         &block_commitment_cache,
                         &mut heaviest_subtree_fork_choice,
                         &bank_notification_sender,
@@ -1160,7 +1160,7 @@ impl ReplayStage {
                         &bank_forks,
                         &poh_recorder,
                         &leader_schedule_cache,
-                        &rpc_subscriptions,
+                        rpc_subscriptions.as_deref(),
                         &slot_status_notifier,
                         &mut progress,
                         &retransmit_slots_sender,
@@ -2079,7 +2079,7 @@ impl ReplayStage {
         bank_forks: &Arc<RwLock<BankForks>>,
         poh_recorder: &Arc<RwLock<PohRecorder>>,
         leader_schedule_cache: &Arc<LeaderScheduleCache>,
-        rpc_subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         slot_status_notifier: &Option<SlotStatusNotifier>,
         progress_map: &mut ProgressMap,
         retransmit_slots_sender: &Sender<Slot>,
@@ -2257,7 +2257,7 @@ impl ReplayStage {
         bank: &Bank,
         root: Slot,
         err: &BlockstoreProcessorError,
-        rpc_subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         slot_status_notifier: &Option<SlotStatusNotifier>,
         duplicate_slots_tracker: &mut DuplicateSlotsTracker,
         duplicate_confirmed_slots: &DuplicateConfirmedSlots,
@@ -2309,11 +2309,13 @@ impl ReplayStage {
                 .notify_slot_dead(slot, parent_slot, err.clone());
         }
 
-        rpc_subscriptions.notify_slot_update(SlotUpdate::Dead {
-            slot,
-            err,
-            timestamp: timestamp(),
-        });
+        if let Some(rpc_subscriptions) = rpc_subscriptions {
+            rpc_subscriptions.notify_slot_update(SlotUpdate::Dead {
+                slot,
+                err,
+                timestamp: timestamp(),
+            });
+        }
 
         let dead_state = DeadState::new_from_state(
             slot,
@@ -2374,7 +2376,7 @@ impl ReplayStage {
         leader_schedule_cache: &Arc<LeaderScheduleCache>,
         lockouts_sender: &Sender<CommitmentAggregationData>,
         snapshot_controller: Option<&SnapshotController>,
-        rpc_subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
         heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
         bank_notification_sender: &Option<BankNotificationSenderConfig>,
@@ -3051,7 +3053,7 @@ impl ReplayStage {
         transaction_status_sender: Option<&TransactionStatusSender>,
         heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
         bank_notification_sender: &Option<BankNotificationSenderConfig>,
-        rpc_subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         slot_status_notifier: &Option<SlotStatusNotifier>,
         duplicate_slots_tracker: &mut DuplicateSlotsTracker,
         duplicate_confirmed_slots: &DuplicateConfirmedSlots,
@@ -3355,7 +3357,7 @@ impl ReplayStage {
         heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
         replay_vote_sender: &ReplayVoteSender,
         bank_notification_sender: &Option<BankNotificationSenderConfig>,
-        rpc_subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         slot_status_notifier: &Option<SlotStatusNotifier>,
         duplicate_slots_tracker: &mut DuplicateSlotsTracker,
         duplicate_confirmed_slots: &DuplicateConfirmedSlots,
@@ -3980,7 +3982,7 @@ impl ReplayStage {
         blockstore: &Blockstore,
         leader_schedule_cache: &Arc<LeaderScheduleCache>,
         snapshot_controller: Option<&SnapshotController>,
-        rpc_subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
         heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
         bank_notification_sender: &Option<BankNotificationSenderConfig>,
@@ -4042,7 +4044,9 @@ impl ReplayStage {
             drop_bank_sender,
         )?;
         blockstore.slots_stats.mark_rooted(new_root);
-        rpc_subscriptions.notify_roots(rooted_slots);
+        if let Some(rpc_subscriptions) = rpc_subscriptions {
+            rpc_subscriptions.notify_roots(rooted_slots);
+        }
         if let Some(sender) = bank_notification_sender {
             sender
                 .sender
@@ -4127,7 +4131,7 @@ impl ReplayStage {
         blockstore: &Blockstore,
         bank_forks: &RwLock<BankForks>,
         leader_schedule_cache: &Arc<LeaderScheduleCache>,
-        rpc_subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         slot_status_notifier: &Option<SlotStatusNotifier>,
         progress: &mut ProgressMap,
         replay_timing: &mut ReplayLoopTiming,
@@ -4222,11 +4226,13 @@ impl ReplayStage {
         slot: u64,
         root_slot: u64,
         leader: &Pubkey,
-        rpc_subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<&RpcSubscriptions>,
         slot_status_notifier: &Option<SlotStatusNotifier>,
         new_bank_options: NewBankOptions,
     ) -> Bank {
-        rpc_subscriptions.notify_slot(slot, parent.slot(), root_slot);
+        if let Some(rpc_subscriptions) = rpc_subscriptions {
+            rpc_subscriptions.notify_slot(slot, parent.slot(), root_slot);
+        }
         if let Some(slot_status_notifier) = slot_status_notifier {
             slot_status_notifier
                 .read()
@@ -4543,6 +4549,8 @@ pub(crate) mod tests {
         bank1.freeze();
         bank_forks.write().unwrap().insert(bank1);
 
+        let rpc_subscriptions = Some(rpc_subscriptions);
+
         // Insert shreds for slot NUM_CONSECUTIVE_LEADER_SLOTS,
         // chaining to slot 1
         let (shreds, _) = make_slot_entries(
@@ -4562,7 +4570,7 @@ pub(crate) mod tests {
             &blockstore,
             &bank_forks,
             &leader_schedule_cache,
-            &rpc_subscriptions,
+            rpc_subscriptions.as_deref(),
             &None,
             &mut progress,
             &mut replay_timing,
@@ -4591,7 +4599,7 @@ pub(crate) mod tests {
             &blockstore,
             &bank_forks,
             &leader_schedule_cache,
-            &rpc_subscriptions,
+            rpc_subscriptions.as_deref(),
             &None,
             &mut progress,
             &mut replay_timing,
@@ -5101,13 +5109,15 @@ pub(crate) mod tests {
                 SlotStatusNotifierForTest::new(dead_slots.clone()),
             )));
 
+            let rpc_subscriptions = Some(rpc_subscriptions);
+
             if let Err(err) = &res {
                 ReplayStage::mark_dead_slot(
                     &blockstore,
                     &bank1,
                     0,
                     err,
-                    &rpc_subscriptions,
+                    rpc_subscriptions.as_deref(),
                     &slot_status_notifier,
                     &mut DuplicateSlotsTracker::default(),
                     &DuplicateConfirmedSlots::new(),
@@ -5164,13 +5174,13 @@ pub(crate) mod tests {
         let exit = Arc::new(AtomicBool::new(false));
         let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
         let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
-        let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
+        let rpc_subscriptions = Some(Arc::new(RpcSubscriptions::new_for_tests(
             exit.clone(),
             max_complete_transaction_status_slot,
             bank_forks.clone(),
             block_commitment_cache.clone(),
             OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
-        ));
+        )));
         let (lockouts_sender, _) = AggregateCommitmentService::new(
             exit,
             block_commitment_cache.clone(),
@@ -6476,12 +6486,14 @@ pub(crate) mod tests {
         );
         blockstore.insert_shreds(shreds, None, false).unwrap();
 
+        let rpc_subscriptions = Some(rpc_subscriptions);
+
         // 3 should now be an active bank
         ReplayStage::generate_new_bank_forks(
             &blockstore,
             &bank_forks,
             &leader_schedule_cache,
-            &rpc_subscriptions,
+            rpc_subscriptions.as_deref(),
             &None,
             &mut progress,
             &mut replay_timing,
@@ -6511,7 +6523,7 @@ pub(crate) mod tests {
             &blockstore,
             &bank_forks,
             &leader_schedule_cache,
-            &rpc_subscriptions,
+            rpc_subscriptions.as_deref(),
             &None,
             &mut progress,
             &mut replay_timing,
@@ -6542,7 +6554,7 @@ pub(crate) mod tests {
             &blockstore,
             &bank_forks,
             &leader_schedule_cache,
-            &rpc_subscriptions,
+            rpc_subscriptions.as_deref(),
             &None,
             &mut progress,
             &mut replay_timing,
@@ -6572,7 +6584,7 @@ pub(crate) mod tests {
             &blockstore,
             &bank_forks,
             &leader_schedule_cache,
-            &rpc_subscriptions,
+            rpc_subscriptions.as_deref(),
             &None,
             &mut progress,
             &mut replay_timing,
@@ -8632,12 +8644,14 @@ pub(crate) mod tests {
         // this test to use true to avoid skipping the leader slot
         let has_new_vote_been_rooted = true;
 
+        let rpc_subscriptions = Some(rpc_subscriptions);
+
         assert!(!ReplayStage::maybe_start_leader(
             my_pubkey,
             bank_forks,
             &poh_recorder,
             &leader_schedule_cache,
-            &rpc_subscriptions,
+            rpc_subscriptions.as_deref(),
             &None,
             &mut progress,
             &retransmit_slots_sender,
@@ -9285,6 +9299,8 @@ pub(crate) mod tests {
         // this test to use true to avoid skipping the leader slot
         let has_new_vote_been_rooted = true;
 
+        let rpc_subscriptions = Some(rpc_subscriptions);
+
         // We should not attempt to start leader for the dummy_slot
         assert_matches!(
             poh_recorder.read().unwrap().reached_leader_slot(&my_pubkey),
@@ -9295,7 +9311,7 @@ pub(crate) mod tests {
             &bank_forks,
             &poh_recorder,
             &leader_schedule_cache,
-            &rpc_subscriptions,
+            rpc_subscriptions.as_deref(),
             &None,
             &mut progress,
             &retransmit_slots_sender,
@@ -9321,7 +9337,7 @@ pub(crate) mod tests {
             &bank_forks,
             &poh_recorder,
             &leader_schedule_cache,
-            &rpc_subscriptions,
+            rpc_subscriptions.as_deref(),
             &None,
             &mut progress,
             &retransmit_slots_sender,

+ 2 - 2
core/src/tpu.rs

@@ -126,7 +126,7 @@ impl Tpu {
         entry_receiver: Receiver<WorkingBankEntry>,
         retransmit_slots_receiver: Receiver<Slot>,
         sockets: TpuSockets,
-        subscriptions: &Arc<RpcSubscriptions>,
+        subscriptions: Option<Arc<RpcSubscriptions>>,
         transaction_status_sender: Option<TransactionStatusSender>,
         entry_notification_sender: Option<EntryNotifierSender>,
         blockstore: Arc<Blockstore>,
@@ -315,7 +315,7 @@ impl Tpu {
             gossip_vote_sender,
             vote_tracker,
             bank_forks.clone(),
-            subscriptions.clone(),
+            subscriptions,
             verified_vote_sender,
             gossip_verified_vote_hash_sender,
             replay_vote_receiver,

+ 5 - 5
core/src/tvu.rs

@@ -134,7 +134,7 @@ impl Tvu {
         sockets: TvuSockets,
         blockstore: Arc<Blockstore>,
         ledger_signal_receiver: Receiver<bool>,
-        rpc_subscriptions: &Arc<RpcSubscriptions>,
+        rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
         poh_recorder: &Arc<RwLock<PohRecorder>>,
         tower: Tower,
         tower_storage: Arc<dyn TowerStorage>,
@@ -224,7 +224,7 @@ impl Tvu {
             turbine_quic_endpoint_sender,
             retransmit_receiver,
             max_slots.clone(),
-            Some(rpc_subscriptions.clone()),
+            rpc_subscriptions.clone(),
             slot_status_notifier.clone(),
             tvu_config.xdp_sender,
         );
@@ -295,7 +295,7 @@ impl Tvu {
         let (voting_sender, voting_receiver) = unbounded();
 
         let replay_senders = ReplaySenders {
-            rpc_subscriptions: rpc_subscriptions.clone(),
+            rpc_subscriptions,
             slot_status_notifier,
             transaction_status_sender,
             entry_notification_sender,
@@ -557,13 +557,13 @@ pub mod tests {
             },
             blockstore,
             ledger_signal_receiver,
-            &Arc::new(RpcSubscriptions::new_for_tests(
+            Some(Arc::new(RpcSubscriptions::new_for_tests(
                 exit.clone(),
                 max_complete_transaction_status_slot,
                 bank_forks.clone(),
                 block_commitment_cache.clone(),
                 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
-            )),
+            ))),
             &poh_recorder,
             Tower::default(),
             Arc::new(FileTowerStorage::default()),

+ 16 - 16
core/src/validator.rs

@@ -1063,17 +1063,6 @@ impl Validator {
         let optimistically_confirmed_bank =
             OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
 
-        let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
-            exit.clone(),
-            max_complete_transaction_status_slot.clone(),
-            blockstore.clone(),
-            bank_forks.clone(),
-            block_commitment_cache.clone(),
-            optimistically_confirmed_bank.clone(),
-            &config.pubsub_config,
-            None,
-        ));
-
         let max_slots = Arc::new(MaxSlots::default());
 
         let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
@@ -1152,6 +1141,7 @@ impl Validator {
             Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
         let (
             json_rpc_service,
+            rpc_subscriptions,
             pubsub_service,
             completed_data_sets_sender,
             completed_data_sets_service,
@@ -1208,13 +1198,22 @@ impl Validator {
                 send_transaction_service_config: config.send_transaction_service_config.clone(),
                 max_slots: max_slots.clone(),
                 leader_schedule_cache: leader_schedule_cache.clone(),
-                max_complete_transaction_status_slot,
+                max_complete_transaction_status_slot: max_complete_transaction_status_slot.clone(),
                 prioritization_fee_cache: prioritization_fee_cache.clone(),
                 client_option,
             };
             let json_rpc_service =
                 JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
-
+            let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config(
+                exit.clone(),
+                max_complete_transaction_status_slot,
+                blockstore.clone(),
+                bank_forks.clone(),
+                block_commitment_cache.clone(),
+                optimistically_confirmed_bank.clone(),
+                &config.pubsub_config,
+                None,
+            ));
             let pubsub_service = if !config.rpc_config.full_api {
                 None
             } else {
@@ -1283,6 +1282,7 @@ impl Validator {
             });
             (
                 Some(json_rpc_service),
+                Some(rpc_subscriptions),
                 pubsub_service,
                 completed_data_sets_sender,
                 completed_data_sets_service,
@@ -1291,7 +1291,7 @@ impl Validator {
                 bank_notification_sender_config,
             )
         } else {
-            (None, None, None, None, None, None, None)
+            (None, None, None, None, None, None, None, None)
         };
 
         if config.halt_at_slot.is_some() {
@@ -1522,7 +1522,7 @@ impl Validator {
             },
             blockstore.clone(),
             ledger_signal_receiver,
-            &rpc_subscriptions,
+            rpc_subscriptions.clone(),
             &poh_recorder,
             tower,
             config.tower_storage.clone(),
@@ -1627,7 +1627,7 @@ impl Validator {
                 vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
                 vortexor_receivers: node.sockets.vortexor_receivers,
             },
-            &rpc_subscriptions,
+            rpc_subscriptions.clone(),
             transaction_status_sender,
             entry_notification_sender,
             blockstore.clone(),

+ 1 - 1
rpc/src/rpc_pubsub_service.rs

@@ -83,7 +83,7 @@ pub struct PubSubService {
 impl PubSubService {
     pub fn new(
         pubsub_config: PubSubConfig,
-        subscriptions: &Arc<RpcSubscriptions>,
+        subscriptions: &RpcSubscriptions,
         pubsub_addr: SocketAddr,
     ) -> (Trigger, Self) {
         let subscription_control = subscriptions.control().clone();