Przeglądaj źródła

Merge pull request #74 from pyth-network/drozdziak1/p2w-client-concurrent-txs

pyth2wormhole-client: Add polling-based concurrent tx confirmation
Stanisław Drozd 3 lat temu
rodzic
commit
63a19c363b

+ 18 - 3
solana/pyth2wormhole/client/src/cli.rs

@@ -46,16 +46,31 @@ pub enum Action {
     #[clap(
         about = "Use an existing pyth2wormhole program to attest product price information to another chain"
     )]
+    // Note: defaults target SOL mainnet-beta conditions at implementation time
     Attest {
         #[clap(short = 'f', long = "--config", about = "Attestation YAML config")]
         attestation_cfg: PathBuf,
         #[clap(
             short = 'n',
-            long = "--retries",
-            about = "How many times to retry each batch on failure",
-            default_value = "3"
+            long = "--n-retries",
+            about = "How many times to retry send_transaction() on each batch on failure",
+            default_value = "5"
         )]
         n_retries: usize,
+        #[clap(
+            short = 't',
+            long = "--timeout",
+            about = "How many seconds to wait before giving up on get_transaction() for each batch",
+            default_value = "40"
+        )]
+        conf_timeout_secs: u64,
+        #[clap(
+            short = 'i',
+            long = "--rpc-interval",
+            about = "How many milliseconds to waist between SOL RPC requests",
+            default_value = "200"
+        )]
+        rpc_interval_ms: u64,
     },
     #[clap(about = "Retrieve a pyth2wormhole program's current settings")]
     GetConfig,

+ 254 - 97
solana/pyth2wormhole/client/src/main.rs

@@ -1,6 +1,13 @@
 pub mod cli;
 
