Bladeren bron

p2w-client: Implement additional trigger conditions

From now on, we support price change by a given percentage as well as
publish time changes for triggering attestations.

commit-id:9bd145e1
Stan Drozd 3 jaren geleden
bovenliggende
commit
15cfcc6541

+ 1 - 0
solana/pyth2wormhole/Cargo.lock

@@ -1977,6 +1977,7 @@ dependencies = [
  "log",
  "p2w-sdk",
  "pyth-client 0.5.0",
+ "pyth-sdk-solana",
  "pyth2wormhole",
  "serde",
  "serde_yaml",

+ 1 - 0
solana/pyth2wormhole/client/Cargo.toml

@@ -20,6 +20,7 @@ log = "0.4.14"
 wormhole-bridge-solana = {path = "../../bridge/program"}
 pyth2wormhole = {path = "../program"}
 p2w-sdk = { path = "../../../third_party/pyth/p2w-sdk/rust", features=["solana"] }
+pyth-sdk-solana = "0.4.0"
 serde = "1"
 serde_yaml = "0.8"
 shellexpand = "2.1.0"

+ 33 - 7
solana/pyth2wormhole/client/src/attestation_cfg.rs

@@ -13,12 +13,12 @@ use serde::{
 use solana_program::pubkey::Pubkey;
 
 /// Pyth2wormhole config specific to attestation requests
-#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
 pub struct AttestationConfig {
     pub symbol_groups: Vec<SymbolGroup>,
 }
 
-#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
 pub struct SymbolGroup {
     pub group_name: String,
     /// Attestation conditions applied to all symbols in this group
@@ -26,10 +26,22 @@ pub struct SymbolGroup {
     pub symbols: Vec<P2WSymbol>,
 }
 
-#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
+/// Spontaneous attestation triggers. Attestation is triggered if any
+/// of the active conditions is met. Option<> fields can be
+/// de-activated with None. All conditions are inactive by default.
+#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq)]
 pub struct AttestationConditions {
-    /// How often to attest
-    pub min_freq_secs: u64,
+    /// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation.
+    #[serde(default)]
+    pub min_interval_secs: Option<u64>,
+
+    /// Trigger attestation if price changes by the specified percentage.
+    #[serde(default)]
+    pub price_changed_pct: Option<f32>,
+
+    /// Trigger attestation if publish_time changes
+    #[serde(default)]
+    pub publish_time_changed: bool,
 }
 
 /// Config entry for a Pyth product + price pair
@@ -50,6 +62,14 @@ pub struct P2WSymbol {
     pub price_addr: Pubkey,
 }
 
+impl ToString for P2WSymbol {
+    fn to_string(&self) -> String {
+        self.name
+            .clone()
+            .unwrap_or(format!("Unnamed product {}", self.product_addr))
+    }
+}
+
 // Helper methods for strinigified SOL addresses
 
 fn pubkey_string_ser<S>(k: &Pubkey, ser: S) -> Result<S::Ok, S::Error>
@@ -78,7 +98,10 @@ mod tests {
     fn test_sanity() -> Result<(), ErrBox> {
         let fastbois = SymbolGroup {
             group_name: "fast bois".to_owned(),
-            conditions: AttestationConditions { min_freq_secs: 5 },
+            conditions: AttestationConditions {
+                min_interval_secs: Some(5),
+                ..Default::default()
+            },
             symbols: vec![
                 P2WSymbol {
                     name: Some("ETHUSD".to_owned()),
@@ -93,7 +116,10 @@ mod tests {
 
         let slowbois = SymbolGroup {
             group_name: "slow bois".to_owned(),
-            conditions: AttestationConditions { min_freq_secs: 200 },
+            conditions: AttestationConditions {
+                min_interval_secs: Some(200),
+                ..Default::default()
+            },
             symbols: vec![
                 P2WSymbol {
                     name: Some("CNYAUD".to_owned()),

+ 110 - 1
solana/pyth2wormhole/client/src/batch_state.rs

@@ -1,6 +1,16 @@
+use log::{
+    debug,
+    warn,
+};
+use solana_client::rpc_client::RpcClient;
 use solana_sdk::signature::Signature;
 
-use std::time::Instant;
+use pyth_sdk_solana::state::PriceAccount;
+
+use std::time::{
+    Duration,
+    Instant,
+};
 
 use crate::{
     AttestationConditions,
@@ -12,6 +22,7 @@ use crate::{
 pub struct BatchState<'a> {
     pub group_name: String,
     pub symbols: &'a [P2WSymbol],
+    pub last_known_symbol_states: Vec<Option<PriceAccount>>,
     pub conditions: AttestationConditions,
     status: BatchTxStatus,
     status_changed_at: Instant,
@@ -27,6 +38,7 @@ impl<'a> BatchState<'a> {
             group_name,
             symbols,
             conditions,
+            last_known_symbol_states: vec![None; symbols.len()],
             status: BatchTxStatus::Sending { attempt_no: 1 },
             status_changed_at: Instant::now(),
         }
@@ -38,11 +50,108 @@ impl<'a> BatchState<'a> {
     pub fn get_status(&self) -> &BatchTxStatus {
         &self.status
     }
+
     /// Ensure that status changes are accompanied by a timestamp bump
     pub fn set_status(&mut self, s: BatchTxStatus) {
         self.status_changed_at = Instant::now();
         self.status = s;
     }
+
+    /// Evaluate the configured attestation conditions for this
+    /// batch. Returns Some("<reason>") if any trigger condition was
+    /// met. Only the first encountered condition is mentioned.
+    pub fn should_resend(&mut self, c: &RpcClient) -> Option<String> {
+        let sym_count = self.symbols.len();
+        let mut new_symbol_states: Vec<Option<PriceAccount>> = Vec::with_capacity(sym_count);
+        for (idx, sym) in self.symbols.iter().enumerate() {
+            let new_state = match c
+                .get_account_data(&sym.price_addr)
+                .map_err(|e| e.to_string())
+                .and_then(|bytes| {
+                    pyth_sdk_solana::state::load_price_account(&bytes)
+                        .map(|state| state.clone())
+                        .map_err(|e| e.to_string())
+                }) {
+                Ok(state) => Some(state),
+                Err(e) => {
+                    warn!(
+                        "Symbol {} ({}/{}): Could not look up state: {}",
+                        sym.name
+                            .as_ref()
+                            .unwrap_or(&format!("Unnamed product {}", sym.product_addr)),
+                        idx + 1,
+                        sym_count,
+                        e.to_string()
+                    );
+                    None
+                }
+            };
+
+            new_symbol_states.push(new_state);
+        }
+
+        // min interval
+        if let Some(i) = self.conditions.min_interval_secs.as_ref() {
+            if self.get_status_changed_at().elapsed() > Duration::from_secs(*i) {
+                return Some(format!(
+                    "minimum interval of {}s elapsed since last state change",
+                    i
+                ));
+            }
+        }
+
+        let mut ret = None;
+        for (idx, old_new_tup) in self
+            .last_known_symbol_states
+            .iter_mut() // Borrow mutably to make the update easier
+            .zip(new_symbol_states.iter())
+            .enumerate()
+        {
+            //  Only evaluate if a triggering condition is not already met
+            if ret.is_none() {
+                match old_new_tup {
+                    (Some(old), Some(new)) => {
+                        // publish_time_changed
+                        if self.conditions.publish_time_changed && new.timestamp > old.timestamp {
+                            ret = Some(format!(
+                                "publish_time advanced for {:?}",
+                                self.symbols[idx].to_string(),
+                            ))
+
+                        // price_changed_pct
+                        } else if let Some(pct) = self.conditions.price_changed_pct {
+                            let price_pct_diff = (old.agg.price as f32 - new.agg.price as f32)
+                                / old.agg.price as f32
+                                * 100.0;
+
+                            if price_pct_diff > pct {
+                                ret = Some(format!(
+                                    "price moved by at least {}% for {:?}",
+                                    pct,
+                                    self.symbols[idx].to_string()
+                                ))
+                            }
+                        }
+                    }
+                    _ => {
+                        debug!(
+                            "Symbol {:?} {}/{}, old or new state value is None, skipping...",
+                            self.symbols[idx].to_string(),
+                            idx + 1,
+                            sym_count
+                        );
+                    }
+                }
+            }
+
+            // Update with newer state if available
+            if old_new_tup.1.is_some() {
+                *old_new_tup.0 = *old_new_tup.1;
+            }
+        }
+
+        return ret;
+    }
 }
 
 #[derive(Debug)]

+ 11 - 17
solana/pyth2wormhole/client/src/main.rs

@@ -14,8 +14,8 @@ use log::{
     debug,
     error,
     info,
-    warn,
     trace,
+    warn,
     LevelFilter,
 };
 use solana_client::rpc_client::RpcClient;
@@ -161,19 +161,13 @@ fn handle_attest(
             g.symbols
                 .as_slice()
                 .chunks(config.max_batch_size as usize)
-                .enumerate()
-                .map(move |(idx, symbols)| {
-                    (
-                        idx + 1,
-                        BatchState::new(
-                            name4closure.clone(),
-                            symbols,
-                            conditions4closure.clone(),
-                        ),
-                    )
+                .map(move |symbols| {
+                    BatchState::new(name4closure.clone(), symbols, conditions4closure.clone())
                 })
         })
         .flatten()
+        .enumerate()
+        .map(|(idx, batch_state)| (idx + 1, batch_state))
         .collect();
     let batch_count = batches.len();
 
@@ -347,21 +341,21 @@ fn handle_attest(
                 Success { .. } | FailedSend { .. } | FailedConfirm { .. } => {
                     // We only try to re-schedule under --daemon
                     if daemon {
-                        if state.get_status_changed_at().elapsed()
-                            > Duration::from_secs(state.conditions.min_freq_secs)
-                        {
+                        if let Some(reason) = state.should_resend(rpc_client) {
+                            info!(
+                                "Batch {}/{} (group {:?}): resending (reason: {})",
+                                batch_no, batch_count, state.group_name, reason,
+                            );
                             state.set_status(Sending { attempt_no: 1 });
                         } else {
                             let elapsed = state.get_status_changed_at().elapsed();
                             trace!(
-                                "Batch {}/{} (group {:?}): waiting ({}.{}/{}.{})",
+                                "Batch {}/{} (group {:?}): waiting ({}.{}s elapsed)",
                                 batch_no,
                                 batch_count,
                                 state.group_name,
                                 elapsed.as_secs(),
                                 elapsed.subsec_millis(),
-                                conf_timeout.as_secs(),
-                                conf_timeout.subsec_millis()
                             )
                         }
                     }

+ 2 - 2
third_party/pyth/p2w_autoattest.py

@@ -186,7 +186,7 @@ if P2W_ATTESTATION_CFG is None:
 symbol_groups:
   - group_name: things
     conditions:
-      min_freq_secs: 17
+      min_interval_secs: 17
     symbols:
 """
 
@@ -208,7 +208,7 @@ symbol_groups:
     cfg_yaml += f"""
   - group_name: stuff
     conditions:
-      min_freq_secs: 19
+      min_interval_secs: 19
     symbols:
 """