Преглед изворни кода

feat(lazer): improve protocol types and modules (#2934)

* refactor(lazer): add some Price tests and error types

* refactor(lazer): make Price operations more explicit

* refactor(lazer): move Price and Rate out from router module, update Rate
API and add tests

* refactor(lazer): restructure protocol modules

* fix(lazer): use non-inverted exponent values in Price and Rate

* test(lazer): add more Price and Rate tests
Pavel Strakhov пре 3 месеци
родитељ
комит
1101ad2d86

+ 13 - 5
Cargo.lock

@@ -701,6 +701,12 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "assert_float_eq"
+version = "1.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "10d2119f741b79fe9907f5396d19bffcb46568cfcc315e78677d731972ac7085"
+
 [[package]]
 name = "async-channel"
 version = "1.9.0"
@@ -5669,7 +5675,7 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-client"
-version = "2.0.1"
+version = "3.0.0"
 dependencies = [
  "alloy-primitives 0.8.25",
  "anyhow",
@@ -5682,7 +5688,7 @@ dependencies = [
  "futures-util",
  "hex",
  "libsecp256k1 0.7.2",
- "pyth-lazer-protocol 0.10.2",
+ "pyth-lazer-protocol 0.11.0",
  "serde",
  "serde_json",
  "tokio",
@@ -5715,10 +5721,11 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-protocol"
-version = "0.10.2"
+version = "0.11.0"
 dependencies = [
  "alloy-primitives 0.8.25",
  "anyhow",
+ "assert_float_eq",
  "bincode 1.3.3",
  "bs58",
  "byteorder",
@@ -5735,6 +5742,7 @@ dependencies = [
  "rust_decimal",
  "serde",
  "serde_json",
+ "thiserror 2.0.12",
 ]
 
 [[package]]
@@ -5753,13 +5761,13 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-publisher-sdk"
-version = "0.5.0"
+version = "0.6.0"
 dependencies = [
  "anyhow",
  "fs-err",
  "protobuf",
  "protobuf-codegen",
- "pyth-lazer-protocol 0.10.2",
+ "pyth-lazer-protocol 0.11.0",
  "serde_json",
 ]
 

+ 2 - 2
lazer/contracts/solana/programs/pyth-lazer-solana-contract/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "pyth-lazer-solana-contract"
-version = "0.5.0"
+version = "0.6.0"
 edition = "2021"
 description = "Pyth Lazer Solana contract and SDK."
 license = "Apache-2.0"
@@ -19,7 +19,7 @@ no-log-ix-name = []
 idl-build = ["anchor-lang/idl-build"]
 
 [dependencies]
-pyth-lazer-protocol = { path = "../../../../sdk/rust/protocol", version = "0.10.1" }
+pyth-lazer-protocol = { path = "../../../../sdk/rust/protocol", version = "0.11.0" }
 
 anchor-lang = "0.31.1"
 bytemuck = { version = "1.20.0", features = ["derive"] }

+ 2 - 2
lazer/publisher_sdk/rust/Cargo.toml

@@ -1,13 +1,13 @@
 [package]
 name = "pyth-lazer-publisher-sdk"
-version = "0.5.0"
+version = "0.6.0"
 edition = "2021"
 description = "Pyth Lazer Publisher SDK types."
 license = "Apache-2.0"
 repository = "https://github.com/pyth-network/pyth-crosschain"
 
 [dependencies]
-pyth-lazer-protocol = { version = "0.10.2", path = "../../sdk/rust/protocol" }
+pyth-lazer-protocol = { version = "0.11.0", path = "../../sdk/rust/protocol" }
 anyhow = "1.0.98"
 protobuf = "3.7.2"
 serde_json = "1.0.140"

+ 6 - 6
lazer/publisher_sdk/rust/src/lib.rs

@@ -3,8 +3,8 @@ use crate::publisher_update::{FeedUpdate, FundingRateUpdate, PriceUpdate};
 use crate::state::FeedState;
 use ::protobuf::MessageField;
 use pyth_lazer_protocol::jrpc::{FeedUpdateParams, UpdateParams};
-use pyth_lazer_protocol::symbol_state::SymbolState;
 use pyth_lazer_protocol::FeedKind;
+use pyth_lazer_protocol::SymbolState;
 
 pub mod transaction_envelope {
     pub use crate::protobuf::transaction_envelope::*;
@@ -56,9 +56,9 @@ impl From<UpdateParams> for Update {
                 best_bid_price,
                 best_ask_price,
             } => Update::PriceUpdate(PriceUpdate {
-                price: Some(price.0.into()),
-                best_bid_price: best_bid_price.map(|p| p.0.into()),
-                best_ask_price: best_ask_price.map(|p| p.0.into()),
+                price: Some(price.mantissa_i64()),
+                best_bid_price: best_bid_price.map(|p| p.mantissa_i64()),
+                best_ask_price: best_ask_price.map(|p| p.mantissa_i64()),
                 special_fields: Default::default(),
             }),
             UpdateParams::FundingRateUpdate {
@@ -66,8 +66,8 @@ impl From<UpdateParams> for Update {
                 rate,
                 funding_rate_interval,
             } => Update::FundingRateUpdate(FundingRateUpdate {
-                price: price.map(|p| p.0.into()),
-                rate: Some(rate.0),
+                price: price.map(|p| p.mantissa_i64()),
+                rate: Some(rate.mantissa()),
                 funding_rate_interval: MessageField::from_option(
                     funding_rate_interval.map(|i| i.into()),
                 ),

+ 2 - 2
lazer/sdk/rust/client/Cargo.toml

@@ -1,12 +1,12 @@
 [package]
 name = "pyth-lazer-client"
-version = "2.0.1"
+version = "3.0.0"
 edition = "2021"
 description = "A Rust client for Pyth Lazer"
 license = "Apache-2.0"
 
 [dependencies]
-pyth-lazer-protocol = { path = "../protocol", version = "0.10.2" }
+pyth-lazer-protocol = { path = "../protocol", version = "0.11.0" }
 tokio = { version = "1", features = ["full"] }
 tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
 futures-util = "0.3"

+ 7 - 6
lazer/sdk/rust/client/examples/subscribe_price_feeds.rs

@@ -4,15 +4,16 @@ use base64::Engine;
 use pyth_lazer_client::backoff::PythLazerExponentialBackoffBuilder;
 use pyth_lazer_client::client::PythLazerClientBuilder;
 use pyth_lazer_client::ws_connection::AnyResponse;
+use pyth_lazer_protocol::api::{
+    Channel, DeliveryFormat, Format, JsonBinaryEncoding, SubscriptionParams, SubscriptionParamsRepr,
+};
+use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId, WsResponse};
 use pyth_lazer_protocol::message::{
     EvmMessage, LeEcdsaMessage, LeUnsignedMessage, Message, SolanaMessage,
 };
 use pyth_lazer_protocol::payload::PayloadData;
-use pyth_lazer_protocol::router::{
-    Channel, DeliveryFormat, FixedRate, Format, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
-    SubscriptionParams, SubscriptionParamsRepr,
-};
-use pyth_lazer_protocol::subscription::{Response, SubscribeRequest, SubscriptionId};
+use pyth_lazer_protocol::time::FixedRate;
+use pyth_lazer_protocol::{PriceFeedId, PriceFeedProperty};
 use tokio::pin;
 use tracing::level_filters::LevelFilter;
 use tracing_subscriber::EnvFilter;
@@ -109,7 +110,7 @@ async fn main() -> anyhow::Result<()> {
         // The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them.
         match msg {
             AnyResponse::Json(msg) => match msg {
-                Response::StreamUpdated(update) => {
+                WsResponse::StreamUpdated(update) => {
                     println!("Received a JSON update for {:?}", update.subscription_id);
                     if let Some(evm_data) = update.payload.evm {
                         // Decode binary data

+ 1 - 1
lazer/sdk/rust/client/src/client.rs

@@ -51,7 +51,7 @@ use crate::{
 };
 use anyhow::{bail, Result};
 use backoff::ExponentialBackoff;
-use pyth_lazer_protocol::subscription::{SubscribeRequest, SubscriptionId};
+use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId};
 use tokio::sync::mpsc::{self, error::TrySendError};
 use tracing::{error, warn};
 use ttl_cache::TtlCache;

+ 11 - 11
lazer/sdk/rust/client/src/resilient_ws_connection.rs

@@ -2,9 +2,7 @@ use std::time::Duration;
 
 use backoff::{backoff::Backoff, ExponentialBackoff};
 use futures_util::StreamExt;
-use pyth_lazer_protocol::subscription::{
-    Request, SubscribeRequest, SubscriptionId, UnsubscribeRequest,
-};
+use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId, UnsubscribeRequest, WsRequest};
 use tokio::{pin, select, sync::mpsc, time::Instant};
 use tracing::{error, info, warn};
 use url::Url;
@@ -18,7 +16,7 @@ use anyhow::{bail, Context, Result};
 const BACKOFF_RESET_DURATION: Duration = Duration::from_secs(10);
 
 pub struct PythLazerResilientWSConnection {
-    request_sender: mpsc::Sender<Request>,
+    request_sender: mpsc::Sender<WsRequest>,
 }
 
 impl PythLazerResilientWSConnection {
@@ -53,7 +51,7 @@ impl PythLazerResilientWSConnection {
 
     pub async fn subscribe(&mut self, request: SubscribeRequest) -> Result<()> {
         self.request_sender
-            .send(Request::Subscribe(request))
+            .send(WsRequest::Subscribe(request))
             .await
             .context("Failed to send subscribe request")?;
         Ok(())
@@ -61,7 +59,9 @@ impl PythLazerResilientWSConnection {
 
     pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> {
         self.request_sender
-            .send(Request::Unsubscribe(UnsubscribeRequest { subscription_id }))
+            .send(WsRequest::Unsubscribe(UnsubscribeRequest {
+                subscription_id,
+            }))
             .await
             .context("Failed to send unsubscribe request")?;
         Ok(())
@@ -95,7 +95,7 @@ impl PythLazerResilientWSConnectionTask {
     pub async fn run(
         &mut self,
         response_sender: mpsc::Sender<AnyResponse>,
-        request_receiver: &mut mpsc::Receiver<Request>,
+        request_receiver: &mut mpsc::Receiver<WsRequest>,
     ) -> Result<()> {
         loop {
             let start_time = Instant::now();
@@ -128,7 +128,7 @@ impl PythLazerResilientWSConnectionTask {
     pub async fn start(
         &mut self,
         sender: mpsc::Sender<AnyResponse>,
-        request_receiver: &mut mpsc::Receiver<Request>,
+        request_receiver: &mut mpsc::Receiver<WsRequest>,
     ) -> Result<()> {
         let mut ws_connection =
             PythLazerWSConnection::new(self.endpoint.clone(), self.access_token.clone())?;
@@ -137,7 +137,7 @@ impl PythLazerResilientWSConnectionTask {
 
         for subscription in self.subscriptions.clone() {
             ws_connection
-                .send_request(Request::Subscribe(subscription))
+                .send_request(WsRequest::Subscribe(subscription))
                 .await?;
         }
         loop {
@@ -167,10 +167,10 @@ impl PythLazerResilientWSConnectionTask {
                 }
                 Some(request) = request_receiver.recv() => {
                    match request {
-                        Request::Subscribe(request) => {
+                        WsRequest::Subscribe(request) => {
                             self.subscribe(&mut ws_connection, request).await?;
                         }
-                        Request::Unsubscribe(request) => {
+                        WsRequest::Unsubscribe(request) => {
                             self.unsubscribe(&mut ws_connection, request).await?;
                         }
                    }

+ 7 - 7
lazer/sdk/rust/client/src/ws_connection.rs

@@ -4,8 +4,8 @@ use anyhow::Result;
 use derive_more::From;
 use futures_util::{SinkExt, StreamExt, TryStreamExt};
 use pyth_lazer_protocol::{
+    api::{ErrorResponse, SubscribeRequest, UnsubscribeRequest, WsRequest, WsResponse},
     binary_update::BinaryWsUpdate,
-    subscription::{ErrorResponse, Request, Response, SubscribeRequest, UnsubscribeRequest},
 };
 use tokio_tungstenite::{connect_async, tungstenite::Message};
 use url::Url;
@@ -32,7 +32,7 @@ pub struct PythLazerWSConnection {
 
 #[derive(Debug, Clone, PartialEq, Eq, Hash, From)]
 pub enum AnyResponse {
-    Json(Response),
+    Json(WsResponse),
     Binary(BinaryWsUpdate),
 }
 
@@ -84,13 +84,13 @@ impl PythLazerWSConnection {
                 .try_filter_map(|msg| async {
                     let r: Result<Option<AnyResponse>> = match msg {
                         Message::Text(text) => {
-                            Ok(Some(serde_json::from_str::<Response>(&text)?.into()))
+                            Ok(Some(serde_json::from_str::<WsResponse>(&text)?.into()))
                         }
                         Message::Binary(data) => {
                             Ok(Some(BinaryWsUpdate::deserialize_slice(&data)?.into()))
                         }
                         Message::Close(_) => Ok(Some(
-                            Response::Error(ErrorResponse {
+                            WsResponse::Error(ErrorResponse {
                                 error: "WebSocket connection closed".to_string(),
                             })
                             .into(),
@@ -103,7 +103,7 @@ impl PythLazerWSConnection {
         Ok(response_stream)
     }
 
-    pub async fn send_request(&mut self, request: Request) -> Result<()> {
+    pub async fn send_request(&mut self, request: WsRequest) -> Result<()> {
         if let Some(sender) = &mut self.ws_sender {
             let msg = serde_json::to_string(&request)?;
             sender.send(Message::Text(msg)).await?;
@@ -118,7 +118,7 @@ impl PythLazerWSConnection {
     /// # Arguments
     /// * `request` - A subscription request containing feed IDs and parameters
     pub async fn subscribe(&mut self, request: SubscribeRequest) -> Result<()> {
-        let request = Request::Subscribe(request);
+        let request = WsRequest::Subscribe(request);
         self.send_request(request).await
     }
 
@@ -127,7 +127,7 @@ impl PythLazerWSConnection {
     /// # Arguments
     /// * `subscription_id` - The ID of the subscription to cancel
     pub async fn unsubscribe(&mut self, request: UnsubscribeRequest) -> Result<()> {
-        let request = Request::Unsubscribe(request);
+        let request = WsRequest::Unsubscribe(request);
         self.send_request(request).await
     }
 

+ 3 - 1
lazer/sdk/rust/protocol/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "pyth-lazer-protocol"
-version = "0.10.2"
+version = "0.11.0"
 edition = "2021"
 description = "Pyth Lazer SDK - protocol types."
 license = "Apache-2.0"
@@ -20,6 +20,7 @@ mry = { version = "0.13.0", features = ["serde"], optional = true }
 chrono = "0.4.41"
 humantime = "2.2.0"
 hex = "0.4.3"
+thiserror = "2.0.12"
 
 [dev-dependencies]
 bincode = "1.3.3"
@@ -28,3 +29,4 @@ hex = "0.4.3"
 libsecp256k1 = "0.7.1"
 bs58 = "0.5.1"
 alloy-primitives = "0.8.19"
+assert_float_eq = "1.1.4"

+ 425 - 3
lazer/sdk/rust/protocol/src/api.rs

@@ -1,8 +1,16 @@
-use serde::{Deserialize, Serialize};
+use std::{
+    fmt::Display,
+    ops::{Deref, DerefMut},
+};
+
+use derive_more::derive::From;
+use itertools::Itertools as _;
+use serde::{de::Error, Deserialize, Serialize};
 
 use crate::{
-    router::{Channel, Format, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty},
-    time::TimestampUs,
+    payload::AggregatedPriceFeedData,
+    time::{DurationUs, FixedRate, TimestampUs},
+    ChannelId, Price, PriceFeedId, PriceFeedProperty, Rate,
 };
 
 #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
@@ -52,3 +60,417 @@ pub type PriceResponse = JsonUpdate;
 pub fn default_parsed() -> bool {
     true
 }
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum DeliveryFormat {
+    /// Deliver stream updates as JSON text messages.
+    #[default]
+    Json,
+    /// Deliver stream updates as binary messages.
+    Binary,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum Format {
+    Evm,
+    Solana,
+    LeEcdsa,
+    LeUnsigned,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum JsonBinaryEncoding {
+    #[default]
+    Base64,
+    Hex,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)]
+pub enum Channel {
+    FixedRate(FixedRate),
+}
+
+impl Serialize for Channel {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        match self {
+            Channel::FixedRate(fixed_rate) => {
+                if *fixed_rate == FixedRate::MIN {
+                    return serializer.serialize_str("real_time");
+                }
+                serializer.serialize_str(&format!(
+                    "fixed_rate@{}ms",
+                    fixed_rate.duration().as_millis()
+                ))
+            }
+        }
+    }
+}
+
+impl Display for Channel {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Channel::FixedRate(fixed_rate) => match *fixed_rate {
+                FixedRate::MIN => write!(f, "real_time"),
+                rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()),
+            },
+        }
+    }
+}
+
+impl Channel {
+    pub fn id(&self) -> ChannelId {
+        match self {
+            Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
+                1 => ChannelId::FIXED_RATE_1,
+                50 => ChannelId::FIXED_RATE_50,
+                200 => ChannelId::FIXED_RATE_200,
+                _ => panic!("unknown channel: {self:?}"),
+            },
+        }
+    }
+}
+
+#[test]
+fn id_supports_all_fixed_rates() {
+    for rate in FixedRate::ALL {
+        Channel::FixedRate(rate).id();
+    }
+}
+
+fn parse_channel(value: &str) -> Option<Channel> {
+    if value == "real_time" {
+        Some(Channel::FixedRate(FixedRate::MIN))
+    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
+        let ms_value = rest.strip_suffix("ms")?;
+        Some(Channel::FixedRate(FixedRate::from_millis(
+            ms_value.parse().ok()?,
+        )?))
+    } else {
+        None
+    }
+}
+
+impl<'de> Deserialize<'de> for Channel {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        let value = <String>::deserialize(deserializer)?;
+        parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SubscriptionParamsRepr {
+    pub price_feed_ids: Vec<PriceFeedId>,
+    pub properties: Vec<PriceFeedProperty>,
+    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
+    #[serde(alias = "chains")]
+    pub formats: Vec<Format>,
+    #[serde(default)]
+    pub delivery_format: DeliveryFormat,
+    #[serde(default)]
+    pub json_binary_encoding: JsonBinaryEncoding,
+    /// If `true`, the stream update will contain a `parsed` JSON field containing
+    /// all data of the update.
+    #[serde(default = "default_parsed")]
+    pub parsed: bool,
+    pub channel: Channel,
+    #[serde(default)]
+    pub ignore_invalid_feed_ids: bool,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SubscriptionParams(SubscriptionParamsRepr);
+
+impl<'de> Deserialize<'de> for SubscriptionParams {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
+        Self::new(value).map_err(Error::custom)
+    }
+}
+
+impl SubscriptionParams {
+    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
+        if value.price_feed_ids.is_empty() {
+            return Err("no price feed ids specified");
+        }
+        if !value.price_feed_ids.iter().all_unique() {
+            return Err("duplicate price feed ids specified");
+        }
+        if !value.formats.iter().all_unique() {
+            return Err("duplicate formats or chains specified");
+        }
+        if value.properties.is_empty() {
+            return Err("no properties specified");
+        }
+        if !value.properties.iter().all_unique() {
+            return Err("duplicate properties specified");
+        }
+        Ok(Self(value))
+    }
+}
+
+impl Deref for SubscriptionParams {
+    type Target = SubscriptionParamsRepr;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+impl DerefMut for SubscriptionParams {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.0
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct JsonBinaryData {
+    pub encoding: JsonBinaryEncoding,
+    pub data: String,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct JsonUpdate {
+    /// Present unless `parsed = false` is specified in subscription params.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub parsed: Option<ParsedPayload>,
+    /// Only present if `Evm` is present in `formats` in subscription params.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub evm: Option<JsonBinaryData>,
+    /// Only present if `Solana` is present in `formats` in subscription params.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub solana: Option<JsonBinaryData>,
+    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub le_ecdsa: Option<JsonBinaryData>,
+    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub le_unsigned: Option<JsonBinaryData>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ParsedPayload {
+    #[serde(with = "crate::serde_str::timestamp")]
+    pub timestamp_us: TimestampUs,
+    pub price_feeds: Vec<ParsedFeedPayload>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ParsedFeedPayload {
+    pub price_feed_id: PriceFeedId,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(with = "crate::serde_str::option_price")]
+    #[serde(default)]
+    pub price: Option<Price>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(with = "crate::serde_str::option_price")]
+    #[serde(default)]
+    pub best_bid_price: Option<Price>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(with = "crate::serde_str::option_price")]
+    #[serde(default)]
+    pub best_ask_price: Option<Price>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(default)]
+    pub publisher_count: Option<u16>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(default)]
+    pub exponent: Option<i16>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(default)]
+    pub confidence: Option<Price>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(default)]
+    pub funding_rate: Option<Rate>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(default)]
+    pub funding_timestamp: Option<TimestampUs>,
+    // More fields may be added later.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    #[serde(default)]
+    pub funding_rate_interval: Option<DurationUs>,
+}
+
+impl ParsedFeedPayload {
+    pub fn new(
+        price_feed_id: PriceFeedId,
+        exponent: Option<i16>,
+        data: &AggregatedPriceFeedData,
+        properties: &[PriceFeedProperty],
+    ) -> Self {
+        let mut output = Self {
+            price_feed_id,
+            price: None,
+            best_bid_price: None,
+            best_ask_price: None,
+            publisher_count: None,
+            exponent: None,
+            confidence: None,
+            funding_rate: None,
+            funding_timestamp: None,
+            funding_rate_interval: None,
+        };
+        for &property in properties {
+            match property {
+                PriceFeedProperty::Price => {
+                    output.price = data.price;
+                }
+                PriceFeedProperty::BestBidPrice => {
+                    output.best_bid_price = data.best_bid_price;
+                }
+                PriceFeedProperty::BestAskPrice => {
+                    output.best_ask_price = data.best_ask_price;
+                }
+                PriceFeedProperty::PublisherCount => {
+                    output.publisher_count = Some(data.publisher_count);
+                }
+                PriceFeedProperty::Exponent => {
+                    output.exponent = exponent;
+                }
+                PriceFeedProperty::Confidence => {
+                    output.confidence = data.confidence;
+                }
+                PriceFeedProperty::FundingRate => {
+                    output.funding_rate = data.funding_rate;
+                }
+                PriceFeedProperty::FundingTimestamp => {
+                    output.funding_timestamp = data.funding_timestamp;
+                }
+                PriceFeedProperty::FundingRateInterval => {
+                    output.funding_rate_interval = data.funding_rate_interval;
+                }
+            }
+        }
+        output
+    }
+
+    pub fn new_full(
+        price_feed_id: PriceFeedId,
+        exponent: Option<i16>,
+        data: &AggregatedPriceFeedData,
+    ) -> Self {
+        Self {
+            price_feed_id,
+            price: data.price,
+            best_bid_price: data.best_bid_price,
+            best_ask_price: data.best_ask_price,
+            publisher_count: Some(data.publisher_count),
+            exponent,
+            confidence: data.confidence,
+            funding_rate: data.funding_rate,
+            funding_timestamp: data.funding_timestamp,
+            funding_rate_interval: data.funding_rate_interval,
+        }
+    }
+}
+
+/// A request sent from the client to the server.
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(tag = "type")]
+#[serde(rename_all = "camelCase")]
+pub enum WsRequest {
+    Subscribe(SubscribeRequest),
+    Unsubscribe(UnsubscribeRequest),
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
+pub struct SubscriptionId(pub u64);
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SubscribeRequest {
+    pub subscription_id: SubscriptionId,
+    #[serde(flatten)]
+    pub params: SubscriptionParams,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct UnsubscribeRequest {
+    pub subscription_id: SubscriptionId,
+}
+
+/// A JSON response sent from the server to the client.
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, From)]
+#[serde(tag = "type")]
+#[serde(rename_all = "camelCase")]
+pub enum WsResponse {
+    Error(ErrorResponse),
+    Subscribed(SubscribedResponse),
+    SubscribedWithInvalidFeedIdsIgnored(SubscribedWithInvalidFeedIdsIgnoredResponse),
+    Unsubscribed(UnsubscribedResponse),
+    SubscriptionError(SubscriptionErrorResponse),
+    StreamUpdated(StreamUpdatedResponse),
+}
+
+/// Sent from the server after a successul subscription.
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SubscribedResponse {
+    pub subscription_id: SubscriptionId,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct InvalidFeedSubscriptionDetails {
+    pub unknown_ids: Vec<PriceFeedId>,
+    pub unsupported_channels: Vec<PriceFeedId>,
+    pub unstable: Vec<PriceFeedId>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SubscribedWithInvalidFeedIdsIgnoredResponse {
+    pub subscription_id: SubscriptionId,
+    pub subscribed_feed_ids: Vec<PriceFeedId>,
+    pub ignored_invalid_feed_ids: InvalidFeedSubscriptionDetails,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct UnsubscribedResponse {
+    pub subscription_id: SubscriptionId,
+}
+
+/// Sent from the server if the requested subscription or unsubscription request
+/// could not be fulfilled.
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SubscriptionErrorResponse {
+    pub subscription_id: SubscriptionId,
+    pub error: String,
+}
+
+/// Sent from the server if an internal error occured while serving data for an existing subscription,
+/// or a client request sent a bad request.
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ErrorResponse {
+    pub error: String,
+}
+
+/// Sent from the server when new data is available for an existing subscription
+/// (only if `delivery_format == Json`).
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct StreamUpdatedResponse {
+    pub subscription_id: SubscriptionId,
+    #[serde(flatten)]
+    pub payload: JsonUpdate,
+}

+ 1 - 1
lazer/sdk/rust/protocol/src/binary_update.rs

@@ -1,5 +1,5 @@
 use {
-    crate::{message::Message, subscription::SubscriptionId},
+    crate::{api::SubscriptionId, message::Message},
     anyhow::{bail, Context},
     byteorder::{WriteBytesExt, BE, LE},
 };

+ 3 - 1
lazer/sdk/rust/protocol/src/jrpc.rs

@@ -1,6 +1,8 @@
-use crate::router::{Channel, Price, PriceFeedId, Rate};
+use crate::rate::Rate;
 use crate::symbol_state::SymbolState;
 use crate::time::TimestampUs;
+use crate::PriceFeedId;
+use crate::{api::Channel, price::Price};
 use serde::{Deserialize, Serialize};
 use std::time::Duration;
 

+ 77 - 5
lazer/sdk/rust/protocol/src/lib.rs

@@ -1,21 +1,93 @@
-//! Protocol types.
+//! Lazer type definitions and utilities.
 
+/// Types describing Lazer HTTP and WebSocket APIs.
 pub mod api;
+/// Binary delivery format for WebSocket.
 pub mod binary_update;
 mod dynamic_value;
 mod feed_kind;
+/// Lazer Agent JSON-RPC API.
 pub mod jrpc;
+/// Types describing Lazer's verifiable messages containing signature and payload.
 pub mod message;
+/// Types describing Lazer's message payload.
 pub mod payload;
+mod price;
+/// Legacy Websocket API for publishers.
 pub mod publisher;
-pub mod router;
+mod rate;
 mod serde_price_as_i64;
 mod serde_str;
-pub mod subscription;
-pub mod symbol_state;
+mod symbol_state;
+/// Lazer's types for time representation.
 pub mod time;
 
-pub use crate::{dynamic_value::DynamicValue, feed_kind::FeedKind};
+use derive_more::derive::{From, Into};
+use serde::{Deserialize, Serialize};
+
+pub use crate::{
+    dynamic_value::DynamicValue,
+    feed_kind::FeedKind,
+    price::{Price, PriceError},
+    rate::{Rate, RateError},
+    symbol_state::SymbolState,
+};
+
+#[derive(
+    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
+)]
+pub struct PublisherId(pub u16);
+
+#[derive(
+    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
+)]
+pub struct PriceFeedId(pub u32);
+
+#[derive(
+    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
+)]
+pub struct ChannelId(pub u8);
+
+impl ChannelId {
+    pub const FIXED_RATE_1: ChannelId = ChannelId(1);
+    pub const FIXED_RATE_50: ChannelId = ChannelId(2);
+    pub const FIXED_RATE_200: ChannelId = ChannelId(3);
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum PriceFeedProperty {
+    Price,
+    BestBidPrice,
+    BestAskPrice,
+    PublisherCount,
+    Exponent,
+    Confidence,
+    FundingRate,
+    FundingTimestamp,
+    FundingRateInterval,
+    // More fields may be added later.
+}
+
+// Operation and coefficient for converting value to mantissa.
+enum ExponentFactor {
+    // mantissa = value * factor
+    Mul(i64),
+    // mantissa = value / factor
+    Div(i64),
+}
+
+impl ExponentFactor {
+    fn get(exponent: i16) -> Option<Self> {
+        if exponent >= 0 {
+            let exponent: u32 = exponent.try_into().ok()?;
+            Some(ExponentFactor::Div(10_i64.checked_pow(exponent)?))
+        } else {
+            let minus_exponent: u32 = exponent.checked_neg()?.try_into().ok()?;
+            Some(ExponentFactor::Mul(10_i64.checked_pow(minus_exponent)?))
+        }
+    }
+}
 
 #[test]
 fn magics_in_big_endian() {

+ 1 - 1
lazer/sdk/rust/protocol/src/message.rs

@@ -1,6 +1,6 @@
 use {
     self::format_magics_le::{EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC},
-    crate::router::ParsedPayload,
+    crate::api::ParsedPayload,
     anyhow::{bail, Context},
     byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
     derive_more::From,

+ 10 - 12
lazer/sdk/rust/protocol/src/payload.rs

@@ -1,12 +1,10 @@
-//! Types representing binary encoding of signable payloads and signature envelopes.
-
-use crate::time::DurationUs;
+use crate::{
+    price::Price,
+    rate::Rate,
+    time::{DurationUs, TimestampUs},
+    ChannelId, PriceFeedId, PriceFeedProperty,
+};
 use {
-    super::router::{PriceFeedId, PriceFeedProperty},
-    crate::{
-        router::{ChannelId, Price, Rate},
-        time::TimestampUs,
-    },
     anyhow::bail,
     byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
     serde::{Deserialize, Serialize},
@@ -231,12 +229,12 @@ fn write_option_price<BO: ByteOrder>(
     mut writer: impl Write,
     value: Option<Price>,
 ) -> std::io::Result<()> {
-    writer.write_i64::<BO>(value.map_or(0, |v| v.0.get()))
+    writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
 }
 
 fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
     let value = NonZeroI64::new(reader.read_i64::<BO>()?);
-    Ok(value.map(Price))
+    Ok(value.map(Price::from_nonzero_mantissa))
 }
 
 fn write_option_rate<BO: ByteOrder>(
@@ -246,7 +244,7 @@ fn write_option_rate<BO: ByteOrder>(
     match value {
         Some(value) => {
             writer.write_u8(1)?;
-            writer.write_i64::<BO>(value.0)
+            writer.write_i64::<BO>(value.mantissa())
         }
         None => {
             writer.write_u8(0)?;
@@ -258,7 +256,7 @@ fn write_option_rate<BO: ByteOrder>(
 fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
     let present = reader.read_u8()? != 0;
     if present {
-        Ok(Some(Rate(reader.read_i64::<BO>()?)))
+        Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
     } else {
         Ok(None)
     }

+ 155 - 0
lazer/sdk/rust/protocol/src/price.rs

@@ -0,0 +1,155 @@
+#[cfg(test)]
+mod tests;
+
+use {
+    crate::ExponentFactor,
+    rust_decimal::{prelude::FromPrimitive, Decimal},
+    serde::{Deserialize, Serialize},
+    std::num::NonZeroI64,
+    thiserror::Error,
+};
+
+#[derive(Debug, Error)]
+pub enum PriceError {
+    #[error("decimal parse error: {0}")]
+    DecimalParse(#[from] rust_decimal::Error),
+    #[error("price value is more precise than available exponent")]
+    TooPrecise,
+    #[error("zero price is unsupported")]
+    ZeroPriceUnsupported,
+    #[error("overflow")]
+    Overflow,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
+#[repr(transparent)]
+pub struct Price(NonZeroI64);
+
+impl Price {
+    pub fn from_integer(value: i64, exponent: i16) -> Result<Price, PriceError> {
+        let value = match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? {
+            ExponentFactor::Mul(coef) => value.checked_mul(coef).ok_or(PriceError::Overflow)?,
+            ExponentFactor::Div(coef) => value.checked_div(coef).ok_or(PriceError::Overflow)?,
+        };
+        let value = NonZeroI64::new(value).ok_or(PriceError::ZeroPriceUnsupported)?;
+        Ok(Self(value))
+    }
+
+    pub fn parse_str(value: &str, exponent: i16) -> Result<Price, PriceError> {
+        let value: Decimal = value.parse()?;
+        let value = match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? {
+            ExponentFactor::Mul(coef) => value
+                .checked_mul(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?)
+                .ok_or(PriceError::Overflow)?,
+            ExponentFactor::Div(coef) => value
+                .checked_div(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?)
+                .ok_or(PriceError::Overflow)?,
+        };
+        if !value.is_integer() {
+            return Err(PriceError::TooPrecise);
+        }
+        let value: i64 = value.try_into().map_err(|_| PriceError::Overflow)?;
+        let value = NonZeroI64::new(value).ok_or(PriceError::Overflow)?;
+        Ok(Self(value))
+    }
+
+    pub const fn from_nonzero_mantissa(mantissa: NonZeroI64) -> Self {
+        Self(mantissa)
+    }
+
+    pub const fn from_mantissa(mantissa: i64) -> Result<Self, PriceError> {
+        if let Some(value) = NonZeroI64::new(mantissa) {
+            Ok(Self(value))
+        } else {
+            Err(PriceError::ZeroPriceUnsupported)
+        }
+    }
+
+    pub fn mantissa(self) -> NonZeroI64 {
+        self.0
+    }
+
+    pub fn mantissa_i64(self) -> i64 {
+        self.0.get()
+    }
+
+    pub fn to_f64(self, exponent: i16) -> Result<f64, PriceError> {
+        match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? {
+            // Mul/div is reversed for this conversion
+            ExponentFactor::Mul(coef) => Ok(self.0.get() as f64 / coef as f64),
+            ExponentFactor::Div(coef) => Ok(self.0.get() as f64 * coef as f64),
+        }
+    }
+
+    pub fn from_f64(value: f64, exponent: i16) -> Result<Self, PriceError> {
+        let value = Decimal::from_f64(value).ok_or(PriceError::Overflow)?;
+        let value = match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? {
+            ExponentFactor::Mul(coef) => value
+                .checked_mul(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?)
+                .ok_or(PriceError::Overflow)?,
+            ExponentFactor::Div(coef) => value
+                .checked_div(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?)
+                .ok_or(PriceError::Overflow)?,
+        };
+        let value: i64 = value.try_into().map_err(|_| PriceError::Overflow)?;
+        Ok(Self(
+            NonZeroI64::new(value).ok_or(PriceError::ZeroPriceUnsupported)?,
+        ))
+    }
+
+    pub fn add_with_same_mantissa(self, other: Price) -> Result<Self, PriceError> {
+        let value = self
+            .0
+            .get()
+            .checked_add(other.0.get())
+            .ok_or(PriceError::Overflow)?;
+        Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported)
+    }
+
+    pub fn sub_with_same_mantissa(self, other: Price) -> Result<Self, PriceError> {
+        let value = self
+            .0
+            .get()
+            .checked_sub(other.0.get())
+            .ok_or(PriceError::Overflow)?;
+        Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported)
+    }
+
+    pub fn mul_integer(self, factor: i64) -> Result<Self, PriceError> {
+        let value = self
+            .0
+            .get()
+            .checked_mul(factor)
+            .ok_or(PriceError::Overflow)?;
+        Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported)
+    }
+
+    pub fn div_integer(self, factor: i64) -> Result<Self, PriceError> {
+        let value = self
+            .0
+            .get()
+            .checked_div(factor)
+            .ok_or(PriceError::Overflow)?;
+        Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported)
+    }
+
+    pub fn mul_decimal(self, mantissa: i64, rhs_exponent: i16) -> Result<Self, PriceError> {
+        let left_value = i128::from(self.0.get());
+        let right_value = i128::from(mantissa);
+
+        let value = left_value
+            .checked_mul(right_value)
+            .ok_or(PriceError::Overflow)?;
+
+        let value = match ExponentFactor::get(rhs_exponent).ok_or(PriceError::Overflow)? {
+            ExponentFactor::Mul(coef) => {
+                value.checked_div(coef.into()).ok_or(PriceError::Overflow)?
+            }
+            ExponentFactor::Div(coef) => {
+                value.checked_mul(coef.into()).ok_or(PriceError::Overflow)?
+            }
+        };
+        let value: i64 = value.try_into().map_err(|_| PriceError::Overflow)?;
+        Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported)
+    }
+}

+ 149 - 0
lazer/sdk/rust/protocol/src/price/tests.rs

@@ -0,0 +1,149 @@
+use {super::Price, assert_float_eq::assert_float_absolute_eq};
+
+#[test]
+fn price_constructs() {
+    let price = Price::parse_str("42.68", -8).unwrap();
+    assert_eq!(price.0.get(), 4_268_000_000);
+    assert_float_absolute_eq!(price.to_f64(-8).unwrap(), 42.68);
+
+    let price2 = Price::from_integer(2, -8).unwrap();
+    assert_eq!(price2.0.get(), 200_000_000);
+    assert_float_absolute_eq!(price2.to_f64(-8).unwrap(), 2.);
+
+    let price3 = Price::from_mantissa(123_456).unwrap();
+    assert_eq!(price3.0.get(), 123_456);
+    assert_float_absolute_eq!(price3.to_f64(-8).unwrap(), 0.001_234_56);
+
+    let price4 = Price::from_f64(42.68, -8).unwrap();
+    assert_eq!(price4.0.get(), 4_268_000_000);
+    assert_float_absolute_eq!(price4.to_f64(-8).unwrap(), 42.68);
+}
+
+#[test]
+fn price_constructs_with_negative_mantissa() {
+    let price = Price::parse_str("-42.68", -8).unwrap();
+    assert_eq!(price.0.get(), -4_268_000_000);
+    assert_float_absolute_eq!(price.to_f64(-8).unwrap(), -42.68);
+
+    let price2 = Price::from_integer(-2, -8).unwrap();
+    assert_eq!(price2.0.get(), -200_000_000);
+    assert_float_absolute_eq!(price2.to_f64(-8).unwrap(), -2.);
+
+    let price3 = Price::from_mantissa(-123_456).unwrap();
+    assert_eq!(price3.0.get(), -123_456);
+    assert_float_absolute_eq!(price3.to_f64(-8).unwrap(), -0.001_234_56);
+
+    let price4 = Price::from_f64(-42.68, -8).unwrap();
+    assert_eq!(price4.0.get(), -4_268_000_000);
+    assert_float_absolute_eq!(price4.to_f64(-8).unwrap(), -42.68);
+}
+
+#[test]
+fn price_constructs_with_zero_exponent() {
+    let price = Price::parse_str("42", 0).unwrap();
+    assert_eq!(price.0.get(), 42);
+    assert_float_absolute_eq!(price.to_f64(0).unwrap(), 42.);
+
+    let price2 = Price::from_integer(2, 0).unwrap();
+    assert_eq!(price2.0.get(), 2);
+    assert_float_absolute_eq!(price2.to_f64(0).unwrap(), 2.);
+
+    let price3 = Price::from_mantissa(123_456).unwrap();
+    assert_eq!(price3.0.get(), 123_456);
+    assert_float_absolute_eq!(price3.to_f64(0).unwrap(), 123_456.);
+
+    let price4 = Price::from_f64(42., 0).unwrap();
+    assert_eq!(price4.0.get(), 42);
+    assert_float_absolute_eq!(price4.to_f64(0).unwrap(), 42.);
+}
+
+#[test]
+fn price_constructs_with_positive_exponent() {
+    let price = Price::parse_str("42_680_000", 3).unwrap();
+    assert_eq!(price.0.get(), 42_680);
+    assert_float_absolute_eq!(price.to_f64(3).unwrap(), 42_680_000.);
+
+    let price2 = Price::from_integer(200_000, 3).unwrap();
+    assert_eq!(price2.0.get(), 200);
+    assert_float_absolute_eq!(price2.to_f64(3).unwrap(), 200_000.);
+
+    let price3 = Price::from_mantissa(123_456).unwrap();
+    assert_eq!(price3.0.get(), 123_456);
+    assert_float_absolute_eq!(price3.to_f64(3).unwrap(), 123_456_000.);
+
+    let price4 = Price::from_f64(42_680_000., 3).unwrap();
+    assert_eq!(price4.0.get(), 42_680);
+    assert_float_absolute_eq!(price4.to_f64(3).unwrap(), 42_680_000.);
+}
+
+#[test]
+fn price_rejects_zero_mantissa() {
+    Price::parse_str("0.0", -8).unwrap_err();
+    Price::from_integer(0, -8).unwrap_err();
+    Price::from_mantissa(0).unwrap_err();
+    Price::from_f64(-0.0, -8).unwrap_err();
+
+    Price::parse_str("0.0", 8).unwrap_err();
+    Price::from_integer(0, 8).unwrap_err();
+    Price::from_f64(-0.0, 8).unwrap_err();
+}
+
+#[test]
+fn price_rejects_too_precise() {
+    Price::parse_str("42.68", 0).unwrap_err();
+    Price::parse_str("42.68", -1).unwrap_err();
+    Price::parse_str("42.68", -2).unwrap();
+
+    Price::parse_str("42_680", 3).unwrap_err();
+    Price::parse_str("42_600", 3).unwrap_err();
+    Price::parse_str("42_000", 3).unwrap();
+}
+
+#[test]
+fn price_ops() {
+    let price1 = Price::parse_str("12.34", -8).unwrap();
+    let price2 = Price::parse_str("23.45", -8).unwrap();
+    assert_float_absolute_eq!(
+        price1
+            .add_with_same_mantissa(price2)
+            .unwrap()
+            .to_f64(-8)
+            .unwrap(),
+        12.34 + 23.45
+    );
+    assert_float_absolute_eq!(
+        price1
+            .sub_with_same_mantissa(price2)
+            .unwrap()
+            .to_f64(-8)
+            .unwrap(),
+        12.34 - 23.45
+    );
+    assert_float_absolute_eq!(
+        price1.mul_integer(2).unwrap().to_f64(-8).unwrap(),
+        12.34 * 2.
+    );
+    assert_float_absolute_eq!(
+        price1.div_integer(2).unwrap().to_f64(-8).unwrap(),
+        12.34 / 2.
+    );
+
+    assert_float_absolute_eq!(
+        price1.mul_decimal(3456, -2).unwrap().to_f64(-8).unwrap(),
+        12.34 * 34.56
+    );
+
+    let price2 = Price::parse_str("42_000", 3).unwrap();
+    assert_float_absolute_eq!(
+        price2.mul_integer(2).unwrap().to_f64(3).unwrap(),
+        42_000. * 2.
+    );
+    assert_float_absolute_eq!(
+        price2.div_integer(2).unwrap().to_f64(3).unwrap(),
+        42_000. / 2.
+    );
+    assert_float_absolute_eq!(
+        price2.mul_decimal(3456, -2).unwrap().to_f64(3).unwrap(),
+        (42_000_f64 * 34.56 / 1000.).floor() * 1000.
+    );
+}

+ 14 - 15
lazer/sdk/rust/protocol/src/publisher.rs

@@ -1,10 +1,5 @@
-//! WebSocket JSON protocol types for API the publisher provides to the router.
-//! Publisher data sourcing may also be implemented in the router process,
-//! eliminating WebSocket overhead.
-
 use {
-    super::router::{Price, PriceFeedId, Rate},
-    crate::time::TimestampUs,
+    crate::{price::Price, rate::Rate, time::TimestampUs, PriceFeedId},
     derive_more::derive::From,
     serde::{Deserialize, Serialize},
 };
@@ -104,9 +99,11 @@ fn price_feed_data_v1_serde() {
         price_feed_id: PriceFeedId(1),
         source_timestamp_us: TimestampUs::from_micros(2),
         publisher_timestamp_us: TimestampUs::from_micros(3),
-        price: Some(Price(4.try_into().unwrap())),
-        best_bid_price: Some(Price(5.try_into().unwrap())),
-        best_ask_price: Some(Price((2 * 256 + 6).try_into().unwrap())),
+        price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())),
+        best_bid_price: Some(Price::from_nonzero_mantissa(5.try_into().unwrap())),
+        best_ask_price: Some(Price::from_nonzero_mantissa(
+            (2 * 256 + 6).try_into().unwrap(),
+        )),
     };
     assert_eq!(
         bincode::deserialize::<PriceFeedDataV1>(&data).unwrap(),
@@ -126,7 +123,7 @@ fn price_feed_data_v1_serde() {
         price_feed_id: PriceFeedId(1),
         source_timestamp_us: TimestampUs::from_micros(2),
         publisher_timestamp_us: TimestampUs::from_micros(3),
-        price: Some(Price(4.try_into().unwrap())),
+        price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())),
         best_bid_price: None,
         best_ask_price: None,
     };
@@ -153,9 +150,11 @@ fn price_feed_data_v2_serde() {
         price_feed_id: PriceFeedId(1),
         source_timestamp_us: TimestampUs::from_micros(2),
         publisher_timestamp_us: TimestampUs::from_micros(3),
-        price: Some(Price(4.try_into().unwrap())),
-        best_bid_price: Some(Price(5.try_into().unwrap())),
-        best_ask_price: Some(Price((2 * 256 + 6).try_into().unwrap())),
+        price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())),
+        best_bid_price: Some(Price::from_nonzero_mantissa(5.try_into().unwrap())),
+        best_ask_price: Some(Price::from_nonzero_mantissa(
+            (2 * 256 + 6).try_into().unwrap(),
+        )),
         funding_rate: None,
     };
     assert_eq!(
@@ -177,10 +176,10 @@ fn price_feed_data_v2_serde() {
         price_feed_id: PriceFeedId(1),
         source_timestamp_us: TimestampUs::from_micros(2),
         publisher_timestamp_us: TimestampUs::from_micros(3),
-        price: Some(Price(4.try_into().unwrap())),
+        price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())),
         best_bid_price: None,
         best_ask_price: None,
-        funding_rate: Some(Rate(3 * 256 + 7)),
+        funding_rate: Some(Rate::from_mantissa(3 * 256 + 7)),
     };
     assert_eq!(
         bincode::deserialize::<PriceFeedDataV2>(&data2).unwrap(),

+ 80 - 0
lazer/sdk/rust/protocol/src/rate.rs

@@ -0,0 +1,80 @@
+#[cfg(test)]
+mod tests;
+
+use {
+    crate::ExponentFactor,
+    rust_decimal::{prelude::FromPrimitive, Decimal},
+    serde::{Deserialize, Serialize},
+    thiserror::Error,
+};
+
+#[derive(Debug, Error)]
+pub enum RateError {
+    #[error("decimal parse error: {0}")]
+    DecimalParse(#[from] rust_decimal::Error),
+    #[error("price value is more precise than available exponent")]
+    TooPrecise,
+    #[error("overflow")]
+    Overflow,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
+#[repr(transparent)]
+pub struct Rate(i64);
+
+impl Rate {
+    pub fn from_integer(value: i64, exponent: i16) -> Result<Self, RateError> {
+        let value = match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? {
+            ExponentFactor::Mul(coef) => value.checked_mul(coef).ok_or(RateError::Overflow)?,
+            ExponentFactor::Div(coef) => value.checked_div(coef).ok_or(RateError::Overflow)?,
+        };
+        Ok(Self(value))
+    }
+
+    pub fn parse_str(value: &str, exponent: i16) -> Result<Self, RateError> {
+        let value: Decimal = value.parse()?;
+        let value = match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? {
+            ExponentFactor::Mul(coef) => value
+                .checked_mul(Decimal::from_i64(coef).ok_or(RateError::Overflow)?)
+                .ok_or(RateError::Overflow)?,
+            ExponentFactor::Div(coef) => value
+                .checked_div(Decimal::from_i64(coef).ok_or(RateError::Overflow)?)
+                .ok_or(RateError::Overflow)?,
+        };
+        if !value.is_integer() {
+            return Err(RateError::TooPrecise);
+        }
+        let value: i64 = value.try_into().map_err(|_| RateError::Overflow)?;
+        Ok(Self(value))
+    }
+
+    pub const fn from_mantissa(mantissa: i64) -> Self {
+        Self(mantissa)
+    }
+
+    pub fn from_f64(value: f64, exponent: i16) -> Result<Self, RateError> {
+        let value = Decimal::from_f64(value).ok_or(RateError::Overflow)?;
+        let value = match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? {
+            ExponentFactor::Mul(coef) => value
+                .checked_mul(Decimal::from_i64(coef).ok_or(RateError::Overflow)?)
+                .ok_or(RateError::Overflow)?,
+            ExponentFactor::Div(coef) => value
+                .checked_div(Decimal::from_i64(coef).ok_or(RateError::Overflow)?)
+                .ok_or(RateError::Overflow)?,
+        };
+        let value: i64 = value.try_into().map_err(|_| RateError::Overflow)?;
+        Ok(Self(value))
+    }
+
+    pub fn mantissa(self) -> i64 {
+        self.0
+    }
+
+    pub fn to_f64(self, exponent: i16) -> Result<f64, RateError> {
+        match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? {
+            // Mul/div is reversed for this conversion
+            ExponentFactor::Mul(coef) => Ok(self.0 as f64 / coef as f64),
+            ExponentFactor::Div(coef) => Ok(self.0 as f64 * coef as f64),
+        }
+    }
+}

+ 107 - 0
lazer/sdk/rust/protocol/src/rate/tests.rs

@@ -0,0 +1,107 @@
+use {crate::rate::Rate, assert_float_eq::assert_float_absolute_eq};
+
+#[test]
+fn rate_constructs() {
+    let rate = Rate::parse_str("42.68", -8).unwrap();
+    assert_eq!(rate.0, 4_268_000_000);
+    assert_float_absolute_eq!(rate.to_f64(-8).unwrap(), 42.68);
+
+    let rate2 = Rate::from_integer(2, -8).unwrap();
+    assert_eq!(rate2.0, 200_000_000);
+    assert_float_absolute_eq!(rate2.to_f64(-8).unwrap(), 2.);
+
+    let rate3 = Rate::from_mantissa(123_456);
+    assert_eq!(rate3.0, 123_456);
+    assert_float_absolute_eq!(rate3.to_f64(-8).unwrap(), 0.001_234_56);
+
+    let rate4 = Rate::from_f64(42.68, -8).unwrap();
+    assert_eq!(rate4.0, 4_268_000_000);
+    assert_float_absolute_eq!(rate4.to_f64(-8).unwrap(), 42.68);
+}
+
+#[test]
+fn rate_constructs_with_negative_mantissa() {
+    let rate = Rate::parse_str("-42.68", -8).unwrap();
+    assert_eq!(rate.0, -4_268_000_000);
+    assert_float_absolute_eq!(rate.to_f64(-8).unwrap(), -42.68);
+
+    let rate2 = Rate::from_integer(-2, -8).unwrap();
+    assert_eq!(rate2.0, -200_000_000);
+    assert_float_absolute_eq!(rate2.to_f64(-8).unwrap(), -2.);
+
+    let rate3 = Rate::from_mantissa(-123_456);
+    assert_eq!(rate3.0, -123_456);
+    assert_float_absolute_eq!(rate3.to_f64(-8).unwrap(), -0.001_234_56);
+
+    let rate4 = Rate::from_f64(-42.68, -8).unwrap();
+    assert_eq!(rate4.0, -4_268_000_000);
+    assert_float_absolute_eq!(rate4.to_f64(-8).unwrap(), -42.68);
+}
+
+#[test]
+fn rate_constructs_with_zero_exponent() {
+    let rate = Rate::parse_str("42", 0).unwrap();
+    assert_eq!(rate.0, 42);
+    assert_float_absolute_eq!(rate.to_f64(0).unwrap(), 42.);
+
+    let rate2 = Rate::from_integer(2, 0).unwrap();
+    assert_eq!(rate2.0, 2);
+    assert_float_absolute_eq!(rate2.to_f64(0).unwrap(), 2.);
+
+    let rate3 = Rate::from_mantissa(123_456);
+    assert_eq!(rate3.0, 123_456);
+    assert_float_absolute_eq!(rate3.to_f64(0).unwrap(), 123_456.);
+
+    let rate4 = Rate::from_f64(42., 0).unwrap();
+    assert_eq!(rate4.0, 42);
+    assert_float_absolute_eq!(rate4.to_f64(0).unwrap(), 42.);
+}
+
+#[test]
+fn rate_constructs_with_zero_mantissa() {
+    let rate1 = Rate::parse_str("0.0", -8).unwrap();
+    assert_eq!(rate1.0, 0);
+    let rate2 = Rate::from_integer(0, -8).unwrap();
+    assert_eq!(rate2.0, 0);
+    let rate3 = Rate::from_mantissa(0);
+    assert_eq!(rate3.0, 0);
+    let rate4 = Rate::from_f64(-0.0, -8).unwrap();
+    assert_eq!(rate4.0, 0);
+
+    let rate1 = Rate::parse_str("0.0", 8).unwrap();
+    assert_eq!(rate1.0, 0);
+    let rate2 = Rate::from_integer(0, 8).unwrap();
+    assert_eq!(rate2.0, 0);
+    let rate4 = Rate::from_f64(-0.0, 8).unwrap();
+    assert_eq!(rate4.0, 0);
+}
+
+#[test]
+fn rate_constructs_with_positive_exponent() {
+    let rate = Rate::parse_str("42_680_000", 3).unwrap();
+    assert_eq!(rate.0, 42_680);
+    assert_float_absolute_eq!(rate.to_f64(3).unwrap(), 42_680_000.);
+
+    let rate2 = Rate::from_integer(200_000, 3).unwrap();
+    assert_eq!(rate2.0, 200);
+    assert_float_absolute_eq!(rate2.to_f64(3).unwrap(), 200_000.);
+
+    let rate3 = Rate::from_mantissa(123_456);
+    assert_eq!(rate3.0, 123_456);
+    assert_float_absolute_eq!(rate3.to_f64(3).unwrap(), 123_456_000.);
+
+    let rate4 = Rate::from_f64(42_680_000., 3).unwrap();
+    assert_eq!(rate4.0, 42_680);
+    assert_float_absolute_eq!(rate4.to_f64(3).unwrap(), 42_680_000.);
+}
+
+#[test]
+fn rate_rejects_too_precise() {
+    Rate::parse_str("42.68", 0).unwrap_err();
+    Rate::parse_str("42.68", -1).unwrap_err();
+    Rate::parse_str("42.68", -2).unwrap();
+
+    Rate::parse_str("42_680", 3).unwrap_err();
+    Rate::parse_str("42_600", 3).unwrap_err();
+    Rate::parse_str("42_000", 3).unwrap();
+}

+ 0 - 605
lazer/sdk/rust/protocol/src/router.rs

@@ -1,605 +0,0 @@
-//! WebSocket JSON protocol types for the API the router provides to consumers and publishers.
-
-use {
-    crate::{
-        payload::AggregatedPriceFeedData,
-        time::{DurationUs, TimestampUs},
-    },
-    anyhow::{bail, Context},
-    derive_more::derive::{From, Into},
-    itertools::Itertools,
-    protobuf::well_known_types::duration::Duration as ProtobufDuration,
-    rust_decimal::{prelude::FromPrimitive, Decimal},
-    serde::{de::Error, Deserialize, Serialize},
-    std::{
-        fmt::Display,
-        num::NonZeroI64,
-        ops::{Add, Deref, DerefMut, Div, Sub},
-    },
-};
-
-#[derive(
-    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
-)]
-pub struct PublisherId(pub u16);
-
-#[derive(
-    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
-)]
-pub struct PriceFeedId(pub u32);
-
-#[derive(
-    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
-)]
-pub struct ChannelId(pub u8);
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
-#[repr(transparent)]
-pub struct Rate(pub i64);
-
-impl Rate {
-    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
-        let value: Decimal = value.parse()?;
-        let coef = 10i64.checked_pow(exponent).context("overflow")?;
-        let coef = Decimal::from_i64(coef).context("overflow")?;
-        let value = value.checked_mul(coef).context("overflow")?;
-        if !value.is_integer() {
-            bail!("price value is more precise than available exponent");
-        }
-        let value: i64 = value.try_into().context("overflow")?;
-        Ok(Self(value))
-    }
-
-    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
-        let value = Decimal::from_f64(value).context("overflow")?;
-        let coef = 10i64.checked_pow(exponent).context("overflow")?;
-        let coef = Decimal::from_i64(coef).context("overflow")?;
-        let value = value.checked_mul(coef).context("overflow")?;
-        let value: i64 = value.try_into().context("overflow")?;
-        Ok(Self(value))
-    }
-
-    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
-        let coef = 10i64.checked_pow(exponent).context("overflow")?;
-        let value = value.checked_mul(coef).context("overflow")?;
-        Ok(Self(value))
-    }
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
-#[repr(transparent)]
-pub struct Price(pub NonZeroI64);
-
-impl Price {
-    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
-        let coef = 10i64.checked_pow(exponent).context("overflow")?;
-        let value = value.checked_mul(coef).context("overflow")?;
-        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
-        Ok(Self(value))
-    }
-
-    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
-        let value: Decimal = value.parse()?;
-        let coef = 10i64.checked_pow(exponent).context("overflow")?;
-        let coef = Decimal::from_i64(coef).context("overflow")?;
-        let value = value.checked_mul(coef).context("overflow")?;
-        if !value.is_integer() {
-            bail!("price value is more precise than available exponent");
-        }
-        let value: i64 = value.try_into().context("overflow")?;
-        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
-        Ok(Self(value))
-    }
-
-    pub fn new(value: i64) -> anyhow::Result<Self> {
-        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
-        Ok(Self(value))
-    }
-
-    pub fn into_inner(self) -> NonZeroI64 {
-        self.0
-    }
-
-    pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
-        Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
-    }
-
-    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
-        let value = (value * 10f64.powi(exponent as i32)) as i64;
-        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
-        Ok(Self(value))
-    }
-
-    pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
-        let left_value = i128::from(self.0.get());
-        let right_value = i128::from(rhs.0.get());
-
-        let value = left_value * right_value / 10i128.pow(rhs_exponent);
-        let value = value.try_into()?;
-        NonZeroI64::new(value)
-            .context("zero price is unsupported")
-            .map(Self)
-    }
-}
-
-impl Sub<i64> for Price {
-    type Output = Option<Price>;
-
-    fn sub(self, rhs: i64) -> Self::Output {
-        let value = self.0.get().saturating_sub(rhs);
-        NonZeroI64::new(value).map(Self)
-    }
-}
-
-impl Add<i64> for Price {
-    type Output = Option<Price>;
-
-    fn add(self, rhs: i64) -> Self::Output {
-        let value = self.0.get().saturating_add(rhs);
-        NonZeroI64::new(value).map(Self)
-    }
-}
-
-impl Add<Price> for Price {
-    type Output = Option<Price>;
-    fn add(self, rhs: Price) -> Self::Output {
-        let value = self.0.get().saturating_add(rhs.0.get());
-        NonZeroI64::new(value).map(Self)
-    }
-}
-
-impl Sub<Price> for Price {
-    type Output = Option<Price>;
-    fn sub(self, rhs: Price) -> Self::Output {
-        let value = self.0.get().saturating_sub(rhs.0.get());
-        NonZeroI64::new(value).map(Self)
-    }
-}
-
-impl Div<i64> for Price {
-    type Output = Option<Price>;
-    fn div(self, rhs: i64) -> Self::Output {
-        let value = self.0.get().saturating_div(rhs);
-        NonZeroI64::new(value).map(Self)
-    }
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub enum PriceFeedProperty {
-    Price,
-    BestBidPrice,
-    BestAskPrice,
-    PublisherCount,
-    Exponent,
-    Confidence,
-    FundingRate,
-    FundingTimestamp,
-    FundingRateInterval,
-    // More fields may be added later.
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub enum DeliveryFormat {
-    /// Deliver stream updates as JSON text messages.
-    #[default]
-    Json,
-    /// Deliver stream updates as binary messages.
-    Binary,
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub enum Format {
-    Evm,
-    Solana,
-    LeEcdsa,
-    LeUnsigned,
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub enum JsonBinaryEncoding {
-    #[default]
-    Base64,
-    Hex,
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)]
-pub enum Channel {
-    FixedRate(FixedRate),
-}
-
-impl Serialize for Channel {
-    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
-    where
-        S: serde::Serializer,
-    {
-        match self {
-            Channel::FixedRate(fixed_rate) => {
-                if *fixed_rate == FixedRate::MIN {
-                    return serializer.serialize_str("real_time");
-                }
-                serializer.serialize_str(&format!(
-                    "fixed_rate@{}ms",
-                    fixed_rate.duration().as_millis()
-                ))
-            }
-        }
-    }
-}
-
-pub mod channel_ids {
-    use super::ChannelId;
-
-    pub const FIXED_RATE_1: ChannelId = ChannelId(1);
-    pub const FIXED_RATE_50: ChannelId = ChannelId(2);
-    pub const FIXED_RATE_200: ChannelId = ChannelId(3);
-}
-
-impl Display for Channel {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        match self {
-            Channel::FixedRate(fixed_rate) => match *fixed_rate {
-                FixedRate::MIN => write!(f, "real_time"),
-                rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()),
-            },
-        }
-    }
-}
-
-impl Channel {
-    pub fn id(&self) -> ChannelId {
-        match self {
-            Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
-                1 => channel_ids::FIXED_RATE_1,
-                50 => channel_ids::FIXED_RATE_50,
-                200 => channel_ids::FIXED_RATE_200,
-                _ => panic!("unknown channel: {self:?}"),
-            },
-        }
-    }
-}
-
-#[test]
-fn id_supports_all_fixed_rates() {
-    for rate in FixedRate::ALL {
-        Channel::FixedRate(rate).id();
-    }
-}
-
-fn parse_channel(value: &str) -> Option<Channel> {
-    if value == "real_time" {
-        Some(Channel::FixedRate(FixedRate::MIN))
-    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
-        let ms_value = rest.strip_suffix("ms")?;
-        Some(Channel::FixedRate(FixedRate::from_millis(
-            ms_value.parse().ok()?,
-        )?))
-    } else {
-        None
-    }
-}
-
-impl<'de> Deserialize<'de> for Channel {
-    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-    where
-        D: serde::Deserializer<'de>,
-    {
-        let value = <String>::deserialize(deserializer)?;
-        parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
-    }
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
-pub struct FixedRate {
-    rate: DurationUs,
-}
-
-impl FixedRate {
-    pub const RATE_1_MS: Self = Self {
-        rate: DurationUs::from_millis_u32(1),
-    };
-    pub const RATE_50_MS: Self = Self {
-        rate: DurationUs::from_millis_u32(50),
-    };
-    pub const RATE_200_MS: Self = Self {
-        rate: DurationUs::from_millis_u32(200),
-    };
-
-    // Assumptions (tested below):
-    // - Values are sorted.
-    // - 1 second contains a whole number of each interval.
-    // - all intervals are divisable by the smallest interval.
-    pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS];
-    pub const MIN: Self = Self::ALL[0];
-
-    pub fn from_millis(millis: u32) -> Option<Self> {
-        Self::ALL
-            .into_iter()
-            .find(|v| v.rate.as_millis() == u64::from(millis))
-    }
-
-    pub fn duration(self) -> DurationUs {
-        self.rate
-    }
-}
-
-impl TryFrom<DurationUs> for FixedRate {
-    type Error = anyhow::Error;
-
-    fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
-        Self::ALL
-            .into_iter()
-            .find(|v| v.rate == value)
-            .with_context(|| format!("unsupported rate: {value:?}"))
-    }
-}
-
-impl TryFrom<&ProtobufDuration> for FixedRate {
-    type Error = anyhow::Error;
-
-    fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
-        let duration = DurationUs::try_from(value)?;
-        Self::try_from(duration)
-    }
-}
-
-impl TryFrom<ProtobufDuration> for FixedRate {
-    type Error = anyhow::Error;
-
-    fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
-        TryFrom::<&ProtobufDuration>::try_from(&duration)
-    }
-}
-
-impl From<FixedRate> for DurationUs {
-    fn from(value: FixedRate) -> Self {
-        value.rate
-    }
-}
-
-impl From<FixedRate> for ProtobufDuration {
-    fn from(value: FixedRate) -> Self {
-        value.rate.into()
-    }
-}
-
-#[test]
-fn fixed_rate_values() {
-    assert!(
-        FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
-        "values must be unique and sorted"
-    );
-    for value in FixedRate::ALL {
-        assert_eq!(
-            1_000_000 % value.duration().as_micros(),
-            0,
-            "1 s must contain whole number of intervals"
-        );
-        assert_eq!(
-            value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
-            0,
-            "the interval's borders must be a subset of the minimal interval's borders"
-        );
-    }
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct SubscriptionParamsRepr {
-    pub price_feed_ids: Vec<PriceFeedId>,
-    pub properties: Vec<PriceFeedProperty>,
-    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
-    #[serde(alias = "chains")]
-    pub formats: Vec<Format>,
-    #[serde(default)]
-    pub delivery_format: DeliveryFormat,
-    #[serde(default)]
-    pub json_binary_encoding: JsonBinaryEncoding,
-    /// If `true`, the stream update will contain a `parsed` JSON field containing
-    /// all data of the update.
-    #[serde(default = "default_parsed")]
-    pub parsed: bool,
-    pub channel: Channel,
-    #[serde(default)]
-    pub ignore_invalid_feed_ids: bool,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
-#[serde(rename_all = "camelCase")]
-pub struct SubscriptionParams(SubscriptionParamsRepr);
-
-impl<'de> Deserialize<'de> for SubscriptionParams {
-    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-    where
-        D: serde::Deserializer<'de>,
-    {
-        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
-        Self::new(value).map_err(Error::custom)
-    }
-}
-
-impl SubscriptionParams {
-    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
-        if value.price_feed_ids.is_empty() {
-            return Err("no price feed ids specified");
-        }
-        if !value.price_feed_ids.iter().all_unique() {
-            return Err("duplicate price feed ids specified");
-        }
-        if !value.formats.iter().all_unique() {
-            return Err("duplicate formats or chains specified");
-        }
-        if value.properties.is_empty() {
-            return Err("no properties specified");
-        }
-        if !value.properties.iter().all_unique() {
-            return Err("duplicate properties specified");
-        }
-        Ok(Self(value))
-    }
-}
-
-impl Deref for SubscriptionParams {
-    type Target = SubscriptionParamsRepr;
-
-    fn deref(&self) -> &Self::Target {
-        &self.0
-    }
-}
-impl DerefMut for SubscriptionParams {
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.0
-    }
-}
-
-pub fn default_parsed() -> bool {
-    true
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct JsonBinaryData {
-    pub encoding: JsonBinaryEncoding,
-    pub data: String,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct JsonUpdate {
-    /// Present unless `parsed = false` is specified in subscription params.
-    #[serde(skip_serializing_if = "Option::is_none")]
-    pub parsed: Option<ParsedPayload>,
-    /// Only present if `Evm` is present in `formats` in subscription params.
-    #[serde(skip_serializing_if = "Option::is_none")]
-    pub evm: Option<JsonBinaryData>,
-    /// Only present if `Solana` is present in `formats` in subscription params.
-    #[serde(skip_serializing_if = "Option::is_none")]
-    pub solana: Option<JsonBinaryData>,
-    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
-    #[serde(skip_serializing_if = "Option::is_none")]
-    pub le_ecdsa: Option<JsonBinaryData>,
-    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
-    #[serde(skip_serializing_if = "Option::is_none")]
-    pub le_unsigned: Option<JsonBinaryData>,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ParsedPayload {
-    #[serde(with = "crate::serde_str::timestamp")]
-    pub timestamp_us: TimestampUs,
-    pub price_feeds: Vec<ParsedFeedPayload>,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ParsedFeedPayload {
-    pub price_feed_id: PriceFeedId,
-    #[serde(skip_serializing_if = "Option::is_none")]
-    #[serde(with = "crate::serde_str::option_price")]
-    #[serde(default)]
-    pub price: Option<Price>,
-    #[serde(skip_serializing_if = "Option::is_none")]
-    #[serde(with = "crate::serde_str::option_price")]
-    #[serde(default)]
-    pub best_bid_price: Option<Price>,
-    #[serde(skip_serializing_if = "Option::is_none")]
-    #[serde(with = "crate::serde_str::option_price")]
-    #[serde(default)]
-    pub best_ask_price: Option<Price>,
-    #[serde(skip_serializing_if = "Option::is_none")]
-    #[serde(default)]
-    pub publisher_count: Option<u16>,
-    #[serde(skip_serializing_if = "Option::is_none")]
-    #[serde(default)]
-    pub exponent: Option<i16>,
-    #[serde(skip_serializing_if = "Option::is_none")]
-    #[serde(default)]
-    pub confidence: Option<Price>,
-    #[serde(skip_serializing_if = "Option::is_none")]
-    #[serde(default)]
-    pub funding_rate: Option<Rate>,
-    #[serde(skip_serializing_if = "Option::is_none")]
-    #[serde(default)]
-    pub funding_timestamp: Option<TimestampUs>,
-    // More fields may be added later.
-    #[serde(skip_serializing_if = "Option::is_none")]
-    #[serde(default)]
-    pub funding_rate_interval: Option<DurationUs>,
-}
-
-impl ParsedFeedPayload {
-    pub fn new(
-        price_feed_id: PriceFeedId,
-        exponent: Option<i16>,
-        data: &AggregatedPriceFeedData,
-        properties: &[PriceFeedProperty],
-    ) -> Self {
-        let mut output = Self {
-            price_feed_id,
-            price: None,
-            best_bid_price: None,
-            best_ask_price: None,
-            publisher_count: None,
-            exponent: None,
-            confidence: None,
-            funding_rate: None,
-            funding_timestamp: None,
-            funding_rate_interval: None,
-        };
-        for &property in properties {
-            match property {
-                PriceFeedProperty::Price => {
-                    output.price = data.price;
-                }
-                PriceFeedProperty::BestBidPrice => {
-                    output.best_bid_price = data.best_bid_price;
-                }
-                PriceFeedProperty::BestAskPrice => {
-                    output.best_ask_price = data.best_ask_price;
-                }
-                PriceFeedProperty::PublisherCount => {
-                    output.publisher_count = Some(data.publisher_count);
-                }
-                PriceFeedProperty::Exponent => {
-                    output.exponent = exponent;
-                }
-                PriceFeedProperty::Confidence => {
-                    output.confidence = data.confidence;
-                }
-                PriceFeedProperty::FundingRate => {
-                    output.funding_rate = data.funding_rate;
-                }
-                PriceFeedProperty::FundingTimestamp => {
-                    output.funding_timestamp = data.funding_timestamp;
-                }
-                PriceFeedProperty::FundingRateInterval => {
-                    output.funding_rate_interval = data.funding_rate_interval;
-                }
-            }
-        }
-        output
-    }
-
-    pub fn new_full(
-        price_feed_id: PriceFeedId,
-        exponent: Option<i16>,
-        data: &AggregatedPriceFeedData,
-    ) -> Self {
-        Self {
-            price_feed_id,
-            price: data.price,
-            best_bid_price: data.best_bid_price,
-            best_ask_price: data.best_ask_price,
-            publisher_count: Some(data.publisher_count),
-            exponent,
-            confidence: data.confidence,
-            funding_rate: data.funding_rate,
-            funding_timestamp: data.funding_timestamp,
-            funding_rate_interval: data.funding_rate_interval,
-        }
-    }
-}

+ 3 - 3
lazer/sdk/rust/protocol/src/serde_price_as_i64.rs

@@ -1,5 +1,5 @@
 use {
-    crate::router::Price,
+    crate::price::Price,
     serde::{Deserialize, Deserializer, Serialize, Serializer},
     std::num::NonZeroI64,
 };
@@ -9,7 +9,7 @@ where
     S: Serializer,
 {
     value
-        .map_or(0i64, |price| price.0.get())
+        .map_or(0i64, |price| price.mantissa_i64())
         .serialize(serializer)
 }
 
@@ -18,5 +18,5 @@ where
     D: Deserializer<'de>,
 {
     let value = i64::deserialize(deserializer)?;
-    Ok(NonZeroI64::new(value).map(Price))
+    Ok(NonZeroI64::new(value).map(Price::from_nonzero_mantissa))
 }

+ 3 - 3
lazer/sdk/rust/protocol/src/serde_str.rs

@@ -1,6 +1,6 @@
 pub mod option_price {
     use {
-        crate::router::Price,
+        crate::price::Price,
         serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer},
         std::num::NonZeroI64,
     };
@@ -10,7 +10,7 @@ pub mod option_price {
         S: Serializer,
     {
         value
-            .map(|price| price.0.get().to_string())
+            .map(|price| price.mantissa_i64().to_string())
             .serialize(serializer)
     }
 
@@ -22,7 +22,7 @@ pub mod option_price {
         if let Some(value) = value {
             let value: i64 = value.parse().map_err(D::Error::custom)?;
             let value = NonZeroI64::new(value).ok_or_else(|| D::Error::custom("zero price"))?;
-            Ok(Some(Price(value)))
+            Ok(Some(Price::from_nonzero_mantissa(value)))
         } else {
             Ok(None)
         }

+ 0 - 103
lazer/sdk/rust/protocol/src/subscription.rs

@@ -1,103 +0,0 @@
-//! Types descibing general WebSocket subscription/unsubscription JSON messages
-//! used across publishers, agents and routers.
-
-use {
-    crate::router::{JsonUpdate, PriceFeedId, SubscriptionParams},
-    derive_more::From,
-    serde::{Deserialize, Serialize},
-};
-
-/// A request sent from the client to the server.
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(tag = "type")]
-#[serde(rename_all = "camelCase")]
-pub enum Request {
-    Subscribe(SubscribeRequest),
-    Unsubscribe(UnsubscribeRequest),
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
-pub struct SubscriptionId(pub u64);
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct SubscribeRequest {
-    pub subscription_id: SubscriptionId,
-    #[serde(flatten)]
-    pub params: SubscriptionParams,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct UnsubscribeRequest {
-    pub subscription_id: SubscriptionId,
-}
-
-/// A JSON response sent from the server to the client.
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, From)]
-#[serde(tag = "type")]
-#[serde(rename_all = "camelCase")]
-pub enum Response {
-    Error(ErrorResponse),
-    Subscribed(SubscribedResponse),
-    SubscribedWithInvalidFeedIdsIgnored(SubscribedWithInvalidFeedIdsIgnoredResponse),
-    Unsubscribed(UnsubscribedResponse),
-    SubscriptionError(SubscriptionErrorResponse),
-    StreamUpdated(StreamUpdatedResponse),
-}
-
-/// Sent from the server after a successul subscription.
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct SubscribedResponse {
-    pub subscription_id: SubscriptionId,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct InvalidFeedSubscriptionDetails {
-    pub unknown_ids: Vec<PriceFeedId>,
-    pub unsupported_channels: Vec<PriceFeedId>,
-    pub unstable: Vec<PriceFeedId>,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct SubscribedWithInvalidFeedIdsIgnoredResponse {
-    pub subscription_id: SubscriptionId,
-    pub subscribed_feed_ids: Vec<PriceFeedId>,
-    pub ignored_invalid_feed_ids: InvalidFeedSubscriptionDetails,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct UnsubscribedResponse {
-    pub subscription_id: SubscriptionId,
-}
-
-/// Sent from the server if the requested subscription or unsubscription request
-/// could not be fulfilled.
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct SubscriptionErrorResponse {
-    pub subscription_id: SubscriptionId,
-    pub error: String,
-}
-
-/// Sent from the server if an internal error occured while serving data for an existing subscription,
-/// or a client request sent a bad request.
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ErrorResponse {
-    pub error: String,
-}
-
-/// Sent from the server when new data is available for an existing subscription
-/// (only if `delivery_format == Json`).
-#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct StreamUpdatedResponse {
-    pub subscription_id: SubscriptionId,
-    #[serde(flatten)]
-    pub payload: JsonUpdate,
-}

+ 94 - 0
lazer/sdk/rust/protocol/src/time.rs

@@ -486,3 +486,97 @@ pub mod duration_us_serde_humantime {
         value.into_inner().try_into().map_err(D::Error::custom)
     }
 }
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
+pub struct FixedRate {
+    rate: DurationUs,
+}
+
+impl FixedRate {
+    pub const RATE_1_MS: Self = Self {
+        rate: DurationUs::from_millis_u32(1),
+    };
+    pub const RATE_50_MS: Self = Self {
+        rate: DurationUs::from_millis_u32(50),
+    };
+    pub const RATE_200_MS: Self = Self {
+        rate: DurationUs::from_millis_u32(200),
+    };
+
+    // Assumptions (tested below):
+    // - Values are sorted.
+    // - 1 second contains a whole number of each interval.
+    // - all intervals are divisable by the smallest interval.
+    pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS];
+    pub const MIN: Self = Self::ALL[0];
+
+    pub fn from_millis(millis: u32) -> Option<Self> {
+        Self::ALL
+            .into_iter()
+            .find(|v| v.rate.as_millis() == u64::from(millis))
+    }
+
+    pub fn duration(self) -> DurationUs {
+        self.rate
+    }
+}
+
+impl TryFrom<DurationUs> for FixedRate {
+    type Error = anyhow::Error;
+
+    fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
+        Self::ALL
+            .into_iter()
+            .find(|v| v.rate == value)
+            .with_context(|| format!("unsupported rate: {value:?}"))
+    }
+}
+
+impl TryFrom<&ProtobufDuration> for FixedRate {
+    type Error = anyhow::Error;
+
+    fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
+        let duration = DurationUs::try_from(value)?;
+        Self::try_from(duration)
+    }
+}
+
+impl TryFrom<ProtobufDuration> for FixedRate {
+    type Error = anyhow::Error;
+
+    fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
+        TryFrom::<&ProtobufDuration>::try_from(&duration)
+    }
+}
+
+impl From<FixedRate> for DurationUs {
+    fn from(value: FixedRate) -> Self {
+        value.rate
+    }
+}
+
+impl From<FixedRate> for ProtobufDuration {
+    fn from(value: FixedRate) -> Self {
+        value.rate.into()
+    }
+}
+
+#[test]
+fn fixed_rate_values() {
+    assert!(
+        FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
+        "values must be unique and sorted"
+    );
+    for value in FixedRate::ALL {
+        assert_eq!(
+            1_000_000 % value.duration().as_micros(),
+            0,
+            "1 s must contain whole number of intervals"
+        );
+        assert_eq!(
+            value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
+            0,
+            "the interval's borders must be a subset of the minimal interval's borders"
+        );
+    }
+}