Browse Source

feat(pyth-lazer-agent) Allow deduplicating updates within each batch (#2944)

Bart Platak 3 months ago
parent
commit
2a543d4b87

+ 1 - 1
Cargo.lock

@@ -5633,7 +5633,7 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "pyth-lazer-agent"
 name = "pyth-lazer-agent"
-version = "0.4.0"
+version = "0.4.1"
 dependencies = [
 dependencies = [
  "anyhow",
  "anyhow",
  "backoff",
  "backoff",

+ 1 - 1
apps/pyth-lazer-agent/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 [package]
 name = "pyth-lazer-agent"
 name = "pyth-lazer-agent"
-version = "0.4.0"
+version = "0.4.1"
 edition = "2024"
 edition = "2024"
 description = "Pyth Lazer Agent"
 description = "Pyth Lazer Agent"
 license = "Apache-2.0"
 license = "Apache-2.0"

+ 2 - 0
apps/pyth-lazer-agent/README.md

@@ -49,6 +49,7 @@ publish_keypair_path = "/path/to/keypair.json"
 authorization_token = "your_token"
 authorization_token = "your_token"
 listen_address = "0.0.0.0:8910"
 listen_address = "0.0.0.0:8910"
 publish_interval_duration = "25ms"
 publish_interval_duration = "25ms"
+enable_update_deduplication = false
 ```
 ```
 
 
 - `relayers_urls`: The Lazer team will provide these.
 - `relayers_urls`: The Lazer team will provide these.
@@ -56,3 +57,4 @@ publish_interval_duration = "25ms"
 - `authorization_token`: The Lazer team will provide this or instruct that it can be omitted.
 - `authorization_token`: The Lazer team will provide this or instruct that it can be omitted.
 - `listen_address`: The local port the agent will be listening on; can be anything you want.
 - `listen_address`: The local port the agent will be listening on; can be anything you want.
 - `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here.
 - `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here.
+- `enable_update_deduplication`: The agent will deduplicate updates based inside each batch before sending it to Lazer.

+ 2 - 0
apps/pyth-lazer-agent/src/config.rs

@@ -19,6 +19,8 @@ pub struct Config {
     #[serde(with = "humantime_serde", default = "default_publish_interval")]
     #[serde(with = "humantime_serde", default = "default_publish_interval")]
     pub publish_interval_duration: Duration,
     pub publish_interval_duration: Duration,
     pub history_service_url: Option<Url>,
     pub history_service_url: Option<Url>,
+    #[serde(default)]
+    pub enable_update_deduplication: bool,
 }
 }
 
 
 #[derive(Deserialize, Derivative, Clone, PartialEq)]
 #[derive(Deserialize, Derivative, Clone, PartialEq)]

+ 1 - 0
apps/pyth-lazer-agent/src/jrpc_handle.rs

@@ -299,6 +299,7 @@ pub mod tests {
             publish_keypair_path: Default::default(),
             publish_keypair_path: Default::default(),
             publish_interval_duration: Default::default(),
             publish_interval_duration: Default::default(),
             history_service_url: None,
             history_service_url: None,
+            enable_update_deduplication: false,
         };
         };
 
 
         println!("{:?}", get_metadata(config).await.unwrap());
         println!("{:?}", get_metadata(config).await.unwrap());

+ 77 - 2
apps/pyth-lazer-agent/src/lazer_publisher.rs

@@ -132,8 +132,13 @@ impl LazerPublisherTask {
             return Ok(());
             return Ok(());
         }
         }
 
 
+        let mut updates = self.pending_updates.drain(..).collect();
+        if self.config.enable_update_deduplication {
+            deduplicate_feed_updates(&mut updates);
+        }
+
         let publisher_update = PublisherUpdate {
         let publisher_update = PublisherUpdate {
-            updates: self.pending_updates.drain(..).collect(),
+            updates,
             publisher_timestamp: MessageField::some(Timestamp::now()),
             publisher_timestamp: MessageField::some(Timestamp::now()),
             special_fields: Default::default(),
             special_fields: Default::default(),
         };
         };
@@ -173,13 +178,19 @@ impl LazerPublisherTask {
     }
     }
 }
 }
 
 
