瀏覽代碼

Review comments: parallelize channel send, exponential backoff

Mike Rolish 5 月之前
父節點
當前提交
6f3241ae52

+ 21 - 12
pyth-lazer-agent/Cargo.lock

@@ -122,6 +122,17 @@ version = "1.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
 
+[[package]]
+name = "backoff"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1"
+dependencies = [
+ "getrandom 0.2.16",
+ "instant",
+ "rand 0.8.5",
+]
+
 [[package]]
 name = "backtrace"
 version = "0.3.75"
@@ -1152,6 +1163,15 @@ dependencies = [
  "hashbrown 0.15.3",
 ]
 
+[[package]]
+name = "instant"
+version = "0.1.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
+dependencies = [
+ "cfg-if",
+]
+
 [[package]]
 name = "is_terminal_polyfill"
 version = "1.70.1"
@@ -1639,6 +1659,7 @@ name = "pyth-lazer-agent"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "backoff",
  "bincode",
  "clap",
  "config",
@@ -1659,7 +1680,6 @@ dependencies = [
  "soketto",
  "solana-keypair",
  "tokio",
- "tokio-stream",
  "tokio-tungstenite",
  "tokio-util",
  "tracing",
@@ -2575,17 +2595,6 @@ dependencies = [
  "tokio",
 ]
 
-[[package]]
-name = "tokio-stream"
-version = "0.1.17"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047"
-dependencies = [
- "futures-core",
- "pin-project-lite",
- "tokio",
-]
-
 [[package]]
 name = "tokio-tungstenite"
 version = "0.26.2"

+ 1 - 1
pyth-lazer-agent/Cargo.toml

@@ -8,6 +8,7 @@ pyth-lazer-publisher-sdk = "0.1.5"
 pyth-lazer-protocol = "0.7.2"
 
 anyhow = "1.0.98"
+backoff = "0.4.0"
 bincode = { version = "2.0.1", features = ["serde"] }
 clap = { version = "4.5.32", features = ["derive"] }
 config = "0.15.11"
@@ -26,7 +27,6 @@ serde_json = "1.0.140"
 soketto = { version = "0.8.1", features = ["http"] }
 solana-keypair = "2.2.1"
 tokio = { version = "1.44.1", features = ["full"] }
-tokio-stream = "0.1.17"
 tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }
 tokio-util = { version = "0.7.14", features = ["compat"] }
 tracing = "0.1.41"

+ 1 - 1
pyth-lazer-agent/src/config.rs

@@ -21,7 +21,7 @@ pub struct Config {
 }
 
 fn default_publish_interval() -> Duration {
-    Duration::from_millis(50)
+    Duration::from_micros(500)
 }
 
 pub fn load_config(config_path: String) -> anyhow::Result<Config> {

+ 6 - 9
pyth-lazer-agent/src/lazer_publisher.rs

@@ -137,15 +137,12 @@ impl LazerPublisherTask {
             payload: Some(buf),
             special_fields: Default::default(),
         };
-        for relayer_sender in self.relayer_senders.iter() {
-            if let Err(e) = relayer_sender
-                .sender
-                .send(signed_lazer_transaction.clone())
-                .await
-            {
-                error!("Error sending transaction to Lazer relayer session: {e:?}");
-            }
-        }
+        futures::future::join_all(
+            self.relayer_senders
+                .iter_mut()
+                .map(|relayer_sender| relayer_sender.sender.send(signed_lazer_transaction.clone())),
+        )
+        .await;
 
         self.pending_updates.clear();
         Ok(())

+ 1 - 1
pyth-lazer-agent/src/publisher_handle.rs

@@ -165,7 +165,7 @@ async fn try_handle_publisher(
                         continue;
                     }
                 }
-            } //_ => bail!("Publisher API request set with invalid context"),
+            }
         };
 
         lazer_publisher.push_feed_update(feed_update).await?;

+ 13 - 3
pyth-lazer-agent/src/relayer_session.rs

@@ -1,5 +1,7 @@
 use crate::config::CHANNEL_CAPACITY;
 use anyhow::{Result, bail};
+use backoff::ExponentialBackoffBuilder;
+use backoff::backoff::Backoff;
 use futures_util::stream::{SplitSink, SplitStream};
 use futures_util::{SinkExt, StreamExt};
 use http::HeaderValue;
@@ -85,8 +87,15 @@ struct RelayerSessionTask {
 
 impl RelayerSessionTask {
     pub async fn run(&mut self) {
+        let initial_interval = Duration::from_millis(100);
+        let max_interval = Duration::from_secs(5);
+        let mut backoff = ExponentialBackoffBuilder::new()
+            .with_initial_interval(initial_interval)
+            .with_max_interval(max_interval)
+            .with_max_elapsed_time(None)
+            .build();
+
         let mut failure_count = 0;
-        let retry_duration = Duration::from_secs(1);
 
         loop {
             match self.run_relayer_connection().await {
@@ -96,13 +105,14 @@ impl RelayerSessionTask {
                 }
                 Err(e) => {
                     failure_count += 1;
+                    let next_backoff = backoff.next_backoff().unwrap_or(max_interval);
                     tracing::error!(
                         "relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}",
                         e,
                         failure_count,
-                        retry_duration
+                        next_backoff
                     );
-                    tokio::time::sleep(retry_duration).await;
+                    tokio::time::sleep(next_backoff).await;
                 }
             }
         }