Bläddra i källkod

Add vortexor stake updater test (#5629)

* Stake updater test

* subscribe to 2 rpc nodes

* short stake updater service sleep time for test

* add timeout checks on test
Lijun Wang 7 månader sedan
förälder
incheckning
e2f9ec62fc
6 ändrade filer med 123 tillägg och 10 borttagningar
  1. 1 0
      Cargo.lock
  2. 4 0
      streamer/src/streamer.rs
  3. 1 0
      vortexor/Cargo.toml
  4. 2 1
      vortexor/src/main.rs
  5. 12 8
      vortexor/src/stake_updater.rs
  6. 103 1
      vortexor/tests/vortexor.rs

+ 1 - 0
Cargo.lock

@@ -11151,6 +11151,7 @@ dependencies = [
  "solana-clap-utils",
  "solana-client",
  "solana-core",
+ "solana-local-cluster",
  "solana-logger",
  "solana-measure",
  "solana-metrics",

+ 4 - 0
streamer/src/streamer.rs

@@ -361,6 +361,10 @@ impl StakedNodes {
             .filter(|&stake| stake > 0);
         let total_stake = values.clone().sum();
         let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default();
+        info!(
+            "StakedNodes: total_stake: {}, min_stake: {}, max_stake: {}",
+            total_stake, min_stake, max_stake
+        );
         (total_stake, min_stake, max_stake)
     }
 

+ 1 - 0
vortexor/Cargo.toml

@@ -56,6 +56,7 @@ x509-parser = { workspace = true }
 
 [dev-dependencies]
 assert_matches = { workspace = true }
+solana-local-cluster = { workspace = true }
 solana-streamer = { workspace = true, features = ["dev-context-only-utils"] }
 
 [lib]

+ 2 - 1
vortexor/src/main.rs

@@ -14,7 +14,7 @@ use {
             PacketBatchSender, DEFAULT_BATCH_SIZE, DEFAULT_RECV_TIMEOUT,
             DEFAULT_SENDER_THREADS_COUNT,
         },
-        stake_updater::StakeUpdater,
+        stake_updater::{StakeUpdater, STAKE_REFRESH_SLEEP_DURATION},
         vortexor::Vortexor,
     },
     std::{
@@ -147,6 +147,7 @@ pub fn main() {
         exit.clone(),
         rpc_load_balancer.clone(),
         staked_nodes.clone(),
+        STAKE_REFRESH_SLEEP_DURATION,
     );
 
     info!(

+ 12 - 8
vortexor/src/stake_updater.rs

@@ -3,7 +3,7 @@
 
 use {
     crate::rpc_load_balancer::RpcLoadBalancer,
-    log::warn,
+    log::{info, warn},
     solana_client::client_error,
     solana_sdk::pubkey::Pubkey,
     solana_streamer::streamer::StakedNodes,
@@ -25,7 +25,7 @@ use {
 const STAKE_REFRESH_INTERVAL: Duration = Duration::from_secs(1800);
 
 // The interval to to sleep to check for exit condition and/or refresh condition.
-const STAKE_REFRESH_SLEEP_DURATION: Duration = Duration::from_secs(5);
+pub const STAKE_REFRESH_SLEEP_DURATION: Duration = Duration::from_secs(5);
 
 /// This service is responsible for periodically refresh the stake information
 /// from the network with the assistance of the RpcLoaderBalancer.
@@ -38,19 +38,22 @@ impl StakeUpdater {
         exit: Arc<AtomicBool>,
         rpc_load_balancer: Arc<RpcLoadBalancer>,
         shared_staked_nodes: Arc<RwLock<StakedNodes>>,
+        refresh_sleep_duration: Duration,
     ) -> Self {
+        info!("Starting stake updater thread");
         let thread_hdl = Builder::new()
             .name("stkUpdtr".to_string())
             .spawn(move || {
-                let mut last_stakes = Instant::now();
+                let mut last_stakes = None;
                 while !exit.load(Ordering::Relaxed) {
                     if let Err(err) = Self::try_refresh_stake_info(
                         &mut last_stakes,
                         &shared_staked_nodes,
                         &rpc_load_balancer,
+                        &refresh_sleep_duration,
                     ) {
                         warn!("Failed to refresh pubkey to stake map! Error: {:?}", err);
-                        sleep(STAKE_REFRESH_SLEEP_DURATION);
+                        sleep(refresh_sleep_duration);
                     }
                 }
             })
@@ -62,11 +65,12 @@ impl StakeUpdater {
     /// Update the stake info when it has elapsed more than the
     /// STAKE_REFRESH_INTERVAL since the last time it was refreshed.
     fn try_refresh_stake_info(
-        last_refresh: &mut Instant,
+        last_refresh: &mut Option<Instant>,
         shared_staked_nodes: &Arc<RwLock<StakedNodes>>,
         rpc_load_balancer: &Arc<RpcLoadBalancer>,
+        refresh_sleep_duration: &Duration,
     ) -> client_error::Result<()> {
-        if last_refresh.elapsed() > STAKE_REFRESH_INTERVAL {
+        if last_refresh.is_none() || last_refresh.unwrap().elapsed() > STAKE_REFRESH_INTERVAL {
             let client = rpc_load_balancer.rpc_client();
             let vote_accounts = client.get_vote_accounts()?;
 
@@ -84,13 +88,13 @@ impl StakeUpdater {
                     .collect::<HashMap<Pubkey, u64>>(),
             );
 
-            *last_refresh = Instant::now();
+            *last_refresh = Some(Instant::now());
             shared_staked_nodes
                 .write()
                 .unwrap()
                 .update_stake_map(stake_map);
         } else {
-            sleep(STAKE_REFRESH_SLEEP_DURATION);
+            sleep(*refresh_sleep_duration);
         }
         Ok(())
     }

+ 103 - 1
vortexor/tests/vortexor.rs

@@ -1,17 +1,28 @@
 use {
     crossbeam_channel::unbounded,
+    log::info,
+    solana_local_cluster::{
+        cluster::ClusterValidatorInfo,
+        local_cluster::{ClusterConfig, LocalCluster},
+    },
     solana_net_utils::VALIDATOR_PORT_RANGE,
-    solana_sdk::{net::DEFAULT_TPU_COALESCE, pubkey::Pubkey, signature::Keypair, signer::Signer},
+    solana_sdk::{
+        native_token::LAMPORTS_PER_SOL, net::DEFAULT_TPU_COALESCE, pubkey::Pubkey,
+        signature::Keypair, signer::Signer,
+    },
     solana_streamer::{
         nonblocking::testing_utilities::check_multiple_streams,
         quic::{
             DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STAKED_CONNECTIONS,
             DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_MAX_UNSTAKED_CONNECTIONS,
         },
+        socket::SocketAddrSpace,
         streamer::StakedNodes,
     },
     solana_vortexor::{
         cli::{DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER, DEFAULT_NUM_QUIC_ENDPOINTS},
+        rpc_load_balancer,
+        stake_updater::StakeUpdater,
         vortexor::Vortexor,
     },
     std::{
@@ -20,7 +31,9 @@ use {
             atomic::{AtomicBool, Ordering},
             Arc, RwLock,
         },
+        time::Duration,
     },
+    url::Url,
 };
 
 #[tokio::test(flavor = "multi_thread")]
@@ -71,3 +84,92 @@ async fn test_vortexor() {
     exit.store(true, Ordering::Relaxed);
     vortexor.join().unwrap();
 }
+
+fn get_server_urls(validator: &ClusterValidatorInfo) -> (Url, Url) {
+    let rpc_addr = validator.info.contact_info.rpc().unwrap();
+    let rpc_pubsub_addr = validator.info.contact_info.rpc_pubsub().unwrap();
+    let rpc_url = Url::parse(format!("http://{}", rpc_addr).as_str()).unwrap();
+    let ws_url = Url::parse(format!("ws://{}", rpc_pubsub_addr).as_str()).unwrap();
+    (rpc_url, ws_url)
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn test_stake_update() {
+    solana_logger::setup();
+
+    // Create a local cluster with 3 validators
+    let default_node_stake = 10 * LAMPORTS_PER_SOL; // Define a default value for node stake
+    let mint_lamports = 100 * LAMPORTS_PER_SOL;
+    let mut config = ClusterConfig::new_with_equal_stakes(3, mint_lamports, default_node_stake);
+
+    let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
+    info!(
+        "Cluster created with {} validators",
+        cluster.validators.len()
+    );
+    assert_eq!(cluster.validators.len(), 3);
+
+    let pubkey = cluster.entry_point_info.pubkey();
+    let validator = &cluster.validators[pubkey];
+
+    let mut servers = vec![get_server_urls(validator)];
+    // add one more RPC subscription to another validator
+    for validator in cluster.validators.values() {
+        if validator.info.keypair.pubkey() != *pubkey {
+            servers.push(get_server_urls(validator));
+            break;
+        }
+    }
+    let exit = Arc::new(AtomicBool::new(false));
+
+    let (rpc_load_balancer, slot_receiver) =
+        rpc_load_balancer::RpcLoadBalancer::new(&servers, &exit);
+
+    // receive 2 slot updates
+    let mut i = 0;
+    let slot_receive_timeout = Duration::from_secs(5); // conservative timeout to ensure stable test
+    while i < 2 {
+        let slot = slot_receiver
+            .recv_timeout(slot_receive_timeout)
+            .unwrap_or_else(|_| panic!("Expected a slot within {slot_receive_timeout:?}"));
+        i += 1;
+        info!("Received a slot update: {}", slot);
+    }
+
+    let rpc_load_balancer = Arc::new(rpc_load_balancer);
+    // Now create a stake updater service
+    let shared_staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
+    let staked_nodes_updater_service = StakeUpdater::new(
+        exit.clone(),
+        rpc_load_balancer.clone(),
+        shared_staked_nodes.clone(),
+        Duration::from_millis(100), // short sleep to speed up the test for service exit
+    );
+
+    // Waiting for the stake map to be populated by the stake updater service
+    let start_of_stake_updater = std::time::Instant::now();
+    let stake_updater_timeout = Duration::from_secs(10); // conservative timeout to ensure stable test
+    loop {
+        let stakes = shared_staked_nodes.read().unwrap();
+        if let Some(stake) = stakes.get_node_stake(pubkey) {
+            info!("Stake for {}: {}", pubkey, stake);
+            assert_eq!(stake, default_node_stake);
+            let total_stake = stakes.total_stake();
+            info!("total_stake: {}", total_stake);
+            assert!(total_stake >= default_node_stake);
+            break;
+        }
+        info!("Waiting for stake map to be populated for {pubkey:?}...");
+        drop(stakes); // Drop the read lock before sleeping so the writer side can proceed
+        std::thread::sleep(std::time::Duration::from_millis(100));
+        if start_of_stake_updater.elapsed() > stake_updater_timeout {
+            panic!("Timeout waiting for stake map to be populated");
+        }
+    }
+    info!("Test done, exiting stake updater service");
+    exit.store(true, Ordering::Relaxed);
+    staked_nodes_updater_service.join().unwrap();
+    info!("Stake updater service exited successfully, shutting down cluster");
+    cluster.exit();
+    info!("Cluster exited successfully");
+}