فهرست منبع

fix: deduplication logic in agent (#3033)

* feat: impl Ord and Eq for FeedUpdate

* fix: deduplication logic in agent

* update cargo lock

* update cargo lock

* use sort by key

* remove ord and partial ord impls

* roll back

* fix sort issues

* fix dep versions

* increase agent version
Keyvan Khademi 2 ماه پیش
والد
کامیت
f9881cc56e

+ 19 - 18
Cargo.lock

@@ -5639,7 +5639,7 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-agent"
-version = "0.4.1"
+version = "0.4.2"
 dependencies = [
  "anyhow",
  "backoff",
@@ -5657,8 +5657,8 @@ dependencies = [
  "hyper 1.6.0",
  "hyper-util",
  "protobuf",
- "pyth-lazer-protocol 0.10.1",
- "pyth-lazer-publisher-sdk 0.3.0",
+ "pyth-lazer-protocol 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "pyth-lazer-publisher-sdk 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "reqwest 0.12.22",
  "serde",
  "serde_json",
@@ -5701,43 +5701,44 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-protocol"
-version = "0.10.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d321e49be0315d68f07d097d240701a05e003e05eff5ac9f2d0457d4a606dd92"
+version = "0.14.0"
 dependencies = [
+ "alloy-primitives 0.8.25",
  "anyhow",
+ "assert_float_eq",
+ "bincode 1.3.3",
+ "bs58",
  "byteorder",
  "chrono",
  "derive_more 1.0.0",
+ "ed25519-dalek 2.1.1",
  "hex",
  "humantime",
  "humantime-serde",
  "itertools 0.13.0",
+ "libsecp256k1 0.7.2",
+ "mry",
  "protobuf",
  "rust_decimal",
  "serde",
  "serde_json",
+ "thiserror 2.0.12",
 ]
 
 [[package]]
 name = "pyth-lazer-protocol"
 version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "91b3e69c264b2ad80b5943df86c606daae63b13f93062abcc008c09a9e2e621e"
 dependencies = [
- "alloy-primitives 0.8.25",
  "anyhow",
- "assert_float_eq",
- "bincode 1.3.3",
- "bs58",
  "byteorder",
  "chrono",
  "derive_more 1.0.0",
- "ed25519-dalek 2.1.1",
  "hex",
  "humantime",
  "humantime-serde",
  "itertools 0.13.0",
- "libsecp256k1 0.7.2",
- "mry",
  "protobuf",
  "rust_decimal",
  "serde",
@@ -5747,27 +5748,27 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-publisher-sdk"
-version = "0.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2bebeacbc58d9e0143e03a397b08becbed1dacf5baad6a245bc00f74ca5cc50d"
+version = "0.10.0"
 dependencies = [
  "anyhow",
  "fs-err",
  "protobuf",
  "protobuf-codegen",
- "pyth-lazer-protocol 0.10.1",
+ "pyth-lazer-protocol 0.14.0",
  "serde_json",
 ]
 
 [[package]]
 name = "pyth-lazer-publisher-sdk"
 version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "98f83b818450d72f6f6db5a9d98e90d2668971da14363820829998290d913f80"
 dependencies = [
  "anyhow",
  "fs-err",
  "protobuf",
  "protobuf-codegen",
- "pyth-lazer-protocol 0.14.0",
+ "pyth-lazer-protocol 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "serde_json",
 ]
 

+ 3 - 3
apps/pyth-lazer-agent/Cargo.toml

@@ -1,14 +1,14 @@
 [package]
 name = "pyth-lazer-agent"
-version = "0.4.1"
+version = "0.4.2"
 edition = "2024"
 description = "Pyth Lazer Agent"
 license = "Apache-2.0"
 repository = "https://github.com/pyth-network/pyth-crosschain"
 
 [dependencies]
-pyth-lazer-publisher-sdk = "0.3.0"
-pyth-lazer-protocol = "0.10.1"
+pyth-lazer-publisher-sdk = "0.10.0"
+pyth-lazer-protocol = "0.14.0"
 
 anyhow = "1.0.98"
 backoff = "0.4.0"

+ 2 - 2
apps/pyth-lazer-agent/src/jrpc_handle.rs

@@ -266,9 +266,9 @@ async fn handle_get_metadata<T: AsyncRead + AsyncWrite + Unpin>(
 
 #[cfg(test)]
 pub mod tests {
+    use pyth_lazer_protocol::{PriceFeedId, SymbolState, api::Channel, time::FixedRate};
+
     use super::*;
-    use pyth_lazer_protocol::router::{Channel, FixedRate, PriceFeedId};
-    use pyth_lazer_protocol::symbol_state::SymbolState;
     use std::net::SocketAddr;
 
     fn gen_test_symbol(name: String, asset_type: String) -> SymbolMetadata {

+ 55 - 16
apps/pyth-lazer-agent/src/lazer_publisher.rs

@@ -13,6 +13,7 @@ use pyth_lazer_publisher_sdk::transaction::{
     Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
 };
 use solana_keypair::read_keypair_file;
+use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
@@ -132,9 +133,10 @@ impl LazerPublisherTask {
             return Ok(());
         }
 
-        let mut updates = self.pending_updates.drain(..).collect();
+        let mut updates: Vec<FeedUpdate> = self.pending_updates.drain(..).collect();
+        updates.sort_by_key(|u| u.source_timestamp.as_ref().map(|t| (t.seconds, t.nanos)));
         if self.config.enable_update_deduplication {
-            deduplicate_feed_updates(&mut updates);
+            updates = deduplicate_feed_updates(&updates)?;
         }
 
         let publisher_update = PublisherUpdate {
@@ -178,9 +180,17 @@ impl LazerPublisherTask {
     }
 }
 
-fn deduplicate_feed_updates(feed_updates: &mut Vec<FeedUpdate>) {
-    // assume that feed_updates is already sorted by timestamp for each feed_update.feed_id
-    feed_updates.dedup_by_key(|feed_update| (feed_update.feed_id, feed_update.update.clone()));
+/// For each feed, keep the latest data. Among updates with the same data, keep the one with the earliest timestamp.
+/// Assumes the input is sorted by timestamp ascending.
+fn deduplicate_feed_updates(sorted_feed_updates: &Vec<FeedUpdate>) -> Result<Vec<FeedUpdate>> {
+    let mut deduped_feed_updates = HashMap::new();
+    for update in sorted_feed_updates {
+        let entry = deduped_feed_updates.entry(update.feed_id).or_insert(update);
+        if entry.update != update.update {
+            *entry = update;
+        }
+    }
+    Ok(deduped_feed_updates.into_values().cloned().collect())
 }
 
 #[cfg(test)]
@@ -308,25 +318,27 @@ mod tests {
         //   - (4, 15)
         //   - (5, 15)
         //   - (6, 10)
-        // we should only return (1, 10), (4, 15), (6, 10)
+        //   - (7, 10)
+        // we should only return (6, 10)
 
-        let updates = &mut vec![
+        let updates = &vec![
             test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
             test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10),
             test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
             test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
             test_feed_update(1, TimestampUs::from_millis(5).unwrap(), 15),
             test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
+            test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 10),
         ];
 
-        let expected_updates = vec![
-            test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
-            test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
-            test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
-        ];
+        let expected_updates = vec![test_feed_update(
+            1,
+            TimestampUs::from_millis(6).unwrap(),
+            10,
+        )];
 
-        deduplicate_feed_updates(updates);
-        assert_eq!(updates.to_vec(), expected_updates);
+        let deduped_updates = deduplicate_feed_updates(updates).unwrap();
+        assert_eq!(deduped_updates, expected_updates);
     }
 
     #[test]
@@ -342,11 +354,38 @@ mod tests {
 
         let expected_updates = vec![
             test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
+            test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
+        ];
+
+        let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
+        deduped_updates.sort_by_key(|u| u.feed_id);
+        assert_eq!(deduped_updates, expected_updates);
+    }
+
+    #[test]
+    fn test_deduplicate_feed_updates_multiple_feeds_random_order() {
+        let updates = &mut vec![
+            test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
+            test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 20),
+            test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
             test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15),
+            test_feed_update(2, TimestampUs::from_millis(5).unwrap(), 15),
             test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
+            test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 20),
+            test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10), // last distinct update for feed 1
+            test_feed_update(1, TimestampUs::from_millis(9).unwrap(), 10),
+            test_feed_update(2, TimestampUs::from_millis(10).unwrap(), 15),
+            test_feed_update(2, TimestampUs::from_millis(11).unwrap(), 15),
+            test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10), // last distinct update for feed 2
+        ];
+
+        let expected_updates = vec![
+            test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10),
+            test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10),
         ];
 
-        deduplicate_feed_updates(updates);
-        assert_eq!(updates.to_vec(), expected_updates);
+        let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
+        deduped_updates.sort_by_key(|u| u.feed_id);
+        assert_eq!(deduped_updates, expected_updates);
     }
 }

