Browse Source

Add RPC bench threads to accounts-cluster-bench (#34520)

sakridge 1 year ago
parent
commit
709e934d4c
1 changed files with 358 additions and 16 deletions
  1. 358 16
      accounts-cluster-bench/src/main.rs

+ 358 - 16
accounts-cluster-bench/src/main.rs

@@ -1,6 +1,6 @@
 #![allow(clippy::arithmetic_side_effects)]
 use {
-    clap::{crate_description, crate_name, value_t, values_t_or_exit, App, Arg},
+    clap::{crate_description, crate_name, value_t, values_t, values_t_or_exit, App, Arg},
     log::*,
     rand::{thread_rng, Rng},
     rayon::prelude::*,
@@ -9,8 +9,9 @@ use {
         hidden_unless_forced, input_parsers::pubkey_of, input_validators::is_url_or_moniker,
     },
     solana_cli_config::{ConfigInput, CONFIG_FILE},
-    solana_client::transaction_executor::TransactionExecutor,
+    solana_client::{rpc_request::TokenAccountsFilter, transaction_executor::TransactionExecutor},
     solana_gossip::gossip_service::discover,
+    solana_measure::measure::Measure,
     solana_rpc_client::rpc_client::RpcClient,
     solana_sdk::{
         commitment_config::CommitmentConfig,
@@ -26,11 +27,12 @@ use {
     std::{
         cmp::min,
         process::exit,
+        str::FromStr,
         sync::{
-            atomic::{AtomicU64, Ordering},
+            atomic::{AtomicBool, AtomicU64, Ordering},
             Arc,
         },
-        thread::sleep,
+        thread::{sleep, Builder, JoinHandle},
         time::{Duration, Instant},
     },
 };
@@ -179,8 +181,8 @@ fn make_create_message(
 fn make_close_message(
     keypair: &Keypair,
     base_keypair: &Keypair,
-    max_created: Arc<AtomicU64>,
-    max_closed: Arc<AtomicU64>,
+    max_created: &AtomicU64,
+    max_closed: &AtomicU64,
     num_instructions: usize,
     balance: u64,
     spl_token: bool,
@@ -227,6 +229,276 @@ fn make_close_message(
     Message::new(&instructions, Some(&keypair.pubkey()))
 }
 
+#[derive(Clone, Copy, Debug)]
+pub enum RpcBench {
+    Version,
+    Slot,
+    MultipleAccounts,
+    ProgramAccounts,
+    TokenAccountsByOwner,
+}
+
+#[derive(Debug)]
+pub enum RpcParseError {
+    InvalidOption,
+}
+
+impl FromStr for RpcBench {
+    type Err = RpcParseError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "slot" => Ok(RpcBench::Slot),
+            "multiple-accounts" => Ok(RpcBench::MultipleAccounts),
+            "token-accounts-by-owner" => Ok(RpcBench::TokenAccountsByOwner),
+            "version" => Ok(RpcBench::Version),
+            _ => Err(RpcParseError::InvalidOption),
+        }
+    }
+}
+
+fn process_get_multiple_accounts(
+    max_closed: &AtomicU64,
+    max_created: &AtomicU64,
+    stats: &mut RpcBenchStats,
+    last_error: &mut Instant,
+    base_keypair_pubkey: &Pubkey,
+    program_id: &Pubkey,
+    client: &RpcClient,
+) {
+    let start = max_closed.load(Ordering::Relaxed);
+    let end = max_created.load(Ordering::Relaxed);
+    let mut chunk_start = start;
+    let chunk_size = 10;
+    while chunk_start < end {
+        let chunk_end = std::cmp::min(chunk_start + chunk_size, end);
+
+        let addresses: Vec<_> = (chunk_start..chunk_end)
+            .map(|seed| {
+                Pubkey::create_with_seed(base_keypair_pubkey, &seed.to_string(), program_id)
+                    .unwrap()
+            })
+            .collect();
+        chunk_start = chunk_end;
+        let mut rpc_time = Measure::start("rpc-get-multiple-accounts");
+        match client.get_multiple_accounts(&addresses) {
+            Ok(accounts) => {
+                rpc_time.stop();
+                for account in accounts.into_iter().flatten() {
+                    if thread_rng().gen_ratio(1, 10_000) {
+                        info!(
+                            "account: lamports {:?} size: {} owner: {:?}",
+                            account.lamports,
+                            account.data.len(),
+                            account.owner
+                        );
+                    }
+                }
+                stats.total_success_time_us += rpc_time.as_us();
+                stats.success += 1;
+            }
+            Err(e) => {
+                rpc_time.stop();
+                stats.total_errors_time_us += rpc_time.as_us();
+                stats.errors += 1;
+                if last_error.elapsed().as_secs() > 2 {
+                    info!("error: {:?}", e);
+                    *last_error = Instant::now();
+                }
+                debug!("error: {:?}", e);
+            }
+        }
+    }
+}
+
+#[derive(Default)]
+struct RpcBenchStats {
+    errors: u64,
+    success: u64,
+    total_errors_time_us: u64,
+    total_success_time_us: u64,
+}
+
+fn run_rpc_bench_loop(
+    rpc_bench: RpcBench,
+    thread: usize,
+    client: &RpcClient,
+    base_keypair_pubkey: &Pubkey,
+    exit: &AtomicBool,
+    program_id: &Pubkey,
+    max_closed: &AtomicU64,
+    max_created: &AtomicU64,
+    mint: &Option<Pubkey>,
+) {
+    let mut stats = RpcBenchStats::default();
+    let mut iters = 0;
+    let mut last_error = Instant::now();
+    let mut last_print = Instant::now();
+    loop {
+        if exit.load(Ordering::Relaxed) {
+            break;
+        }
+        match rpc_bench {
+            RpcBench::Slot => {
+                let mut rpc_time = Measure::start("rpc-get-slot");
+                match client.get_slot() {
+                    Ok(_slot) => {
+                        rpc_time.stop();
+                        stats.success += 1;
+                        stats.total_success_time_us += rpc_time.as_us();
+                    }
+                    Err(e) => {
+                        rpc_time.stop();
+                        stats.total_errors_time_us += rpc_time.as_us();
+                        stats.errors += 1;
+                        if last_error.elapsed().as_secs() > 2 {
+                            info!("get_slot error: {:?}", e);
+                            last_error = Instant::now();
+                        }
+                    }
+                }
+            }
+            RpcBench::MultipleAccounts => {
+                process_get_multiple_accounts(
+                    max_closed,
+                    max_created,
+                    &mut stats,
+                    &mut last_error,
+                    base_keypair_pubkey,
+                    program_id,
+                    client,
+                );
+            }
+            RpcBench::ProgramAccounts => {
+                let mut rpc_time = Measure::start("rpc-get-program-accounts");
+                match client.get_program_accounts(program_id) {
+                    Ok(accounts) => {
+                        rpc_time.stop();
+                        stats.success += 1;
+                        stats.total_success_time_us += rpc_time.as_us();
+                        if thread_rng().gen_ratio(1, 100) {
+                            info!("accounts: {} first: {:?}", accounts.len(), accounts.first());
+                        }
+                    }
+                    Err(e) => {
+                        rpc_time.stop();
+                        stats.errors += 1;
+                        stats.total_errors_time_us += rpc_time.as_us();
+                        if last_error.elapsed().as_secs() > 2 {
+                            info!("get-program-accounts error: {:?}", e);
+                            last_error = Instant::now();
+                        }
+                    }
+                }
+            }
+            RpcBench::TokenAccountsByOwner => {
+                let mut rpc_time = Measure::start("rpc-get-token-accounts-by-owner");
+                let filter = TokenAccountsFilter::Mint(*mint.as_ref().unwrap());
+                match client.get_token_accounts_by_owner(program_id, filter) {
+                    Ok(_accounts) => {
+                        rpc_time.stop();
+                        stats.success += 1;
+                        stats.total_success_time_us += rpc_time.as_us();
+                    }
+                    Err(e) => {
+                        rpc_time.stop();
+                        stats.errors += 1;
+                        stats.total_errors_time_us += rpc_time.as_us();
+                        if last_error.elapsed().as_secs() > 2 {
+                            info!("get-token-accounts error: {:?}", e);
+                            last_error = Instant::now();
+                        }
+                    }
+                }
+            }
+            RpcBench::Version => {
+                let mut rpc_time = Measure::start("rpc-get-version");
+                match client.get_version() {
+                    Ok(_r) => {
+                        rpc_time.stop();
+                        stats.success += 1;
+                        stats.total_success_time_us += rpc_time.as_us();
+                    }
+                    Err(_e) => {
+                        rpc_time.stop();
+                        stats.errors += 1;
+                        stats.total_errors_time_us += rpc_time.as_us();
+                    }
+                }
+            }
+        }
+
+        if last_print.elapsed().as_secs() > 3 {
+            info!(
+                "t({}) rpc({:?}) iters: {} success: {} errors: {}",
+                thread, rpc_bench, iters, stats.success, stats.errors
+            );
+            if stats.success > 0 {
+                info!(
+                    " t({}) rpc({:?} average success_time: {} us",
+                    thread,
+                    rpc_bench,
+                    stats.total_success_time_us / stats.success
+                );
+            }
+            if stats.errors > 0 {
+                info!(
+                    " rpc average average errors time: {} us",
+                    stats.total_errors_time_us / stats.errors
+                );
+            }
+            last_print = Instant::now();
+            stats = RpcBenchStats::default();
+        }
+
+        iters += 1;
+    }
+}
+
+fn make_rpc_bench_threads(
+    rpc_benches: Vec<RpcBench>,
+    mint: &Option<Pubkey>,
+    exit: &Arc<AtomicBool>,
+    client: &Arc<RpcClient>,
+    seed_tracker: &SeedTracker,
+    base_keypair_pubkey: Pubkey,
+    num_rpc_bench_threads: usize,
+) -> Vec<JoinHandle<()>> {
+    let program_id = if mint.is_some() {
+        inline_spl_token::id()
+    } else {
+        system_program::id()
+    };
+    rpc_benches
+        .into_iter()
+        .flat_map(|rpc_bench| {
+            (0..num_rpc_bench_threads).map(move |thread| {
+                let client = client.clone();
+                let exit = exit.clone();
+                let max_closed = seed_tracker.max_closed.clone();
+                let max_created = seed_tracker.max_created.clone();
+                let mint = *mint;
+                Builder::new()
+                    .name(format!("rpc-bench-{}", thread))
+                    .spawn(move || {
+                        run_rpc_bench_loop(
+                            rpc_bench,
+                            thread,
+                            &client,
+                            &base_keypair_pubkey,
+                            &exit,
+                            &program_id,
+                            &max_closed,
+                            &max_created,
+                            &mint,
+                        )
+                    })
+                    .unwrap()
+            })
+        })
+        .collect()
+}
+
 #[allow(clippy::too_many_arguments)]
 fn run_accounts_bench(
     client: Arc<RpcClient>,
@@ -239,6 +511,8 @@ fn run_accounts_bench(
     num_instructions: usize,
     mint: Option<Pubkey>,
     reclaim_accounts: bool,
+    rpc_benches: Option<Vec<RpcBench>>,
+    num_rpc_bench_threads: usize,
 ) {
     assert!(num_instructions > 0);
     info!("Targeting {}", client.url());
@@ -291,6 +565,22 @@ fn run_accounts_bench(
         None,
     );
 
+    let exit = Arc::new(AtomicBool::new(false));
+    let base_keypair_pubkey = base_keypair.pubkey();
+    let rpc_bench_threads: Vec<_> = if let Some(rpc_benches) = rpc_benches {
+        make_rpc_bench_threads(
+            rpc_benches,
+            &mint,
+            &exit,
+            &client,
+            &seed_tracker,
+            base_keypair_pubkey,
+            num_rpc_bench_threads,
+        )
+    } else {
+        Vec::new()
+    };
+
     loop {
         if latest_blockhash.elapsed().as_millis() > 10_000 {
             blockhash = poll_get_latest_blockhash(&client).expect("blockhash");
@@ -369,8 +659,8 @@ fn run_accounts_bench(
                             let message = make_close_message(
                                 payer_keypairs[0],
                                 &base_keypair,
-                                seed_tracker.max_created.clone(),
-                                seed_tracker.max_closed.clone(),
+                                &seed_tracker.max_created,
+                                &seed_tracker.max_closed,
                                 1,
                                 min_balance,
                                 mint.is_some(),
@@ -440,8 +730,8 @@ fn run_accounts_bench(
                                     let message = make_close_message(
                                         keypair,
                                         &base_keypair,
-                                        seed_tracker.max_created.clone(),
-                                        seed_tracker.max_closed.clone(),
+                                        &seed_tracker.max_created,
+                                        &seed_tracker.max_closed,
                                         num_instructions,
                                         min_balance,
                                         mint.is_some(),
@@ -483,6 +773,11 @@ fn run_accounts_bench(
         }
         executor.close();
     }
+
+    exit.store(true, Ordering::Relaxed);
+    for t in rpc_bench_threads {
+        t.join().unwrap();
+    }
 }
 
 fn main() {
@@ -605,6 +900,19 @@ fn main() {
                 .takes_value(false)
                 .help("Reclaim accounts after session ends; incompatible with --iterations 0"),
         )
+        .arg(
+            Arg::with_name("num_rpc_bench_threads")
+                .long("num-rpc-bench-threads")
+                .takes_value(true)
+                .help("Spawn this many RPC benching threads for each type passed by --rpc-bench"),
+        )
+        .arg(
+            Arg::with_name("rpc_bench")
+                .long("rpc-bench")
+                .takes_value(true)
+                .multiple(true)
+                .help("Spawn a thread which calls a specific RPC method in a loop to benchmark it"),
+        )
         .get_matches();
 
     let skip_gossip = !matches.is_present("check_gossip");
@@ -619,6 +927,19 @@ fn main() {
         eprintln!("bad num_instructions: {num_instructions}");
         exit(1);
     }
+    let rpc_benches = values_t!(matches, "rpc_bench", String)
+        .map(|benches| {
+            benches
+                .into_iter()
+                .map(|bench| RpcBench::from_str(&bench).unwrap())
+                .collect()
+        })
+        .ok();
+    let num_rpc_bench_threads = if rpc_benches.is_none() {
+        0
+    } else {
+        value_t!(matches, "num_rpc_bench_threads", usize).unwrap_or(1)
+    };
 
     let mint = pubkey_of(&matches, "mint");
 
@@ -696,6 +1017,8 @@ fn main() {
         num_instructions,
         mint,
         matches.is_present("reclaim_accounts"),
+        rpc_benches,
+        num_rpc_bench_threads,
     );
 }
 
@@ -703,7 +1026,10 @@ fn main() {
 pub mod test {
     use {
         super::*,
-        solana_accounts_db::inline_spl_token,
+        solana_accounts_db::{
+            accounts_index::{AccountIndex, AccountSecondaryIndexes},
+            inline_spl_token,
+        },
         solana_core::validator::ValidatorConfig,
         solana_faucet::faucet::run_local_faucet,
         solana_local_cluster::{
@@ -719,11 +1045,19 @@ pub mod test {
         },
     };
 
+    fn add_secondary_indexes(indexes: &mut AccountSecondaryIndexes) {
+        indexes.indexes.insert(AccountIndex::SplTokenOwner);
+        indexes.indexes.insert(AccountIndex::SplTokenMint);
+        indexes.indexes.insert(AccountIndex::ProgramId);
+    }
+
     #[test]
     fn test_accounts_cluster_bench() {
         solana_logger::setup();
-        let validator_config = ValidatorConfig::default_for_test();
+        let mut validator_config = ValidatorConfig::default_for_test();
         let num_nodes = 1;
+        add_secondary_indexes(&mut validator_config.account_indexes);
+        add_secondary_indexes(&mut validator_config.rpc_config.account_indexes);
         let mut config = ClusterConfig {
             cluster_lamports: 10_000_000,
             poh_config: PohConfig::new_sleep(Duration::from_millis(50)),
@@ -745,8 +1079,11 @@ pub mod test {
             rpc_addr,
             CommitmentConfig::confirmed(),
         ));
+        let mint = None;
+        let reclaim_accounts = false;
+        let pre_txs = client.get_transaction_count().unwrap();
         run_accounts_bench(
-            client,
+            client.clone(),
             &[&cluster.funding_keypair],
             iterations,
             maybe_space,
@@ -754,11 +1091,14 @@ pub mod test {
             close_nth_batch,
             maybe_lamports,
             num_instructions,
-            None,
-            false,
+            mint,
+            reclaim_accounts,
+            Some(vec![RpcBench::ProgramAccounts]),
+            1,
         );
+        let post_txs = client.get_transaction_count().unwrap();
         start.stop();
-        info!("{}", start);
+        info!("{} pre {} post {}", start, pre_txs, post_txs);
     }
 
     #[test]
@@ -852,6 +1192,8 @@ pub mod test {
             num_instructions,
             Some(spl_mint_keypair.pubkey()),
             true,
+            None,
+            0,
         );
         start.stop();
         info!("{}", start);