Эх сурвалжийг харах

test-validator: Add PubSub args & reduce memory usage (#7269)

* feat: add flags to test-validator to reduce memory footprint

* remove bad requires

* Fix nits and fmt

* remove as_str

* Update validator/src/commands/run/args.rs

Co-authored-by: steviez <steven@anza.xyz>

* Update validator/src/cli.rs

Co-authored-by: steviez <steven@anza.xyz>

* remove temporary

* cleanup format more

* fix tests

* fmt and remove pub

---------

Co-authored-by: steviez <steven@anza.xyz>
Daniel-Aaron-Bloom 3 долоо хоног өмнө
parent
commit
78daf92a4b

+ 3 - 2
rpc/src/rpc_pubsub_service.rs

@@ -35,6 +35,7 @@ pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 1_000_000;
 pub const DEFAULT_QUEUE_CAPACITY_ITEMS: usize = 10_000_000;
 pub const DEFAULT_TEST_QUEUE_CAPACITY_ITEMS: usize = 100;
 pub const DEFAULT_QUEUE_CAPACITY_BYTES: usize = 256 * 1024 * 1024;
+const DEFAULT_TEST_QUEUE_CAPACITY_BYTES: usize = 16 * 1024 * 1024;
 pub const DEFAULT_WORKER_THREADS: usize = 1;
 
 #[derive(Debug, Clone, PartialEq)]
@@ -63,13 +64,13 @@ impl Default for PubSubConfig {
 }
 
 impl PubSubConfig {
-    pub fn default_for_tests() -> Self {
+    pub const fn default_for_tests() -> Self {
         Self {
             enable_block_subscription: false,
             enable_vote_subscription: false,
             max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
             queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS,
-            queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
+            queue_capacity_bytes: DEFAULT_TEST_QUEUE_CAPACITY_BYTES,
             worker_threads: DEFAULT_WORKER_THREADS,
             notification_threads: NonZeroUsize::new(2),
         }

+ 5 - 9
validator/src/bin/solana-test-validator.rs

@@ -1,8 +1,8 @@
 use {
     agave_logger::redirect_stderr_to_file,
     agave_validator::{
-        admin_rpc_service, cli, dashboard::Dashboard, ledger_lockfile, lock_ledger,
-        println_name_value,
+        admin_rpc_service, cli, commands::FromClapArgMatches, dashboard::Dashboard,
+        ledger_lockfile, lock_ledger, println_name_value,
     },
     clap::{crate_name, value_t, value_t_or_exit, values_t_or_exit},
     crossbeam_channel::unbounded,
@@ -149,8 +149,8 @@ fn main() {
         });
 
     let rpc_port = value_t_or_exit!(matches, "rpc_port", u16);
-    let enable_vote_subscription = matches.is_present("rpc_pubsub_enable_vote_subscription");
-    let enable_block_subscription = matches.is_present("rpc_pubsub_enable_block_subscription");
+    let pub_sub_config =
+        PubSubConfig::from_clap_arg_match(&matches).unwrap_or(PubSubConfig::default_for_tests());
     let faucet_port = value_t_or_exit!(matches, "faucet_port", u16);
     let ticks_per_slot = value_t!(matches, "ticks_per_slot", u64).ok();
     let slots_per_epoch = value_t!(matches, "slots_per_epoch", Slot).ok();
@@ -468,11 +468,7 @@ fn main() {
             faucet_pubkey,
             AccountSharedData::new(faucet_lamports, 0, &system_program::id()),
         )
-        .pubsub_config(PubSubConfig {
-            enable_vote_subscription,
-            enable_block_subscription,
-            ..PubSubConfig::default()
-        })
+        .pubsub_config(pub_sub_config)
         .rpc_port(rpc_port)
         .add_upgradeable_programs_with_path(&upgradeable_programs_to_load)
         .add_accounts_from_json_files(&accounts_to_load)

+ 2 - 13
validator/src/cli.rs

@@ -1,5 +1,5 @@
 use {
-    crate::commands,
+    crate::{commands, commands::run::args::pub_sub_config},
     agave_snapshots::{
         snapshot_config::{
             DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
@@ -556,18 +556,6 @@ pub fn test_app<'a>(version: &'a str, default_args: &'a DefaultTestArgs) -> App<
                 .default_value(solana_storage_bigtable::DEFAULT_APP_PROFILE_ID)
                 .help("Application profile id to use in Bigtable requests"),
         )
-        .arg(
-            Arg::with_name("rpc_pubsub_enable_vote_subscription")
-                .long("rpc-pubsub-enable-vote-subscription")
-                .takes_value(false)
-                .help("Enable the unstable RPC PubSub `voteSubscribe` subscription"),
-        )
-        .arg(
-            Arg::with_name("rpc_pubsub_enable_block_subscription")
-                .long("rpc-pubsub-enable-block-subscription")
-                .takes_value(false)
-                .help("Enable the unstable RPC PubSub `blockSubscribe` subscription"),
-        )
         .arg(
             Arg::with_name("bpf_program")
                 .long("bpf-program")
@@ -921,6 +909,7 @@ pub fn test_app<'a>(version: &'a str, default_args: &'a DefaultTestArgs) -> App<
                      silently ignored",
                 ),
         )
+        .args(&pub_sub_config::args(/*test_validator:*/ true))
 }
 
 pub struct DefaultTestArgs {

+ 1 - 1
validator/src/commands/run/args.rs

@@ -1376,7 +1376,7 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a,
                  set,tpu-client-next is used by default.",
             ),
     )
-    .args(&pub_sub_config::args())
+    .args(&pub_sub_config::args(/*test_validator:*/ false))
     .args(&json_rpc_config::args())
     .args(&rpc_bigtable_config::args())
     .args(&send_transaction_config::args())

+ 58 - 19
validator/src/commands/run/args/pub_sub_config.rs

@@ -11,20 +11,73 @@ use {
 
 static DEFAULT_RPC_PUBSUB_MAX_ACTIVE_SUBSCRIPTIONS: LazyLock<String> =
     LazyLock::new(|| PubSubConfig::default().max_active_subscriptions.to_string());
+static DEFAULT_TEST_RPC_PUBSUB_MAX_ACTIVE_SUBSCRIPTIONS: LazyLock<String> = LazyLock::new(|| {
+    PubSubConfig::default_for_tests()
+        .max_active_subscriptions
+        .to_string()
+});
 
 static DEFAULT_RPC_PUBSUB_QUEUE_CAPACITY_ITEMS: LazyLock<String> =
     LazyLock::new(|| PubSubConfig::default().queue_capacity_items.to_string());
+static DEFAULT_TEST_RPC_PUBSUB_QUEUE_CAPACITY_ITEMS: LazyLock<String> = LazyLock::new(|| {
+    PubSubConfig::default_for_tests()
+        .queue_capacity_items
+        .to_string()
+});
 
 static DEFAULT_RPC_PUBSUB_QUEUE_CAPACITY_BYTES: LazyLock<String> =
     LazyLock::new(|| PubSubConfig::default().queue_capacity_bytes.to_string());
+static DEFAULT_TEST_RPC_PUBSUB_QUEUE_CAPACITY_BYTES: LazyLock<String> = LazyLock::new(|| {
+    PubSubConfig::default_for_tests()
+        .queue_capacity_bytes
+        .to_string()
+});
 
 const DEFAULT_RPC_PUBSUB_WORKER_THREADS: &str = "4";
+static DEFAULT_TEST_RPC_PUBSUB_WORKER_THREADS: LazyLock<String> =
+    LazyLock::new(|| PubSubConfig::default_for_tests().worker_threads.to_string());
 
 #[cfg_attr(test, qualifiers(pub(crate)))]
 static DEFAULT_RPC_PUBSUB_NUM_NOTIFICATION_THREADS: LazyLock<String> =
     LazyLock::new(|| get_thread_count().to_string());
 
-pub(crate) fn args<'a, 'b>() -> Vec<Arg<'a, 'b>> {
+pub(crate) fn args<'a, 'b>(test_validator: bool) -> Vec<Arg<'a, 'b>> {
+    let rpc_pubsub_notification_threads = Arg::with_name("rpc_pubsub_notification_threads")
+        .long("rpc-pubsub-notification-threads")
+        .takes_value(true)
+        .value_name("NUM_THREADS")
+        .validator(is_parsable::<usize>)
+        .help(
+            "The maximum number of threads that RPC PubSub will use for generating notifications. \
+             0 will disable RPC PubSub notifications",
+        );
+    let (
+        rpc_pubsub_notification_threads,
+        default_rpc_pubsub_max_active_subscriptions,
+        default_rpc_pubsub_queue_capacity_items,
+        default_rpc_pubsub_queue_capacity_bytes,
+    ) = if test_validator {
+        (
+            rpc_pubsub_notification_threads.default_value(&DEFAULT_TEST_RPC_PUBSUB_WORKER_THREADS),
+            &DEFAULT_TEST_RPC_PUBSUB_MAX_ACTIVE_SUBSCRIPTIONS,
+            &DEFAULT_TEST_RPC_PUBSUB_QUEUE_CAPACITY_ITEMS,
+            &DEFAULT_RPC_PUBSUB_QUEUE_CAPACITY_BYTES,
+        )
+    } else {
+        (
+            rpc_pubsub_notification_threads
+                .default_value_if(
+                    "full_rpc_api",
+                    None,
+                    &DEFAULT_RPC_PUBSUB_NUM_NOTIFICATION_THREADS,
+                )
+                .requires("full_rpc_api"),
+            &DEFAULT_RPC_PUBSUB_MAX_ACTIVE_SUBSCRIPTIONS,
+            &DEFAULT_RPC_PUBSUB_QUEUE_CAPACITY_ITEMS,
+            &DEFAULT_TEST_RPC_PUBSUB_QUEUE_CAPACITY_BYTES,
+        )
+    };
+
     vec![
         Arg::with_name("rpc_pubsub_enable_block_subscription")
             .long("rpc-pubsub-enable-block-subscription")
@@ -40,7 +93,7 @@ pub(crate) fn args<'a, 'b>() -> Vec<Arg<'a, 'b>> {
             .takes_value(true)
             .value_name("NUMBER")
             .validator(is_parsable::<usize>)
-            .default_value(&DEFAULT_RPC_PUBSUB_MAX_ACTIVE_SUBSCRIPTIONS)
+            .default_value(default_rpc_pubsub_max_active_subscriptions)
             .help(
                 "The maximum number of active subscriptions that RPC PubSub will accept across \
                  all connections.",
@@ -50,7 +103,7 @@ pub(crate) fn args<'a, 'b>() -> Vec<Arg<'a, 'b>> {
             .takes_value(true)
             .value_name("NUMBER")
             .validator(is_parsable::<usize>)
-            .default_value(&DEFAULT_RPC_PUBSUB_QUEUE_CAPACITY_ITEMS)
+            .default_value(default_rpc_pubsub_queue_capacity_items)
             .help(
                 "The maximum number of notifications that RPC PubSub will store across all \
                  connections.",
@@ -60,7 +113,7 @@ pub(crate) fn args<'a, 'b>() -> Vec<Arg<'a, 'b>> {
             .takes_value(true)
             .value_name("BYTES")
             .validator(is_parsable::<usize>)
-            .default_value(&DEFAULT_RPC_PUBSUB_QUEUE_CAPACITY_BYTES)
+            .default_value(default_rpc_pubsub_queue_capacity_bytes)
             .help(
                 "The maximum total size of notifications that RPC PubSub will store across all \
                  connections.",
@@ -72,21 +125,7 @@ pub(crate) fn args<'a, 'b>() -> Vec<Arg<'a, 'b>> {
             .validator(is_parsable::<usize>)
             .default_value(DEFAULT_RPC_PUBSUB_WORKER_THREADS)
             .help("PubSub worker threads"),
-        Arg::with_name("rpc_pubsub_notification_threads")
-            .long("rpc-pubsub-notification-threads")
-            .requires("full_rpc_api")
-            .takes_value(true)
-            .value_name("NUM_THREADS")
-            .validator(is_parsable::<usize>)
-            .default_value_if(
-                "full_rpc_api",
-                None,
-                &DEFAULT_RPC_PUBSUB_NUM_NOTIFICATION_THREADS,
-            )
-            .help(
-                "The maximum number of threads that RPC PubSub will use for generating \
-                 notifications. 0 will disable RPC PubSub notifications",
-            ),
+        rpc_pubsub_notification_threads,
     ]
 }