Prechádzať zdrojové kódy

Update Leader Slot Info Atomically (#7241)

Brennan 3 mesiacov pred
rodič
commit
46d151a191
1 zmenil súbory, kde vykonal 16 pridanie a 11 odobranie
  1. 16 11
      tpu-client/src/nonblocking/tpu_client.rs

+ 16 - 11
tpu-client/src/nonblocking/tpu_client.rs

@@ -72,6 +72,7 @@ struct LeaderTpuCacheUpdateInfo {
     pub(super) maybe_cluster_nodes: Option<ClientResult<Vec<RpcContactInfo>>>,
     pub(super) maybe_epoch_schedule: Option<ClientResult<EpochSchedule>>,
     pub(super) maybe_slot_leaders: Option<ClientResult<Vec<Pubkey>>>,
+    pub(super) first_slot: Slot,
 }
 impl LeaderTpuCacheUpdateInfo {
     pub fn has_some(&self) -> bool {
@@ -212,11 +213,7 @@ impl LeaderTpuCache {
         (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch)
     }
 
-    pub fn update_all(
-        &mut self,
-        estimated_current_slot: Slot,
-        cache_update_info: LeaderTpuCacheUpdateInfo,
-    ) -> (bool, bool) {
+    pub fn update_all(&mut self, cache_update_info: LeaderTpuCacheUpdateInfo) -> (bool, bool) {
         let mut has_error = false;
         let mut cluster_refreshed = false;
         if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes {
@@ -234,7 +231,7 @@ impl LeaderTpuCache {
         }
 
         if let Some(Ok(epoch_schedule)) = cache_update_info.maybe_epoch_schedule {
-            let epoch = epoch_schedule.get_epoch(estimated_current_slot);
+            let epoch = epoch_schedule.get_epoch(cache_update_info.first_slot);
             self.slots_in_epoch = epoch_schedule.get_slots_in_epoch(epoch);
             self.last_slot_in_epoch = epoch_schedule.get_last_slot_in_epoch(epoch);
         }
@@ -242,13 +239,14 @@ impl LeaderTpuCache {
         if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders {
             match slot_leaders {
                 Ok(slot_leaders) => {
-                    self.first_slot = estimated_current_slot;
+                    self.first_slot = cache_update_info.first_slot;
                     self.leaders = slot_leaders;
                 }
                 Err(err) => {
                     warn!(
-                        "Failed to fetch slot leaders (current estimated slot: \
-                         {estimated_current_slot}): {err}"
+                        "Failed to fetch slot leaders (first_slot: \
+                         {}): {err}",
+                        cache_update_info.first_slot
                     );
                     has_error = true;
                 }
@@ -903,8 +901,7 @@ impl LeaderTpuService {
 
             if cache_update_info.has_some() {
                 let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
-                let (has_error, cluster_refreshed) = leader_tpu_cache
-                    .update_all(recent_slots.estimated_current_slot(), cache_update_info);
+                let (has_error, cluster_refreshed) = leader_tpu_cache.update_all(cache_update_info);
                 if has_error {
                     sleep_ms = 100;
                 }
@@ -978,17 +975,24 @@ async fn maybe_fetch_cache_info(
         None
     };
 
+    // Grab information about the slot leaders currently in the cache.
     let estimated_current_slot = recent_slots.estimated_current_slot();
     let (last_slot, last_slot_in_epoch, slots_in_epoch) = {
         let leader_tpu_cache = leader_tpu_cache.read().unwrap();
         leader_tpu_cache.slot_info()
     };
+
+    // If we're crossing into a new epoch, fetch the updated epoch schedule.
     let maybe_epoch_schedule = if estimated_current_slot > last_slot_in_epoch {
         Some(rpc_client.get_epoch_schedule().await)
     } else {
         None
     };
 
+    // If we are within the fanout range of the last slot in the cache, fetch
+    // more slot leaders. We pull down a big batch at at time to amortize the
+    // cost of the RPC call. We don't want to stall transactions on pulling this
+    // down so we fetch it proactively.
     let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS)
     {
         Some(
@@ -1006,6 +1010,7 @@ async fn maybe_fetch_cache_info(
         maybe_cluster_nodes,
         maybe_epoch_schedule,
         maybe_slot_leaders,
+        first_slot: estimated_current_slot,
     }
 }