Ver código fonte

refactor: update streaming implementation to use eventsource crate

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>
Devin AI 6 meses atrás
pai
commit
b0afa97864

+ 28 - 0
apps/hermes/client/rust/Cargo.lock

@@ -205,6 +205,17 @@ version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
 
+[[package]]
+name = "eventsource-stream"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab"
+dependencies = [
+ "futures-core",
+ "nom",
+ "pin-project-lite",
+]
+
 [[package]]
 name = "fnv"
 version = "1.0.7"
@@ -333,6 +344,7 @@ dependencies = [
  "anyhow",
  "base64",
  "derive_more",
+ "eventsource-stream",
  "futures-util",
  "reqwest",
  "serde",
@@ -685,6 +697,12 @@ dependencies = [
  "unicase",
 ]
 
+[[package]]
+name = "minimal-lexical"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
+
 [[package]]
 name = "miniz_oxide"
 version = "0.8.8"
@@ -705,6 +723,16 @@ dependencies = [
  "windows-sys",
 ]
 
+[[package]]
+name = "nom"
+version = "7.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
+dependencies = [
+ "memchr",
+ "minimal-lexical",
+]
+
 [[package]]
 name = "num-conv"
 version = "0.1.0"

+ 1 - 0
apps/hermes/client/rust/Cargo.toml

@@ -25,6 +25,7 @@ futures-util = "0.3"
 derive_more = { version = "1.0.0", features = ["from"] }
 base64 = "0.22.1"
 tracing = "0.1"
+eventsource-stream = "0.2.3"
 
 [[example]]
 name = "latest_prices"

+ 68 - 53
apps/hermes/client/rust/src/streaming.rs

@@ -1,9 +1,14 @@
 use crate::apis::configuration::Configuration;
 use crate::models::{EncodingType, ParsedPriceUpdate, PriceUpdate};
+use eventsource_stream::EventStream;
 use futures_util::stream::{Stream, StreamExt};
 use std::error::Error;
-use tokio::sync::mpsc;
-use tokio_stream::wrappers::ReceiverStream;
+use std::pin::Pin;
+use futures_util::task::{Context, Poll};
+use std::collections::VecDeque;
+use futures_util::stream::FusedStream;
+use std::sync::Arc;
+use std::sync::Mutex;
 
 pub async fn create_price_update_stream(
     config: &Configuration,
@@ -43,70 +48,80 @@ pub async fn create_price_update_stream(
     
     drop(query_pairs);
     
-    let (tx, rx) = mpsc::channel(100);
+    let client = reqwest::Client::new();
+    
+    let res = client.get(url)
+        .header("Accept", "text/event-stream")
+        .send()
+        .await?;
+    
+    if !res.status().is_success() {
+        return Err(format!("Failed to connect to SSE endpoint: {}", res.status()).into());
+    }
+    
+    let stream = res.bytes_stream();
+    let event_stream = EventStream::new(stream);
+    
+    let buffer = Arc::new(Mutex::new(VecDeque::new()));
+    let buffer_clone = buffer.clone();
     
     tokio::spawn(async move {
-        let client = reqwest::Client::new();
-        
-        let res = match client.get(url)
-            .header("Accept", "text/event-stream")
-            .send()
-            .await {
-                Ok(res) => res,
-                Err(e) => {
-                    let _ = tx.send(Err(Box::new(e) as Box<dyn Error + Send + Sync>)).await;
-                    return;
-                }
-            };
-        
-        if !res.status().is_success() {
-            let _ = tx.send(Err(format!("Failed to connect to SSE endpoint: {}", res.status()).into())).await;
-            return;
-        }
-        
-        let mut buffer = String::new();
-        let mut stream = res.bytes_stream();
+        let mut stream = event_stream;
         
-        while let Some(chunk_result) = stream.next().await {
-            match chunk_result {
-                Ok(chunk) => {
-                    if let Ok(text) = String::from_utf8(chunk.to_vec()) {
-                        buffer.push_str(&text);
-                        
-                        while let Some(pos) = buffer.find("\n\n") {
-                            let event = buffer[..pos].to_string();
-                            buffer = buffer[pos + 2..].to_string();
-                            
-                            if let Some(data_line) = event.lines().find(|line| line.starts_with("data:")) {
-                                let data = data_line.trim_start_matches("data:").trim();
-                                
-                                match serde_json::from_str::<PriceUpdate>(data) {
-                                    Ok(price_update) => {
-                                        if let Some(Some(parsed_updates)) = price_update.parsed {
-                                            for update in parsed_updates {
-                                                if tx.send(Ok(update)).await.is_err() {
-                                                    return;
-                                                }
-                                            }
-                                        }
-                                    },
-                                    Err(e) => {
-                                        let _ = tx.send(Err(Box::new(e) as Box<dyn Error + Send + Sync>)).await;
-                                    }
+        while let Some(event_result) = stream.next().await {
+            match event_result {
+                Ok(event) => {
+                    if !event.event.is_empty() && event.event != "message" {
+                        continue;
+                    }
+                    
+                    match serde_json::from_str::<PriceUpdate>(&event.data) {
+                        Ok(price_update) => {
+                            if let Some(Some(parsed_updates)) = price_update.parsed {
+                                let mut buffer = buffer.lock().unwrap();
+                                for update in parsed_updates {
+                                    buffer.push_back(Ok(update));
                                 }
                             }
+                        },
+                        Err(e) => {
+                            let mut buffer = buffer.lock().unwrap();
+                            buffer.push_back(Err(format!("Failed to parse price update: {}", e).into()));
                         }
                     }
                 },
                 Err(e) => {
-                    let _ = tx.send(Err(Box::new(e) as Box<dyn Error + Send + Sync>)).await;
-                    break;
+                    let mut buffer = buffer.lock().unwrap();
+                    buffer.push_back(Err(format!("Error in SSE stream: {}", e).into()));
                 }
             }
         }
     });
     
-    let stream = ReceiverStream::new(rx);
+    Ok(PriceUpdateStream { buffer: buffer_clone })
+}
+
+struct PriceUpdateStream {
+    buffer: Arc<Mutex<VecDeque<Result<ParsedPriceUpdate, Box<dyn Error + Send + Sync>>>>>,
+}
+
+impl Stream for PriceUpdateStream {
+    type Item = Result<ParsedPriceUpdate, Box<dyn Error + Send + Sync>>;
     
-    Ok(stream)
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut buffer = self.buffer.lock().unwrap();
+        
+        if let Some(item) = buffer.pop_front() {
+            Poll::Ready(Some(item))
+        } else {
+            cx.waker().wake_by_ref();
+            Poll::Pending
+        }
+    }
+}
+
+impl FusedStream for PriceUpdateStream {
+    fn is_terminated(&self) -> bool {
+        false
+    }
 }