+fn deduplicate_feed_updates(feed_updates: &mut Vec<FeedUpdate>) {
+    // assume that feed_updates is already sorted by timestamp for each feed_update.feed_id
+    feed_updates.dedup_by_key(|feed_update| (feed_update.feed_id, feed_update.update.clone()));
+}
+
 #[cfg(test)]
 #[cfg(test)]
 mod tests {
 mod tests {
     use crate::config::{CHANNEL_CAPACITY, Config};
     use crate::config::{CHANNEL_CAPACITY, Config};
-    use crate::lazer_publisher::LazerPublisherTask;
+    use crate::lazer_publisher::{LazerPublisherTask, deduplicate_feed_updates};
     use ed25519_dalek::SigningKey;
     use ed25519_dalek::SigningKey;
     use protobuf::well_known_types::timestamp::Timestamp;
     use protobuf::well_known_types::timestamp::Timestamp;
     use protobuf::{Message, MessageField};
     use protobuf::{Message, MessageField};
+    use pyth_lazer_protocol::time::TimestampUs;
     use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update;
     use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update;
     use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate};
     use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate};
     use pyth_lazer_publisher_sdk::transaction::{LazerTransaction, lazer_transaction};
     use pyth_lazer_publisher_sdk::transaction::{LazerTransaction, lazer_transaction};
@@ -212,6 +223,18 @@ mod tests {
         temp_file
         temp_file
     }
     }
 
 
+    fn test_feed_update(feed_id: u32, timestamp: TimestampUs, price: i64) -> FeedUpdate {
+        FeedUpdate {
+            feed_id: Some(feed_id),
+            source_timestamp: MessageField::some(timestamp.into()),
+            update: Some(Update::PriceUpdate(PriceUpdate {
+                price: Some(price),
+                ..PriceUpdate::default()
+            })),
+            special_fields: Default::default(),
+        }
+    }
+
     #[tokio::test]
     #[tokio::test]
     async fn test_lazer_exporter_task() {
     async fn test_lazer_exporter_task() {
         let signing_key_file = get_private_key_file();
         let signing_key_file = get_private_key_file();
@@ -224,6 +247,7 @@ mod tests {
             publish_keypair_path: PathBuf::from(signing_key_file.path()),
             publish_keypair_path: PathBuf::from(signing_key_file.path()),
             publish_interval_duration: Duration::from_millis(25),
             publish_interval_duration: Duration::from_millis(25),
             history_service_url: None,
             history_service_url: None,
+            enable_update_deduplication: false,
         };
         };
 
 
         let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);
         let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);
@@ -274,4 +298,55 @@ mod tests {
             _ => panic!("channel should have a transaction waiting"),
             _ => panic!("channel should have a transaction waiting"),
         }
         }
     }
     }
+
+    #[test]
+    fn test_deduplicate_feed_updates() {
+        // let's consider a batch containing updates for a single feed. the updates are (ts, price):
+        //   - (1, 10)
+        //   - (2, 10)
+        //   - (3, 10)
+        //   - (4, 15)
+        //   - (5, 15)
+        //   - (6, 10)
+        // we should only return (1, 10), (4, 15), (6, 10)
+
+        let updates = &mut vec![
+            test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
+            test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10),
+            test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
+            test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
+            test_feed_update(1, TimestampUs::from_millis(5).unwrap(), 15),
+            test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
+        ];
+
+        let expected_updates = vec![
+            test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
+            test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
+            test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
+        ];
+
+        deduplicate_feed_updates(updates);
+        assert_eq!(updates.to_vec(), expected_updates);
+    }
+
+    #[test]
+    fn test_deduplicate_feed_updates_multiple_feeds() {
+        let updates = &mut vec![
+            test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
+            test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10),
+            test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
+            test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15),
+            test_feed_update(2, TimestampUs::from_millis(5).unwrap(), 15),
+            test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
+        ];
+
+        let expected_updates = vec![
+            test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
+            test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15),
+            test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
+        ];
+
+        deduplicate_feed_updates(updates);
+        assert_eq!(updates.to_vec(), expected_updates);
+    }
 }
 }