Bladeren bron

pyth2wormhole-client: refactor into fully-async futures-based approach (#219)

* pyth2wormhole-client: refactor into fully-async futures-based approach

commit-id:2ed35045

* p2w-client: Change inconsistent rpc constructor

commit-id:cb3b2ff2

* p2w-client: Move job creation to a function, simplify comment

commit-id:35328b38

* pyth2wormhole-client: Use get_multiple_accounts

commit-id:7fc85157

* Implement a rate-limited mutex for RPC client

commit-id:1a243063

* pyth2wormhole-client: only guard beginning new requests in RLMutex

commit-id:d8251474

* pyth2wormhole-client: RLMutex: ensure the inner guard is not dropped

commit-id:c3513f5e

* pyth2wormhole-client: Clarify attestation_sched_futs comment

commit-id:97033670

* pyth2wormhole-client: Use CommitmentConfig's native FromStr parsing

commit-id:835d7125

* pyth2wormhole-client: doc comment typo

commit-id:5ee388de

* pyth2wormhole-client: move closures to their own async functions

This makes the main.rs async attestation routines easier to read.

commit-id:3565a744

* pyth2wormhole-client: fix merge typo

* pyth2wormhole-client: Apply Tom's readability advice

* pyth2wormhole-client reword attestation_sched_job() comment

* pyth2wormhole-client: expand attestation_sched_job() comment

* pyth2wormhole-client:   e  x  p  a  n  d   the comment

* Trigger CI

* p2w-client/main.rs: correct missing awaits after merge
Stanisław Drozd 3 jaren geleden
bovenliggende
commit
f0552e5f1b

+ 2 - 0
solana/pyth2wormhole/Cargo.lock

@@ -2153,6 +2153,7 @@ dependencies = [
  "borsh",
  "clap 3.1.18",
  "env_logger 0.8.4",
+ "futures",
  "log",
  "p2w-sdk",
  "pyth-client 0.5.1",
@@ -2168,6 +2169,7 @@ dependencies = [
  "solana-transaction-status",
  "solitaire",
  "solitaire-client",
+ "tokio",
  "wormhole-bridge-solana",
 ]
 

+ 2 - 0
solana/pyth2wormhole/client/Cargo.toml

@@ -29,6 +29,8 @@ solana-sdk = "=1.10.13"
 solana-transaction-status = "=1.10.13"
 solitaire-client = {path = "../../solitaire/client"}
 solitaire = {path = "../../solitaire/program"}
+tokio = {version = "1", features = ["sync", "rt", "time"]}
+futures = "0.3.21"
 
 [dev-dependencies]
 pyth-client = "0.5.0"

+ 14 - 4
solana/pyth2wormhole/client/src/attestation_cfg.rs

@@ -26,20 +26,30 @@ pub struct SymbolGroup {
     pub symbols: Vec<P2WSymbol>,
 }
 
-pub const fn DEFAULT_MIN_INTERVAL_SECS() -> u64 {
+pub const fn default_min_interval_secs() -> u64 {
     60
 }
 
+pub const fn default_max_batch_jobs() -> usize {
+    20
+}
+
 /// Spontaneous attestation triggers. Attestation is triggered if any
 /// of the active conditions is met. Option<> fields can be
 /// de-activated with None. All conditions are inactive by default,
-/// except for min_interval_secs set to 1 minute.
+/// except for the non-Option ones.
 #[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq)]
 pub struct AttestationConditions {
     /// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation.
-    #[serde(default = "DEFAULT_MIN_INTERVAL_SECS")]
+    #[serde(default = "default_min_interval_secs")]
     pub min_interval_secs: u64,
 
+    /// Limit concurrent attestation attempts per batch. This setting
+    /// should act only as a failsafe cap on resource consumption and is
+    /// best set well above the expected average number of jobs.
+    #[serde(default = "default_max_batch_jobs")]
+    pub max_batch_jobs: usize,
+
     /// Trigger attestation if price changes by the specified percentage.
     #[serde(default)]
     pub price_changed_pct: Option<f64>,
@@ -51,7 +61,7 @@ pub struct AttestationConditions {
 }
 
 /// Config entry for a Pyth product + price pair
-#[derive(Default, Debug, Deserialize, Serialize, PartialEq, Eq)]
+#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq, Eq)]
 pub struct P2WSymbol {
     /// User-defined human-readable name
     pub name: Option<String>,

+ 42 - 69
solana/pyth2wormhole/client/src/batch_state.rs

@@ -1,8 +1,10 @@
+use futures::future::TryFutureExt;
 use log::{
     debug,
+    trace,
     warn,
 };
-use solana_client::rpc_client::RpcClient;
+use solana_client::nonblocking::rpc_client::RpcClient;
 use solana_sdk::signature::Signature;
 
 use pyth_sdk_solana::state::PriceAccount;
@@ -16,17 +18,18 @@ use crate::{
     AttestationConditions,
     ErrBox,
     P2WSymbol,
+    RLMutex,
 };
 
+/// Runtime representation of a batch. It refers to the original group
+/// from the config.
 #[derive(Debug)]
 pub struct BatchState<'a> {
     pub group_name: String,
     pub symbols: &'a [P2WSymbol],
     pub last_known_symbol_states: Vec<Option<PriceAccount>>,
     pub conditions: AttestationConditions,
-    status: BatchTxStatus,
-    status_changed_at: Instant,
-    pub last_success_at: Option<Instant>,
+    pub last_job_finished_at: Instant,
 }
 
 impl<'a> BatchState<'a> {
@@ -40,63 +43,51 @@ impl<'a> BatchState<'a> {
             symbols,
             conditions,
             last_known_symbol_states: vec![None; symbols.len()],
-            status: BatchTxStatus::Sending { attempt_no: 1 },
-            status_changed_at: Instant::now(),
-            last_success_at: None,
+            last_job_finished_at: Instant::now(),
         }
     }
-    /// Ensure only set_status() alters the timestamp
-    pub fn get_status_changed_at(&self) -> &Instant {
-        &self.status_changed_at
-    }
-    pub fn get_status(&self) -> &BatchTxStatus {
-        &self.status
-    }
-
-    /// Ensure that status changes are accompanied by a timestamp bump
-    pub fn set_status(&mut self, s: BatchTxStatus) {
-        self.status_changed_at = Instant::now();
-        self.status = s;
-    }
 
     /// Evaluate the configured attestation conditions for this
     /// batch. RPC is used to update last known state. Returns
     /// Some("<reason>") if any trigger condition was met. Only the
     /// first encountered condition is mentioned.
-    pub fn should_resend(&mut self, c: &RpcClient) -> Option<String> {
+    pub async fn should_resend(&mut self, c: &RpcClient) -> Option<String> {
         let mut ret = None;
 
         let sym_count = self.symbols.len();
-        let mut new_symbol_states: Vec<Option<PriceAccount>> = Vec::with_capacity(sym_count);
-        for (idx, sym) in self.symbols.iter().enumerate() {
-            let new_state = match c
-                .get_account_data(&sym.price_addr)
-                .map_err(|e| e.to_string())
-                .and_then(|bytes| {
-                    pyth_sdk_solana::state::load_price_account(&bytes)
-                        .map(|state| state.clone())
-                        .map_err(|e| e.to_string())
-                }) {
-                Ok(state) => Some(state),
-                Err(e) => {
-                    warn!(
-                        "Symbol {} ({}/{}): Could not look up state: {}",
-                        sym.name
-                            .as_ref()
-                            .unwrap_or(&format!("Unnamed product {}", sym.product_addr)),
-                        idx + 1,
-                        sym_count,
-                        e.to_string()
-                    );
-                    None
-                }
-            };
+        let pubkeys: Vec<_> = self.symbols.iter().map(|s| s.price_addr).collect();
 
-            new_symbol_states.push(new_state);
-        }
+        // Always learn the current on-chain state for each symbol, use None values if lookup fails
+        let mut new_symbol_states: Vec<Option<PriceAccount>> = match c
+            .get_multiple_accounts(&pubkeys)
+            .await
+        {
+            Ok(acc_opts) => {
+                acc_opts
+                    .into_iter()
+                    .enumerate()
+                    .map(|(idx, opt)| {
+                        // Take each Some(acc), make it None and log on load_price_account() error
+                        opt.and_then(|acc| {
+                            pyth_sdk_solana::state::load_price_account(&acc.data)
+                                .cloned() // load_price_account() transmutes the data reference into another reference, and owning acc_opts is not enough
+                                .map_err(|e| {
+                                    warn!("Could not parse symbol {}/{}: {}", idx, sym_count, e);
+                                    e
+                                })
+                                .ok() // Err becomes None
+                        })
+                    })
+                    .collect()
+            }
+            Err(e) => {
+                warn!("Could not look up any symbols on-chain: {}", e);
+                vec![None; sym_count]
+            }
+        };
 
         // min interval
-        if self.get_status_changed_at().elapsed()
+        if self.last_job_finished_at.elapsed()
             > Duration::from_secs(self.conditions.min_interval_secs)
         {
             ret = Some(format!(
@@ -154,7 +145,9 @@ impl<'a> BatchState<'a> {
             }
         }
 
-        // Update with newer state if a condition was met
+        // Update with newer state only if a condition was met. We
+        // don't want to shadow changes that may happen over a larger
+        // period between state lookups.
         if ret.is_some() {
             for (old, new) in self
                 .last_known_symbol_states
@@ -170,23 +163,3 @@ impl<'a> BatchState<'a> {
         return ret;
     }
 }
-
-#[derive(Debug)]
-pub enum BatchTxStatus {
-    Sending {
-        attempt_no: usize,
-    },
-    Confirming {
-        attempt_no: usize,
-        signature: Signature,
-    },
-    Success {
-        seqno: String,
-    },
-    FailedSend {
-        last_err: ErrBox,
-    },
-    FailedConfirm {
-        last_err: ErrBox,
-    },
-}

+ 21 - 11
solana/pyth2wormhole/client/src/cli.rs

@@ -1,6 +1,7 @@
 //! CLI options
 
 use solana_program::pubkey::Pubkey;
+use solana_sdk::commitment_config::CommitmentConfig;
 use std::path::PathBuf;
 
 use clap::{
@@ -29,6 +30,14 @@ pub struct Cli {
     pub payer: String,
     #[clap(short, long, default_value = "http://localhost:8899")]
     pub rpc_url: String,
+    #[clap(
+        long = "rpc-interval",
+        default_value = "150",
+        help = "Rate-limiting minimum delay between RPC requests in milliseconds"
+    )]
+    pub rpc_interval_ms: u64,
+    #[clap(long, default_value = "confirmed")]
+    pub commitment: CommitmentConfig,
     #[clap(long)]
     pub p2w_addr: Pubkey,
     #[clap(subcommand)]
@@ -60,10 +69,18 @@ pub enum Action {
         #[clap(
             short = 'n',
             long = "--n-retries",
-            help = "How many times to retry send_transaction() on each batch before flagging a failure.",
+            help = "How many times to retry send_transaction() on each batch before flagging a failure. Only active outside daemon mode",
             default_value = "5"
         )]
         n_retries: usize,
+        #[clap(
+            short = 'i',
+            long = "--retry-interval",
+            help = "How long to wait between send_transaction
+            retries. Only active outside daemon mode",
+            default_value = "5"
+        )]
+        retry_interval_secs: u64,
         #[clap(
             short = 'd',
             long = "--daemon",
@@ -73,17 +90,10 @@ pub enum Action {
         #[clap(
             short = 't',
             long = "--timeout",
-            help = "How many seconds to wait before giving up on get_transaction() for tx confirmation.",
-            default_value = "40"
-        )]
-        conf_timeout_secs: u64,
-        #[clap(
-            short = 'i',
-            long = "--rpc-interval",
-            help = "How many milliseconds to wait between SOL RPC requests",
-            default_value = "200"
+            help = "How many seconds to wait before giving up on  tx confirmation.",
+            default_value = "20"
         )]
-        rpc_interval_ms: u64,
+        confirmation_timeout_secs: u64,
     },
     #[clap(about = "Retrieve a pyth2wormhole program's current settings")]
     GetConfig,

+ 27 - 11
solana/pyth2wormhole/client/src/lib.rs

@@ -1,11 +1,12 @@
 pub mod attestation_cfg;
 pub mod batch_state;
+pub mod util;
 
 use borsh::{
     BorshDeserialize,
     BorshSerialize,
 };
-use solana_client::rpc_client::RpcClient;
+use solana_client::nonblocking::rpc_client::RpcClient;
 use solana_program::{
     hash::Hash,
     instruction::{
@@ -51,19 +52,24 @@ use pyth2wormhole::{
     migrate::MigrateAccounts,
     set_config::SetConfigAccounts,
     AttestData,
-    Pyth2WormholeConfig,
 };
 
+pub use pyth2wormhole::Pyth2WormholeConfig;
+
 pub use attestation_cfg::{
     AttestationConditions,
     AttestationConfig,
     P2WSymbol,
 };
-pub use batch_state::{
-    BatchState,
-    BatchTxStatus,
+pub use batch_state::BatchState;
+pub use util::{
+    RLMutex,
+    RLMutexGuard,
 };
 
+/// Future-friendly version of solitaire::ErrBox
+pub type ErrBoxSend = Box<dyn std::error::Error + Send + Sync>;
+
 pub fn gen_init_tx(
     payer: Keypair,
     p2w_addr: Pubkey,
@@ -159,14 +165,17 @@ pub fn gen_migrate_tx(
 }
 
 /// Get the current config account data for given p2w program address
-pub fn get_config_account(
+pub async fn get_config_account(
     rpc_client: &RpcClient,
     p2w_addr: &Pubkey,
 ) -> Result<Pyth2WormholeConfig, ErrBox> {
     let p2w_config_addr = P2WConfigAccount::<{ AccountState::Initialized }>::key(None, p2w_addr);
 
     let config = Pyth2WormholeConfig::try_from_slice(
-        rpc_client.get_account_data(&p2w_config_addr)?.as_slice(),
+        rpc_client
+            .get_account_data(&p2w_config_addr)
+            .await?
+            .as_slice(),
     )?;
 
     Ok(config)
@@ -181,7 +190,7 @@ pub fn gen_attest_tx(
     symbols: &[P2WSymbol],
     wh_msg: &Keypair,
     latest_blockhash: Hash,
-) -> Result<Transaction, ErrBox> {
+) -> Result<Transaction, ErrBoxSend> {
     let emitter_addr = P2WEmitter::key(None, &p2w_addr);
 
     let seq_addr = Sequence::key(
@@ -193,11 +202,11 @@ pub fn gen_attest_tx(
 
     let p2w_config_addr = P2WConfigAccount::<{ AccountState::Initialized }>::key(None, &p2w_addr);
     if symbols.len() > p2w_config.max_batch_size as usize {
-        return Err(format!(
+        return Err((format!(
             "Expected up to {} symbols for batch, {} were found",
             p2w_config.max_batch_size,
             symbols.len()
-        )
+        ))
         .into());
     }
     // Initial attest() accounts
@@ -267,7 +276,14 @@ pub fn gen_attest_tx(
         },
     );
 
-    let ix = Instruction::new_with_bytes(p2w_addr, ix_data.try_to_vec()?.as_slice(), acc_metas);
+    let ix = Instruction::new_with_bytes(
+        p2w_addr,
+        ix_data
+            .try_to_vec()
+            .map_err(|e| -> ErrBoxSend { Box::new(e) })?
+            .as_slice(),
+        acc_metas,
+    );
 
     let tx_signed = Transaction::new_signed_with_payer::<Vec<&Keypair>>(
         &[ix],

+ 312 - 302
solana/pyth2wormhole/client/src/main.rs

@@ -2,6 +2,8 @@ pub mod cli;
 
 use std::{
     fs::File,
+    pin::Pin,
+    sync::Arc,
     thread,
     time::{
         Duration,
@@ -10,6 +12,12 @@ use std::{
 };
 
 use clap::Parser;
+use futures::future::{
+    Future,
+    FutureExt,
+    TryFuture,
+    TryFutureExt,
+};
 use log::{
     debug,
     error,
@@ -18,7 +26,10 @@ use log::{
     warn,
     LevelFilter,
 };
-use solana_client::rpc_client::RpcClient;
+use solana_client::{
+    client_error::ClientError,
+    nonblocking::rpc_client::RpcClient,
+};
 use solana_program::pubkey::Pubkey;
 use solana_sdk::{
     commitment_config::CommitmentConfig,
@@ -33,6 +44,10 @@ use solitaire::{
     ErrBox,
 };
 use solitaire_client::Keypair;
+use tokio::{
+    sync::Semaphore,
+    task::JoinHandle,
+};
 
 use cli::{
     Action,
@@ -49,16 +64,18 @@ use pyth2wormhole_client::*;
 
 pub const SEQNO_PREFIX: &'static str = "Program log: Sequence: ";
 
-fn main() -> Result<(), ErrBox> {
+#[tokio::main]
+async fn main() -> Result<(), ErrBox> {
     let cli = Cli::parse();
     init_logging(cli.log_level);
 
     let payer = read_keypair_file(&*shellexpand::tilde(&cli.payer))?;
-    let rpc_client = RpcClient::new_with_commitment(cli.rpc_url, CommitmentConfig::confirmed());
+
+    let rpc_client = RpcClient::new_with_commitment(cli.rpc_url.clone(), cli.commitment.clone());
 
     let p2w_addr = cli.p2w_addr;
 
-    let latest_blockhash = rpc_client.get_latest_blockhash()?;
+    let latest_blockhash = rpc_client.get_latest_blockhash().await?;
 
     match cli.action {
         Action::Init {
@@ -79,10 +96,16 @@ fn main() -> Result<(), ErrBox> {
                 },
                 latest_blockhash,
             )?;
-            rpc_client.send_and_confirm_transaction_with_spinner(&tx)?;
+            rpc_client
+                .send_and_confirm_transaction_with_spinner(&tx)
+                .await?;
+            println!(
+                "Initialized with conifg:\n{:?}",
+                get_config_account(&rpc_client, &p2w_addr).await?
+            );
         }
         Action::GetConfig => {
-            println!("{:?}", get_config_account(&rpc_client, &p2w_addr)?);
+            println!("{:?}", get_config_account(&rpc_client, &p2w_addr).await?);
         }
         Action::SetConfig {
             ref owner,
@@ -91,7 +114,7 @@ fn main() -> Result<(), ErrBox> {
             new_pyth_owner_addr,
             is_active,
         } => {
-            let old_config = get_config_account(&rpc_client, &p2w_addr)?;
+            let old_config = get_config_account(&rpc_client, &p2w_addr).await?;
             let tx = gen_set_config_tx(
                 payer,
                 p2w_addr,
@@ -105,10 +128,12 @@ fn main() -> Result<(), ErrBox> {
                 },
                 latest_blockhash,
             )?;
-            rpc_client.send_and_confirm_transaction_with_spinner(&tx)?;
+            rpc_client
+                .send_and_confirm_transaction_with_spinner(&tx)
+                .await?;
             println!(
                 "Applied conifg:\n{:?}",
-                get_config_account(&rpc_client, &p2w_addr)?
+                get_config_account(&rpc_client, &p2w_addr).await?
             );
         }
         Action::Migrate {
@@ -120,58 +145,69 @@ fn main() -> Result<(), ErrBox> {
                 read_keypair_file(&*shellexpand::tilde(&owner))?,
                 latest_blockhash,
             )?;
-            rpc_client.send_and_confirm_transaction_with_spinner(&tx)?;
+            rpc_client.send_and_confirm_transaction_with_spinner(&tx).await?;
             println!(
                 "Applied conifg:\n{:?}",
-                get_config_account(&rpc_client, &p2w_addr)?
+                get_config_account(&rpc_client, &p2w_addr).await?
             );
         }
         Action::Attest {
             ref attestation_cfg,
             n_retries,
+            retry_interval_secs,
+            confirmation_timeout_secs,
             daemon,
-            conf_timeout_secs,
-            rpc_interval_ms,
         } => {
             // Load the attestation config yaml
             let attestation_cfg: AttestationConfig =
                 serde_yaml::from_reader(File::open(attestation_cfg)?)?;
 
             handle_attest(
-                &rpc_client,
+                cli.rpc_url,
+                Duration::from_millis(cli.rpc_interval_ms),
+                cli.commitment,
                 payer,
                 p2w_addr,
-                &attestation_cfg,
+                attestation_cfg,
                 n_retries,
+                Duration::from_secs(retry_interval_secs),
+                Duration::from_secs(confirmation_timeout_secs),
                 daemon,
-                Duration::from_secs(conf_timeout_secs),
-                Duration::from_millis(rpc_interval_ms),
-            )?;
+            )
+            .await?;
         }
     }
 
     Ok(())
 }
 
-use BatchTxStatus::*;
-
 /// Send a series of batch attestations for symbols of an attestation config.
-fn handle_attest(
-    rpc_client: &RpcClient,
+async fn handle_attest(
+    rpc_url: String,
+    rpc_interval: Duration,
+    commitment: CommitmentConfig,
     payer: Keypair,
     p2w_addr: Pubkey,
-    attestation_cfg: &AttestationConfig,
+    attestation_cfg: AttestationConfig,
     n_retries: usize,
+    retry_interval: Duration,
+    confirmation_timeout: Duration,
     daemon: bool,
-    conf_timeout: Duration,
-    rpc_interval: Duration,
 ) -> Result<(), ErrBox> {
     // Derive seeded accounts
     let emitter_addr = P2WEmitter::key(None, &p2w_addr);
 
     info!("Using emitter addr {}", emitter_addr);
 
-    let config = get_config_account(rpc_client, &p2w_addr)?;
+    let config = get_config_account(
+        &RpcClient::new_with_timeout_and_commitment(
+            rpc_url.clone(),
+            confirmation_timeout,
+            commitment.clone(),
+        ),
+        &p2w_addr,
+    )
+    .await?;
 
     debug!("Symbol config:\n{:#?}", attestation_cfg);
 
@@ -185,7 +221,6 @@ fn handle_attest(
         .symbol_groups
         .iter()
         .map(|g| {
-            // FIXME: The forbidden nested closure move technique (a lost art of pleasing the borrow checker)
             let conditions4closure = g.conditions.clone();
             let name4closure = g.group_name.clone();
 
@@ -205,301 +240,276 @@ fn handle_attest(
         .collect();
     let batch_count = batches.len();
 
-    // NOTE(2022-04-26): only increment this if `daemon` is false
-    let mut finished_count = 0;
+    /// Note: For global rate-limitting of RPC requests, we use a
+    /// custom Mutex wrapper which enforces a delay of rpc_interval
+    /// between RPC accesses.
+    let rpc_cfg = Arc::new(RLMutex::new(
+        RpcCfg {
+            url: rpc_url,
+            timeout: confirmation_timeout,
+            commitment: commitment.clone(),
+        },
+        rpc_interval,
+    ));
+
+    // Create attestation scheduling routines; see attestation_sched_job() for details
+    let mut attestation_sched_futs = batches.into_iter().map(|(batch_no, batch)| {
+        attestation_sched_job(
+            batch,
+            batch_no,
+            batch_count,
+            n_retries,
+            retry_interval,
+            daemon,
+            rpc_cfg.clone(),
+            p2w_addr,
+            config.clone(),
+            Keypair::from_bytes(&payer.to_bytes()).unwrap(),
+        )
+    });
 
-    // Stats
-    // TODO(2022-05-12): These should become Prometheus metrics in the future
-    let mut stats_start_time = Instant::now();
-    let mut tx_successes = 0;
-    let mut tx_send_failures = 0;
-    let mut tx_confirm_failures = 0;
+    info!("Spinning up attestation sched jobs");
 
-    let mut batch_wait_times = Duration::ZERO;
+    let results = futures::future::join_all(attestation_sched_futs).await; // May never finish for daemon mode
 
-    // TODO(2021-03-09): Extract logic into helper functions
-    while daemon || finished_count < batches.len() {
-        finished_count = 0;
-        for (batch_no, state) in batches.iter_mut() {
-            match state.get_status().clone() {
-                Sending { attempt_no } => {
-                    info!(
-                        "Batch {}/{} contents (group {:?}): {:?}",
-                        batch_no,
-                        batch_count,
-                        state.group_name,
-                        state
-                            .symbols
-                            .iter()
-                            .map(|s| s
-                                .name
-                                .clone()
-                                .unwrap_or(format!("unnamed product {:?}", s.product_addr)))
-                            .collect::<Vec<_>>()
-                    );
+    info!("Got {} results", results.len());
 
-                    // Send the transaction
-                    let res = rpc_client
-                        .get_latest_blockhash()
-                        .map_err(|e| -> ErrBox { e.into() })
-                        .and_then(|latest_blockhash| {
-                            let tx_signed = gen_attest_tx(
-                                p2w_addr,
-                                &config,
-                                &payer,
-                                state.symbols,
-                                &Keypair::new(),
-                                latest_blockhash,
-                            )?;
-
-                            rpc_client
-                                .send_transaction(&tx_signed)
-                                .map_err(|e| -> ErrBox { e.into() })
-                        });
-
-                    // Individual batch errors mustn't prevent other batches from being sent.
-                    match res {
-                        Ok(signature) => {
-                            info!(
-                                "Batch {}/{} (group {:?}) tx send: OK (Attempt {} of {})",
-                                batch_no, batch_count, state.group_name, attempt_no, n_retries
-                            );
-
-                            state.set_status(Confirming {
-                                attempt_no: *attempt_no,
-                                signature,
-                            });
-                        }
-                        Err(e) => {
-                            let msg = format!(
-                                "Batch {}/{} (group {:?}) tx send error (attempt {} of {}): {}",
-                                batch_no,
-                                batch_count,
-                                state.group_name,
-                                attempt_no,
-                                n_retries + 1,
-                                e.to_string()
-                            );
-                            warn!("{}", &msg);
-
-                            tx_send_failures += 1;
-
-                            if attempt_no < &n_retries {
-                                state.set_status(Sending {
-                                    attempt_no: attempt_no + 1,
-                                })
-                            } else {
-                                // This batch failed all attempts, note the error but do not schedule for retry
-                                error!(
-                                    "Batch {}/{} (group {:?}) tx send: All {} attempts failed",
-                                    state.group_name,
-                                    batch_no,
-                                    batch_count,
-                                    n_retries + 1
-                                );
-                                state.set_status(FailedSend { last_err: e });
-                            }
-                        }
-                    }
-                }
-                Confirming {
-                    attempt_no,
-                    signature,
-                } => {
-                    let res = rpc_client
-                        .get_transaction(&signature, UiTransactionEncoding::Json)
-                        .map_err(|e| -> ErrBox { e.into() })
-                        .and_then(|this_tx| {
-                            this_tx
-                                .transaction
-                                .meta
-                                .and_then(|meta| meta.log_messages)
-                                .and_then(|logs| {
-                                    let mut seqno = None;
-                                    for log in logs {
-                                        if log.starts_with(SEQNO_PREFIX) {
-                                            seqno = Some(log.replace(SEQNO_PREFIX, ""));
-                                            break;
-                                        }
-                                    }
-                                    seqno
-                                })
-                                .ok_or_else(|| format!("No seqno in program logs").into())
-                        });
-
-                    match res {
-                        Ok(seqno) => {
-                            // NOTE(2022-03-09): p2w_autoattest.py relies on parsing this println!()
-                            println!("Sequence number: {}", seqno);
-                            info!("Batch {}/{}: OK, seqno {}", batch_no, batch_count, seqno);
-
-                            tx_successes += 1;
-
-                            // Include delay for average
-                            if let Some(t) = state.last_success_at.as_ref() {
-                                batch_wait_times += t.elapsed();
-                            }
-
-                            state.last_success_at = Some(Instant::now());
-                            state.set_status(Success { seqno });
-                        }
-                        Err(e) => {
-                            let elapsed = state.get_status_changed_at().elapsed();
-                            let msg = format!(
-                                "Batch {}/{} (group {:?}) tx confirmation failed ({}.{}/{}.{}): {}",
-                                batch_no,
-                                batch_count,
-                                state.group_name,
-                                elapsed.as_secs(),
-                                elapsed.subsec_millis(),
-                                conf_timeout.as_secs(),
-                                conf_timeout.subsec_millis(),
-                                e.to_string()
-                            );
-                            debug!("{}", &msg); // Output volume usually not suitable for warn!()
-
-                            if elapsed > conf_timeout {
-                                // This batch exceeded the timeout,
-                                // note the error and schedule for a
-                                // fresh send attempt
-                                warn!(
-                                    "Batch {}/{} (group {:?}) tx confirm: Took more than {}.{} seconds (attempt {} of {}): {}",
-                                    state.group_name,
-                                    batch_no,
-                                    batch_count,
-                                    conf_timeout.as_secs(),
-                                    conf_timeout.subsec_millis(),
-				    attempt_no, n_retries,
-				    msg
-                                );
-
-                                tx_confirm_failures += 1;
-
-                                if attempt_no < &n_retries {
-                                    state.set_status(Sending {
-                                        attempt_no: attempt_no + 1,
-                                    });
-                                } else {
-                                    error!(
-                                        "Batch {}/{} (group {:?}) tx confirm: All {} attempts failed",
-                                        state.group_name,
-                                        batch_no,
-                                        batch_count,
-                                        n_retries + 1
-                                    );
-                                    state.set_status(FailedConfirm { last_err: e });
-                                }
-                            }
-                        }
-                    }
-                }
-                Success { .. } | FailedSend { .. } | FailedConfirm { .. } => {
-                    // We only try to re-schedule under --daemon
-                    if daemon {
-                        if let Some(reason) = state.should_resend(rpc_client) {
-                            info!(
-                                "Batch {}/{} (group {:?}): resending (reason: {})",
-                                batch_no, batch_count, state.group_name, reason,
-                            );
-                            state.set_status(Sending { attempt_no: 1 });
-                        } else {
-                            let elapsed = state.get_status_changed_at().elapsed();
-                            trace!(
-                                "Batch {}/{} (group {:?}): waiting ({}.{}s elapsed)",
-                                batch_no,
-                                batch_count,
-                                state.group_name,
-                                elapsed.as_secs(),
-                                elapsed.subsec_millis(),
-                            )
-                        }
-                    } else {
-                        // Track the finished batches outside daemon mode
-                        finished_count += 1;
+    // With daemon mode off, the sched jobs return from the
+    // join_all. We filter out errors and report them
+    let errors: Vec<_> = results
+        .iter()
+        .filter_map(|r| r.as_ref().err().map(|e| e.to_string()))
+        .collect();
 
-                        // No RPC requests are made on terminal states outside daemon mode, skip sleep
-                        continue;
-                    }
-                }
-            }
+    if !errors.is_empty() {
+        let err_lines = errors.join("\n");
+        let msg = format!(
+            "{} of {} batches failed:\n{}",
+            errors.len(),
+            batch_count,
+            err_lines
+        );
+        error!("{}", msg);
+        return Err(msg.into());
+    }
 
-            thread::sleep(rpc_interval);
-        }
+    Ok(())
+}
 
-        // Print stats on every pass through the batches
-        let stats_seconds_elapsed = stats_start_time.elapsed().as_millis() as f64 / 1000.0;
-        let tps = tx_successes as f64 / stats_seconds_elapsed;
-        let sym_ps = tps * config.max_batch_size as f64;
+#[derive(Clone)]
+pub struct RpcCfg {
+    pub url: String,
+    pub timeout: Duration,
+    pub commitment: CommitmentConfig,
+}
 
-        let mut total = (tx_successes + tx_send_failures + tx_confirm_failures) as f64;
-        // Avoid division by 0
-        if total < 0.0001 {
-            total = 0.0001;
-        }
+/// Helper function for claiming the rate-limited mutex and constructing an RPC instance
+async fn lock_and_make_rpc(rlmtx: &RLMutex<RpcCfg>) -> RpcClient {
+    let RpcCfg {
+        url,
+        timeout,
+        commitment,
+    } = rlmtx.lock().await.clone();
+    RpcClient::new_with_timeout_and_commitment(url, timeout, commitment)
+}
 
-        let successes_pct = tx_successes as f64 / total * 100.0;
-        let send_failures_pct = tx_send_failures as f64 / total * 100.0;
-        let confirm_failures_pct = tx_confirm_failures as f64 / total * 100.0;
-
-        info!(
-            "Stats since start:
-Runtime: {}s
-TPS: {:.4} tx/s ({:.4} symbols/s)
-Total attestation attempts: {}
-Successful txs: {} ({:.2})%
-Sending failures: {} ({:.2})%
-Confirmation failures: {} ({:.2})%
-Average batch resend delay: {:.2}s",
-            stats_start_time.elapsed().as_secs(),
-            tps,
-            sym_ps,
-            total,
-            tx_successes,
-            successes_pct,
-            tx_send_failures,
-            send_failures_pct,
-            tx_confirm_failures,
-            confirm_failures_pct,
-            batch_wait_times.as_secs() as f64 / (tx_successes as f64 + 0.000001),
+/// A future that decides how a batch is sent.
+///
+/// In daemon mode, attestations of the batch are scheduled
+/// continuously using spawn(), which means that a next attestation of
+/// the same batch begins immediately when a condition is met without
+/// waiting for the previous attempt to finish. Subsequent
+/// attestations are started according to the attestation_conditions
+/// field on the batch. Concurrent requests per batch are limited by
+/// the max_batch_jobs field to prevent excess memory usage on network
+/// slowdowns etc..
+///
+/// With daemon_mode off, this future attempts only one blocking
+/// attestation of the batch and returns the result.
+async fn attestation_sched_job(
+    mut batch: BatchState<'_>,
+    batch_no: usize,
+    batch_count: usize,
+    n_retries: usize,
+    retry_interval: Duration,
+    daemon: bool,
+    rpc_cfg: Arc<RLMutex<RpcCfg>>,
+    p2w_addr: Pubkey,
+    config: Pyth2WormholeConfig,
+    payer: Keypair,
+) -> Result<(), ErrBoxSend> {
+    let mut retries_left = n_retries;
+    // Enforces the max batch job count
+    let sema = Arc::new(Semaphore::new(batch.conditions.max_batch_jobs));
+    loop {
+        debug!(
+            "Batch {}/{}, group {:?}: Scheduling attestation job",
+            batch_no, batch_count, batch.group_name
         );
-    }
 
-    let mut errors = Vec::new();
+        let job = attestation_job(
+            rpc_cfg.clone(),
+            batch_no,
+            batch_count,
+            batch.group_name.clone(),
+            p2w_addr,
+            config.clone(),
+            Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone
+            batch.symbols.to_vec(),
+            sema.clone(),
+        );
 
-    // Filter out errors
-    for (batch_no, state) in batches {
-        match state.get_status() {
-            Success { .. } => {}
-            FailedSend { last_err, .. } | FailedConfirm { last_err, .. } => {
-                errors.push(last_err.to_string())
+        if daemon {
+            // park this routine until a resend condition is met
+            loop {
+                if let Some(reason) = batch
+                    .should_resend(&lock_and_make_rpc(&rpc_cfg).await)
+                    .await
+                {
+                    info!(
+                        "Batch {}/{}, group {}: Resending (reason: {:?})",
+                        batch_no, batch_count, batch.group_name, reason
+                    );
+                    break;
+                }
             }
-            other => {
-                // Be loud about non-terminal states left behind
-                let msg = format!(
-                    "INTERNAL: Batch {} left in non-terminal state {:#?}",
-                    batch_no, other
-                );
 
-                error!("{}", msg);
+            if sema.available_permits() == 0 {
+                warn!(
+                    "Batch {}/{}, group {:?}: Ran out of job \
+                             permits, some attestation conditions may be \
+                             delayed. For better accuracy, increase \
+                             max_batch_jobs or adjust attestation \
+                             conditions",
+                    batch_no, batch_count, batch.group_name
+                );
+            }
 
-                errors.push(msg);
+            // This short-lived permit prevents scheduling
+            // excess attestation jobs (which could eventually
+            // eat all memory). It is freed as soon as we
+            // leave this code block.
+            let _permit4sched = sema.acquire().await?;
+
+            let batch_no4err_msg = batch_no.clone();
+            let batch_count4err_msg = batch_count.clone();
+            let group_name4err_msg = batch.group_name.clone();
+
+            // We never get to error reporting in daemon mode, attach a map_err
+            let job_with_err_msg = job.map_err(move |e| async move {
+                warn!(
+                    "Batch {}/{}, group {:?} ERR: {}",
+                    batch_no4err_msg,
+                    batch_count4err_msg,
+                    group_name4err_msg,
+                    e.to_string()
+                );
+                e
+            });
+
+            // Spawn the job in background
+            let _detached_job: JoinHandle<_> = tokio::spawn(job_with_err_msg);
+        } else {
+            // Await and return the single result in non-daemon mode, with retries if necessary
+            match job.await {
+                Ok(_) => return Ok(()),
+                Err(e) => {
+                    if retries_left == 0 {
+                        return Err(e);
+                    } else {
+                        retries_left -= 1;
+                        debug!(
+                            "{}/{}, group {:?}: attestation failure: {}",
+                            batch_no,
+                            batch_count,
+                            batch.group_name,
+                            e.to_string()
+                        );
+                        info!(
+                            "Batch {}/{}, group {:?}: retrying in {}.{}s, {} retries left",
+                            batch_no,
+                            batch_count,
+                            batch.group_name,
+                            retry_interval.as_secs(),
+                            retry_interval.subsec_millis(),
+                            retries_left,
+                        );
+
+                        tokio::time::sleep(retry_interval).await;
+                    }
+                }
             }
         }
+
+        batch.last_job_finished_at = Instant::now();
     }
+}
 
-    if !errors.is_empty() {
-        let err_list = errors.join("\n");
+/// A future for a single attempt to attest a batch on Solana.
+async fn attestation_job(
+    rlmtx: Arc<RLMutex<RpcCfg>>,
+    batch_no: usize,
+    batch_count: usize,
+    group_name: String,
+    p2w_addr: Pubkey,
+    config: Pyth2WormholeConfig,
+    payer: Keypair,
+    symbols: Vec<P2WSymbol>,
+    max_jobs_sema: Arc<Semaphore>,
+) -> Result<(), ErrBoxSend> {
+    // Will be dropped after attestation is complete
+    let _permit = max_jobs_sema.acquire().await?;
+
+    debug!(
+        "Batch {}/{}, group {:?}: Starting attestation job",
+        batch_no, batch_count, group_name
+    );
+    let rpc = lock_and_make_rpc(&*rlmtx).await; // Reuse the same lock for the blockhash/tx/get_transaction
+    let latest_blockhash = rpc
+        .get_latest_blockhash()
+        .map_err(|e| -> ErrBoxSend { e.into() })
+        .await?;
+
+    let tx_res: Result<_, ErrBoxSend> = gen_attest_tx(
+        p2w_addr,
+        &config,
+        &payer,
+        symbols.as_slice(),
+        &Keypair::new(),
+        latest_blockhash,
+    );
+    let tx = tx_res?;
+    let sig = rpc
+        .send_and_confirm_transaction(&tx)
+        .map_err(|e| -> ErrBoxSend { e.into() })
+        .await?;
+    let tx_data = rpc
+        .get_transaction(&sig, UiTransactionEncoding::Json)
+        .map_err(|e| -> ErrBoxSend { e.into() })
+        .await?;
+    let seqno = tx_data
+        .transaction
+        .meta
+        .and_then(|meta| meta.log_messages)
+        .and_then(|logs| {
+            let mut seqno = None;
+            for log in logs {
+                if log.starts_with(SEQNO_PREFIX) {
+                    seqno = Some(log.replace(SEQNO_PREFIX, ""));
+                    break;
+                }
+            }
+            seqno
+        })
+        .ok_or_else(|| -> ErrBoxSend { format!("No seqno in program logs").into() })?;
 
-        Err(format!(
-            "{} of {} batches failed:\n{}",
-            errors.len(),
-            batch_count,
-            err_list
-        )
-        .into())
-    } else {
-        Ok(())
-    }
+    info!(
+        "Batch {}/{}, group {:?} OK",
+        batch_no, batch_count, group_name
+    );
+    // NOTE(2022-03-09): p2w_autoattest.py relies on parsing this println!{}
+    println!("Sequence number: {}", seqno);
+    Result::<(), ErrBoxSend>::Ok(())
 }
 
 fn init_logging(verbosity: u32) {

+ 97 - 0
solana/pyth2wormhole/client/src/util.rs

@@ -0,0 +1,97 @@
+use log::trace;
+
+use std::{
+    ops::{
+        Deref,
+        DerefMut,
+    },
+    time::{
+        Duration,
+        Instant,
+    },
+};
+use tokio::sync::{
+    Mutex,
+    MutexGuard,
+};
+
+/// Rate-limited mutex. Ensures there's a period of minimum rl_interval between lock acquisitions
+pub struct RLMutex<T> {
+    mtx: Mutex<RLMutexState<T>>,
+    rl_interval: Duration,
+}
+
+/// Helper to make the last_released writes also guarded by the mutex
+pub struct RLMutexState<T> {
+    /// Helps make sure regular passage of time is subtracted from sleep duration
+    last_released: Instant,
+    val: T,
+}
+
+impl<T> Deref for RLMutexState<T> {
+    type Target = T;
+    fn deref(&self) -> &Self::Target {
+        &self.val
+    }
+}
+
+impl<T> DerefMut for RLMutexState<T> {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.val
+    }
+}
+
+/// Helper wrapper to record lock release times via Drop
+pub struct RLMutexGuard<'a, T> {
+    guard: MutexGuard<'a, RLMutexState<T>>,
+}
+
+impl<'a, T> Drop for RLMutexGuard<'a, T> {
+    fn drop(&mut self) {
+        let state: &mut RLMutexState<T> =
+            MutexGuard::<'a, RLMutexState<T>>::deref_mut(&mut self.guard);
+        state.last_released = Instant::now();
+    }
+}
+
+impl<'a, T> Deref for RLMutexGuard<'a, T> {
+    type Target = T;
+    fn deref(&self) -> &Self::Target {
+        self.guard.deref()
+    }
+}
+
+impl<'a, T> DerefMut for RLMutexGuard<'a, T> {
+    fn deref_mut(&mut self) -> &mut T {
+        self.guard.deref_mut()
+    }
+}
+
+impl<T> RLMutex<T> {
+    pub fn new(val: T, rl_interval: Duration) -> Self {
+        Self {
+            mtx: Mutex::new(RLMutexState {
+                last_released: Instant::now() - rl_interval,
+                val,
+            }),
+            rl_interval,
+        }
+    }
+
+    pub async fn lock(&self) -> RLMutexGuard<'_, T> {
+        let guard = self.mtx.lock().await;
+        let elapsed = guard.last_released.elapsed();
+        if elapsed < self.rl_interval {
+            let sleep_time = self.rl_interval - elapsed;
+            trace!(
+                "RLMutex: Parking lock future for {}.{}s",
+                sleep_time.as_secs(),
+                sleep_time.subsec_millis()
+            );
+
+            tokio::time::sleep(sleep_time).await;
+        }
+
+        RLMutexGuard { guard }
+    }
+}

+ 1 - 1
solana/pyth2wormhole/client/tests/test_attest.rs

@@ -37,7 +37,7 @@ use fixtures::{
 };
 
 #[tokio::test]
-async fn test_happy_path() -> Result<(), solitaire::ErrBox> {
+async fn test_happy_path() -> Result<(), p2wc::ErrBoxSend> {
     // Programs
     let p2w_program_id = Pubkey::new_unique();
     let wh_fixture_program_id = Pubkey::new_unique();

+ 3 - 6
solana/pyth2wormhole/program/src/migrate.rs

@@ -51,11 +51,7 @@ impl<'b> InstructionContext<'b> for Migrate<'b> {
     }
 }
 
-pub fn migrate(
-    ctx: &ExecutionContext,
-    accs: &mut Migrate,
-    data: (),
-) -> SoliResult<()> {
+pub fn migrate(ctx: &ExecutionContext, accs: &mut Migrate, data: ()) -> SoliResult<()> {
     let old_config: &OldPyth2WormholeConfig = &accs.old_config.1;
 
     if &old_config.owner != accs.current_owner.info().key {
@@ -82,7 +78,8 @@ pub fn migrate(
     **accs.old_config.info().lamports.borrow_mut() = 0;
 
     // Credit payer with saved balance
-    accs.payer.info()
+    accs.payer
+        .info()
         .lamports
         .borrow_mut()
         .checked_add(old_config_balance_val)

+ 11 - 6
third_party/pyth/p2w_autoattest.py

@@ -184,9 +184,9 @@ if P2W_ATTESTATION_CFG is None:
     cfg_yaml = """
 ---
 symbol_groups:
-  - group_name: things
+  - group_name: fast_interval_only
     conditions:
-      min_interval_secs: 17
+      min_interval_secs: 3
     symbols:
 """
 
@@ -206,9 +206,10 @@ symbol_groups:
         product_addr: {product}"""
 
     cfg_yaml += f"""
-  - group_name: stuff
+  - group_name: longer_interval_sensitive_changes
     conditions:
-      min_interval_secs: 19
+      min_interval_secs: 10
+      price_changed_pct: 3 
     symbols:
 """
 
@@ -231,8 +232,10 @@ symbol_groups:
 first_attest_result = run_or_die(
     [
         "pyth2wormhole-client",
+        "--commitment",
+        "finalized",
         "--log-level",
-        "4",
+        "3",
         "--p2w-addr",
         P2W_SOL_ADDRESS,
         "--rpc-url",
@@ -266,8 +269,10 @@ while True:
     p2w_client_process = Popen(
         [
             "pyth2wormhole-client",
+            "--commitment",
+            "finalized",
             "--log-level",
-            "4",
+            "3",
             "--p2w-addr",
             P2W_SOL_ADDRESS,
             "--rpc-url",