Преглед изворни кода

feat: implement true SSE streaming client for Hermes

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>
Devin AI пре 6 месеци
родитељ
комит
3c17a89cff

+ 6 - 0
apps/hermes/client/rust/.openapi-generator-ignore

@@ -21,4 +21,10 @@
 #docs/*.md
 # Then explicitly reverse the ignore rule for a single file:
 #!docs/README.md
+
+# Exclude the autogenerated SSE client implementation
+# This will be replaced with a custom implementation that properly handles streaming
+src/apis/rest_api.rs:price_stream_sse_handler
+
+# Exclude examples directory to prevent overwriting custom examples
 examples/

+ 56 - 0
apps/hermes/client/rust/Cargo.lock

@@ -235,6 +235,12 @@ version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
 
+[[package]]
+name = "futures-io"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+
 [[package]]
 name = "futures-macro"
 version = "0.3.31"
@@ -246,6 +252,12 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "futures-sink"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
+
 [[package]]
 name = "futures-task"
 version = "0.3.31"
@@ -259,8 +271,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
 dependencies = [
  "futures-core",
+ "futures-io",
  "futures-macro",
+ "futures-sink",
  "futures-task",
+ "memchr",
  "pin-project-lite",
  "pin-utils",
  "slab",
@@ -325,6 +340,7 @@ dependencies = [
  "serde_repr",
  "serde_with",
  "tokio",
+ "tokio-stream",
  "tracing",
  "url",
 ]
@@ -935,11 +951,13 @@ dependencies = [
  "sync_wrapper",
  "tokio",
  "tokio-rustls",
+ "tokio-util",
  "tower",
  "tower-service",
  "url",
  "wasm-bindgen",
  "wasm-bindgen-futures",
+ "wasm-streams",
  "web-sys",
  "webpki-roots 0.26.11",
  "windows-registry",
@@ -1321,6 +1339,31 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tokio-stream"
+version = "0.1.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047"
+dependencies = [
+ "futures-core",
+ "pin-project-lite",
+ "tokio",
+ "tokio-util",
+]
+
+[[package]]
+name = "tokio-util"
+version = "0.7.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+]
+
 [[package]]
 name = "tower"
 version = "0.5.2"
@@ -1515,6 +1558,19 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "wasm-streams"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
+dependencies = [
+ "futures-util",
+ "js-sys",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
 [[package]]
 name = "web-sys"
 version = "0.3.77"

+ 11 - 1
apps/hermes/client/rust/Cargo.toml

@@ -5,6 +5,11 @@ authors = ["Pyth Network Contributors"]
 description = "A Rust client for Pyth Hermes price service"
 license = "Apache-2.0"
 edition = "2021"
+repository = "https://github.com/pyth-network/pyth-crosschain"
+documentation = "https://docs.rs/hermes-client"
+readme = "README.md"
+keywords = ["pyth", "price", "oracle", "defi", "finance"]
+categories = ["api-bindings", "finance"]
 
 [dependencies]
 serde = { version = "1.0", features = ["derive"] }
@@ -12,8 +17,9 @@ serde_with = { version = "3.8", default-features = false, features = ["base64",
 serde_json = "1.0"
 serde_repr = "0.1"
 url = "2.5"
-reqwest = { version = "0.12", default-features = false, features = ["json", "multipart", "rustls-tls"] }
+reqwest = { version = "0.12", default-features = false, features = ["json", "multipart", "rustls-tls", "stream"] }
 tokio = { version = "1.0", features = ["full"] }
+tokio-stream = { version = "0.1", features = ["sync"] }
 anyhow = "1.0"
 futures-util = "0.3"
 derive_more = { version = "1.0.0", features = ["from"] }
@@ -27,3 +33,7 @@ path = "examples/latest_prices.rs"
 [[example]]
 name = "price_stream"
 path = "examples/price_stream.rs"
+
+[[example]]
+name = "sse_price_stream"
+path = "examples/sse_price_stream.rs"

+ 36 - 41
apps/hermes/client/rust/examples/price_stream.rs

@@ -1,9 +1,7 @@
-use hermes_client::apis::configuration::Configuration;
-use hermes_client::apis::rest_api;
-use hermes_client::models::EncodingType;
+use futures_util::stream::StreamExt;
+use hermes_client::Configuration;
+use hermes_client::create_price_update_stream;
 use std::error::Error;
-use std::time::Duration;
-use tokio::time;
 
 const BTC_PRICE_FEED_ID: &str = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43";
 const ETH_PRICE_FEED_ID: &str = "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace";
@@ -18,49 +16,46 @@ async fn main() -> Result<(), Box<dyn Error>> {
         ETH_PRICE_FEED_ID.to_string()
     ];
     
-    println!("Starting price stream for BTC/USD and ETH/USD...");
+    println!("Starting SSE price stream for BTC/USD and ETH/USD...");
     println!("Press Ctrl+C to exit");
     println!("====================");
     
-    let mut interval = time::interval(Duration::from_secs(2));
+    let mut stream = create_price_update_stream(
+        &config,
+        price_feed_ids,
+        None,  // default encoding (base64)
+        None,  // default allow_unordered
+        None,  // default benchmarks_only
+        None   // default ignore_invalid_price_ids
+    ).await?;
     
-    loop {
-        interval.tick().await;
-        
-        match rest_api::latest_price_updates(
-            &config, 
-            price_feed_ids.clone(), 
-            Some(EncodingType::Base64), 
-            Some(true),  // parsed
-            Some(false)  // ignore_invalid_price_ids
-        ).await {
-            Ok(price_update) => {
-                if let Some(Some(parsed_updates)) = price_update.parsed {
-                    for update in parsed_updates {
-                        let price_feed_id = update.id;
-                        let symbol = match price_feed_id.as_str() {
-                            BTC_PRICE_FEED_ID => "BTC/USD",
-                            ETH_PRICE_FEED_ID => "ETH/USD",
-                            _ => "Unknown",
-                        };
-                        
-                        let price = &update.price;
-                        let price_value = price.price.parse::<f64>().unwrap_or(0.0) * 10f64.powi(price.expo);
-                        let conf_value = price.conf.parse::<f64>().unwrap_or(0.0) * 10f64.powi(price.expo);
-                        
-                        println!(
-                            "{}: ${:.2} (conf: ${:.2}, publish_time: {})",
-                            symbol,
-                            price_value,
-                            conf_value,
-                            price.publish_time
-                        );
-                    }
-                }
+    while let Some(result) = stream.next().await {
+        match result {
+            Ok(update) => {
+                let price_feed_id = &update.id;
+                let symbol = match price_feed_id.as_str() {
+                    BTC_PRICE_FEED_ID => "BTC/USD",
+                    ETH_PRICE_FEED_ID => "ETH/USD",
+                    _ => "Unknown",
+                };
+                
+                let price = &update.price;
+                let price_value = price.price.parse::<f64>().unwrap_or(0.0) * 10f64.powi(price.expo);
+                let conf_value = price.conf.parse::<f64>().unwrap_or(0.0) * 10f64.powi(price.expo);
+                
+                println!(
+                    "{}: ${:.2} (conf: ${:.2}, publish_time: {})",
+                    symbol,
+                    price_value,
+                    conf_value,
+                    price.publish_time
+                );
             },
             Err(e) => {
-                eprintln!("Error fetching price update: {}", e);
+                eprintln!("Error: {}", e);
             }
         }
     }
+    
+    Ok(())
 }

+ 4 - 0
apps/hermes/client/rust/src/lib.rs

@@ -9,3 +9,7 @@ extern crate reqwest;
 
 pub mod apis;
 pub mod models;
+pub mod streaming;
+
+pub use crate::apis::configuration::Configuration;
+pub use crate::streaming::create_price_update_stream;

+ 112 - 0
apps/hermes/client/rust/src/streaming.rs

@@ -0,0 +1,112 @@
+use crate::apis::configuration::Configuration;
+use crate::models::{EncodingType, ParsedPriceUpdate, PriceUpdate};
+use futures_util::stream::{Stream, StreamExt};
+use std::error::Error;
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+
+pub async fn create_price_update_stream(
+    config: &Configuration,
+    price_feed_ids: Vec<String>,
+    encoding: Option<EncodingType>,
+    allow_unordered: Option<bool>,
+    benchmarks_only: Option<bool>,
+    ignore_invalid_price_ids: Option<bool>,
+) -> Result<impl Stream<Item = Result<ParsedPriceUpdate, Box<dyn Error + Send + Sync>>>, Box<dyn Error>> {
+    let base_url = format!("{}/v2/updates/price/stream", config.base_path);
+    let mut url = reqwest::Url::parse(&base_url)?;
+    
+    let mut query_pairs = url.query_pairs_mut();
+    for id in &price_feed_ids {
+        query_pairs.append_pair("ids[]", id);
+    }
+    
+    if let Some(enc) = encoding {
+        query_pairs.append_pair("encoding", &enc.to_string());
+    } else {
+        query_pairs.append_pair("encoding", "base64");
+    }
+    
+    query_pairs.append_pair("parsed", "true");
+    
+    if let Some(allow) = allow_unordered {
+        query_pairs.append_pair("allow_unordered", &allow.to_string());
+    }
+    
+    if let Some(benchmarks) = benchmarks_only {
+        query_pairs.append_pair("benchmarks_only", &benchmarks.to_string());
+    }
+    
+    if let Some(ignore) = ignore_invalid_price_ids {
+        query_pairs.append_pair("ignore_invalid_price_ids", &ignore.to_string());
+    }
+    
+    drop(query_pairs);
+    
+    let (tx, rx) = mpsc::channel(100);
+    
+    tokio::spawn(async move {
+        let client = reqwest::Client::new();
+        
+        let res = match client.get(url)
+            .header("Accept", "text/event-stream")
+            .send()
+            .await {
+                Ok(res) => res,
+                Err(e) => {
+                    let _ = tx.send(Err(Box::new(e) as Box<dyn Error + Send + Sync>)).await;
+                    return;
+                }
+            };
+        
+        if !res.status().is_success() {
+            let _ = tx.send(Err(format!("Failed to connect to SSE endpoint: {}", res.status()).into())).await;
+            return;
+        }
+        
+        let mut buffer = String::new();
+        let mut stream = res.bytes_stream();
+        
+        while let Some(chunk_result) = stream.next().await {
+            match chunk_result {
+                Ok(chunk) => {
+                    if let Ok(text) = String::from_utf8(chunk.to_vec()) {
+                        buffer.push_str(&text);
+                        
+                        while let Some(pos) = buffer.find("\n\n") {
+                            let event = buffer[..pos].to_string();
+                            buffer = buffer[pos + 2..].to_string();
+                            
+                            if let Some(data_line) = event.lines().find(|line| line.starts_with("data:")) {
+                                let data = data_line.trim_start_matches("data:").trim();
+                                
+                                match serde_json::from_str::<PriceUpdate>(data) {
+                                    Ok(price_update) => {
+                                        if let Some(Some(parsed_updates)) = price_update.parsed {
+                                            for update in parsed_updates {
+                                                if tx.send(Ok(update)).await.is_err() {
+                                                    return;
+                                                }
+                                            }
+                                        }
+                                    },
+                                    Err(e) => {
+                                        let _ = tx.send(Err(Box::new(e) as Box<dyn Error + Send + Sync>)).await;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                },
+                Err(e) => {
+                    let _ = tx.send(Err(Box::new(e) as Box<dyn Error + Send + Sync>)).await;
+                    break;
+                }
+            }
+        }
+    });
+    
+    let stream = ReceiverStream::new(rx);
+    
+    Ok(stream)
+}