Browse Source

v1.17: rpc-sts: add config options for stake-weighted qos (backport of #197) (#340)

rpc-sts: add config options for stake-weighted qos (#197)

* rpc-sts: plumb options for swqos config

* rpc-sts: send to specific tpu peers when configured

(cherry picked from commit f41fb84e1543d44610988bafd4c4a06afe515ca7)

# Conflicts:
#	send-transaction-service/src/send_transaction_service.rs
#	validator/src/cli.rs
#	validator/src/main.rs

Co-authored-by: Trent Nelson <490004+t-nelson@users.noreply.github.com>
mergify[bot] 1 year ago
parent
commit
adc63fb3c0

+ 18 - 4
send-transaction-service/src/send_transaction_service.rs

@@ -114,6 +114,7 @@ pub struct Config {
     pub batch_size: usize,
     pub batch_size: usize,
     /// How frequently batches are sent
     /// How frequently batches are sent
     pub batch_send_rate_ms: u64,
     pub batch_send_rate_ms: u64,
+    pub tpu_peers: Option<Vec<SocketAddr>>,
 }
 }
 
 
 impl Default for Config {
 impl Default for Config {
@@ -125,6 +126,7 @@ impl Default for Config {
             service_max_retries: DEFAULT_SERVICE_MAX_RETRIES,
             service_max_retries: DEFAULT_SERVICE_MAX_RETRIES,
             batch_size: DEFAULT_TRANSACTION_BATCH_SIZE,
             batch_size: DEFAULT_TRANSACTION_BATCH_SIZE,
             batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS,
             batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS,
+            tpu_peers: None,
         }
         }
     }
     }
 }
 }
@@ -565,12 +567,18 @@ impl SendTransactionService {
         stats: &SendTransactionServiceStats,
         stats: &SendTransactionServiceStats,
     ) {
     ) {
         // Processing the transactions in batch
         // Processing the transactions in batch
-        let addresses = Self::get_tpu_addresses_with_slots(
+        let mut addresses = config
+            .tpu_peers
+            .as_ref()
+            .map(|addrs| addrs.iter().map(|a| (a, 0)).collect::<Vec<_>>())
+            .unwrap_or_default();
+        let leader_addresses = Self::get_tpu_addresses_with_slots(
             tpu_address,
             tpu_address,
             leader_info,
             leader_info,
             config,
             config,
             connection_cache.protocol(),
             connection_cache.protocol(),
         );
         );
+        addresses.extend(leader_addresses);
 
 
         let wire_transactions = transactions
         let wire_transactions = transactions
             .iter()
             .iter()
@@ -583,8 +591,8 @@ impl SendTransactionService {
             })
             })
             .collect::<Vec<&[u8]>>();
             .collect::<Vec<&[u8]>>();
 
 
-        for address in &addresses {
-            Self::send_transactions(address.0, &wire_transactions, connection_cache, stats);
+        for (address, _) in &addresses {
+            Self::send_transactions(address, &wire_transactions, connection_cache, stats);
         }
         }
     }
     }
 
 
