|
|
@@ -6,8 +6,8 @@ use futures::{AsyncRead, AsyncWrite};
|
|
|
use futures_util::io::{BufReader, BufWriter};
|
|
|
use hyper_util::rt::TokioIo;
|
|
|
use pyth_lazer_protocol::jrpc::{
|
|
|
- FeedUpdateParams, GetMetadataParams, JrpcCall, JrpcError, JrpcErrorResponse, JrpcResponse,
|
|
|
- JrpcSuccessResponse, JsonRpcVersion, PythLazerAgentJrpcV1, SymbolMetadata,
|
|
|
+ FeedUpdateParams, GetMetadataParams, JrpcCall, JrpcError, JrpcErrorResponse, JrpcId,
|
|
|
+ JrpcResponse, JrpcSuccessResponse, JsonRpcVersion, PythLazerAgentJrpcV1, SymbolMetadata,
|
|
|
};
|
|
|
use soketto::Sender;
|
|
|
use soketto::handshake::http::Server;
|
|
|
@@ -83,7 +83,7 @@ async fn try_handle_jrpc(
|
|
|
JrpcErrorResponse {
|
|
|
jsonrpc: JsonRpcVersion::V2,
|
|
|
error: JrpcError::InternalError(err.to_string()).into(),
|
|
|
- id: None,
|
|
|
+ id: JrpcId::Null,
|
|
|
},
|
|
|
))?
|
|
|
.as_str(),
|
|
|
@@ -103,18 +103,23 @@ async fn handle_jrpc_inner<T: AsyncRead + AsyncWrite + Unpin>(
|
|
|
match serde_json::from_slice::<PythLazerAgentJrpcV1>(receive_buf.as_slice()) {
|
|
|
Ok(jrpc_request) => match jrpc_request.params {
|
|
|
JrpcCall::PushUpdate(request_params) => {
|
|
|
- handle_push_update(sender, lazer_publisher, request_params, jrpc_request.id).await
|
|
|
+ handle_push_update(
|
|
|
+ sender,
|
|
|
+ lazer_publisher,
|
|
|
+ request_params,
|
|
|
+ jrpc_request.id.clone(),
|
|
|
+ )
|
|
|
+ .await
|
|
|
}
|
|
|
JrpcCall::PushUpdates(request_params) => {
|
|
|
for feed in request_params {
|
|
|
- handle_push_update(sender, lazer_publisher, feed, jrpc_request.id).await?;
|
|
|
+ handle_push_update(sender, lazer_publisher, feed, jrpc_request.id.clone())
|
|
|
+ .await?;
|
|
|
}
|
|
|
Ok(())
|
|
|
}
|
|
|
- JrpcCall::GetMetadata(request_params) => {
|
|
|
- if let Some(request_id) = jrpc_request.id {
|
|
|
- handle_get_metadata(sender, config, request_params, request_id).await
|
|
|
- } else {
|
|
|
+ JrpcCall::GetMetadata(request_params) => match jrpc_request.id {
|
|
|
+ JrpcId::Null => {
|
|
|
send_json(
|
|
|
sender,
|
|
|
&JrpcErrorResponse {
|
|
|
@@ -123,12 +128,13 @@ async fn handle_jrpc_inner<T: AsyncRead + AsyncWrite + Unpin>(
|
|
|
"The request to method 'get_metadata' requires an 'id'".to_string(),
|
|
|
)
|
|
|
.into(),
|
|
|
- id: None,
|
|
|
+ id: JrpcId::Null,
|
|
|
},
|
|
|
)
|
|
|
.await
|
|
|
}
|
|
|
- }
|
|
|
+ _ => handle_get_metadata(sender, config, request_params, jrpc_request.id).await,
|
|
|
+ },
|
|
|
},
|
|
|
Err(err) => {
|
|
|
debug!("Error parsing JRPC request: {}", err);
|
|
|
@@ -137,7 +143,7 @@ async fn handle_jrpc_inner<T: AsyncRead + AsyncWrite + Unpin>(
|
|
|
&JrpcErrorResponse {
|
|
|
jsonrpc: JsonRpcVersion::V2,
|
|
|
error: JrpcError::ParseError(err.to_string()).into(),
|
|
|
- id: None,
|
|
|
+ id: JrpcId::Null,
|
|
|
},
|
|
|
)
|
|
|
.await
|
|
|
@@ -199,14 +205,15 @@ async fn handle_push_update<T: AsyncRead + AsyncWrite + Unpin>(
|
|
|
sender: &mut Sender<T>,
|
|
|
lazer_publisher: &LazerPublisher,
|
|
|
request_params: FeedUpdateParams,
|
|
|
- request_id: Option<i64>,
|
|
|
+ request_id: JrpcId,
|
|
|
) -> anyhow::Result<()> {
|
|
|
match lazer_publisher
|
|
|
.push_feed_update(request_params.clone().into())
|
|
|
.await
|
|
|
{
|
|
|
- Ok(_) => {
|
|
|
- if let Some(request_id) = request_id {
|
|
|
+ Ok(_) => match request_id {
|
|
|
+ JrpcId::Null => Ok(()),
|
|
|
+ _ => {
|
|
|
send_json(
|
|
|
sender,
|
|
|
&JrpcSuccessResponse::<String> {
|
|
|
@@ -216,10 +223,8 @@ async fn handle_push_update<T: AsyncRead + AsyncWrite + Unpin>(
|
|
|
},
|
|
|
)
|
|
|
.await
|
|
|
- } else {
|
|
|
- Ok(())
|
|
|
}
|
|
|
- }
|
|
|
+ },
|
|
|
Err(err) => {
|
|
|
debug!("error while sending updates: {:?}", err);
|
|
|
send_json(
|
|
|
@@ -239,7 +244,7 @@ async fn handle_get_metadata<T: AsyncRead + AsyncWrite + Unpin>(
|
|
|
sender: &mut Sender<T>,
|
|
|
config: &Config,
|
|
|
request_params: GetMetadataParams,
|
|
|
- request_id: i64,
|
|
|
+ request_id: JrpcId,
|
|
|
) -> anyhow::Result<()> {
|
|
|
match get_metadata(config.clone()).await {
|
|
|
Ok(symbols) => {
|
|
|
@@ -262,7 +267,7 @@ async fn handle_get_metadata<T: AsyncRead + AsyncWrite + Unpin>(
|
|
|
&JrpcErrorResponse {
|
|
|
jsonrpc: JsonRpcVersion::V2,
|
|
|
error: JrpcError::InternalError(err.to_string()).into(),
|
|
|
- id: Some(request_id),
|
|
|
+ id: request_id,
|
|
|
},
|
|
|
)
|
|
|
.await
|