|
|
@@ -24,6 +24,9 @@ use tokio::{
|
|
|
time::interval,
|
|
|
};
|
|
|
use tracing::error;
|
|
|
+use ttl_cache::TtlCache;
|
|
|
+
|
|
|
+const DEDUP_CACHE_SIZE: usize = 100_000;
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
pub struct LazerPublisher {
|
|
|
@@ -88,6 +91,7 @@ impl LazerPublisher {
|
|
|
pending_updates: Vec::new(),
|
|
|
relayer_sender,
|
|
|
signing_key,
|
|
|
+ ttl_cache: TtlCache::new(DEDUP_CACHE_SIZE),
|
|
|
};
|
|
|
tokio::spawn(async move { task.run().await });
|
|
|
Self {
|
|
|
@@ -109,6 +113,7 @@ struct LazerPublisherTask {
|
|
|
pending_updates: Vec<FeedUpdate>,
|
|
|
relayer_sender: broadcast::Sender<SignedLazerTransaction>,
|
|
|
signing_key: SigningKey,
|
|
|
+ ttl_cache: TtlCache<u32, FeedUpdate>,
|
|
|
}
|
|
|
|
|
|
impl LazerPublisherTask {
|
|
|
@@ -136,7 +141,16 @@ impl LazerPublisherTask {
|
|
|
let mut updates: Vec<FeedUpdate> = self.pending_updates.drain(..).collect();
|
|
|
updates.sort_by_key(|u| u.source_timestamp.as_ref().map(|t| (t.seconds, t.nanos)));
|
|
|
if self.config.enable_update_deduplication {
|
|
|
- updates = deduplicate_feed_updates(&updates)?;
|
|
|
+ updates = deduplicate_feed_updates_in_tx(&updates)?;
|
|
|
+ deduplicate_feed_updates(
|
|
|
+ &mut updates,
|
|
|
+ &mut self.ttl_cache,
|
|
|
+ self.config.update_deduplication_ttl,
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ if updates.is_empty() {
|
|
|
+ return Ok(());
|
|
|
}
|
|
|
|
|
|
let publisher_update = PublisherUpdate {
|
|
|
@@ -182,7 +196,9 @@ impl LazerPublisherTask {
|
|
|
|
|
|
/// For each feed, keep the latest data. Among updates with the same data, keep the one with the earliest timestamp.
|
|
|
/// Assumes the input is sorted by timestamp ascending.
|
|
|
-fn deduplicate_feed_updates(sorted_feed_updates: &Vec<FeedUpdate>) -> Result<Vec<FeedUpdate>> {
|
|
|
+fn deduplicate_feed_updates_in_tx(
|
|
|
+ sorted_feed_updates: &Vec<FeedUpdate>,
|
|
|
+) -> Result<Vec<FeedUpdate>> {
|
|
|
let mut deduped_feed_updates = HashMap::new();
|
|
|
for update in sorted_feed_updates {
|
|
|
let entry = deduped_feed_updates.entry(update.feed_id).or_insert(update);
|
|
|
@@ -193,10 +209,35 @@ fn deduplicate_feed_updates(sorted_feed_updates: &Vec<FeedUpdate>) -> Result<Vec
|
|
|
Ok(deduped_feed_updates.into_values().cloned().collect())
|
|
|
}
|
|
|
|
|
|
+fn deduplicate_feed_updates(
|
|
|
+ sorted_feed_updates: &mut Vec<FeedUpdate>,
|
|
|
+ ttl_cache: &mut TtlCache<u32, FeedUpdate>,
|
|
|
+ ttl: std::time::Duration,
|
|
|
+) {
|
|
|
+ sorted_feed_updates.retain(|update| {
|
|
|
+ let feed_id = match update.feed_id {
|
|
|
+ Some(id) => id,
|
|
|
+ None => return false, // drop updates without feed_id
|
|
|
+ };
|
|
|
+
|
|
|
+ if let Some(cached_feed) = ttl_cache.get(&feed_id) {
|
|
|
+ if cached_feed.update == update.update {
|
|
|
+ // drop if the same update is already in the cache
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ttl_cache.insert(feed_id, update.clone(), ttl);
|
|
|
+ true
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
#[cfg(test)]
|
|
|
mod tests {
|
|
|
use crate::config::{CHANNEL_CAPACITY, Config};
|
|
|
- use crate::lazer_publisher::{LazerPublisherTask, deduplicate_feed_updates};
|
|
|
+ use crate::lazer_publisher::{
|
|
|
+ DEDUP_CACHE_SIZE, LazerPublisherTask, deduplicate_feed_updates_in_tx,
|
|
|
+ };
|
|
|
use ed25519_dalek::SigningKey;
|
|
|
use protobuf::well_known_types::timestamp::Timestamp;
|
|
|
use protobuf::{Message, MessageField};
|
|
|
@@ -210,6 +251,7 @@ mod tests {
|
|
|
use tempfile::NamedTempFile;
|
|
|
use tokio::sync::broadcast::error::TryRecvError;
|
|
|
use tokio::sync::{broadcast, mpsc};
|
|
|
+ use ttl_cache::TtlCache;
|
|
|
use url::Url;
|
|
|
|
|
|
fn get_private_key() -> SigningKey {
|
|
|
@@ -258,6 +300,7 @@ mod tests {
|
|
|
publish_interval_duration: Duration::from_millis(25),
|
|
|
history_service_url: None,
|
|
|
enable_update_deduplication: false,
|
|
|
+ update_deduplication_ttl: Default::default(),
|
|
|
};
|
|
|
|
|
|
let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);
|
|
|
@@ -268,6 +311,7 @@ mod tests {
|
|
|
pending_updates: Vec::new(),
|
|
|
relayer_sender,
|
|
|
signing_key,
|
|
|
+ ttl_cache: TtlCache::new(DEDUP_CACHE_SIZE),
|
|
|
};
|
|
|
tokio::spawn(async move { task.run().await });
|
|
|
|
|
|
@@ -337,7 +381,7 @@ mod tests {
|
|
|
10,
|
|
|
)];
|
|
|
|
|
|
- let deduped_updates = deduplicate_feed_updates(updates).unwrap();
|
|
|
+ let deduped_updates = deduplicate_feed_updates_in_tx(updates).unwrap();
|
|
|
assert_eq!(deduped_updates, expected_updates);
|
|
|
}
|
|
|
|
|
|
@@ -357,7 +401,7 @@ mod tests {
|
|
|
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
|
|
|
];
|
|
|
|
|
|
- let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
|
|
|
+ let mut deduped_updates = deduplicate_feed_updates_in_tx(updates).unwrap();
|
|
|
deduped_updates.sort_by_key(|u| u.feed_id);
|
|
|
assert_eq!(deduped_updates, expected_updates);
|
|
|
}
|
|
|
@@ -384,7 +428,7 @@ mod tests {
|
|
|
test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10),
|
|
|
];
|
|
|
|
|
|
- let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
|
|
|
+ let mut deduped_updates = deduplicate_feed_updates_in_tx(updates).unwrap();
|
|
|
deduped_updates.sort_by_key(|u| u.feed_id);
|
|
|
assert_eq!(deduped_updates, expected_updates);
|
|
|
}
|