relayer_session.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. use anyhow::{Result, bail};
  2. use backoff::ExponentialBackoffBuilder;
  3. use backoff::backoff::Backoff;
  4. use futures_util::stream::{SplitSink, SplitStream};
  5. use futures_util::{SinkExt, StreamExt};
  6. use http::HeaderValue;
  7. use protobuf::Message;
  8. use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
  9. use std::time::{Duration, Instant};
  10. use tokio::net::TcpStream;
  11. use tokio::select;
  12. use tokio::sync::broadcast;
  13. use tokio_tungstenite::tungstenite::client::IntoClientRequest;
  14. use tokio_tungstenite::{
  15. MaybeTlsStream, WebSocketStream, connect_async_with_config,
  16. tungstenite::Message as TungsteniteMessage,
  17. };
  18. use url::Url;
  19. type RelayerWsSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>;
  20. type RelayerWsReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
  21. async fn connect_to_relayer(
  22. mut url: Url,
  23. token: &str,
  24. ) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
  25. tracing::info!("connecting to the relayer at {}", url);
  26. url.set_path("/v1/transaction");
  27. let mut req = url.clone().into_client_request()?;
  28. let headers = req.headers_mut();
  29. headers.insert(
  30. "Authorization",
  31. HeaderValue::from_str(&format!("Bearer {token}"))?,
  32. );
  33. let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
  34. Ok(ws_stream.split())
  35. }
  36. struct RelayerWsSession {
  37. ws_sender: RelayerWsSender,
  38. }
  39. impl RelayerWsSession {
  40. async fn send_transaction(
  41. &mut self,
  42. signed_lazer_transaction: SignedLazerTransaction,
  43. ) -> Result<()> {
  44. tracing::debug!(
  45. "Sending SignedLazerTransaction: {:?}",
  46. signed_lazer_transaction
  47. );
  48. let buf = signed_lazer_transaction.write_to_bytes()?;
  49. self.ws_sender
  50. .send(TungsteniteMessage::from(buf.clone()))
  51. .await?;
  52. self.ws_sender.flush().await?;
  53. Ok(())
  54. }
  55. }
  56. pub struct RelayerSessionTask {
  57. // connection state
  58. pub url: Url,
  59. pub token: String,
  60. pub receiver: broadcast::Receiver<SignedLazerTransaction>,
  61. }
  62. impl RelayerSessionTask {
  63. pub async fn run(&mut self) {
  64. let initial_interval = Duration::from_millis(100);
  65. let max_interval = Duration::from_secs(5);
  66. let mut backoff = ExponentialBackoffBuilder::new()
  67. .with_initial_interval(initial_interval)
  68. .with_max_interval(max_interval)
  69. .with_max_elapsed_time(None)
  70. .build();
  71. const FAILURE_RESET_TIME: Duration = Duration::from_secs(300);
  72. let mut first_failure_time = Instant::now();
  73. let mut failure_count = 0;
  74. loop {
  75. match self.run_relayer_connection().await {
  76. Ok(()) => {
  77. tracing::info!("relayer session graceful shutdown");
  78. return;
  79. }
  80. Err(e) => {
  81. if first_failure_time.elapsed() > FAILURE_RESET_TIME {
  82. failure_count = 0;
  83. first_failure_time = Instant::now();
  84. backoff.reset();
  85. }
  86. failure_count += 1;
  87. let next_backoff = backoff.next_backoff().unwrap_or(max_interval);
  88. tracing::error!(
  89. "relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}",
  90. e,
  91. failure_count,
  92. next_backoff
  93. );
  94. tokio::time::sleep(next_backoff).await;
  95. }
  96. }
  97. }
  98. }
  99. pub async fn run_relayer_connection(&mut self) -> Result<()> {
  100. // Establish relayer connection
  101. // Relayer will drop the connection if no data received in 5s
  102. let (relayer_ws_sender, mut relayer_ws_receiver) =
  103. connect_to_relayer(self.url.clone(), &self.token).await?;
  104. let mut relayer_ws_session = RelayerWsSession {
  105. ws_sender: relayer_ws_sender,
  106. };
  107. loop {
  108. select! {
  109. recv_result = self.receiver.recv() => {
  110. match recv_result {
  111. Ok(transaction) => {
  112. if let Err(e) = relayer_ws_session.send_transaction(transaction).await {
  113. tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
  114. bail!("Failed to publish transaction to Lazer relayer: {e:?}");
  115. }
  116. },
  117. Err(e) => {
  118. match e {
  119. broadcast::error::RecvError::Closed => {
  120. tracing::error!("transaction broadcast channel closed");
  121. bail!("transaction broadcast channel closed");
  122. }
  123. broadcast::error::RecvError::Lagged(skipped_count) => {
  124. tracing::warn!("transaction broadcast channel lagged by {skipped_count} messages");
  125. }
  126. }
  127. }
  128. }
  129. }
  130. // Handle messages from the relayers, such as errors if we send a bad update
  131. msg = relayer_ws_receiver.next() => {
  132. match msg {
  133. Some(Ok(msg)) => {
  134. tracing::debug!("Received message from relayer: {msg:?}");
  135. }
  136. Some(Err(e)) => {
  137. tracing::error!("Error receiving message from at relayer: {e:?}");
  138. }
  139. None => {
  140. tracing::error!("relayer connection closed");
  141. bail!("relayer connection closed");
  142. }
  143. }
  144. }
  145. }
  146. }
  147. }
  148. }
  149. #[cfg(test)]
  150. mod tests {
  151. use crate::relayer_session::RelayerSessionTask;
  152. use ed25519_dalek::{Signer, SigningKey};
  153. use futures_util::StreamExt;
  154. use protobuf::well_known_types::timestamp::Timestamp;
  155. use protobuf::{Message, MessageField};
  156. use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update;
  157. use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate, PublisherUpdate};
  158. use pyth_lazer_publisher_sdk::transaction::lazer_transaction::Payload;
  159. use pyth_lazer_publisher_sdk::transaction::signature_data::Data::Ed25519;
  160. use pyth_lazer_publisher_sdk::transaction::{
  161. Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
  162. };
  163. use std::net::SocketAddr;
  164. use tokio::net::TcpListener;
  165. use tokio::sync::{broadcast, mpsc};
  166. use url::Url;
  167. pub const RELAYER_CHANNEL_CAPACITY: usize = 1000;
  168. fn get_private_key() -> SigningKey {
  169. SigningKey::from_keypair_bytes(&[
  170. 105, 175, 146, 91, 32, 145, 164, 199, 37, 111, 139, 255, 44, 225, 5, 247, 154, 170,
  171. 238, 70, 47, 15, 9, 48, 102, 87, 180, 50, 50, 38, 148, 243, 62, 148, 219, 72, 222, 170,
  172. 8, 246, 176, 33, 205, 29, 118, 11, 220, 163, 214, 204, 46, 49, 132, 94, 170, 173, 244,
  173. 39, 179, 211, 177, 70, 252, 31,
  174. ])
  175. .unwrap()
  176. }
  177. pub async fn run_mock_relayer(
  178. addr: SocketAddr,
  179. back_sender: mpsc::Sender<SignedLazerTransaction>,
  180. ) {
  181. let listener = TcpListener::bind(addr).await.unwrap();
  182. tokio::spawn(async move {
  183. let Ok((stream, _)) = listener.accept().await else {
  184. panic!("failed to accept mock relayer websocket connection");
  185. };
  186. let ws_stream = tokio_tungstenite::accept_async(stream)
  187. .await
  188. .expect("handshake failed");
  189. let (_, mut read) = ws_stream.split();
  190. while let Some(msg) = read.next().await {
  191. if let Ok(msg) = msg {
  192. if msg.is_binary() {
  193. tracing::info!("Received binary message: {msg:?}");
  194. let transaction =
  195. SignedLazerTransaction::parse_from_bytes(msg.into_data().as_ref())
  196. .unwrap();
  197. back_sender.clone().send(transaction).await.unwrap();
  198. }
  199. } else {
  200. tracing::error!("Received a malformed message: {msg:?}");
  201. }
  202. }
  203. });
  204. }
  205. #[tokio::test]
  206. async fn test_relayer_session() {
  207. let (back_sender, mut back_receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY);
  208. let relayer_addr = "127.0.0.1:12346".parse().unwrap();
  209. run_mock_relayer(relayer_addr, back_sender).await;
  210. tokio::time::sleep(std::time::Duration::from_millis(100)).await;
  211. let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
  212. let mut relayer_session_task = RelayerSessionTask {
  213. // connection state
  214. url: Url::parse("ws://127.0.0.1:12346").unwrap(),
  215. token: "token1".to_string(),
  216. receiver: relayer_receiver,
  217. };
  218. tokio::spawn(async move { relayer_session_task.run().await });
  219. tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
  220. let transaction = get_signed_lazer_transaction();
  221. relayer_sender
  222. .send(transaction.clone())
  223. .expect("relayer_sender.send failed");
  224. tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
  225. let received_transaction = back_receiver
  226. .recv()
  227. .await
  228. .expect("back_receiver.recv failed");
  229. assert_eq!(transaction, received_transaction);
  230. }
  231. fn get_signed_lazer_transaction() -> SignedLazerTransaction {
  232. let publisher_update = PublisherUpdate {
  233. updates: vec![FeedUpdate {
  234. feed_id: Some(1),
  235. source_timestamp: MessageField::some(Timestamp::now()),
  236. update: Some(Update::PriceUpdate(PriceUpdate {
  237. price: Some(1_000_000_000i64),
  238. ..PriceUpdate::default()
  239. })),
  240. special_fields: Default::default(),
  241. }],
  242. publisher_timestamp: MessageField::some(Timestamp::now()),
  243. special_fields: Default::default(),
  244. };
  245. let lazer_transaction = LazerTransaction {
  246. payload: Some(Payload::PublisherUpdate(publisher_update)),
  247. special_fields: Default::default(),
  248. };
  249. let buf = lazer_transaction.write_to_bytes().unwrap();
  250. let signing_key = get_private_key();
  251. let signature = signing_key.sign(&buf);
  252. let signature_data = SignatureData {
  253. data: Some(Ed25519(Ed25519SignatureData {
  254. signature: Some(signature.to_bytes().into()),
  255. public_key: Some(signing_key.verifying_key().to_bytes().into()),
  256. special_fields: Default::default(),
  257. })),
  258. special_fields: Default::default(),
  259. };
  260. SignedLazerTransaction {
  261. signature_data: MessageField::some(signature_data),
  262. payload: Some(buf),
  263. special_fields: Default::default(),
  264. }
  265. }
  266. }