Browse Source

pyth-lazer-agent fixes

Mike Rolish 5 tháng trước cách đây
mục cha
commit
44b0b50493

+ 20 - 18
pyth-lazer-agent/src/lazer_publisher.rs

@@ -1,5 +1,5 @@
 use crate::config::{CHANNEL_CAPACITY, Config};
-use crate::relayer_session::RelayerSender;
+use crate::relayer_session::RelayerSessionTask;
 use anyhow::{Context, Result, bail};
 use ed25519_dalek::{Signer, SigningKey};
 use protobuf::well_known_types::timestamp::Timestamp;
@@ -11,6 +11,7 @@ use pyth_lazer_publisher_sdk::transaction::{
     Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
 };
 use solana_keypair::read_keypair_file;
+use tokio::sync::broadcast;
 use tokio::{
     select,
     sync::mpsc::{self, Receiver, Sender},
@@ -25,20 +26,22 @@ pub struct LazerPublisher {
 
 impl LazerPublisher {
     pub async fn new(config: &Config) -> Self {
-        let relayer_senders = futures::future::join_all(
-            config
-                .relayer_urls
-                .iter()
-                .map(async |url| RelayerSender::new(url, &config.authorization_token).await),
-        )
-        .await;
+        let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
+        for url in config.relayer_urls.iter() {
+            let mut task = RelayerSessionTask {
+                url: url.clone(),
+                token: config.authorization_token.to_owned(),
+                receiver: relayer_sender.subscribe(),
+            };
+            tokio::spawn(async move { task.run().await });
+        }
 
         let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
         let mut task = LazerPublisherTask {
             config: config.clone(),
             receiver,
             pending_updates: Vec::new(),
-            relayer_senders,
+            relayer_sender,
         };
         tokio::spawn(async move { task.run().await });
         Self { sender }
@@ -55,7 +58,7 @@ struct LazerPublisherTask {
     config: Config,
     receiver: Receiver<FeedUpdate>,
     pending_updates: Vec<FeedUpdate>,
-    relayer_senders: Vec<RelayerSender>,
+    relayer_sender: broadcast::Sender<SignedLazerTransaction>,
 }
 
 impl LazerPublisherTask {
@@ -108,7 +111,7 @@ impl LazerPublisherTask {
         }
 
         let publisher_update = PublisherUpdate {
-            updates: self.pending_updates.clone(),
+            updates: self.pending_updates.drain(..).collect(),
             publisher_timestamp: MessageField::some(Timestamp::now()),
             special_fields: Default::default(),
         };
@@ -137,14 +140,13 @@ impl LazerPublisherTask {
             payload: Some(buf),
             special_fields: Default::default(),
         };
-        futures::future::join_all(
-            self.relayer_senders
-                .iter_mut()
-                .map(|relayer_sender| relayer_sender.sender.send(signed_lazer_transaction.clone())),
-        )
-        .await;
+        match self.relayer_sender.send(signed_lazer_transaction.clone()) {
+            Ok(_) => (),
+            Err(e) => {
+                tracing::error!("Error sending transaction to relayer receivers: {e}");
+            }
+        }
 
-        self.pending_updates.clear();
         Ok(())
     }
 }

+ 1 - 1
pyth-lazer-agent/src/main.rs

@@ -16,7 +16,7 @@ mod websocket_utils;
 #[derive(Parser)]
 #[command(version)]
 struct Cli {
-    #[clap(short, long, default_value = "config.toml")]
+    #[clap(short, long, default_value = "config/config.toml")]
     config: String,
 }
 

+ 34 - 32
pyth-lazer-agent/src/relayer_session.rs

@@ -1,4 +1,3 @@
-use crate::config::CHANNEL_CAPACITY;
 use anyhow::{Result, bail};
 use backoff::ExponentialBackoffBuilder;
 use backoff::backoff::Backoff;
@@ -7,12 +6,10 @@ use futures_util::{SinkExt, StreamExt};
 use http::HeaderValue;
 use protobuf::Message;
 use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
-use std::time::Duration;
+use std::time::{Duration, Instant};
 use tokio::net::TcpStream;
-use tokio::{
-    select,
-    sync::mpsc::{self, Receiver, Sender},
-};
+use tokio::select;
+use tokio::sync::broadcast;
 use tokio_tungstenite::tungstenite::client::IntoClientRequest;
 use tokio_tungstenite::{
     MaybeTlsStream, WebSocketStream, connect_async_with_config,
@@ -20,23 +17,6 @@ use tokio_tungstenite::{
 };
 use url::Url;
 
-pub struct RelayerSender {
-    pub(crate) sender: Sender<SignedLazerTransaction>,
-}
-
-impl RelayerSender {
-    pub async fn new(url: &Url, token: &str) -> Self {
-        let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
-        let mut task = RelayerSessionTask {
-            url: url.clone(),
-            token: token.to_owned(),
-            receiver,
-        };
-        tokio::spawn(async move { task.run().await });
-        Self { sender }
-    }
-}
-
 type RelayerWsSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>;
 type RelayerWsReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
 
@@ -78,11 +58,11 @@ impl RelayerWsSession {
     }
 }
 
-struct RelayerSessionTask {
+pub struct RelayerSessionTask {
     // connection state
-    url: Url,
-    token: String,
-    receiver: Receiver<SignedLazerTransaction>,
+    pub url: Url,
+    pub token: String,
+    pub receiver: broadcast::Receiver<SignedLazerTransaction>,
 }
 
 impl RelayerSessionTask {
@@ -95,6 +75,8 @@ impl RelayerSessionTask {
             .with_max_elapsed_time(None)
             .build();
 
+        const FAILURE_RESET_TIME: Duration = Duration::from_secs(300);
+        let mut first_failure_time = Instant::now();
         let mut failure_count = 0;
 
         loop {
@@ -104,6 +86,12 @@ impl RelayerSessionTask {
                     return;
                 }
                 Err(e) => {
+                    if first_failure_time.elapsed() > FAILURE_RESET_TIME {
+                        failure_count = 0;
+                        first_failure_time = Instant::now();
+                        backoff.reset();
+                    }
+
                     failure_count += 1;
                     let next_backoff = backoff.next_backoff().unwrap_or(max_interval);
                     tracing::error!(
@@ -129,11 +117,25 @@ impl RelayerSessionTask {
 
         loop {
             select! {
-                Some(transaction) = self.receiver.recv() => {
-                    if let Err(e) = relayer_ws_session.send_transaction(transaction).await
-                    {
-                        tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
-                        bail!("Failed to publish transaction to Lazer relayer: {e:?}");
+                recv_result = self.receiver.recv() => {
+                    match recv_result {
+                        Ok(transaction) => {
+                            if let Err(e) = relayer_ws_session.send_transaction(transaction).await {
+                                tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
+                                bail!("Failed to publish transaction to Lazer relayer: {e:?}");
+                            }
+                        },
+                        Err(e) => {
+                            match e {
+                                broadcast::error::RecvError::Closed => {
+                                    tracing::error!("transaction broadcast channel closed");
+                                    bail!("transaction broadcast channel closed");
+                                }
+                                broadcast::error::RecvError::Lagged(skipped_count) => {
+                                    tracing::warn!("transaction broadcast channel lagged by {skipped_count} messages");
+                                }
+                            }
+                        }
                     }
                 }
                 // Handle messages from the relayers, such as errors if we send a bad update