Browse Source

Drozdziak1/p2w client sym groups (#199)

* pyth2wormhole-client: Implement and use daemon mode

commit-id:7001bdf7

* pyth2wormhole-client: harden CLI, simplify attestation loop

commit-id:ad5e4857

* pyth2wormhole-client: Refactor symbol config into groups

This commit implements the notion of a symbol group - a collection of
symbols that are attested according to the specified set of
condition. Currently, only an attestation frequency is supported. In
the future, this value will serve as a fallback for symbols that
rarely trip other conditions.

commit-id:cf19cc41

* p2w_autoattest.py: Use symbol groups in config

commit-id:1153c5f7

* Dockerfile.client: bump toolchain version to mitigate compiler error

The previous compiler version encountered a bug when building
pyth2wormhole-client for testing. The error message mentioned serde,
which is tested inside the `attestation_cfg.rs` module.

commit-id:61c12427

* p2w_autoattest.py: Add another test group

commit-id:2e29583c

* p2w-client: Get rid of sent_at, clarify messages, add status setter

commit-id:87653ab1
Stanisław Drozd 3 years ago
parent
commit
a1e50ae636

+ 1 - 1
Dockerfile.client

@@ -5,7 +5,7 @@ RUN apt-get update && apt-get install -yq libssl-dev libudev-dev pkg-config zlib
 RUN curl -fsSL https://deb.nodesource.com/setup_16.x | bash - && apt-get install -y nodejs
 RUN curl -sSfL https://release.solana.com/v1.9.4/install | sh
 
-RUN rustup default nightly-2022-01-02
+RUN rustup default nightly-2022-02-01
 RUN rustup component add rustfmt
 
 RUN --mount=type=cache,target=/root/.cache \

+ 9 - 4
solana/pyth2wormhole/Cargo.lock

@@ -1143,6 +1143,9 @@ name = "hex"
 version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
+dependencies = [
+ "serde",
+]
 
 [[package]]
 name = "hidapi"
@@ -1716,6 +1719,7 @@ dependencies = [
 name = "p2w-sdk"
 version = "0.1.1"
 dependencies = [
+ "hex",
  "pyth-sdk-solana",
  "serde",
  "solana-program",
@@ -1918,21 +1922,22 @@ dependencies = [
 
 [[package]]
 name = "pyth-sdk"
-version = "0.3.0"
+version = "0.4.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cb06993b8c8ad7f50042e8b6b6ae4ed2a98722495845b12efc9a12f4301b901b"
+checksum = "f262b88557d8f152a247e1be786a8359d63112fac0a6e49fa41082a8ef789e8d"
 dependencies = [
  "borsh",
  "borsh-derive",
+ "hex",
  "schemars",
  "serde",
 ]
 
 [[package]]
 name = "pyth-sdk-solana"
-version = "0.3.0"
+version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b83f33cbdeccc350e021f6b4bc714655aa38352fac80a93b16b1902863aedb62"
+checksum = "2e37614ced8a0a61637111f714a08811fb7a677df3719c0a5b261e1d13d50de6"
 dependencies = [
  "borsh",
  "borsh-derive",

+ 48 - 12
solana/pyth2wormhole/client/src/attestation_cfg.rs

@@ -1,4 +1,7 @@
-use std::str::FromStr;
+use std::{
+    collections::HashMap,
+    str::FromStr,
+};
 
 use serde::{
     de::Error,
@@ -12,11 +15,25 @@ use solana_program::pubkey::Pubkey;
 /// Pyth2wormhole config specific to attestation requests
 #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
 pub struct AttestationConfig {
+    pub symbol_groups: Vec<SymbolGroup>,
+}
+
+#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
+pub struct SymbolGroup {
+    pub group_name: String,
+    /// Attestation conditions applied to all symbols in this group
+    pub conditions: AttestationConditions,
     pub symbols: Vec<P2WSymbol>,
 }
 
+#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
+pub struct AttestationConditions {
+    /// How often to attest
+    pub min_freq_secs: u64,
+}
+
 /// Config entry for a Pyth product + price pair
-#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
+#[derive(Default, Debug, Deserialize, Serialize, PartialEq, Eq)]
 pub struct P2WSymbol {
     /// User-defined human-readable name
     pub name: Option<String>,
@@ -59,27 +76,46 @@ mod tests {
 
     #[test]
     fn test_sanity() -> Result<(), ErrBox> {
-        let initial = AttestationConfig {
+        let fastbois = SymbolGroup {
+            group_name: "fast bois".to_owned(),
+            conditions: AttestationConditions { min_freq_secs: 5 },
+            symbols: vec![
+                P2WSymbol {
+                    name: Some("ETHUSD".to_owned()),
+                    ..Default::default()
+                },
+                P2WSymbol {
+                    name: Some("BTCUSD".to_owned()),
+                    ..Default::default()
+                },
+            ],
+        };
+
+        let slowbois = SymbolGroup {
+            group_name: "slow bois".to_owned(),
+            conditions: AttestationConditions { min_freq_secs: 200 },
             symbols: vec![
                 P2WSymbol {
-                    name: Some("ETH/USD".to_owned()),
-                    product_addr: Default::default(),
-                    price_addr: Default::default(),
+                    name: Some("CNYAUD".to_owned()),
+                    ..Default::default()
                 },
                 P2WSymbol {
-                    name: None,
-                    product_addr: Pubkey::new(&[42u8; 32]),
-                    price_addr: Default::default(),
+                    name: Some("INRPLN".to_owned()),
+                    ..Default::default()
                 },
             ],
         };
 
-        let serialized = serde_yaml::to_string(&initial)?;
-        eprintln!("Serialized:\n{}", serialized);
+        let cfg = AttestationConfig {
+            symbol_groups: vec![fastbois, slowbois],
+        };
+
+        let serialized = serde_yaml::to_string(&cfg)?;
 
         let deserialized: AttestationConfig = serde_yaml::from_str(&serialized)?;
 
-        assert_eq!(initial, deserialized);
+        assert_eq!(cfg, deserialized);
+
         Ok(())
     }
 }

+ 0 - 8
solana/pyth2wormhole/client/src/cli.rs

@@ -63,14 +63,6 @@ pub enum Action {
             about = "Do not stop attesting. In this mode, this program will behave more like a daemon and continuously attest the specified symbols.",
         )]
         daemon: bool,
-        #[clap(
-            short = 'b',
-            long = "--batch-interval",
-            about = "How often in seconds to transmit each batch. Only active with --daemon.",
-            default_value = "30",
-            requires_if("true", "daemon"),
-        )]
-        batch_interval_secs: u64,
         #[clap(
             short = 't',
             long = "--timeout",

+ 1 - 3
solana/pyth2wormhole/client/src/lib.rs

@@ -52,9 +52,7 @@ use pyth2wormhole::{
     Pyth2WormholeConfig,
 };
 
-pub use attestation_cfg::P2WSymbol;
-
-pub use attestation_cfg::AttestationConfig;
+pub use attestation_cfg::{AttestationConfig, AttestationConditions, P2WSymbol};
 
 pub fn gen_init_tx(
     payer: Keypair,

+ 105 - 115
solana/pyth2wormhole/client/src/main.rs

@@ -15,6 +15,7 @@ use log::{
     error,
     info,
     warn,
+    trace,
     LevelFilter,
 };
 use solana_client::rpc_client::RpcClient;
@@ -95,7 +96,6 @@ fn main() -> Result<(), ErrBox> {
             ref attestation_cfg,
             n_retries,
             daemon,
-            batch_interval_secs,
             conf_timeout_secs,
             rpc_interval_ms,
         } => {
@@ -110,7 +110,6 @@ fn main() -> Result<(), ErrBox> {
                 &attestation_cfg,
                 n_retries,
                 daemon,
-                Duration::from_secs(batch_interval_secs),
                 Duration::from_secs(conf_timeout_secs),
                 Duration::from_millis(rpc_interval_ms),
             )?;
@@ -121,35 +120,43 @@ fn main() -> Result<(), ErrBox> {
 }
 
 #[derive(Debug)]
-pub enum BatchTxState<'a> {
+pub struct BatchState<'a> {
+    group_name: String,
+    symbols: &'a [P2WSymbol],
+    conditions: AttestationConditions,
+    status: BatchTxStatus,
+    status_changed_at: Instant,
+}
+
+impl BatchState<'_> {
+    /// Helps make state changes one-liners
+    pub fn set_status(&mut self, s: BatchTxStatus) {
+        self.status = s;
+        self.status_changed_at = Instant::now();
+    }
+}
+
+#[derive(Debug)]
+pub enum BatchTxStatus {
     Sending {
-        symbols: &'a [P2WSymbol],
         attempt_no: usize,
     },
     Confirming {
-        symbols: &'a [P2WSymbol],
         attempt_no: usize,
         signature: Signature,
-        sent_at: Instant,
     },
     Success {
-        symbols: &'a [P2WSymbol],
-        occured_at: Instant,
         seqno: String,
     },
     FailedSend {
-        symbols: &'a [P2WSymbol],
-        occured_at: Instant,
         last_err: ErrBox,
     },
     FailedConfirm {
-        symbols: &'a [P2WSymbol],
-        occured_at: Instant,
         last_err: ErrBox,
     },
 }
 
-use BatchTxState::*;
+use BatchTxStatus::*;
 
 /// Send a series of batch attestations for symbols of an attestation config.
 fn handle_attest(
@@ -159,7 +166,6 @@ fn handle_attest(
     attestation_cfg: &AttestationConfig,
     n_retries: usize,
     daemon: bool,
-    batch_interval: Duration,
     conf_timeout: Duration,
     rpc_interval: Duration,
 ) -> Result<(), ErrBox> {
@@ -170,42 +176,45 @@ fn handle_attest(
 
     let config = get_config_account(rpc_client, &p2w_addr)?;
 
-    let batch_count = {
-        let whole_batches = attestation_cfg.symbols.len() / config.max_batch_size as usize;
-
-        // Include  partial batch if there is a remainder
-        if attestation_cfg.symbols.len() % config.max_batch_size as usize > 0 {
-            whole_batches + 1
-        } else {
-            whole_batches
-        }
-    };
-
     debug!("Symbol config:\n{:#?}", attestation_cfg);
 
     info!(
-        "{} symbols read, max batch size {}, dividing into {} batches",
-        attestation_cfg.symbols.len(),
-        config.max_batch_size,
-        batch_count
+        "{} symbol groups read, dividing into batches",
+        attestation_cfg.symbol_groups.len(),
     );
 
     // Reused for failed batch retries
     let mut batches: Vec<_> = attestation_cfg
-        .symbols
-        .as_slice()
-        .chunks(config.max_batch_size as usize)
-        .enumerate()
-        .map(|(idx, symbols)| {
-            (
-                idx + 1,
-                BatchTxState::Sending {
-                    symbols,
-                    attempt_no: 1,
-                },
-            )
+        .symbol_groups
+        .iter()
+        .map(|g| {
+            // FIXME: The forbidden nested closure move technique (a lost art of pleasing the borrow checker)
+            let conditions4closure = g.conditions.clone();
+            let name4closure = g.group_name.clone();
+
+            info!("Group {:?}, {} symbols", g.group_name, g.symbols.len(),);
+
+            // Divide group into batches
+            g.symbols
+                .as_slice()
+                .chunks(config.max_batch_size as usize)
+                .enumerate()
+                .map(move |(idx, symbols)| {
+                    (
+                        idx + 1,
+                        BatchState {
+                            conditions: conditions4closure.clone(),
+                            group_name: name4closure.clone(),
+                            symbols,
+                            status: Sending { attempt_no: 1 },
+                            status_changed_at: Instant::now(),
+                        },
+                    )
+                })
         })
+        .flatten()
         .collect();
+    let batch_count = batches.len();
 
     // NOTE(2022-04-26): only increment this if `daemon` is false
     let mut finished_count = 0;
@@ -214,16 +223,15 @@ fn handle_attest(
     while daemon || finished_count < batches.len() {
         finished_count = 0;
         for (batch_no, state) in batches.iter_mut() {
-            match state {
-                BatchTxState::Sending {
-                    symbols,
-                    attempt_no,
-                } => {
+            match state.status {
+                Sending { attempt_no } => {
                     info!(
-                        "Batch {}/{} contents: {:?}",
+                        "Batch {}/{} contents (group {:?}): {:?}",
                         batch_no,
                         batch_count,
-                        symbols
+                        state.group_name,
+                        state
+                            .symbols
                             .iter()
                             .map(|s| s
                                 .name
@@ -241,7 +249,7 @@ fn handle_attest(
                                 p2w_addr,
                                 &config,
                                 &payer,
-                                symbols,
+                                state.symbols,
                                 &Keypair::new(),
                                 latest_blockhash,
                             )?;
@@ -255,57 +263,48 @@ fn handle_attest(
                     match res {
                         Ok(signature) => {
                             info!(
-                                "Batch {}/{} tx send: OK (Attempt {} of {})",
-                                batch_no, batch_count, attempt_no, n_retries
+                                "Batch {}/{} (group {:?}) tx send: OK (Attempt {} of {})",
+                                batch_no, batch_count, state.group_name, attempt_no, n_retries
                             );
 
-                            // Record when we've sent this tx
-
-                            *state = BatchTxState::Confirming {
-                                symbols,
-                                attempt_no: *attempt_no,
+                            state.set_status(Confirming {
+                                attempt_no,
                                 signature,
-                                sent_at: Instant::now(),
-                            }
+                            });
                         }
                         Err(e) => {
                             let msg = format!(
-                                "Batch {}/{} tx send error (attempt {} of {}): {}",
+                                "Batch {}/{} (group {:?}) tx send error (attempt {} of {}): {}",
                                 batch_no,
                                 batch_count,
+                                state.group_name,
                                 attempt_no,
                                 n_retries + 1,
                                 e.to_string()
                             );
                             warn!("{}", &msg);
 
-                            if *attempt_no < n_retries {
-                                *state = BatchTxState::Sending {
-                                    attempt_no: *attempt_no + 1,
-                                    symbols,
-                                }
+                            if attempt_no < n_retries {
+                                state.set_status(Sending {
+                                    attempt_no: attempt_no + 1,
+                                })
                             } else {
                                 // This batch failed all attempts, note the error but do not schedule for retry
                                 error!(
-                                    "Batch {}/{} tx send: All {} attempts failed",
+                                    "Batch {}/{} (group {:?}) tx send: All {} attempts failed",
+                                    state.group_name,
                                     batch_no,
                                     batch_count,
                                     n_retries + 1
                                 );
-                                *state = BatchTxState::FailedSend {
-                                    symbols,
-                                    occured_at: Instant::now(),
-                                    last_err: e,
-                                };
+                                state.set_status(FailedSend { last_err: e });
                             }
                         }
                     }
                 }
-                BatchTxState::Confirming {
-                    symbols,
+                Confirming {
                     attempt_no,
                     signature,
-                    sent_at,
                 } => {
                     let res = rpc_client
                         .get_transaction(&signature, UiTransactionEncoding::Json)
@@ -334,18 +333,15 @@ fn handle_attest(
                             println!("Sequence number: {}", seqno);
                             info!("Batch {}/{}: OK, seqno {}", batch_no, batch_count, seqno);
 
-                            *state = BatchTxState::Success {
-                                symbols,
-                                seqno,
-                                occured_at: Instant::now(),
-                            };
+                            state.set_status(Success { seqno });
                         }
                         Err(e) => {
-                            let elapsed = sent_at.elapsed();
+                            let elapsed = state.status_changed_at.elapsed();
                             let msg = format!(
-                                "Batch {}/{} tx confirmation failed ({}.{}/{}.{}): {}",
+                                "Batch {}/{} (groups {:?}) tx confirmation failed ({}.{}/{}.{}): {}",
                                 batch_no,
                                 batch_count,
+                                state.group_name,
                                 elapsed.as_secs(),
                                 elapsed.subsec_millis(),
                                 conf_timeout.as_secs(),
@@ -359,7 +355,8 @@ fn handle_attest(
                                 // note the error and schedule for a
                                 // fresh send attempt
                                 warn!(
-                                    "Batch {}/{} tx confirm: Took more than {}.{} seconds (attempt {} of {}): {}",
+                                    "Batch {}/{} (group {:?}) tx confirm: Took more than {}.{} seconds (attempt {} of {}): {}",
+                                    state.group_name,
                                     batch_no,
                                     batch_count,
                                     conf_timeout.as_secs(),
@@ -368,52 +365,46 @@ fn handle_attest(
 				    msg
                                 );
 
-                                if *attempt_no < n_retries {
-                                    *state = BatchTxState::Sending {
-                                        symbols,
-                                        attempt_no: *attempt_no + 1,
-                                    };
+                                if attempt_no < n_retries {
+                                    state.set_status(Sending {
+                                        attempt_no: attempt_no + 1,
+                                    });
                                 } else {
                                     error!(
-                                        "Batch {}/{} tx confirm: All {} attempts failed",
+                                        "Batch {}/{} (group {:?}) tx confirm: All {} attempts failed",
+                                        state.group_name,
                                         batch_no,
                                         batch_count,
                                         n_retries + 1
                                     );
-                                    *state = BatchTxState::FailedConfirm {
-                                        symbols,
-                                        occured_at: Instant::now(),
-                                        last_err: e,
-                                    };
+                                    state.set_status(FailedConfirm { last_err: e });
                                 }
                             }
                         }
                     }
                 }
-                Success {
-                    symbols,
-                    occured_at,
-                    ..
-                }
-                | FailedSend {
-                    symbols,
-                    occured_at,
-                    ..
-                }
-                | FailedConfirm {
-                    symbols,
-                    occured_at,
-                    ..
-                } => {
+                Success { .. } | FailedSend { .. } | FailedConfirm { .. } => {
                     // We only try to re-schedule under --daemon
                     if daemon {
-                        if occured_at.elapsed() > batch_interval {
-                            *state = BatchTxState::Sending {
-                                symbols,
-                                attempt_no: 1,
-                            };
+                        if state.status_changed_at.elapsed()
+                            > Duration::from_secs(state.conditions.min_freq_secs)
+                        {
+                            state.set_status(Sending { attempt_no: 1 });
+                        } else {
+                            let elapsed = state.status_changed_at.elapsed();
+                            trace!(
+                                "Batch {}/{} (group {:?}): waiting ({}.{}/{}.{})",
+                                batch_no,
+                                batch_count,
+                                state.group_name,
+                                elapsed.as_secs(),
+                                elapsed.subsec_millis(),
+                                conf_timeout.as_secs(),
+                                conf_timeout.subsec_millis()
+                            )
                         }
-                    } 
+                    }
+
                     // Track the finished batches
                     finished_count += 1;
 
@@ -429,8 +420,7 @@ fn handle_attest(
 
     // Filter out errors
     for (batch_no, state) in batches {
-        use BatchTxState::*;
-        match state {
+        match state.status {
             Success { .. } => {}
             FailedSend { last_err, .. } | FailedConfirm { last_err, .. } => {
                 errors.push(last_err.to_string())

+ 37 - 11
third_party/pyth/p2w_autoattest.py

@@ -177,24 +177,50 @@ if P2W_ATTESTATION_CFG is None:
         logging.error("Bad Content type")
         sys.exit(1)
 
-    cfg_yaml = f"""
----
-symbols:"""
-
     logging.info(
         f"Retrieved {len(pyth_accounts)} Pyth accounts from endpoint: {pyth_accounts}"
     )
 
-    for acc in pyth_accounts:
+    cfg_yaml = """
+---
+symbol_groups:
+  - group_name: things
+    conditions:
+      min_freq_secs: 17
+    symbols:
+"""
+
+    # integer-divide the symbols in ~half for two test
+    # groups. Assumes arr[:idx] is exclusive, and arr[idx:] is
+    # inclusive
+    half_len = len(pyth_accounts) // 2;
+
+    for thing in pyth_accounts[:half_len]:
+        name = thing["name"]
+        price = thing["price"]
+        product = thing["product"]
 
-        name = acc["name"]
-        price = acc["price"]
-        product = acc["product"]
+        cfg_yaml += f"""
+      - name: {name}
+        price_addr: {price}
+        product_addr: {product}"""
+
+    cfg_yaml += f"""
+  - group_name: stuff
+    conditions:
+      min_freq_secs: 19
+    symbols:
+"""
+
+    for stuff in pyth_accounts[half_len:]:
+        name = stuff["name"]
+        price = stuff["price"]
+        product = stuff["product"]
 
         cfg_yaml += f"""
-    - name: {name}
-      price_addr: {price}
-      product_addr: {product}"""
+      - name: {name}
+        price_addr: {price}
+        product_addr: {product}"""
 
     with open(P2W_ATTESTATION_CFG, "w") as f:
         f.write(cfg_yaml)