+ 5 - 5
apps/pyth-lazer-agent/src/publisher_handle.rs

@@ -89,9 +89,9 @@ async fn try_handle_publisher(
                         feed_id: Some(data.price_feed_id.0),
                         source_timestamp: MessageField::some(data.source_timestamp_us.into()),
                         update: Some(Update::PriceUpdate(PriceUpdate {
-                            price: data.price.map(|p| p.0.get()),
-                            best_bid_price: data.best_bid_price.map(|p| p.0.get()),
-                            best_ask_price: data.best_ask_price.map(|p| p.0.get()),
+                            price: data.price.map(|p| p.mantissa_i64()),
+                            best_bid_price: data.best_bid_price.map(|p| p.mantissa_i64()),
+                            best_ask_price: data.best_ask_price.map(|p| p.mantissa_i64()),
                             ..PriceUpdate::default()
                         })),
                         special_fields: Default::default(),
@@ -125,8 +125,8 @@ async fn try_handle_publisher(
                         feed_id: Some(data.price_feed_id.0),
                         source_timestamp: MessageField::some(data.source_timestamp_us.into()),
                         update: Some(Update::FundingRateUpdate(FundingRateUpdate {
-                            price: data.price.map(|p| p.0.get()),
-                            rate: data.funding_rate.map(|r| r.0),
+                            price: data.price.map(|p| p.mantissa_i64()),
+                            rate: data.funding_rate.map(|r| r.mantissa()),
                             ..FundingRateUpdate::default()
                         })),
                         special_fields: Default::default(),