-use std::fs::File;
+use std::{
+    fs::File,
+    thread,
+    time::{
+        Duration,
+        Instant,
+    },
+};
 
 use clap::Clap;
 use log::{
@@ -14,7 +21,10 @@ use solana_client::rpc_client::RpcClient;
 use solana_program::pubkey::Pubkey;
 use solana_sdk::{
     commitment_config::CommitmentConfig,
-    signature::read_keypair_file,
+    signature::{
+        read_keypair_file,
+        Signature,
+    },
 };
 use solana_transaction_status::UiTransactionEncoding;
 use solitaire::{
@@ -84,24 +94,62 @@ fn main() -> Result<(), ErrBox> {
         Action::Attest {
             ref attestation_cfg,
             n_retries,
+            conf_timeout_secs,
+            rpc_interval_ms,
         } => {
             // Load the attestation config yaml
             let attestation_cfg: AttestationConfig =
                 serde_yaml::from_reader(File::open(attestation_cfg)?)?;
 
-            handle_attest(&rpc_client, payer, p2w_addr, &attestation_cfg, n_retries)?;
+            handle_attest(
+                &rpc_client,
+                payer,
+                p2w_addr,
+                &attestation_cfg,
+                n_retries,
+                Duration::from_secs(conf_timeout_secs),
+                Duration::from_millis(rpc_interval_ms),
+            )?;
         }
     }
 
     Ok(())
 }
 
+#[derive(Debug)]
+pub enum BatchTxState<'a> {
+    Sending {
+        symbols: &'a [P2WSymbol],
+        attempt_no: usize,
+    },
+    Confirming {
+        symbols: &'a [P2WSymbol],
+        attempt_no: usize,
+        signature: Signature,
+        sent_at: Instant,
+    },
+    Success {
+        seqno: String,
+    },
+    FailedSend {
+        last_err: ErrBox,
+    },
+    FailedConfirm {
+        last_err: ErrBox,
+    },
+}
+
+use BatchTxState::*;
+
+/// Send a series of batch attestations for symbols of an attestation config.
 fn handle_attest(
-    rpc_client: &RpcClient, // Needed for reading Pyth account data
+    rpc_client: &RpcClient,
     payer: Keypair,
     p2w_addr: Pubkey,
     attestation_cfg: &AttestationConfig,
     n_retries: usize,
+    conf_timeout: Duration,
+    rpc_interval: Duration,
 ) -> Result<(), ErrBox> {
     // Derive seeded accounts
     let emitter_addr = P2WEmitter::key(None, &p2w_addr);
@@ -130,116 +178,225 @@ fn handle_attest(
         batch_count
     );
 
-    let mut errors = Vec::new();
-
     // Reused for failed batch retries
     let mut batches: Vec<_> = attestation_cfg
         .symbols
         .as_slice()
         .chunks(config.max_batch_size as usize)
         .enumerate()
-        .map(|(idx, symbols)| (idx + 1, symbols, 1))
+        .map(|(idx, symbols)| {
+            (
+                idx + 1,
+                BatchTxState::Sending {
+                    symbols,
+                    attempt_no: 1,
+                },
+            )
+        })
         .collect();
 
-    let mut batches4retry = Vec::new();
-
-    // If no batches are scheduled for retry, the vector eventually drains
-    while !batches.is_empty() {
-        for (batch_no, symbols, attempt_no) in batches {
-            info!(
-                "Batch {}/{} contents: {:?}",
-                batch_no,
-                batch_count,
-                symbols
-                    .iter()
-                    .map(|s| s
-                        .name
-                        .clone()
-                        .unwrap_or(format!("unnamed product {:?}", s.product_addr)))
-                    .collect::<Vec<_>>()
-            );
-
-            // Execute the transaction, obtain the resulting sequence
-            // number. The and_then() calls enforce permissible error
-            // handling location near loop end.
-            let res = rpc_client
-                .get_latest_blockhash()
-                .map_err(|e| -> ErrBox { e.into() })
-                .and_then(|latest_blockhash| {
-                    let tx_signed = gen_attest_tx(
-                        p2w_addr,
-                        &config,
-                        &payer,
-                        symbols,
-                        &Keypair::new(),
-                        latest_blockhash,
-                    )?;
-
-                    rpc_client
-                        .send_and_confirm_transaction_with_spinner(&tx_signed)
-                        .map_err(|e| -> ErrBox { e.into() })
-                })
-                .and_then(|sig| {
-                    rpc_client
-                        .get_transaction(&sig, UiTransactionEncoding::Json)
+    let mut finished_count = 0;
+
+    // TODO(2021-03-09): Extract logic into helper functions
+    while finished_count < batches.len() {
+        finished_count = 0;
+        for (batch_no, state) in batches.iter_mut() {
+            match state {
+                BatchTxState::Sending {
+                    symbols,
+                    attempt_no,
+                } => {
+                    info!(
+                        "Batch {}/{} contents: {:?}",
+                        batch_no,
+                        batch_count,
+                        symbols
+                            .iter()
+                            .map(|s| s
+                                .name
+                                .clone()
+                                .unwrap_or(format!("unnamed product {:?}", s.product_addr)))
+                            .collect::<Vec<_>>()
+                    );
+
+                    // Send the transaction
+                    let res = rpc_client
+                        .get_latest_blockhash()
                         .map_err(|e| -> ErrBox { e.into() })
-                })
-                .and_then(|this_tx| {
-                    this_tx
-                        .transaction
-                        .meta
-                        .and_then(|meta| meta.log_messages)
-                        .and_then(|logs| {
-                            let mut seqno = None;
-                            for log in logs {
-                                if log.starts_with(SEQNO_PREFIX) {
-                                    seqno = Some(log.replace(SEQNO_PREFIX, ""));
-                                    break;
+                        .and_then(|latest_blockhash| {
+                            let tx_signed = gen_attest_tx(
+                                p2w_addr,
+                                &config,
+                                &payer,
+                                symbols,
+                                &Keypair::new(),
+                                latest_blockhash,
+                            )?;
+
+                            rpc_client
+                                .send_transaction(&tx_signed)
+                                .map_err(|e| -> ErrBox { e.into() })
+                        });
+
+                    // Individual batch errors mustn't prevent other batches from being sent.
+                    match res {
+                        Ok(signature) => {
+                            info!(
+                                "Batch {}/{} tx send: OK (Attempt {} of {})",
+                                batch_no, batch_count, attempt_no, n_retries
+                            );
+
+                            // Record when we've sent this tx
+
+                            *state = BatchTxState::Confirming {
+                                symbols,
+                                attempt_no: *attempt_no,
+                                signature,
+                                sent_at: Instant::now(),
+                            }
+                        }
+                        Err(e) => {
+                            let msg = format!(
+                                "Batch {}/{} tx send error (attempt {} of {}): {}",
+                                batch_no,
+                                batch_count,
+                                attempt_no,
+                                n_retries + 1,
+                                e.to_string()
+                            );
+                            warn!("{}", &msg);
+
+                            if *attempt_no < n_retries {
+                                *state = BatchTxState::Sending {
+                                    attempt_no: *attempt_no + 1,
+                                    symbols,
                                 }
+                            } else {
+                                // This batch failed all attempts, note the error but do not schedule for retry
+                                error!(
+                                    "Batch {}/{} tx send: All {} attempts failed",
+                                    batch_no,
+                                    batch_count,
+                                    n_retries + 1
+                                );
+                                *state = BatchTxState::FailedSend { last_err: e };
                             }
-                            seqno
-                        })
-                        .ok_or_else(|| format!("No seqno in program logs").into())
-                });
-
-            // Individual batch errors mustn't prevent other batches from being sent.
-            match res {
-                Ok(seqno) => {
-                    println!("Sequence number: {}", seqno);
-                    info!("Batch {}/{}: OK, seqno {}", batch_no, batch_count, seqno);
+                        }
+                    }
                 }
-                Err(e) => {
-                    let msg = format!(
-                        "Batch {}/{} tx error (attempt {} of {}): {}",
-                        batch_no,
-                        batch_count,
-                        attempt_no,
-                        n_retries + 1,
-                        e.to_string()
-                    );
-                    warn!("{}", &msg);
-
-                    if attempt_no < n_retries + 1 {
-                        // Schedule this batch for a retry if we have retries left
-                        batches4retry.push((batch_no, symbols, attempt_no + 1));
-                    } else {
-                        // This batch failed all attempts, note the error but do not schedule for retry
-                        error!(
-                            "Batch {}/{}: All {} attempts failed",
-                            batch_no,
-                            batch_count,
-                            n_retries + 1
-                        );
-                        errors.push(msg);
+                BatchTxState::Confirming {
+                    symbols,
+                    attempt_no,
+                    signature,
+                    sent_at,
+                } => {
+                    let res = rpc_client
+                        .get_transaction(&signature, UiTransactionEncoding::Json)
+                        .map_err(|e| -> ErrBox { e.into() })
+                        .and_then(|this_tx| {
+                            this_tx
+                                .transaction
+                                .meta
+                                .and_then(|meta| meta.log_messages)
+                                .and_then(|logs| {
+                                    let mut seqno = None;
+                                    for log in logs {
+                                        if log.starts_with(SEQNO_PREFIX) {
+                                            seqno = Some(log.replace(SEQNO_PREFIX, ""));
+                                            break;
+                                        }
+                                    }
+                                    seqno
+                                })
+                                .ok_or_else(|| format!("No seqno in program logs").into())
+                        });
+
+                    match res {
+                        Ok(seqno) => {
+                            // NOTE(2022-03-09): p2w_autoattest.py relies on parsing this println!()
+                            println!("Sequence number: {}", seqno);
+                            info!("Batch {}/{}: OK, seqno {}", batch_no, batch_count, seqno);
+
+                            *state = BatchTxState::Success { seqno };
+                        }
+                        Err(e) => {
+                            let elapsed = sent_at.elapsed();
+                            let msg = format!(
+                                "Batch {}/{} tx confirmation failed ({}.{}/{}.{}): {}",
+                                batch_no,
+                                batch_count,
+                                elapsed.as_secs(),
+                                elapsed.subsec_millis(),
+                                conf_timeout.as_secs(),
+                                conf_timeout.subsec_millis(),
+                                e.to_string()
+                            );
+                            debug!("{}", &msg); // Output volume usually not suitable for warn!()
+
+                            if elapsed > conf_timeout {
+                                // This batch exceeded the timeout,
+                                // note the error and schedule for a
+                                // fresh send attempt
+                                warn!(
+                                    "Batch {}/{} tx confirm: Took more than {}.{} seconds (attempt {} of {}): {}",
+                                    batch_no,
+                                    batch_count,
+                                    conf_timeout.as_secs(),
+                                    conf_timeout.subsec_millis(),
+				    attempt_no, n_retries,
+				    msg
+                                );
+
+                                if *attempt_no < n_retries {
+                                    *state = BatchTxState::Sending {
+                                        symbols,
+                                        attempt_no: *attempt_no + 1,
+                                    };
+                                } else {
+                                    error!(
+                                        "Batch {}/{} tx confirm: All {} attempts failed",
+                                        batch_no,
+                                        batch_count,
+                                        n_retries + 1
+                                    );
+                                    *state = BatchTxState::FailedConfirm { last_err: e };
+                                }
+                            }
+                        }
                     }
                 }
+                Success { .. } | FailedSend { .. } | FailedConfirm { .. } => {
+                    finished_count += 1; // Gather terminal states for top-level loop exit
+                    continue; // No requests were made, skip sleep
+                }
             }
+
+            thread::sleep(rpc_interval);
         }
+    }
+
+    let mut errors = Vec::new();
+
+    // Filter out errors
+    for (batch_no, state) in batches {
+        use BatchTxState::*;
+        match state {
+            Success { .. } => {}
+            FailedSend { last_err } | FailedConfirm { last_err } => {
+                errors.push(last_err.to_string())
+            }
+            other => {
+                // Be loud about non-terminal states left behind
+                let msg = format!(
+                    "INTERNAL: Batch {} left in non-terminal state {:#?}",
+                    batch_no, other
+                );
+
+                error!("{}", msg);
 
-        // Batches scheduled for retry become the list of batches for
-        // next attempt round, clear retry vec for future failed attempts.
-        batches = batches4retry;
-        batches4retry = Vec::new();
+                errors.push(msg);
+            }
+        }
     }
 
     if !errors.is_empty() {