Quellcode durchsuchen

fix(pyth-lazer-agent): Only send one success response per batch update (#3186)

Mike Rolish vor 1 Woche
Ursprung
Commit
5d11cbc1b2
3 geänderte Dateien mit 54 neuen und 39 gelöschten Zeilen
  1. 1 1
      Cargo.lock
  2. 1 1
      apps/pyth-lazer-agent/Cargo.toml
  3. 52 37
      apps/pyth-lazer-agent/src/jrpc_handle.rs

+ 1 - 1
Cargo.lock

@@ -5702,7 +5702,7 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-agent"
-version = "0.8.0"
+version = "0.8.1"
 dependencies = [
  "anyhow",
  "backoff",

+ 1 - 1
apps/pyth-lazer-agent/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "pyth-lazer-agent"
-version = "0.8.0"
+version = "0.8.1"
 edition = "2024"
 description = "Pyth Lazer Agent"
 license = "Apache-2.0"

+ 52 - 37
apps/pyth-lazer-agent/src/jrpc_handle.rs

@@ -103,20 +103,36 @@ async fn handle_jrpc_inner<T: AsyncRead + AsyncWrite + Unpin>(
     match serde_json::from_slice::<PythLazerAgentJrpcV1>(receive_buf.as_slice()) {
         Ok(jrpc_request) => match jrpc_request.params {
             JrpcCall::PushUpdate(request_params) => {
-                handle_push_update(
-                    sender,
-                    lazer_publisher,
-                    request_params,
-                    jrpc_request.id.clone(),
-                )
-                .await
+                match lazer_publisher
+                    .push_feed_update(request_params.clone().into())
+                    .await
+                {
+                    Ok(()) => send_update_success_response(sender, jrpc_request.id).await,
+                    Err(err) => {
+                        send_update_failure_response(sender, request_params, jrpc_request.id, err)
+                            .await
+                    }
+                }
             }
-            JrpcCall::PushUpdates(request_params) => {
-                for feed in request_params {
-                    handle_push_update(sender, lazer_publisher, feed, jrpc_request.id.clone())
-                        .await?;
+            JrpcCall::PushUpdates(request_params_batch) => {
+                for request_params in request_params_batch {
+                    match lazer_publisher
+                        .push_feed_update(request_params.clone().into())
+                        .await
+                    {
+                        Ok(()) => (),
+                        Err(err) => {
+                            return send_update_failure_response(
+                                sender,
+                                request_params,
+                                jrpc_request.id,
+                                err,
+                            )
+                            .await;
+                        }
+                    }
                 }
-                Ok(())
+                send_update_success_response(sender, jrpc_request.id).await
             }
             JrpcCall::GetMetadata(request_params) => match jrpc_request.id {
                 JrpcId::Null => {
@@ -201,37 +217,18 @@ fn filter_symbols(
     res
 }
 
-async fn handle_push_update<T: AsyncRead + AsyncWrite + Unpin>(
+async fn send_update_success_response<T: AsyncRead + AsyncWrite + Unpin>(
     sender: &mut Sender<T>,
-    lazer_publisher: &LazerPublisher,
-    request_params: FeedUpdateParams,
     request_id: JrpcId,
 ) -> anyhow::Result<()> {
-    match lazer_publisher
-        .push_feed_update(request_params.clone().into())
-        .await
-    {
-        Ok(_) => match request_id {
-            JrpcId::Null => Ok(()),
-            _ => {
-                send_json(
-                    sender,
-                    &JrpcSuccessResponse::<String> {
-                        jsonrpc: JsonRpcVersion::V2,
-                        result: "success".to_string(),
-                        id: request_id,
-                    },
-                )
-                .await
-            }
-        },
-        Err(err) => {
-            debug!("error while sending updates: {:?}", err);
+    match request_id {
+        JrpcId::Null => Ok(()),
+        _ => {
             send_json(
                 sender,
-                &JrpcErrorResponse {
+                &JrpcSuccessResponse::<String> {
                     jsonrpc: JsonRpcVersion::V2,
-                    error: JrpcError::SendUpdateError(request_params).into(),
+                    result: "success".to_string(),
                     id: request_id,
                 },
             )
@@ -240,6 +237,24 @@ async fn handle_push_update<T: AsyncRead + AsyncWrite + Unpin>(
     }
 }
 
+async fn send_update_failure_response<T: AsyncRead + AsyncWrite + Unpin>(
+    sender: &mut Sender<T>,
+    request_params: FeedUpdateParams,
+    request_id: JrpcId,
+    err: Error,
+) -> anyhow::Result<()> {
+    debug!("error while sending updates: {:?}", err);
+    send_json(
+        sender,
+        &JrpcErrorResponse {
+            jsonrpc: JsonRpcVersion::V2,
+            error: JrpcError::SendUpdateError(request_params).into(),
+            id: request_id,
+        },
+    )
+    .await
+}
+
 async fn handle_get_metadata<T: AsyncRead + AsyncWrite + Unpin>(
     sender: &mut Sender<T>,
     config: &Config,