Pārlūkot izejas kodu

feat(lazer): add state endpoint support to history client (#3175)

* feat(lazer): add state endpoint support to history client

* chore: bump pyth-lazer-client version to 9.0.0
Pavel Strakhov 2 nedēļas atpakaļ
vecāks
revīzija
d884f26eda

+ 81 - 20
Cargo.lock

@@ -762,9 +762,9 @@ dependencies = [
 
 [[package]]
 name = "async-trait"
-version = "0.1.88"
+version = "0.1.89"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5"
+checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2036,8 +2036,18 @@ version = "0.20.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
 dependencies = [
- "darling_core",
- "darling_macro",
+ "darling_core 0.20.11",
+ "darling_macro 0.20.11",
+]
+
+[[package]]
+name = "darling"
+version = "0.21.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0"
+dependencies = [
+ "darling_core 0.21.3",
+ "darling_macro 0.21.3",
 ]
 
 [[package]]
@@ -2054,13 +2064,38 @@ dependencies = [
  "syn 2.0.104",
 ]
 
+[[package]]
+name = "darling_core"
+version = "0.21.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4"
+dependencies = [
+ "fnv",
+ "ident_case",
+ "proc-macro2",
+ "quote",
+ "strsim",
+ "syn 2.0.104",
+]
+
 [[package]]
 name = "darling_macro"
 version = "0.20.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
 dependencies = [
- "darling_core",
+ "darling_core 0.20.11",
+ "quote",
+ "syn 2.0.104",
+]
+
+[[package]]
+name = "darling_macro"
+version = "0.21.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
+dependencies = [
+ "darling_core 0.21.3",
  "quote",
  "syn 2.0.104",
 ]
@@ -4674,7 +4709,7 @@ version = "0.13.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c7d00273ad77a49702501e11864ccbd018514fcbb6028f54c14fb78a5f08d70a"
 dependencies = [
- "darling",
+ "darling 0.20.11",
  "proc-macro2",
  "quote",
  "syn 2.0.104",
@@ -5584,6 +5619,17 @@ dependencies = [
  "thiserror 1.0.69",
 ]
 
+[[package]]
+name = "protobuf-json-mapping"
+version = "3.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0d6e4be637b310d8a5c02fa195243328e2d97fa7df1127a27281ef1187fcb1d"
+dependencies = [
+ "protobuf",
+ "protobuf-support",
+ "thiserror 1.0.69",
+]
+
 [[package]]
 name = "protobuf-parse"
 version = "3.7.2"
@@ -5694,11 +5740,12 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-client"
-version = "8.6.1"
+version = "9.0.0"
 dependencies = [
  "alloy-primitives 0.8.25",
  "anyhow",
  "arc-swap",
+ "async-trait",
  "atomicwrites",
  "backoff",
  "base64 0.22.1",
@@ -5712,11 +5759,15 @@ dependencies = [
  "hex",
  "humantime-serde",
  "libsecp256k1 0.7.2",
+ "protobuf-json-mapping",
  "pyth-lazer-protocol 0.20.2",
+ "pyth-lazer-publisher-sdk 0.20.1",
  "reqwest 0.12.23",
  "serde",
  "serde_json",
+ "serde_with",
  "tokio",
+ "tokio-stream",
  "tokio-tungstenite 0.20.1",
  "tracing",
  "tracing-subscriber",
@@ -7056,10 +7107,11 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
 
 [[package]]
 name = "serde"
-version = "1.0.219"
+version = "1.0.228"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
+checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
 dependencies = [
+ "serde_core",
  "serde_derive",
 ]
 
@@ -7081,11 +7133,20 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "serde_core"
+version = "1.0.228"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
+dependencies = [
+ "serde_derive",
+]
+
 [[package]]
 name = "serde_derive"
-version = "1.0.219"
+version = "1.0.228"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
+checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -7105,14 +7166,15 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.140"
+version = "1.0.145"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373"
+checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c"
 dependencies = [
  "itoa",
  "memchr",
  "ryu",
  "serde",
+ "serde_core",
 ]
 
 [[package]]
@@ -7161,9 +7223,9 @@ dependencies = [
 
 [[package]]
 name = "serde_with"
-version = "3.14.0"
+version = "3.15.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f2c45cd61fefa9db6f254525d46e392b852e0e61d9a1fd36e5bd183450a556d5"
+checksum = "aa66c845eee442168b2c8134fec70ac50dc20e760769c8ba0ad1319ca1959b04"
 dependencies = [
  "base64 0.22.1",
  "chrono",
@@ -7172,8 +7234,7 @@ dependencies = [
  "indexmap 2.10.0",
  "schemars 0.9.0",
  "schemars 1.0.3",
- "serde",
- "serde_derive",
+ "serde_core",
  "serde_json",
  "serde_with_macros",
  "time",
@@ -7181,11 +7242,11 @@ dependencies = [
 
 [[package]]
 name = "serde_with_macros"
-version = "3.14.0"
+version = "3.15.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f"
+checksum = "b91a903660542fced4e99881aa481bdbaec1634568ee02e0b8bd57c64cb38955"
 dependencies = [
- "darling",
+ "darling 0.21.3",
  "proc-macro2",
  "quote",
  "syn 2.0.104",

+ 7 - 2
lazer/sdk/rust/client/Cargo.toml

@@ -1,13 +1,16 @@
 [package]
 name = "pyth-lazer-client"
-version = "8.6.1"
+version = "9.0.0"
 edition = "2021"
 description = "A Rust client for Pyth Lazer"
 license = "Apache-2.0"
 
 [dependencies]
 pyth-lazer-protocol = { path = "../protocol", version = "0.20.1" }
+pyth-lazer-publisher-sdk = { path = "../../../publisher_sdk/rust", version = "0.20.1" }
+
 tokio = { version = "1", features = ["full"] }
+tokio-stream = "0.1.17"
 tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
 futures-util = "0.3"
 serde = { version = "1.0", features = ["derive"] }
@@ -25,7 +28,9 @@ futures = "0.3.31"
 humantime-serde = "1.1.1"
 fs-err = "3.1.1"
 atomicwrites = "0.4.4"
-
+protobuf-json-mapping = "3.7.2"
+serde_with = "3.15.1"
+async-trait = "0.1.89"
 
 [dev-dependencies]
 bincode = "1.3.3"

+ 45 - 0
lazer/sdk/rust/client/examples/publishers.rs

@@ -0,0 +1,45 @@
+use {
+    pyth_lazer_client::arc_swap::StreamIntoAutoUpdatedHandle,
+    pyth_lazer_client::history_client::GetStateParams,
+    pyth_lazer_client::history_client::{PythLazerHistoryClient, PythLazerHistoryClientConfig},
+    std::{env, time::Duration},
+    tokio::time::sleep,
+    url::Url,
+};
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+    tracing_subscriber::fmt::init();
+    let urls = std::env::args()
+        .skip(1)
+        .map(|s| Url::parse(&s))
+        .collect::<Result<Vec<_>, _>>()?;
+
+    let client = PythLazerHistoryClient::new(PythLazerHistoryClientConfig {
+        urls,
+        update_interval: Duration::from_secs(5),
+        access_token: Some(env::var("ACCESS_TOKEN")?),
+        ..Default::default()
+    });
+    let state = client
+        .state_stream(GetStateParams {
+            publishers: true,
+            ..Default::default()
+        })
+        .await?
+        .into_auto_updated_handle()
+        .await?;
+
+    loop {
+        println!("publishers len: {}", state.load().publishers.len());
+        println!(
+            "publisher 1: {:?}",
+            state
+                .load()
+                .publishers
+                .iter()
+                .find(|p| p.publisher_id == Some(1))
+        );
+        sleep(Duration::from_secs(15)).await;
+    }
+}

+ 14 - 4
lazer/sdk/rust/client/examples/symbols.rs

@@ -1,4 +1,4 @@
-use std::time::Duration;
+use {pyth_lazer_client::arc_swap::StreamIntoAutoUpdatedHandle, std::time::Duration};
 
 use pyth_lazer_client::history_client::{PythLazerHistoryClient, PythLazerHistoryClientConfig};
 use pyth_lazer_protocol::PriceFeedId;
@@ -18,11 +18,21 @@ async fn main() -> anyhow::Result<()> {
         update_interval: Duration::from_secs(5),
         ..Default::default()
     });
-    let symbols = client.all_symbols_metadata_handle().await?;
+    let symbols = client
+        .all_symbols_metadata_stream()
+        .await?
+        .into_auto_updated_handle()
+        .await?;
 
     loop {
-        println!("symbols len: {}", symbols.symbols().len());
-        println!("symbol 1: {:?}", symbols.symbols().get(&PriceFeedId(1)));
+        println!("symbols len: {}", symbols.load().len());
+        println!(
+            "symbol 1: {:?}",
+            symbols
+                .load()
+                .iter()
+                .find(|feed| feed.pyth_lazer_id == PriceFeedId(1))
+        );
         sleep(Duration::from_secs(15)).await;
     }
 }

+ 2 - 2
lazer/sdk/rust/client/examples/symbols_stream.rs

@@ -1,4 +1,4 @@
-use std::time::Duration;
+use {futures::StreamExt, std::time::Duration};
 
 use pyth_lazer_client::history_client::{PythLazerHistoryClient, PythLazerHistoryClientConfig};
 use pyth_lazer_protocol::PriceFeedId;
@@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> {
     });
     let mut symbols_stream = client.all_symbols_metadata_stream().await?;
 
-    while let Some(symbols) = symbols_stream.recv().await {
+    while let Some(symbols) = symbols_stream.next().await {
         println!("symbols len: {}", symbols.len());
         println!(
             "symbol 1: {:?}",

+ 39 - 0
lazer/sdk/rust/client/src/arc_swap.rs

@@ -0,0 +1,39 @@
+use {
+    anyhow::Context as _,
+    arc_swap::ArcSwap,
+    futures::Stream,
+    futures_util::StreamExt as _,
+    std::sync::Arc,
+    tracing::{info, Instrument as _},
+};
+
+#[async_trait::async_trait]
+pub trait StreamIntoAutoUpdatedHandle: Stream + Unpin + Sized + 'static
+where
+    Self::Item: Send + Sync,
+{
+    /// Create an `ArcSwap` that provides access to the most recent value produced by the stream.
+    async fn into_auto_updated_handle(mut self) -> anyhow::Result<Arc<ArcSwap<Self::Item>>> {
+        let first_value = self
+            .next()
+            .await
+            .context("cannot create auto updated handle from empty stream")?;
+        let handle = Arc::new(ArcSwap::new(Arc::new(first_value)));
+        let weak_handle = Arc::downgrade(&handle);
+        tokio::spawn(
+            async move {
+                while let Some(value) = self.next().await {
+                    let Some(handle) = weak_handle.upgrade() else {
+                        info!("handle dropped, stopping auto handle update task");
+                        return;
+                    };
+                    handle.store(Arc::new(value));
+                }
+            }
+            .in_current_span(),
+        );
+        Ok(handle)
+    }
+}
+
+impl<T: Stream + Unpin + 'static> StreamIntoAutoUpdatedHandle for T where T::Item: Send + Sync {}

+ 332 - 193
lazer/sdk/rust/client/src/history_client.rs

@@ -1,22 +1,28 @@
-use std::{
-    collections::HashMap,
-    io::Write,
-    path::{Path, PathBuf},
-    sync::{Arc, Weak},
-    time::Duration,
+use {
+    anyhow::{bail, format_err, Context as _},
+    atomicwrites::replace_atomic,
+    backoff::{exponential::ExponentialBackoff, future::retry_notify, SystemClock},
+    futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt},
+    pyth_lazer_protocol::jrpc::SymbolMetadata,
+    pyth_lazer_publisher_sdk::state::State,
+    serde::{
+        de::{DeserializeOwned, Error as _},
+        ser::Error as _,
+        Deserialize, Serialize,
+    },
+    std::{
+        future::Future,
+        io::Write,
+        path::{Path, PathBuf},
+        sync::Arc,
+        time::Duration,
+    },
+    tokio::{sync::mpsc, time::sleep},
+    tokio_stream::wrappers::ReceiverStream,
+    tracing::{info, info_span, warn, Instrument},
+    url::Url,
 };
 
-use anyhow::{bail, Context as _};
-use arc_swap::ArcSwap;
-use atomicwrites::replace_atomic;
-use backoff::{exponential::ExponentialBackoff, future::retry_notify, SystemClock};
-use futures::{stream::FuturesUnordered, StreamExt};
-use pyth_lazer_protocol::{jrpc::SymbolMetadata, PriceFeedId};
-use serde::{de::DeserializeOwned, Deserialize, Serialize};
-use tokio::{sync::mpsc, time::sleep};
-use tracing::{info, warn};
-use url::Url;
-
 /// Configuration for the history client.
 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
 pub struct PythLazerHistoryClientConfig {
@@ -35,6 +41,10 @@ pub struct PythLazerHistoryClientConfig {
     /// Capacity of communication channels created by this client. It must be above zero.
     #[serde(default = "default_channel_capacity")]
     pub channel_capacity: usize,
+    /// Access token for publisher or governance restricted endpoints.
+    ///
+    /// Not needed for consumer facing endpoints.
+    pub access_token: Option<String>,
 }
 
 fn default_urls() -> Vec<Url> {
@@ -61,6 +71,7 @@ impl Default for PythLazerHistoryClientConfig {
             request_timeout: default_request_timeout(),
             cache_dir: None,
             channel_capacity: default_channel_capacity(),
+            access_token: None,
         }
     }
 }
@@ -72,6 +83,21 @@ pub struct PythLazerHistoryClient {
     client: reqwest::Client,
 }
 
+/// Specifies which parts of the state should be present in the output.
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
+pub struct GetStateParams {
+    #[serde(default)]
+    pub all: bool,
+    #[serde(default)]
+    pub publishers: bool,
+    #[serde(default)]
+    pub feeds: bool,
+    #[serde(default)]
+    pub governance_sources: bool,
+    #[serde(default)]
+    pub feature_flags: bool,
+}
+
 impl PythLazerHistoryClient {
     pub fn new(config: PythLazerHistoryClientConfig) -> Self {
         Self {
@@ -90,80 +116,78 @@ impl PythLazerHistoryClient {
             .map(|path| path.join("symbols_v1.json"))
     }
 
+    fn state_cache_file_path(&self, params: &GetStateParams) -> Option<PathBuf> {
+        let GetStateParams {
+            all,
+            publishers,
+            feeds,
+            governance_sources,
+            feature_flags,
+        } = params;
+
+        self.config.cache_dir.as_ref().map(|path| {
+            path.join(format!(
+                "state_{}{}{}{}{}_v1.json",
+                *all as u8,
+                *publishers as u8,
+                *feeds as u8,
+                *governance_sources as u8,
+                *feature_flags as u8,
+            ))
+        })
+    }
+
     /// Fetch current metadata for all symbols.
     pub async fn all_symbols_metadata(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
-        self.fetch_symbols_initial().await
+        self.fetch_from_all_urls_or_file(self.symbols_cache_file_path(), |url| {
+            self.request_symbols(url)
+        })
+        .instrument(info_span!("all_symbols_metadata"))
+        .await
     }
 
-    /// Fetch metadata for all symbols as an auto-updating handle.
+    /// Creates a fault-tolerant stream that requests the list of symbols and yields new items
+    /// when a change of value occurs.
     ///
     /// Returns an error if the initial fetch failed.
-    /// The returned `SymbolMetadataHandle` will be updated by a background task when the data changes.
-    pub async fn all_symbols_metadata_handle(&self) -> anyhow::Result<SymbolMetadataHandle> {
-        let symbols = Arc::new(
-            self.fetch_symbols_initial()
-                .await?
-                .into_iter()
-                .map(|f| (f.pyth_lazer_id, f))
-                .collect::<HashMap<_, _>>(),
-        );
-        let previous_symbols = symbols.clone();
-        let handle = Arc::new(ArcSwap::new(symbols));
-        let client = self.clone();
-        let weak_handle = Arc::downgrade(&handle);
-        tokio::spawn(async move {
-            client
-                .update_symbols_handle(weak_handle, previous_symbols)
-                .await;
-        });
-        Ok(SymbolMetadataHandle(handle))
-    }
-
-    /// Fetch metadata for all symbols as an auto-updating handle.
-    ///
-    /// The returned `SymbolMetadataHandle` will be updated by a background task when the data changes.
-    /// If the initial fetch failed, the handle will initially contain an empty hashmap.
-    pub async fn all_symbols_metadata_fault_tolerant_handle(&self) -> SymbolMetadataHandle {
-        let initial_result = self.fetch_symbols_initial().await;
-        let symbols = match initial_result {
-            Ok(data) => data
-                .into_iter()
-                .map(|f| (f.pyth_lazer_id, f))
-                .collect::<HashMap<_, _>>(),
-            Err(err) => {
-                warn!(
-                    ?err,
-                    "failed to fetch symbols, proceeding with empty symbol list"
-                );
-                HashMap::new()
-            }
-        };
-        let symbols = Arc::new(symbols);
-        let previous_symbols = symbols.clone();
-        let handle = Arc::new(ArcSwap::new(symbols));
-        let weak_handle = Arc::downgrade(&handle);
-        let client = self.clone();
-        tokio::spawn(async move {
-            client
-                .update_symbols_handle(weak_handle, previous_symbols)
-                .await;
-        });
-        SymbolMetadataHandle(handle)
+    /// On a successful return, the channel will always contain the initial data that can be fetched
+    /// immediately from the returned stream.
+    /// You should continuously poll the stream to receive updates.
+    pub async fn all_symbols_metadata_stream(
+        &self,
+    ) -> anyhow::Result<impl Stream<Item = Vec<SymbolMetadata>> + Unpin> {
+        self.stream(self.symbols_cache_file_path(), |client, url| {
+            Box::pin(client.request_symbols(url))
+        })
+        .instrument(info_span!("all_symbols_metadata_stream"))
+        .await
     }
 
-    /// Fetch metadata for all symbols as a receiver.
+    /// Creates a fault-tolerant stream that requests data using `f` and yields new items
+    /// when a change of value occurs.
     ///
     /// Returns an error if the initial fetch failed.
     /// On a successful return, the channel will always contain the initial data that can be fetched
-    /// immediately from the returned receiver.
-    /// You should continuously poll the receiver to receive updates.
-    pub async fn all_symbols_metadata_stream(
+    /// immediately from the returned stream.
+    /// You should continuously poll the stream to receive updates.
+    async fn stream<F, R>(
         &self,
-    ) -> anyhow::Result<mpsc::Receiver<Vec<SymbolMetadata>>> {
+        cache_file_path: Option<PathBuf>,
+        f: F,
+    ) -> anyhow::Result<impl Stream<Item = R> + Unpin>
+    where
+        for<'a> F: Fn(&'a Self, &'a Url) -> BoxFuture<'a, Result<R, backoff::Error<anyhow::Error>>>
+            + Send
+            + Sync
+            + 'static,
+        R: Clone + Serialize + DeserializeOwned + PartialEq + Send + Sync + 'static,
+    {
         if self.config.channel_capacity == 0 {
             bail!("channel_capacity cannot be 0");
         }
-        let symbols = self.fetch_symbols_initial().await?;
+        let symbols = self
+            .fetch_from_all_urls_or_file(cache_file_path.clone(), |url| f(self, url))
+            .await?;
         let (sender, receiver) = mpsc::channel(self.config.channel_capacity);
 
         let previous_symbols = symbols.clone();
@@ -172,118 +196,129 @@ impl PythLazerHistoryClient {
             .await
             .expect("send to new channel failed");
         let client = self.clone();
-        tokio::spawn(async move {
-            client.update_symbols_stream(sender, previous_symbols).await;
-        });
-        Ok(receiver)
-    }
-
-    async fn update_symbols_handle(
-        &self,
-        handle: Weak<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>,
-        mut previous_symbols: Arc<HashMap<PriceFeedId, SymbolMetadata>>,
-    ) {
-        info!("starting background task for updating symbols");
-        loop {
-            sleep(self.config.update_interval).await;
-            if handle.upgrade().is_none() {
-                info!("symbols handle dropped, stopping background task");
-                return;
-            }
-            match self.fetch_symbols().await {
-                Ok(new_symbols) => {
-                    let new_symbols = new_symbols
-                        .into_iter()
-                        .map(|f| (f.pyth_lazer_id, f))
-                        .collect::<HashMap<_, _>>();
-                    if *previous_symbols != new_symbols {
-                        let Some(handle) = handle.upgrade() else {
-                            info!("symbols handle dropped, stopping background task");
-                            return;
-                        };
-                        info!("symbols changed");
-                        if let Some(cache_file_path) = self.symbols_cache_file_path() {
-                            if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
-                                warn!(?err, ?cache_file_path, "failed to save data to cache file");
-                            }
-                        }
-                        let new_symbols = Arc::new(new_symbols);
-                        previous_symbols = new_symbols.clone();
-                        handle.store(new_symbols);
-                    }
-                }
-                Err(err) => {
-                    warn!(?err, "failed to fetch symbols");
-                }
+        tokio::spawn(
+            async move {
+                client
+                    .keep_stream_updated(cache_file_path, sender, previous_symbols, |url| {
+                        f(&client, url)
+                    })
+                    .await;
             }
-        }
+            .in_current_span(),
+        );
+        Ok(ReceiverStream::new(receiver))
     }
 
-    async fn update_symbols_stream(
-        &self,
-        handle: mpsc::Sender<Vec<SymbolMetadata>>,
-        mut previous_symbols: Vec<SymbolMetadata>,
-    ) {
-        info!("starting background task for updating symbols");
+    /// Requests new data using `f` repeatedly,
+    /// writes new data to the cache file and sends it using `sender`.
+    async fn keep_stream_updated<'a, F, Fut, R>(
+        &'a self,
+        cache_file_path: Option<PathBuf>,
+        sender: mpsc::Sender<R>,
+        mut previous_data: R,
+        f: F,
+    ) where
+        F: Fn(&'a Url) -> Fut,
+        Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
+        R: Serialize + DeserializeOwned + PartialEq + Clone,
+    {
+        info!("starting background task for updating data");
         loop {
             sleep(self.config.update_interval).await;
-            if handle.is_closed() {
-                info!("symbols channel closed, stopping background task");
+            if sender.is_closed() {
+                info!("data handle dropped, stopping background task");
                 return;
             }
-            match self.fetch_symbols().await {
-                Ok(new_symbols) => {
-                    if *previous_symbols != new_symbols {
-                        info!("symbols changed");
-                        if let Some(cache_file_path) = self.symbols_cache_file_path() {
-                            if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
+            match self.fetch_from_all_urls(true, &f).await {
+                Ok(new_data) => {
+                    if previous_data != new_data {
+                        info!("data changed");
+                        if let Some(cache_file_path) = &cache_file_path {
+                            if let Err(err) = atomic_save_file(cache_file_path, &new_data) {
                                 warn!(?err, ?cache_file_path, "failed to save data to cache file");
                             }
                         }
-                        previous_symbols = new_symbols.clone();
-                        if handle.send(new_symbols).await.is_err() {
-                            info!("symbols channel closed, stopping background task");
+
+                        previous_data = new_data.clone();
+                        if sender.send(new_data.clone()).await.is_err() {
+                            info!("update handle dropped, stopping background task");
                             return;
                         }
                     }
                 }
                 Err(err) => {
-                    warn!(?err, "failed to fetch symbols");
+                    warn!(?err, "failed to fetch data");
                 }
             }
         }
     }
 
-    async fn fetch_symbols_initial(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
-        let result = self.fetch_symbols().await;
+    /// Uses all configured URLs to perform request `f` and handles retrying on error.
+    /// Returns the value once any of the requests succeeds. If all requests fail,
+    /// tries to fetch the data from local cache. If loading from cache also fails,
+    /// it keeps retrying the requests until any of them succeeds.
+    async fn fetch_from_all_urls_or_file<'a, F, Fut, R>(
+        &'a self,
+        cache_file_path: Option<PathBuf>,
+        f: F,
+    ) -> anyhow::Result<R>
+    where
+        F: Fn(&'a Url) -> Fut,
+        Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
+        R: Serialize + DeserializeOwned,
+    {
+        let result = self.fetch_from_all_urls(true, &f).await;
         match result {
             Ok(data) => {
-                info!("fetched initial symbols from history service");
-                if let Some(cache_file_path) = self.symbols_cache_file_path() {
-                    if let Err(err) = atomic_save_file(&cache_file_path, &data) {
+                info!("fetched initial data from history service");
+                if let Some(cache_file_path) = cache_file_path {
+                    if let Err(err) = atomic_save_file::<R>(&cache_file_path, &data) {
                         warn!(?err, ?cache_file_path, "failed to save data to cache file");
                     }
                 }
-                Ok(data)
+                return Ok(data);
+            }
+            Err(err) => {
+                warn!(?err, "all requests failed");
+            }
+        }
+
+        if let Some(cache_file_path) = cache_file_path {
+            match load_file::<R>(&cache_file_path) {
+                Ok(Some(data)) => {
+                    info!(
+                        "failed to fetch initial data from history service, \
+                        but fetched last known data from cache"
+                    );
+                    return Ok(data);
+                }
+                Ok(None) => {
+                    info!("no data found in cache");
+                }
+                Err(err) => {
+                    warn!(?err, "failed to fetch data from cache");
+                }
             }
-            Err(err) => match self.symbols_cache_file_path() {
-                Some(cache_file_path) => match load_file::<Vec<SymbolMetadata>>(&cache_file_path) {
-                    Ok(Some(data)) => {
-                        info!(?err, "failed to fetch initial symbols from history service, but fetched last known symbols from cache");
-                        Ok(data)
-                    }
-                    Ok(None) => Err(err),
-                    Err(cache_err) => {
-                        warn!(?cache_err, "failed to fetch data from cache");
-                        Err(err)
-                    }
-                },
-                None => Err(err),
-            },
         }
+
+        self.fetch_from_all_urls(false, f).await
     }
 
-    async fn fetch_symbols(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
+    /// Uses all configured URLs to perform request `f` and handles retrying on error.
+    ///
+    /// Returns the value once any of the requests succeeds.
+    /// If `limit_by_update_interval` is true, the total time spent retrying it limited to
+    /// `self.config.update_interval`. If `limit_by_update_interval` is false, the requests
+    /// will be retried indefinitely.
+    async fn fetch_from_all_urls<'a, F, Fut, R>(
+        &'a self,
+        limit_by_update_interval: bool,
+        f: F,
+    ) -> anyhow::Result<R>
+    where
+        F: Fn(&'a Url) -> Fut,
+        Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
+    {
         if self.config.urls.is_empty() {
             bail!("no history urls provided");
         }
@@ -291,7 +326,9 @@ impl PythLazerHistoryClient {
             .config
             .urls
             .iter()
-            .map(|url| Box::pin(self.fetch_symbols_single(url)))
+            .map(|url| {
+                Box::pin(self.fetch_from_single_url_with_retry(limit_by_update_interval, || f(url)))
+            })
             .collect::<FuturesUnordered<_>>();
         while let Some(result) = futures.next().await {
             match result {
@@ -303,49 +340,151 @@ impl PythLazerHistoryClient {
         }
 
         bail!(
-            "failed to fetch symbols from any urls ({:?})",
+            "failed to fetch data from any urls ({:?})",
             self.config.urls
         );
     }
 
-    async fn fetch_symbols_single(&self, url: &Url) -> anyhow::Result<Vec<SymbolMetadata>> {
-        let url = url.join("v1/symbols")?;
-        retry_notify(
-            ExponentialBackoff::<SystemClock> {
-                // We will retry all requests after `update_interval`, so there is
-                // no reason to continue retrying here.
-                max_elapsed_time: Some(self.config.update_interval),
-                ..Default::default()
-            },
-            || async {
-                let response = self
-                    .client
-                    .get(url.clone())
-                    .send()
-                    .await
-                    .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?
-                    .backoff_error_for_status()?;
-                response
-                    .json::<Vec<SymbolMetadata>>()
-                    .await
-                    .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))
-            },
-            |e, _| warn!("failed to fetch symbols from {} (will retry): {:?}", url, e),
-        )
+    async fn fetch_from_single_url_with_retry<F, Fut, R>(
+        &self,
+        limit_by_update_interval: bool,
+        f: F,
+    ) -> anyhow::Result<R>
+    where
+        F: FnMut() -> Fut,
+        Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
+    {
+        let mut backoff = ExponentialBackoff::<SystemClock>::default();
+        if limit_by_update_interval {
+            backoff.max_elapsed_time = Some(self.config.update_interval);
+        }
+        retry_notify(backoff, f, |e, _| warn!(?e, "operation failed, will retry")).await
+    }
+
+    async fn request_symbols(
+        &self,
+        url: &Url,
+    ) -> Result<Vec<SymbolMetadata>, backoff::Error<anyhow::Error>> {
+        let url = url
+            .join("v1/symbols")
+            .map_err(|err| backoff::Error::permanent(anyhow::Error::from(err)))?;
+
+        let response = self
+            .client
+            .get(url.clone())
+            .send()
+            .await
+            .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?
+            .backoff_error_for_status()?;
+        let vec = response
+            .json::<Vec<SymbolMetadata>>()
+            .await
+            .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?;
+        Ok(vec)
+    }
+
+    /// Fetch a partial state snapshot containing data specified in `params`.
+    pub async fn state(&self, params: GetStateParams) -> anyhow::Result<State> {
+        self.fetch_from_all_urls_or_file(self.state_cache_file_path(&params), move |url| {
+            self.request_state(url, params.clone())
+        })
+        .instrument(info_span!("state"))
         .await
+        .map(|s| s.0)
+    }
+
+    /// Fetch a part of the current state specified by `params`.
+    ///
+    /// Creates a fault-tolerant stream that requests a partial state snapshot
+    /// containing data specified in `params`. It yields new items
+    /// when a change of value occurs.
+    ///
+    /// Returns an error if the initial fetch failed.
+    /// On a successful return, the stream will always contain the initial data that can be fetched
+    /// immediately from the returned stream.
+    /// You should continuously poll the stream to receive updates.
+    pub async fn state_stream(
+        &self,
+        params: GetStateParams,
+    ) -> anyhow::Result<impl Stream<Item = State> + Unpin> {
+        let stream = self
+            .stream(self.state_cache_file_path(&params), move |client, url| {
+                Box::pin(client.request_state(url, params.clone()))
+            })
+            .instrument(info_span!("state_stream"))
+            .await?;
+        Ok(stream.map(|s| s.0))
+    }
+
+    /// Fetch data from /v1/state endpoint without any timeouts or retries.
+    async fn request_state(
+        &self,
+        url: &Url,
+        params: GetStateParams,
+    ) -> Result<StateWithSerde, backoff::Error<anyhow::Error>> {
+        let url = url
+            .join("v1/state")
+            .map_err(|err| backoff::Error::permanent(anyhow::Error::from(err)))?;
+        let access_token = self.config.access_token.as_ref().ok_or_else(|| {
+            backoff::Error::permanent(format_err!("missing access_token in config"))
+        })?;
+        let response = self
+            .client
+            .get(url.clone())
+            .query(&params)
+            .bearer_auth(access_token)
+            .send()
+            .await
+            .map_err(|err| {
+                backoff::Error::transient(
+                    anyhow::Error::from(err).context(format!("failed to fetch state from {url}")),
+                )
+            })?
+            .backoff_error_for_status()?;
+        let bytes = response.bytes().await.map_err(|err| {
+            backoff::Error::transient(
+                anyhow::Error::from(err).context(format!("failed to fetch state from {url}")),
+            )
+        })?;
+        let json = String::from_utf8(bytes.into()).map_err(|err| {
+            backoff::Error::permanent(
+                anyhow::Error::from(err).context(format!("failed to parse state from {url}")),
+            )
+        })?;
+        let state = protobuf_json_mapping::parse_from_str::<State>(&json).map_err(|err| {
+            backoff::Error::permanent(
+                anyhow::Error::from(err).context(format!("failed to parse state from {url}")),
+            )
+        })?;
+        Ok(StateWithSerde(state))
     }
 }
 
-#[derive(Debug, Clone)]
-pub struct SymbolMetadataHandle(Arc<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>);
+// State wrapper that delegates serialization and deserialization to `protobuf_json_mapping`.
+#[derive(Debug, Clone, PartialEq)]
+struct StateWithSerde(State);
 
-impl SymbolMetadataHandle {
-    pub fn symbols(&self) -> arc_swap::Guard<Arc<HashMap<PriceFeedId, SymbolMetadata>>> {
-        self.0.load()
+impl Serialize for StateWithSerde {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        let json = protobuf_json_mapping::print_to_string(&self.0).map_err(S::Error::custom)?;
+        let json_value =
+            serde_json::from_str::<serde_json::Value>(&json).map_err(S::Error::custom)?;
+        json_value.serialize(serializer)
     }
+}
 
-    pub fn new_for_test(data: HashMap<PriceFeedId, SymbolMetadata>) -> Self {
-        Self(Arc::new(ArcSwap::new(Arc::new(data))))
+impl<'de> Deserialize<'de> for StateWithSerde {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        let json_value = serde_json::Value::deserialize(deserializer)?;
+        let json = serde_json::to_string(&json_value).map_err(D::Error::custom)?;
+        let value = protobuf_json_mapping::parse_from_str(&json).map_err(D::Error::custom)?;
+        Ok(Self(value))
     }
 }
 
@@ -378,7 +517,7 @@ fn load_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<Option<T>> {
     Ok(Some(data))
 }
 
-fn atomic_save_file(path: &Path, data: &impl Serialize) -> anyhow::Result<()> {
+fn atomic_save_file<T: Serialize>(path: &Path, data: &T) -> anyhow::Result<()> {
     let parent_path = path.parent().context("invalid file path: no parent")?;
     fs_err::create_dir_all(parent_path)?;
 

+ 1 - 0
lazer/sdk/rust/client/src/lib.rs

@@ -1,5 +1,6 @@
 const CHANNEL_CAPACITY: usize = 1000;
 
+pub mod arc_swap;
 pub mod backoff;
 pub mod history_client;
 pub mod resilient_ws_connection;