|
|
@@ -58,42 +58,41 @@ pub async fn create_price_update_stream(
|
|
|
|
|
|
let stream = response.bytes_stream();
|
|
|
|
|
|
- let sse_stream =
|
|
|
- eventsource_stream::EventStream::new(stream)
|
|
|
- .map(move |event_result| match event_result {
|
|
|
- Ok(event) => {
|
|
|
- if event.event != "message" {
|
|
|
- return Err(format!("Unexpected event type: {}", event.event).into());
|
|
|
- }
|
|
|
+ let sse_stream = eventsource_stream::EventStream::new(stream)
|
|
|
+ .map(move |event_result| match event_result {
|
|
|
+ Ok(event) => {
|
|
|
+ if event.event != "message" {
|
|
|
+ return Err(format!("Unexpected event type: {}", event.event).into());
|
|
|
+ }
|
|
|
|
|
|
- let data = &event.data;
|
|
|
+ let data = &event.data;
|
|
|
|
|
|
- println!("Received SSE data: {}", data);
|
|
|
+ println!("Received SSE data: {}", data);
|
|
|
|
|
|
- match serde_json::from_str::<crate::models::SseEvent>(data) {
|
|
|
- Ok(sse_event) => {
|
|
|
- if let Some(parsed_updates) = sse_event.parsed {
|
|
|
- let stream =
|
|
|
+ match serde_json::from_str::<crate::models::SseEvent>(data) {
|
|
|
+ Ok(sse_event) => {
|
|
|
+ if let Some(parsed_updates) = sse_event.parsed {
|
|
|
+ let stream =
|
|
|
parsed_updates
|
|
|
.into_iter()
|
|
|
.map(Ok)
|
|
|
.collect::<Vec<
|
|
|
Result<ParsedPriceUpdate, Box<dyn Error + Send + Sync>>,
|
|
|
>>();
|
|
|
- Ok(futures_util::stream::iter(stream))
|
|
|
- } else {
|
|
|
- Err("No parsed price updates in the response".into())
|
|
|
- }
|
|
|
+ Ok(futures_util::stream::iter(stream))
|
|
|
+ } else {
|
|
|
+ Err("No parsed price updates in the response".into())
|
|
|
}
|
|
|
- Err(e) => Err(format!("Failed to parse price update: {}", e).into()),
|
|
|
}
|
|
|
+ Err(e) => Err(format!("Failed to parse price update: {}", e).into()),
|
|
|
}
|
|
|
- Err(e) => Err(format!("Error in SSE stream: {}", e).into()),
|
|
|
- })
|
|
|
- .flat_map(|result| match result {
|
|
|
- Ok(stream) => stream,
|
|
|
- Err(e) => futures_util::stream::iter(vec![Err(e)]),
|
|
|
- });
|
|
|
+ }
|
|
|
+ Err(e) => Err(format!("Error in SSE stream: {}", e).into()),
|
|
|
+ })
|
|
|
+ .flat_map(|result| match result {
|
|
|
+ Ok(stream) => stream,
|
|
|
+ Err(e) => futures_util::stream::iter(vec![Err(e)]),
|
|
|
+ });
|
|
|
|
|
|
Ok(sse_stream)
|
|
|
}
|