Przeglądaj źródła

feat(pyth-lazer) Implement JRPC endpoint for the lazer agent (#2837)

Bart Platak 4 miesięcy temu
rodzic
commit
685d223646

+ 105 - 22
Cargo.lock

@@ -768,6 +768,12 @@ dependencies = [
  "num-traits",
 ]
 
+[[package]]
+name = "atomic-waker"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
+
 [[package]]
 name = "atty"
 version = "0.2.14"
@@ -3383,6 +3389,25 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "h2"
+version = "0.4.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785"
+dependencies = [
+ "atomic-waker",
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "http 1.3.1",
+ "indexmap 2.10.0",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
 [[package]]
 name = "hashbrown"
 version = "0.11.2"
@@ -3659,7 +3684,7 @@ dependencies = [
  "futures-channel",
  "futures-core",
  "futures-util",
- "h2",
+ "h2 0.3.26",
  "http 0.2.12",
  "http-body 0.4.6",
  "httparse",
@@ -3682,6 +3707,7 @@ dependencies = [
  "bytes",
  "futures-channel",
  "futures-util",
+ "h2 0.4.11",
  "http 1.3.1",
  "http-body 1.0.1",
  "httparse",
@@ -3737,6 +3763,22 @@ dependencies = [
  "tokio-native-tls",
 ]
 
+[[package]]
+name = "hyper-tls"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
+dependencies = [
+ "bytes",
+ "http-body-util",
+ "hyper 1.6.0",
+ "hyper-util",
+ "native-tls",
+ "tokio",
+ "tokio-native-tls",
+ "tower-service",
+]
+
 [[package]]
 name = "hyper-util"
 version = "0.1.14"
@@ -3756,9 +3798,11 @@ dependencies = [
  "percent-encoding",
  "pin-project-lite",
  "socket2",
+ "system-configuration 0.6.1",
  "tokio",
  "tower-service",
  "tracing",
+ "windows-registry",
 ]
 
 [[package]]
@@ -5540,8 +5584,9 @@ dependencies = [
  "hyper 1.6.0",
  "hyper-util",
  "protobuf",
- "pyth-lazer-protocol 0.7.3",
- "pyth-lazer-publisher-sdk 0.1.6",
+ "pyth-lazer-protocol 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "pyth-lazer-publisher-sdk 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
+ "reqwest 0.12.22",
  "serde",
  "serde_json",
  "soketto",
@@ -5580,15 +5625,19 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-protocol"
-version = "0.7.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6445dc5d2f7fff7c677fb8edc5a080a82ef7583c1bdb39daa95421788c23f695"
+version = "0.8.1"
 dependencies = [
+ "alloy-primitives 0.8.25",
  "anyhow",
- "base64 0.22.1",
+ "bincode 1.3.3",
+ "bs58",
  "byteorder",
  "derive_more 1.0.0",
+ "ed25519-dalek 2.1.1",
+ "hex",
+ "humantime-serde",
  "itertools 0.13.0",
+ "libsecp256k1 0.7.2",
  "protobuf",
  "rust_decimal",
  "serde",
@@ -5598,18 +5647,14 @@ dependencies = [
 [[package]]
 name = "pyth-lazer-protocol"
 version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1258b8770756a82a39b7b02a296c10a91b93aa58c0cded47950defe4d9377644"
 dependencies = [
- "alloy-primitives 0.8.25",
  "anyhow",
- "bincode 1.3.3",
- "bs58",
  "byteorder",
  "derive_more 1.0.0",
- "ed25519-dalek 2.1.1",
- "hex",
  "humantime-serde",
  "itertools 0.13.0",
- "libsecp256k1 0.7.2",
  "protobuf",
  "rust_decimal",
  "serde",
@@ -5618,16 +5663,14 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-publisher-sdk"
-version = "0.1.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "db6ef4052ebf2a7943259b3d52a10b2231ffc346717735c50e44d73fe92019d5"
+version = "0.1.7"
 dependencies = [
  "anyhow",
  "fs-err",
  "humantime",
  "protobuf",
  "protobuf-codegen",
- "pyth-lazer-protocol 0.7.3",
+ "pyth-lazer-protocol 0.8.1",
  "serde-value",
  "tracing",
 ]
@@ -5635,13 +5678,15 @@ dependencies = [
 [[package]]
 name = "pyth-lazer-publisher-sdk"
 version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b8d52a515b21b77a89266d584da4363fcd1e121213ac3065ab7ff0dab1172006"
 dependencies = [
  "anyhow",
  "fs-err",
  "humantime",
  "protobuf",
  "protobuf-codegen",
- "pyth-lazer-protocol 0.8.1",
+ "pyth-lazer-protocol 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "serde-value",
  "tracing",
 ]
@@ -6102,12 +6147,12 @@ dependencies = [
  "encoding_rs",
  "futures-core",
  "futures-util",
- "h2",
+ "h2 0.3.26",
  "http 0.2.12",
  "http-body 0.4.6",
  "hyper 0.14.32",
  "hyper-rustls 0.24.2",
- "hyper-tls",
+ "hyper-tls 0.5.0",
  "ipnet",
  "js-sys",
  "log",
@@ -6122,7 +6167,7 @@ dependencies = [
  "serde_json",
  "serde_urlencoded",
  "sync_wrapper 0.1.2",
- "system-configuration",
+ "system-configuration 0.5.1",
  "tokio",
  "tokio-native-tls",
  "tokio-rustls 0.24.1",
@@ -6144,17 +6189,22 @@ dependencies = [
  "async-compression",
  "base64 0.22.1",
  "bytes",
+ "encoding_rs",
  "futures-channel",
  "futures-core",
  "futures-util",
+ "h2 0.4.11",
  "http 1.3.1",
  "http-body 1.0.1",
  "http-body-util",
  "hyper 1.6.0",
  "hyper-rustls 0.27.7",
+ "hyper-tls 0.6.0",
  "hyper-util",
  "js-sys",
  "log",
+ "mime",
+ "native-tls",
  "percent-encoding",
  "pin-project-lite",
  "quinn",
@@ -6165,6 +6215,7 @@ dependencies = [
  "serde_urlencoded",
  "sync_wrapper 1.0.2",
  "tokio",
+ "tokio-native-tls",
  "tokio-rustls 0.26.2",
  "tokio-util",
  "tower 0.5.2",
@@ -9644,7 +9695,18 @@ checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
 dependencies = [
  "bitflags 1.3.2",
  "core-foundation 0.9.4",
- "system-configuration-sys",
+ "system-configuration-sys 0.5.0",
+]
+
+[[package]]
+name = "system-configuration"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b"
+dependencies = [
+ "bitflags 2.9.1",
+ "core-foundation 0.9.4",
+ "system-configuration-sys 0.6.0",
 ]
 
 [[package]]
@@ -9657,6 +9719,16 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "system-configuration-sys"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
 [[package]]
 name = "tap"
 version = "1.0.1"
@@ -10743,6 +10815,17 @@ version = "0.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
 
+[[package]]
+name = "windows-registry"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e"
+dependencies = [
+ "windows-link",
+ "windows-result",
+ "windows-strings",
+]
+
 [[package]]
 name = "windows-result"
 version = "0.3.4"

+ 4 - 3
apps/pyth-lazer-agent/Cargo.toml

@@ -4,8 +4,8 @@ version = "0.1.3"
 edition = "2024"
 
 [dependencies]
-pyth-lazer-publisher-sdk = "0.1.5"
-pyth-lazer-protocol = "0.7.2"
+pyth-lazer-publisher-sdk = "0.1.7"
+pyth-lazer-protocol = "0.8.1"
 
 anyhow = "1.0.98"
 backoff = "0.4.0"
@@ -20,7 +20,7 @@ futures-util = "0.3.31"
 http = "1.3.1"
 http-body-util = "0.1.3"
 humantime-serde = "1.1.1"
-hyper = { version = "1.6.0", features = ["http1", "server"] }
+hyper = { version = "1.6.0", features = ["http1", "server", "client"] }
 hyper-util = { version = "0.1.10", features = ["tokio"] }
 protobuf = "3.7.2"
 serde = { version = "1.0.219", features = ["derive"] }
@@ -33,6 +33,7 @@ tokio-util = { version = "0.7.14", features = ["compat"] }
 tracing = "0.1.41"
 tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }
 url = { version = "2.5.4", features = ["serde"] }
+reqwest = "0.12.22"
 
 [dev-dependencies]
 tempfile = "3.20.0"

+ 1 - 0
apps/pyth-lazer-agent/config/config.toml

@@ -2,3 +2,4 @@ relayer_urls = ["ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction", "ws://
 publish_keypair_path = "/path/to/solana/id.json"
 listen_address = "0.0.0.0:8910"
 publish_interval_duration = "25ms"
+authorization_token="token1"

+ 1 - 0
apps/pyth-lazer-agent/src/config.rs

@@ -17,6 +17,7 @@ pub struct Config {
     pub publish_keypair_path: PathBuf,
     #[serde(with = "humantime_serde", default = "default_publish_interval")]
     pub publish_interval_duration: Duration,
+    pub history_service_url: Option<Url>,
 }
 
 fn default_publish_interval() -> Duration {

+ 46 - 17
apps/pyth-lazer-agent/src/http_server.rs

@@ -1,32 +1,39 @@
+use crate::jrpc_handle::{JrpcConnectionContext, handle_jrpc};
+use crate::publisher_handle::handle_publisher;
+use crate::{
+    config::Config, lazer_publisher::LazerPublisher, publisher_handle::PublisherConnectionContext,
+};
 use anyhow::{Context, Result};
+use hyper::body::Incoming;
 use hyper::{Response, StatusCode, body::Bytes, server::conn::http1, service::service_fn};
 use hyper_util::rt::TokioIo;
 use soketto::{
     BoxedError,
     handshake::http::{Server, is_upgrade_request},
 };
+use std::fmt::Debug;
 use std::{io, net::SocketAddr};
 use tokio::net::{TcpListener, TcpStream};
 use tracing::{debug, info, instrument, warn};
 
-use crate::{
-    config::Config,
-    lazer_publisher::LazerPublisher,
-    publisher_handle::{PublisherConnectionContext, handle_publisher},
-};
-
 type FullBody = http_body_util::Full<Bytes>;
 
-#[derive(Debug)]
-pub enum Request {
+#[derive(Debug, Copy, Clone)]
+pub enum PublisherRequest {
     PublisherV1,
     PublisherV2,
 }
 
-pub struct RelayerRequest(pub http::Request<hyper::body::Incoming>);
+pub enum Request {
+    PublisherRequest(PublisherRequest),
+    JrpcV1,
+}
+
+pub struct RelayerRequest(pub http::Request<Incoming>);
 
-const PUBLISHER_WS_URI: &str = "/v1/publisher";
+const PUBLISHER_WS_URI_V1: &str = "/v1/publisher";
 const PUBLISHER_WS_URI_V2: &str = "/v2/publisher";
+const JRPC_WS_URI_V1: &str = "/v1/jprc";
 
 const READINESS_PROBE_PATH: &str = "/ready";
 const LIVENESS_PROBE_PATH: &str = "/live";
@@ -38,8 +45,11 @@ pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()>
     loop {
         let stream_addr = listener.accept().await;
         let lazer_publisher_clone = lazer_publisher.clone();
-        tokio::spawn(async {
-            if let Err(err) = try_handle_connection(stream_addr, lazer_publisher_clone).await {
+        let config = config.clone();
+        tokio::spawn(async move {
+            if let Err(err) =
+                try_handle_connection(config, stream_addr, lazer_publisher_clone).await
+            {
                 warn!("error while handling connection: {err:?}");
             }
         });
@@ -47,6 +57,7 @@ pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()>
 }
 
 async fn try_handle_connection(
+    config: Config,
     stream_addr: io::Result<(TcpStream, SocketAddr)>,
     lazer_publisher: LazerPublisher,
 ) -> Result<()> {
@@ -58,7 +69,12 @@ async fn try_handle_connection(
             TokioIo::new(stream),
             service_fn(move |r| {
                 let request = RelayerRequest(r);
-                request_handler(request, remote_addr, lazer_publisher.clone())
+                request_handler(
+                    config.clone(),
+                    request,
+                    remote_addr,
+                    lazer_publisher.clone(),
+                )
             }),
         )
         .with_upgrades()
@@ -68,6 +84,7 @@ async fn try_handle_connection(
 
 #[instrument(skip_all, fields(component = "http_server", remote_addr = remote_addr.to_string()))]
 async fn request_handler(
+    config: Config,
     request: RelayerRequest,
     remote_addr: SocketAddr,
     lazer_publisher: LazerPublisher,
@@ -75,8 +92,9 @@ async fn request_handler(
     let path = request.0.uri().path();
 
     let request_type = match path {
-        PUBLISHER_WS_URI => Request::PublisherV1,
-        PUBLISHER_WS_URI_V2 => Request::PublisherV2,
+        PUBLISHER_WS_URI_V1 => Request::PublisherRequest(PublisherRequest::PublisherV1),
+        PUBLISHER_WS_URI_V2 => Request::PublisherRequest(PublisherRequest::PublisherV2),
+        JRPC_WS_URI_V1 => Request::JrpcV1,
         LIVENESS_PROBE_PATH => {
             let response = Response::builder().status(StatusCode::OK);
             return Ok(response.body(FullBody::default())?);
@@ -113,9 +131,9 @@ async fn request_handler(
         Ok(response) => {
             info!("accepted connection from publisher");
             match request_type {
-                Request::PublisherV1 | Request::PublisherV2 => {
+                Request::PublisherRequest(publisher_request_type) => {
                     let publisher_connection_context = PublisherConnectionContext {
-                        request_type,
+                        request_type: publisher_request_type,
                         _remote_addr: remote_addr,
                     };
                     tokio::spawn(handle_publisher(
@@ -126,6 +144,17 @@ async fn request_handler(
                     ));
                     Ok(response.map(|()| FullBody::default()))
                 }
+                Request::JrpcV1 => {
+                    let publisher_connection_context = JrpcConnectionContext {};
+                    tokio::spawn(handle_jrpc(
+                        config.clone(),
+                        server,
+                        request.0,
+                        publisher_connection_context,
+                        lazer_publisher,
+                    ));
+                    Ok(response.map(|()| FullBody::default()))
+                }
             }
         }
         Err(e) => {

+ 328 - 0
apps/pyth-lazer-agent/src/jrpc_handle.rs

@@ -0,0 +1,328 @@
+use crate::config::Config;
+use crate::lazer_publisher::LazerPublisher;
+use crate::websocket_utils::{handle_websocket_error, send_text};
+use anyhow::Error;
+use futures::{AsyncRead, AsyncWrite};
+use futures_util::io::{BufReader, BufWriter};
+use hyper_util::rt::TokioIo;
+use pyth_lazer_protocol::jrpc::{
+    GetMetadataParams, JrpcCall, JrpcError, JrpcErrorResponse, JrpcResponse, JrpcSuccessResponse,
+    JsonRpcVersion, PythLazerAgentJrpcV1, SymbolMetadata,
+};
+use soketto::Sender;
+use soketto::handshake::http::Server;
+use std::str::FromStr;
+use tokio::{pin, select};
+use tokio_util::compat::TokioAsyncReadCompatExt;
+use tracing::{debug, error, instrument};
+use url::Url;
+
+const DEFAULT_HISTORY_SERVICE_URL: &str =
+    "https://history.pyth-lazer.dourolabs.app/history/v1/symbols";
+
+pub struct JrpcConnectionContext {}
+
+#[instrument(
+    skip(server, request, lazer_publisher, context),
+    fields(component = "jrpc_ws")
+)]
+pub async fn handle_jrpc(
+    config: Config,
+    server: Server,
+    request: hyper::Request<hyper::body::Incoming>,
+    context: JrpcConnectionContext,
+    lazer_publisher: LazerPublisher,
+) {
+    if let Err(err) = try_handle_jrpc(config, server, request, context, lazer_publisher).await {
+        handle_websocket_error(err);
+    }
+}
+
+#[instrument(
+    skip(server, request, lazer_publisher, _context),
+    fields(component = "jrpc_ws")
+)]
+async fn try_handle_jrpc(
+    config: Config,
+    server: Server,
+    request: hyper::Request<hyper::body::Incoming>,
+    _context: JrpcConnectionContext,
+    lazer_publisher: LazerPublisher,
+) -> anyhow::Result<()> {
+    let stream = hyper::upgrade::on(request).await?;
+    let io = TokioIo::new(stream);
+    let stream = BufReader::new(BufWriter::new(io.compat()));
+    let (mut ws_sender, mut ws_receiver) = server.into_builder(stream).finish();
+
+    let mut receive_buf = Vec::new();
+
+    loop {
+        receive_buf.clear();
+        {
+            // soketto is not cancel-safe, so we need to store the future and poll it
+            // in the inner loop.
+            let receive = async { ws_receiver.receive(&mut receive_buf).await };
+            pin!(receive);
+            #[allow(clippy::never_loop, reason = "false positive")] // false positive
+            loop {
+                select! {
+                    _result = &mut receive => {
+                        break
+                    }
+                }
+            }
+        }
+
+        match handle_jrpc_inner(&config, &mut ws_sender, &mut receive_buf, &lazer_publisher).await {
+            Ok(_) => {}
+            Err(err) => {
+                debug!("Error handling JRPC request: {}", err);
+                send_text(
+                    &mut ws_sender,
+                    serde_json::to_string::<JrpcResponse<()>>(&JrpcResponse::Error(
+                        JrpcErrorResponse {
+                            jsonrpc: JsonRpcVersion::V2,
+                            error: JrpcError::InternalError.into(),
+                            id: None,
+                        },
+                    ))?
+                    .as_str(),
+                )
+                .await?;
+            }
+        }
+    }
+}
+
+async fn handle_jrpc_inner<T: AsyncRead + AsyncWrite + Unpin>(
+    config: &Config,
+    sender: &mut Sender<T>,
+    receive_buf: &mut Vec<u8>,
+    lazer_publisher: &LazerPublisher,
+) -> anyhow::Result<()> {
+    match serde_json::from_slice::<PythLazerAgentJrpcV1>(receive_buf.as_slice()) {
+        Ok(jrpc_request) => match jrpc_request.params {
+            JrpcCall::PushUpdate(request_params) => {
+                match lazer_publisher
+                    .push_feed_update(request_params.into())
+                    .await
+                {
+                    Ok(_) => {
+                        send_text(
+                            sender,
+                            serde_json::to_string::<JrpcResponse<String>>(&JrpcResponse::Success(
+                                JrpcSuccessResponse::<String> {
+                                    jsonrpc: JsonRpcVersion::V2,
+                                    result: "success".to_string(),
+                                    id: jrpc_request.id,
+                                },
+                            ))?
+                            .as_str(),
+                        )
+                        .await?;
+                    }
+                    Err(err) => {
+                        debug!("error while sending updates: {:?}", err);
+                        send_text(
+                            sender,
+                            serde_json::to_string::<JrpcResponse<()>>(&JrpcResponse::Error(
+                                JrpcErrorResponse {
+                                    jsonrpc: JsonRpcVersion::V2,
+                                    error: JrpcError::InternalError.into(),
+                                    id: Some(jrpc_request.id),
+                                },
+                            ))?
+                            .as_str(),
+                        )
+                        .await?;
+                    }
+                }
+            }
+            JrpcCall::GetMetadata(request_params) => match get_metadata(config.clone()).await {
+                Ok(symbols) => {
+                    let symbols = filter_symbols(symbols.clone(), request_params);
+
+                    send_text(
+                        sender,
+                        serde_json::to_string::<JrpcResponse<Vec<SymbolMetadata>>>(
+                            &JrpcResponse::Success(JrpcSuccessResponse::<Vec<SymbolMetadata>> {
+                                jsonrpc: JsonRpcVersion::V2,
+                                result: symbols,
+                                id: jrpc_request.id,
+                            }),
+                        )?
+                        .as_str(),
+                    )
+                    .await?;
+                }
+                Err(err) => {
+                    error!("error while retrieving metadata: {:?}", err);
+                    send_text(
+                        sender,
+                        serde_json::to_string::<JrpcResponse<()>>(&JrpcResponse::Error(
+                            JrpcErrorResponse {
+                                jsonrpc: JsonRpcVersion::V2,
+                                // note: right now specifying an invalid method results in a parse error
+                                error: JrpcError::InternalError.into(),
+                                id: None,
+                            },
+                        ))?
+                        .as_str(),
+                    )
+                    .await?;
+                }
+            },
+        },
+        Err(err) => {
+            debug!("Error parsing JRPC request: {}", err);
+            send_text(
+                sender,
+                serde_json::to_string::<JrpcResponse<()>>(&JrpcResponse::Error(
+                    JrpcErrorResponse {
+                        jsonrpc: JsonRpcVersion::V2,
+                        error: JrpcError::ParseError(err.to_string()).into(),
+                        id: None,
+                    },
+                ))?
+                .as_str(),
+            )
+            .await?;
+        }
+    }
+    Ok(())
+}
+
+async fn get_metadata(config: Config) -> Result<Vec<SymbolMetadata>, Error> {
+    let result = reqwest::get(
+        config
+            .history_service_url
+            .unwrap_or(Url::from_str(DEFAULT_HISTORY_SERVICE_URL)?),
+    )
+    .await?;
+
+    if result.status().is_success() {
+        Ok(serde_json::from_str::<Vec<SymbolMetadata>>(
+            &result.text().await?,
+        )?)
+    } else {
+        Err(anyhow::anyhow!(
+            "Error getting metadata (status_code={}, body={})",
+            result.status(),
+            result.text().await.unwrap_or("none".to_string())
+        ))
+    }
+}
+
+fn filter_symbols(
+    symbols: Vec<SymbolMetadata>,
+    get_metadata_params: GetMetadataParams,
+) -> Vec<SymbolMetadata> {
+    let names = &get_metadata_params.names.clone();
+    let asset_types = &get_metadata_params.asset_types.clone();
+
+    let res: Vec<SymbolMetadata> = symbols
+        .into_iter()
+        .filter(|symbol| {
+            if let Some(names) = names {
+                if !names.contains(&symbol.name) {
+                    return false;
+                }
+            }
+
+            if let Some(asset_types) = asset_types {
+                if !asset_types.contains(&symbol.asset_type) {
+                    return false;
+                }
+            }
+
+            true
+        })
+        .collect();
+
+    res
+}
+
+#[cfg(test)]
+pub mod tests {
+    use super::*;
+    use pyth_lazer_protocol::router::{Channel, FixedRate, PriceFeedId};
+    use pyth_lazer_protocol::symbol_state::SymbolState;
+    use std::net::SocketAddr;
+
+    fn gen_test_symbol(name: String, asset_type: String) -> SymbolMetadata {
+        SymbolMetadata {
+            pyth_lazer_id: PriceFeedId(1),
+            name,
+            symbol: "".to_string(),
+            description: "".to_string(),
+            asset_type,
+            exponent: 0,
+            cmc_id: None,
+            funding_rate_interval: None,
+            min_publishers: 0,
+            min_channel: Channel::FixedRate(FixedRate::MIN),
+            state: SymbolState::Stable,
+            hermes_id: None,
+            quote_currency: None,
+        }
+    }
+
+    #[tokio::test]
+    #[ignore]
+    async fn test_try_get_metadata() {
+        let config = Config {
+            listen_address: SocketAddr::from(([127, 0, 0, 1], 0)),
+            relayer_urls: vec![],
+            authorization_token: None,
+            publish_keypair_path: Default::default(),
+            publish_interval_duration: Default::default(),
+            history_service_url: None,
+        };
+
+        println!("{:?}", get_metadata(config).await.unwrap());
+    }
+
+    #[test]
+    fn test_filter_symbols() {
+        let symbol1 = gen_test_symbol("BTC".to_string(), "crypto".to_string());
+        let symbol2 = gen_test_symbol("XMR".to_string(), "crypto".to_string());
+        let symbol3 = gen_test_symbol("BTCUSDT".to_string(), "funding-rate".to_string());
+        let symbols = vec![symbol1.clone(), symbol2.clone(), symbol3.clone()];
+
+        // just a name filter
+        assert_eq!(
+            filter_symbols(
+                symbols.clone(),
+                GetMetadataParams {
+                    names: Some(vec!["XMR".to_string()]),
+                    asset_types: None,
+                },
+            ),
+            vec![symbol2.clone()]
+        );
+
+        // just an asset type filter
+        assert_eq!(
+            filter_symbols(
+                symbols.clone(),
+                GetMetadataParams {
+                    names: None,
+                    asset_types: Some(vec!["crypto".to_string()]),
+                },
+            ),
+            vec![symbol1.clone(), symbol2.clone()]
+        );
+
+        // name and asset type
+        assert_eq!(
+            filter_symbols(
+                symbols.clone(),
+                GetMetadataParams {
+                    names: Some(vec!["BTC".to_string()]),
+                    asset_types: Some(vec!["crypto".to_string()]),
+                },
+            ),
+            vec![symbol1.clone()]
+        );
+    }
+}

+ 2 - 1
apps/pyth-lazer-agent/src/lazer_publisher.rs

@@ -24,7 +24,7 @@ use tokio::{
 };
 use tracing::error;
 
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct LazerPublisher {
     sender: Sender<FeedUpdate>,
     pub(crate) is_ready: Arc<AtomicBool>,
@@ -223,6 +223,7 @@ mod tests {
             authorization_token: None,
             publish_keypair_path: PathBuf::from(signing_key_file.path()),
             publish_interval_duration: Duration::from_millis(25),
+            history_service_url: None,
         };
 
         let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);

+ 1 - 0
apps/pyth-lazer-agent/src/main.rs

@@ -8,6 +8,7 @@ use {
 
 mod config;
 mod http_server;
+mod jrpc_handle;
 mod lazer_publisher;
 mod publisher_handle;
 mod relayer_session;

+ 3 - 3
apps/pyth-lazer-agent/src/publisher_handle.rs

@@ -22,7 +22,7 @@ use crate::{
 };
 
 pub struct PublisherConnectionContext {
-    pub request_type: http_server::Request,
+    pub request_type: http_server::PublisherRequest,
     pub _remote_addr: SocketAddr,
 }
 
@@ -81,7 +81,7 @@ async fn try_handle_publisher(
 
         // reply with an error if we can't parse the binary update
         let feed_update: FeedUpdate = match context.request_type {
-            http_server::Request::PublisherV1 => {
+            http_server::PublisherRequest::PublisherV1 => {
                 match bincode::serde::decode_from_slice::<PriceFeedDataV1, _>(
                     &receive_buf,
                     bincode::config::legacy(),
@@ -132,7 +132,7 @@ async fn try_handle_publisher(
                     }
                 }
             }
-            http_server::Request::PublisherV2 => {
+            http_server::PublisherRequest::PublisherV2 => {
                 match bincode::serde::decode_from_slice::<PriceFeedDataV2, _>(
                     &receive_buf,
                     bincode::config::legacy(),

+ 4 - 2
apps/pyth-lazer-agent/src/relayer_session.rs

@@ -35,6 +35,7 @@ async fn connect_to_relayer(
         HeaderValue::from_str(&format!("Bearer {token}"))?,
     );
     let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
+    tracing::info!("connected to the relayer at {}", url);
     Ok(ws_stream.split())
 }
 
@@ -149,7 +150,7 @@ impl RelayerSessionTask {
                 msg = relayer_ws_receiver.next() => {
                     match msg {
                         Some(Ok(msg)) => {
-                            tracing::debug!("Received message from relayer: {msg:?}");
+                            tracing::debug!("Received a message from relayer: {msg:?}");
                         }
                         Some(Err(e)) => {
                             tracing::error!("Error receiving message from at relayer: {e:?}");
@@ -165,6 +166,7 @@ impl RelayerSessionTask {
     }
 }
 
+//noinspection DuplicatedCode
 #[cfg(test)]
 mod tests {
     use crate::relayer_session::RelayerSessionTask;
@@ -215,7 +217,7 @@ mod tests {
             while let Some(msg) = read.next().await {
                 if let Ok(msg) = msg {
                     if msg.is_binary() {
-                        tracing::info!("Received binary message: {msg:?}");
+                        tracing::info!("Received a binary message: {msg:?}");
                         let transaction =
                             SignedLazerTransaction::parse_from_bytes(msg.into_data().as_ref())
                                 .unwrap();