@@ -701,14 +709,20 @@ impl SendTransactionService {
 
 
             let iter = wire_transactions.chunks(config.batch_size);
             let iter = wire_transactions.chunks(config.batch_size);
             for chunk in iter {
             for chunk in iter {
+                let mut addresses = config
+                    .tpu_peers
+                    .as_ref()
+                    .map(|addrs| addrs.iter().collect::<Vec<_>>())
+                    .unwrap_or_default();
                 let mut leader_info_provider = leader_info_provider.lock().unwrap();
                 let mut leader_info_provider = leader_info_provider.lock().unwrap();
                 let leader_info = leader_info_provider.get_leader_info();
                 let leader_info = leader_info_provider.get_leader_info();
-                let addresses = Self::get_tpu_addresses(
+                let leader_addresses = Self::get_tpu_addresses(
                     tpu_address,
                     tpu_address,
                     leader_info,
                     leader_info,
                     config,
                     config,
                     connection_cache.protocol(),
                     connection_cache.protocol(),
                 );
                 );
+                addresses.extend(leader_addresses);
 
 
                 for address in &addresses {
                 for address in &addresses {
                     Self::send_transactions(address, chunk, connection_cache, stats);
                     Self::send_transactions(address, chunk, connection_cache, stats);

+ 16 - 0
validator/src/cli.rs

@@ -1045,6 +1045,22 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
                 .default_value(&default_args.rpc_send_transaction_batch_size)
                 .default_value(&default_args.rpc_send_transaction_batch_size)
                 .help("The size of transactions to be sent in batch."),
                 .help("The size of transactions to be sent in batch."),
         )
         )
+        .arg(
+            Arg::with_name("rpc_send_transaction_tpu_peer")
+                .long("rpc-send-transaction-tpu-peer")
+                .takes_value(true)
+                .number_of_values(1)
+                .multiple(true)
+                .value_name("HOST:PORT")
+                .validator(solana_net_utils::is_host_port)
+                .help("Peer(s) to broadcast transactions to instead of the current leader")
+        )
+        .arg(
+            Arg::with_name("rpc_send_transaction_also_leader")
+                .long("rpc-send-transaction-also-leader")
+                .requires("rpc_send_transaction_tpu_peer")
+                .help("With `--rpc-send-transaction-tpu-peer HOST:PORT`, also send to the current leader")
+        )
         .arg(
         .arg(
             Arg::with_name("rpc_scan_and_fix_roots")
             Arg::with_name("rpc_scan_and_fix_roots")
                 .long("rpc-scan-and-fix-roots")
                 .long("rpc-scan-and-fix-roots")

+ 23 - 5
validator/src/main.rs

@@ -1266,6 +1266,27 @@ pub fn main() {
         );
         );
         exit(1);
         exit(1);
     }
     }
+    let rpc_send_transaction_tpu_peers = matches
+        .values_of("rpc_send_transaction_tpu_peer")
+        .map(|values| {
+            values
+                .map(solana_net_utils::parse_host_port)
+                .collect::<Result<Vec<SocketAddr>, String>>()
+        })
+        .transpose()
+        .unwrap_or_else(|e| {
+            eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}");
+            exit(1);
+        });
+    let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader");
+    let leader_forward_count =
+        if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader {
+            // rpc-sts is configured to send only to specific tpu peers. disable leader forwards
+            0
+        } else {
+            value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64)
+        };
+
     let full_api = matches.is_present("full_rpc_api");
     let full_api = matches.is_present("full_rpc_api");
 
 
     let mut validator_config = ValidatorConfig {
     let mut validator_config = ValidatorConfig {
@@ -1359,11 +1380,7 @@ pub fn main() {
         contact_debug_interval,
         contact_debug_interval,
         send_transaction_service_config: send_transaction_service::Config {
         send_transaction_service_config: send_transaction_service::Config {
             retry_rate_ms: rpc_send_retry_rate_ms,
             retry_rate_ms: rpc_send_retry_rate_ms,
-            leader_forward_count: value_t_or_exit!(
-                matches,
-                "rpc_send_transaction_leader_forward_count",
-                u64
-            ),
+            leader_forward_count,
             default_max_retries: value_t!(
             default_max_retries: value_t!(
                 matches,
                 matches,
                 "rpc_send_transaction_default_max_retries",
                 "rpc_send_transaction_default_max_retries",
@@ -1377,6 +1394,7 @@ pub fn main() {
             ),
             ),
             batch_send_rate_ms: rpc_send_batch_send_rate_ms,
             batch_send_rate_ms: rpc_send_batch_send_rate_ms,
             batch_size: rpc_send_batch_size,
             batch_size: rpc_send_batch_size,
+            tpu_peers: rpc_send_transaction_tpu_peers,
         },
         },
         no_poh_speed_test: matches.is_present("no_poh_speed_test"),
         no_poh_speed_test: matches.is_present("no_poh_speed_test"),
         no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
         no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),