Преглед на файлове

feat(lazer): add new formats to protocol (#2414)

* feat(lazer): add new formats to protocol

* chore: remove json format

* chore(lazer): bump protocol version
Pavel Strakhov преди 8 месеца
родител
ревизия
89e2dcdba6

+ 3 - 2
lazer/Cargo.lock

@@ -3767,11 +3767,12 @@ dependencies = [
  "base64 0.22.1",
  "bincode",
  "bs58 0.5.1",
+ "derive_more",
  "ed25519-dalek 2.1.1",
  "futures-util",
  "hex",
  "libsecp256k1 0.7.1",
- "pyth-lazer-protocol 0.5.0",
+ "pyth-lazer-protocol 0.6.0",
  "serde",
  "serde_json",
  "tokio",
@@ -3798,7 +3799,7 @@ dependencies = [
 
 [[package]]
 name = "pyth-lazer-protocol"
-version = "0.5.1"
+version = "0.6.0"
 dependencies = [
  "alloy-primitives",
  "anyhow",

+ 2 - 1
lazer/sdk/rust/client/Cargo.toml

@@ -6,7 +6,7 @@ description = "A Rust client for Pyth Lazer"
 license = "Apache-2.0"
 
 [dependencies]
-pyth-lazer-protocol = "0.5.0"
+pyth-lazer-protocol = { path = "../protocol", version = "0.6.0" }
 tokio = { version = "1", features = ["full"] }
 tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
 futures-util = "0.3"
@@ -16,6 +16,7 @@ base64 = "0.22.1"
 anyhow = "1.0"
 tracing = "0.1"
 url = "2.4"
+derive_more = { version = "1.0.0", features = ["from"] }
 
 [dev-dependencies]
 bincode = "1.3.3"

+ 119 - 36
lazer/sdk/rust/client/examples/subscribe_price_feeds.rs

@@ -1,13 +1,16 @@
 use base64::Engine;
 use futures_util::StreamExt;
-use pyth_lazer_client::LazerClient;
-use pyth_lazer_protocol::message::{EvmMessage, SolanaMessage};
+use pyth_lazer_client::{AnyResponse, LazerClient};
+use pyth_lazer_protocol::message::{
+    EvmMessage, LeEcdsaMessage, LeUnsignedMessage, Message, SolanaMessage,
+};
 use pyth_lazer_protocol::payload::PayloadData;
 use pyth_lazer_protocol::router::{
-    Chain, Channel, DeliveryFormat, FixedRate, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
+    Channel, DeliveryFormat, FixedRate, Format, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
     SubscriptionParams, SubscriptionParamsRepr,
 };
 use pyth_lazer_protocol::subscription::{Request, Response, SubscribeRequest, SubscriptionId};
+use tokio::pin;
 
 fn get_lazer_access_token() -> String {
     // Place your access token in your env at LAZER_ACCESS_TOKEN or set it here
@@ -22,7 +25,8 @@ async fn main() -> anyhow::Result<()> {
         "wss://pyth-lazer.dourolabs.app/v1/stream",
         &get_lazer_access_token(),
     )?;
-    let mut stream = client.start().await?;
+    let stream = client.start().await?;
+    pin!(stream);
 
     let subscription_requests = vec![
         // Example subscription: Parsed JSON feed targeting Solana
@@ -36,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
                     PriceFeedProperty::BestAskPrice,
                     PriceFeedProperty::BestBidPrice,
                 ],
-                chains: vec![Chain::Solana],
+                formats: vec![Format::Solana],
                 delivery_format: DeliveryFormat::Json,
                 json_binary_encoding: JsonBinaryEncoding::Base64,
                 parsed: true,
@@ -57,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
                     PriceFeedProperty::BestAskPrice,
                     PriceFeedProperty::BestBidPrice,
                 ],
-                chains: vec![Chain::Evm, Chain::Solana],
+                formats: vec![Format::Evm, Format::Solana],
                 delivery_format: DeliveryFormat::Binary,
                 json_binary_encoding: JsonBinaryEncoding::Base64,
                 parsed: false,
@@ -80,41 +84,100 @@ async fn main() -> anyhow::Result<()> {
     while let Some(msg) = stream.next().await {
         // The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them.
         match msg? {
-            Response::StreamUpdated(update) => {
-                if let Some(evm_data) = update.payload.evm {
-                    // Decode binary data
-                    let binary_data =
-                        base64::engine::general_purpose::STANDARD.decode(&evm_data.data)?;
-                    let evm_message = EvmMessage::deserialize_slice(&binary_data)?;
-
-                    // Parse and verify the EVM message
-                    let payload = parse_and_verify_evm_message(&evm_message);
-                    println!("EVM payload: {payload:?}\n");
-                }
+            AnyResponse::Json(msg) => match msg {
+                Response::StreamUpdated(update) => {
+                    println!("Received a JSON update for {:?}", update.subscription_id);
+                    if let Some(evm_data) = update.payload.evm {
+                        // Decode binary data
+                        let binary_data =
+                            base64::engine::general_purpose::STANDARD.decode(&evm_data.data)?;
+                        let evm_message = EvmMessage::deserialize_slice(&binary_data)?;
+
+                        // Parse and verify the EVM message
+                        let payload = parse_and_verify_evm_message(&evm_message);
+                        println!("EVM payload: {payload:?}");
+                    }
 
-                if let Some(solana_data) = update.payload.solana {
-                    // Decode binary data
-                    let binary_data =
-                        base64::engine::general_purpose::STANDARD.decode(&solana_data.data)?;
-                    let solana_message = SolanaMessage::deserialize_slice(&binary_data)?;
+                    if let Some(solana_data) = update.payload.solana {
+                        // Decode binary data
+                        let binary_data =
+                            base64::engine::general_purpose::STANDARD.decode(&solana_data.data)?;
+                        let solana_message = SolanaMessage::deserialize_slice(&binary_data)?;
 
-                    // Parse and verify the Solana message
-                    let payload = parse_and_verify_solana_message(&solana_message);
-                    println!("Solana payload: {payload:?}\n");
-                }
+                        // Parse and verify the Solana message
+                        let payload = parse_and_verify_solana_message(&solana_message);
+                        println!("Solana payload: {payload:?}");
+                    }
 
-                if let Some(parsed) = update.payload.parsed {
-                    // Parsed payloads (`parsed: true`) are already decoded and ready to use
-                    for feed in parsed.price_feeds {
-                        println!(
-                            "Parsed payload: {:?}: {:?} at {:?}\n",
-                            feed.price_feed_id, feed, parsed.timestamp_us
-                        );
+                    if let Some(data) = update.payload.le_ecdsa {
+                        // Decode binary data
+                        let binary_data =
+                            base64::engine::general_purpose::STANDARD.decode(&data.data)?;
+                        let message = LeEcdsaMessage::deserialize_slice(&binary_data)?;
+
+                        // Parse and verify the message
+                        let payload = parse_and_verify_le_ecdsa_message(&message);
+                        println!("LeEcdsa payload: {payload:?}");
+                    }
+
+                    if let Some(data) = update.payload.le_unsigned {
+                        // Decode binary data
+                        let binary_data =
+                            base64::engine::general_purpose::STANDARD.decode(&data.data)?;
+                        let message = LeUnsignedMessage::deserialize_slice(&binary_data)?;
+
+                        // Parse the message
+                        let payload = PayloadData::deserialize_slice_le(&message.payload)?;
+                        println!("LE unsigned payload: {payload:?}");
+                    }
+
+                    if let Some(parsed) = update.payload.parsed {
+                        // Parsed payloads (`parsed: true`) are already decoded and ready to use
+                        for feed in parsed.price_feeds {
+                            println!(
+                                "Parsed payload: {:?}: {:?} at {:?}",
+                                feed.price_feed_id, feed, parsed.timestamp_us
+                            );
+                        }
+                    }
+                }
+                msg => println!("Received non-update message: {msg:?}"),
+            },
+            AnyResponse::Binary(msg) => {
+                println!("Received a binary update for {:?}", msg.subscription_id);
+                for message in msg.messages {
+                    match message {
+                        Message::Evm(message) => {
+                            // Parse and verify the EVM message
+                            let payload = parse_and_verify_evm_message(&message);
+                            println!("EVM payload: {payload:?}");
+                        }
+                        Message::Solana(message) => {
+                            // Parse and verify the Solana message
+                            let payload = parse_and_verify_solana_message(&message);
+                            println!("Solana payload: {payload:?}");
+                        }
+                        Message::LeEcdsa(message) => {
+                            let payload = parse_and_verify_le_ecdsa_message(&message);
+                            println!("LeEcdsa payload: {payload:?}");
+                        }
+                        Message::LeUnsigned(message) => {
+                            let payload = PayloadData::deserialize_slice_le(&message.payload)?;
+                            println!("LeUnsigned payload: {payload:?}");
+                        }
+                        Message::Json(message) => {
+                            for feed in message.price_feeds {
+                                println!(
+                                    "JSON payload: {:?}: {:?} at {:?}",
+                                    feed.price_feed_id, feed, message.timestamp_us
+                                );
+                            }
+                        }
                     }
                 }
             }
-            _ => println!("Received non-update message"),
         }
+        println!();
 
         count += 1;
         if count >= 50 {
@@ -122,7 +185,7 @@ async fn main() -> anyhow::Result<()> {
         }
     }
 
-    // Unsubscribe before exiting
+    // Unsubscribe example
     for sub_id in [SubscriptionId(1), SubscriptionId(2)] {
         client.unsubscribe(sub_id).await?;
         println!("Unsubscribed from {:?}", sub_id);
@@ -147,12 +210,32 @@ fn parse_and_verify_solana_message(solana_message: &SolanaMessage) -> anyhow::Re
 
 fn parse_and_verify_evm_message(evm_message: &EvmMessage) -> anyhow::Result<PayloadData> {
     // Recover pubkey from message
-    libsecp256k1::recover(
+    let public_key = libsecp256k1::recover(
         &libsecp256k1::Message::parse(&alloy_primitives::keccak256(&evm_message.payload)),
         &libsecp256k1::Signature::parse_standard(&evm_message.signature)?,
         &libsecp256k1::RecoveryId::parse(evm_message.recovery_id)?,
     )?;
+    println!(
+        "evm address recovered from signature: {:?}",
+        hex::encode(&alloy_primitives::keccak256(&public_key.serialize()[1..])[12..])
+    );
 
     let payload = PayloadData::deserialize_slice_be(&evm_message.payload)?;
     Ok(payload)
 }
+
+fn parse_and_verify_le_ecdsa_message(message: &LeEcdsaMessage) -> anyhow::Result<PayloadData> {
+    // Recover pubkey from message
+    let public_key = libsecp256k1::recover(
+        &libsecp256k1::Message::parse(&alloy_primitives::keccak256(&message.payload)),
+        &libsecp256k1::Signature::parse_standard(&message.signature)?,
+        &libsecp256k1::RecoveryId::parse(message.recovery_id)?,
+    )?;
+    println!(
+        "evm address recovered from signature: {:?}",
+        hex::encode(&alloy_primitives::keccak256(&public_key.serialize()[1..])[12..])
+    );
+
+    let payload = PayloadData::deserialize_slice_le(&message.payload)?;
+    Ok(payload)
+}

+ 34 - 17
lazer/sdk/rust/client/src/lib.rs

@@ -1,10 +1,13 @@
 use anyhow::Result;
-use futures_util::{SinkExt, StreamExt};
-use pyth_lazer_protocol::subscription::{
-    ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest,
+use derive_more::From;
+use futures_util::{SinkExt, StreamExt, TryStreamExt};
+use pyth_lazer_protocol::{
+    binary_update::BinaryWsUpdate,
+    subscription::{ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest},
 };
 use tokio_tungstenite::{connect_async, tungstenite::Message};
 use url::Url;
+
 /// A WebSocket client for consuming Pyth Lazer price feed updates
 ///
 /// This client provides a simple interface to:
@@ -25,6 +28,12 @@ pub struct LazerClient {
     >,
 }
 
+#[derive(Debug, Clone, PartialEq, Eq, Hash, From)]
+pub enum AnyResponse {
+    Json(Response),
+    Binary(BinaryWsUpdate),
+}
+
 impl LazerClient {
     /// Creates a new Lazer client instance
     ///
@@ -48,7 +57,7 @@ impl LazerClient {
     ///
     /// # Returns
     /// Returns a stream of responses from the server
-    pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<Response>>> {
+    pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<AnyResponse>>> {
         let url = self.endpoint.clone();
         let mut request =
             tokio_tungstenite::tungstenite::client::IntoClientRequest::into_client_request(url)?;
@@ -62,19 +71,27 @@ impl LazerClient {
         let (ws_sender, ws_receiver) = ws_stream.split();
 
         self.ws_sender = Some(ws_sender);
-        let response_stream = ws_receiver.map(|msg| -> Result<Response> {
-            let msg = msg?;
-            match msg {
-                Message::Text(text) => Ok(serde_json::from_str(&text)?),
-                Message::Binary(data) => Ok(Response::from_binary(&data)?),
-                Message::Close(_) => Ok(Response::Error(ErrorResponse {
-                    error: "WebSocket connection closed".to_string(),
-                })),
-                _ => Ok(Response::Error(ErrorResponse {
-                    error: "Unexpected message type".to_string(),
-                })),
-            }
-        });
+        let response_stream =
+            ws_receiver
+                .map_err(anyhow::Error::from)
+                .try_filter_map(|msg| async {
+                    let r: Result<Option<AnyResponse>> = match msg {
+                        Message::Text(text) => {
+                            Ok(Some(serde_json::from_str::<Response>(&text)?.into()))
+                        }
+                        Message::Binary(data) => {
+                            Ok(Some(BinaryWsUpdate::deserialize_slice(&data)?.into()))
+                        }
+                        Message::Close(_) => Ok(Some(
+                            Response::Error(ErrorResponse {
+                                error: "WebSocket connection closed".to_string(),
+                            })
+                            .into(),
+                        )),
+                        _ => Ok(None),
+                    };
+                    r
+                });
 
         Ok(response_stream)
     }

+ 1 - 1
lazer/sdk/rust/protocol/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "pyth-lazer-protocol"
-version = "0.5.1"
+version = "0.6.0"
 edition = "2021"
 description = "Pyth Lazer SDK - protocol types."
 license = "Apache-2.0"

+ 8 - 5
lazer/sdk/rust/protocol/examples/parse_and_verify.rs

@@ -1,9 +1,12 @@
 use {
     anyhow::bail,
-    byteorder::{ReadBytesExt, BE},
+    byteorder::{ReadBytesExt, LE},
     pyth_lazer_protocol::{
-        message::{EvmMessage, SolanaMessage},
-        payload::{PayloadData, EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC_BE},
+        message::{
+            format_magics_le::{EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC},
+            EvmMessage, SolanaMessage,
+        },
+        payload::PayloadData,
     },
     std::io::{stdin, BufRead, Cursor},
 };
@@ -12,8 +15,8 @@ fn main() -> anyhow::Result<()> {
     println!("Reading hex encoded payloads from stdin...");
     for line in stdin().lock().lines() {
         let message = hex::decode(line?.trim())?;
-        let magic = Cursor::new(&message).read_u32::<BE>()?;
-        if magic == SOLANA_FORMAT_MAGIC_BE {
+        let magic = Cursor::new(&message).read_u32::<LE>()?;
+        if magic == SOLANA_FORMAT_MAGIC {
             println!("this is a solana payload");
             let message = SolanaMessage::deserialize_slice(&message)?;
             println!(

+ 4 - 2
lazer/sdk/rust/protocol/src/api.rs

@@ -1,7 +1,7 @@
 use serde::{Deserialize, Serialize};
 
 use crate::router::{
-    Chain, Channel, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty,
+    Channel, Format, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty,
 };
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -9,7 +9,9 @@ use crate::router::{
 pub struct LatestPriceRequest {
     pub price_feed_ids: Vec<PriceFeedId>,
     pub properties: Vec<PriceFeedProperty>,
-    pub chains: Vec<Chain>,
+    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
+    #[serde(alias = "chains")]
+    pub formats: Vec<Format>,
     #[serde(default)]
     pub json_binary_encoding: JsonBinaryEncoding,
     /// If `true`, the stream update will contain a JSON object containing

+ 89 - 0
lazer/sdk/rust/protocol/src/binary_update.rs

@@ -0,0 +1,89 @@
+use {
+    crate::{message::Message, subscription::SubscriptionId},
+    anyhow::{bail, Context},
+    byteorder::{WriteBytesExt, BE, LE},
+};
+
+/// First bytes (LE) of a binary Websocket message. A binary message will
+/// contain one or multiple price updates, each with its encoding format magic.
+pub const BINARY_UPDATE_FORMAT_MAGIC: u32 = 461928307;
+
+/// Content of a Websocket update sent to the client when the binary delivery method
+/// is requested.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct BinaryWsUpdate {
+    pub subscription_id: SubscriptionId,
+    pub messages: Vec<Message>,
+}
+
+impl BinaryWsUpdate {
+    pub fn serialize(&self, buf: &mut Vec<u8>) -> anyhow::Result<()> {
+        buf.write_u32::<LE>(BINARY_UPDATE_FORMAT_MAGIC)?;
+        buf.write_u64::<BE>(self.subscription_id.0)?;
+
+        for message in &self.messages {
+            write_with_len_header(buf, |buf| message.serialize(buf))?;
+        }
+        Ok(())
+    }
+
+    pub fn deserialize_slice(data: &[u8]) -> anyhow::Result<Self> {
+        let mut pos = 0;
+        let magic = u32::from_le_bytes(
+            data.get(pos..pos + 4)
+                .context("data too short")?
+                .try_into()?,
+        );
+        pos += 4;
+
+        if magic != BINARY_UPDATE_FORMAT_MAGIC {
+            bail!("binary update format magic mismatch");
+        }
+
+        let subscription_id = SubscriptionId(u64::from_be_bytes(
+            data.get(pos..pos + 8)
+                .context("data too short")?
+                .try_into()?,
+        ));
+        pos += 8;
+
+        let mut messages = Vec::new();
+
+        while pos < data.len() {
+            let len: usize = u16::from_be_bytes(
+                data.get(pos..pos + 2)
+                    .context("data too short")?
+                    .try_into()?,
+            )
+            .into();
+            pos += 2;
+            let message_data = data.get(pos..pos + len).context("data too short")?;
+            pos += len;
+            messages.push(Message::deserialize_slice(message_data)?);
+        }
+
+        Ok(Self {
+            subscription_id,
+            messages,
+        })
+    }
+}
+
+/// Performs write operations specified by `f` and inserts the length header before them.
+/// The length is written as a BE u16.
+fn write_with_len_header(
+    out: &mut Vec<u8>,
+    f: impl FnOnce(&mut Vec<u8>) -> anyhow::Result<()>,
+) -> anyhow::Result<()> {
+    let len_index = out.len();
+    // Make space for size.
+    out.push(0);
+    out.push(0);
+    let data_start_index = out.len();
+    f(out)?;
+    let len = out.len() - data_start_index;
+    let len: u16 = len.try_into()?;
+    out[len_index..data_start_index].copy_from_slice(&len.to_be_bytes());
+
+    Ok(())
+}

+ 38 - 0
lazer/sdk/rust/protocol/src/lib.rs

@@ -1,6 +1,7 @@
 //! Protocol types.
 
 pub mod api;
+pub mod binary_update;
 pub mod message;
 pub mod payload;
 pub mod publisher;
@@ -8,3 +9,40 @@ pub mod router;
 mod serde_price_as_i64;
 mod serde_str;
 pub mod subscription;
+
+#[test]
+fn magics_in_big_endian() {
+    use crate::{
+        binary_update::BINARY_UPDATE_FORMAT_MAGIC,
+        message::format_magics_le::{
+            EVM_FORMAT_MAGIC, JSON_FORMAT_MAGIC, LE_ECDSA_FORMAT_MAGIC, LE_UNSIGNED_FORMAT_MAGIC,
+            SOLANA_FORMAT_MAGIC,
+        },
+        payload::PAYLOAD_FORMAT_MAGIC,
+    };
+
+    // The values listed in this test can be used when reading the magic headers in BE format
+    // (e.g. on EVM).
+
+    assert_eq!(u32::swap_bytes(BINARY_UPDATE_FORMAT_MAGIC), 1937213467);
+    assert_eq!(u32::swap_bytes(PAYLOAD_FORMAT_MAGIC), 1976813459);
+
+    assert_eq!(u32::swap_bytes(SOLANA_FORMAT_MAGIC), 3103857282);
+    assert_eq!(u32::swap_bytes(JSON_FORMAT_MAGIC), 2584795844);
+    assert_eq!(u32::swap_bytes(EVM_FORMAT_MAGIC), 706910618);
+    assert_eq!(u32::swap_bytes(LE_ECDSA_FORMAT_MAGIC), 3837609805);
+    assert_eq!(u32::swap_bytes(LE_UNSIGNED_FORMAT_MAGIC), 206398297);
+
+    for magic in [
+        BINARY_UPDATE_FORMAT_MAGIC,
+        PAYLOAD_FORMAT_MAGIC,
+        SOLANA_FORMAT_MAGIC,
+        JSON_FORMAT_MAGIC,
+        EVM_FORMAT_MAGIC,
+        LE_ECDSA_FORMAT_MAGIC,
+        LE_UNSIGNED_FORMAT_MAGIC,
+    ] {
+        // Required to distinguish between byte orders.
+        assert!(u32::swap_bytes(magic) != magic);
+    }
+}

+ 162 - 7
lazer/sdk/rust/protocol/src/message.rs

@@ -1,10 +1,72 @@
 use {
-    crate::payload::{EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC_LE},
-    anyhow::bail,
-    byteorder::{ReadBytesExt, WriteBytesExt, BE, LE},
+    self::format_magics_le::{EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC},
+    crate::router::ParsedPayload,
+    anyhow::{bail, Context},
+    byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
+    derive_more::From,
+    format_magics_le::{JSON_FORMAT_MAGIC, LE_ECDSA_FORMAT_MAGIC, LE_UNSIGNED_FORMAT_MAGIC},
     std::io::{Cursor, Read, Write},
 };
 
+/// Constants containing first bytes (LE) of a price update.
+pub mod format_magics_le {
+    /// First bytes (LE) of a JSON-encoded price update (JSON structure is represented by
+    /// `router::ParsedPayload` type).
+    ///
+    /// Note: this header will only be present if the binary delivery method is requested
+    /// in a Websocket subscription. If the default (JSON) delivery method is used,
+    /// the price update JSON will simply be embedded in the main JSON of the notification.
+    pub const JSON_FORMAT_MAGIC: u32 = 3302625434;
+    /// First bytes (LE) of an EVM-targeted price update (BE-encoded payload with an ECDSA signature).
+    pub const EVM_FORMAT_MAGIC: u32 = 2593727018;
+    /// First bytes (LE) of a Solana-targeted price update with a native Solana signature
+    /// (LE-encoded payload with a Ed25519 signature).
+    pub const SOLANA_FORMAT_MAGIC: u32 = 2182742457;
+    /// First bytes (LE) of a price update with LE-encoded payload and an ECDSA signature
+    /// (suitable for Solana).
+    pub const LE_ECDSA_FORMAT_MAGIC: u32 = 1296547300;
+    /// First bytes (LE) of a price update with LE-encoded payload without a signature
+    /// (suitable for off-chain usage).
+    pub const LE_UNSIGNED_FORMAT_MAGIC: u32 = 1499680012;
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, From)]
+pub enum Message {
+    Evm(EvmMessage),
+    Solana(SolanaMessage),
+    LeEcdsa(LeEcdsaMessage),
+    LeUnsigned(LeUnsignedMessage),
+    Json(ParsedPayload),
+}
+
+impl Message {
+    pub fn serialize(&self, mut writer: impl Write) -> anyhow::Result<()> {
+        match self {
+            Message::Evm(message) => message.serialize(writer),
+            Message::Solana(message) => message.serialize(writer),
+            Message::LeEcdsa(message) => message.serialize(writer),
+            Message::LeUnsigned(message) => message.serialize(writer),
+            Message::Json(message) => {
+                writer.write_u32::<LE>(JSON_FORMAT_MAGIC)?;
+                serde_json::to_writer(writer, message)?;
+                Ok(())
+            }
+        }
+    }
+
+    pub fn deserialize_slice(data: &[u8]) -> anyhow::Result<Self> {
+        let magic = LE::read_u32(data.get(0..4).context("data too short")?);
+        match magic {
+            JSON_FORMAT_MAGIC => Ok(serde_json::from_slice::<ParsedPayload>(&data[4..])?.into()),
+            EVM_FORMAT_MAGIC => Ok(EvmMessage::deserialize_slice(data)?.into()),
+            SOLANA_FORMAT_MAGIC => Ok(SolanaMessage::deserialize_slice(data)?.into()),
+            LE_ECDSA_FORMAT_MAGIC => Ok(LeEcdsaMessage::deserialize_slice(data)?.into()),
+            LE_UNSIGNED_FORMAT_MAGIC => Ok(LeUnsignedMessage::deserialize_slice(data)?.into()),
+            _ => bail!("unrecognized format magic"),
+        }
+    }
+}
+
 /// EVM signature enveope.
 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct EvmMessage {
@@ -15,7 +77,7 @@ pub struct EvmMessage {
 
 impl EvmMessage {
     pub fn serialize(&self, mut writer: impl Write) -> anyhow::Result<()> {
-        writer.write_u32::<BE>(EVM_FORMAT_MAGIC)?;
+        writer.write_u32::<LE>(EVM_FORMAT_MAGIC)?;
         writer.write_all(&self.signature)?;
         writer.write_u8(self.recovery_id)?;
         writer.write_u16::<BE>(self.payload.len().try_into()?)?;
@@ -28,7 +90,7 @@ impl EvmMessage {
     }
 
     pub fn deserialize(mut reader: impl Read) -> anyhow::Result<Self> {
-        let magic = reader.read_u32::<BE>()?;
+        let magic = reader.read_u32::<LE>()?;
         if magic != EVM_FORMAT_MAGIC {
             bail!("magic mismatch");
         }
@@ -56,7 +118,7 @@ pub struct SolanaMessage {
 
 impl SolanaMessage {
     pub fn serialize(&self, mut writer: impl Write) -> anyhow::Result<()> {
-        writer.write_u32::<LE>(SOLANA_FORMAT_MAGIC_LE)?;
+        writer.write_u32::<LE>(SOLANA_FORMAT_MAGIC)?;
         writer.write_all(&self.signature)?;
         writer.write_all(&self.public_key)?;
         writer.write_u16::<LE>(self.payload.len().try_into()?)?;
@@ -70,7 +132,7 @@ impl SolanaMessage {
 
     pub fn deserialize(mut reader: impl Read) -> anyhow::Result<Self> {
         let magic = reader.read_u32::<LE>()?;
-        if magic != SOLANA_FORMAT_MAGIC_LE {
+        if magic != SOLANA_FORMAT_MAGIC {
             bail!("magic mismatch");
         }
         let mut signature = [0u8; 64];
@@ -88,6 +150,77 @@ impl SolanaMessage {
     }
 }
 
+/// LE-ECDSA format enveope.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct LeEcdsaMessage {
+    pub payload: Vec<u8>,
+    pub signature: [u8; 64],
+    pub recovery_id: u8,
+}
+
+impl LeEcdsaMessage {
+    pub fn serialize(&self, mut writer: impl Write) -> anyhow::Result<()> {
+        writer.write_u32::<LE>(LE_ECDSA_FORMAT_MAGIC)?;
+        writer.write_all(&self.signature)?;
+        writer.write_u8(self.recovery_id)?;
+        writer.write_u16::<LE>(self.payload.len().try_into()?)?;
+        writer.write_all(&self.payload)?;
+        Ok(())
+    }
+
+    pub fn deserialize_slice(data: &[u8]) -> anyhow::Result<Self> {
+        Self::deserialize(Cursor::new(data))
+    }
+
+    pub fn deserialize(mut reader: impl Read) -> anyhow::Result<Self> {
+        let magic = reader.read_u32::<LE>()?;
+        if magic != LE_ECDSA_FORMAT_MAGIC {
+            bail!("magic mismatch");
+        }
+        let mut signature = [0u8; 64];
+        reader.read_exact(&mut signature)?;
+        let recovery_id = reader.read_u8()?;
+        let payload_len: usize = reader.read_u16::<LE>()?.into();
+        let mut payload = vec![0u8; payload_len];
+        reader.read_exact(&mut payload)?;
+        Ok(Self {
+            payload,
+            signature,
+            recovery_id,
+        })
+    }
+}
+
+/// LE-Unsigned format enveope.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct LeUnsignedMessage {
+    pub payload: Vec<u8>,
+}
+
+impl LeUnsignedMessage {
+    pub fn serialize(&self, mut writer: impl Write) -> anyhow::Result<()> {
+        writer.write_u32::<LE>(LE_UNSIGNED_FORMAT_MAGIC)?;
+        writer.write_u16::<LE>(self.payload.len().try_into()?)?;
+        writer.write_all(&self.payload)?;
+        Ok(())
+    }
+
+    pub fn deserialize_slice(data: &[u8]) -> anyhow::Result<Self> {
+        Self::deserialize(Cursor::new(data))
+    }
+
+    pub fn deserialize(mut reader: impl Read) -> anyhow::Result<Self> {
+        let magic = reader.read_u32::<LE>()?;
+        if magic != LE_UNSIGNED_FORMAT_MAGIC {
+            bail!("magic mismatch");
+        }
+        let payload_len: usize = reader.read_u16::<LE>()?.into();
+        let mut payload = vec![0u8; payload_len];
+        reader.read_exact(&mut payload)?;
+        Ok(Self { payload })
+    }
+}
+
 #[test]
 fn test_evm_serde() {
     let m1 = EvmMessage {
@@ -111,3 +244,25 @@ fn test_solana_serde() {
     m1.serialize(&mut buf).unwrap();
     assert_eq!(m1, SolanaMessage::deserialize_slice(&buf).unwrap());
 }
+
+#[test]
+fn test_le_ecdsa_serde() {
+    let m1 = LeEcdsaMessage {
+        payload: vec![1, 2, 4, 3],
+        signature: [5; 64],
+        recovery_id: 1,
+    };
+    let mut buf = Vec::new();
+    m1.serialize(&mut buf).unwrap();
+    assert_eq!(m1, LeEcdsaMessage::deserialize_slice(&buf).unwrap());
+}
+
+#[test]
+fn test_le_unsigned_serde() {
+    let m1 = LeUnsignedMessage {
+        payload: vec![1, 2, 4, 3],
+    };
+    let mut buf = Vec::new();
+    m1.serialize(&mut buf).unwrap();
+    assert_eq!(m1, LeUnsignedMessage::deserialize_slice(&buf).unwrap());
+}

+ 2 - 7
lazer/sdk/rust/protocol/src/payload.rs

@@ -47,6 +47,8 @@ pub struct AggregatedPriceFeedData {
     pub confidence: Option<Price>,
 }
 
+/// First bytes of a payload's encoding
+/// (in LE or BE depending on the byte order used for encoding the rest of the payload)
 pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
 
 impl PayloadData {
@@ -205,10 +207,3 @@ fn read_option_u16<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Opti
     let value = reader.read_u16::<BO>()?;
     Ok(Some(value))
 }
-
-pub const BINARY_UPDATE_FORMAT_MAGIC: u32 = 1937213467;
-
-pub const PARSED_FORMAT_MAGIC: u32 = 2584795844;
-pub const EVM_FORMAT_MAGIC: u32 = 706910618;
-pub const SOLANA_FORMAT_MAGIC_BE: u32 = 3103857282;
-pub const SOLANA_FORMAT_MAGIC_LE: u32 = u32::swap_bytes(SOLANA_FORMAT_MAGIC_BE);

+ 18 - 5
lazer/sdk/rust/protocol/src/router.rs

@@ -157,9 +157,11 @@ pub enum DeliveryFormat {
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
-pub enum Chain {
+pub enum Format {
     Evm,
     Solana,
+    LeEcdsa,
+    LeUnsigned,
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
@@ -291,12 +293,14 @@ fn fixed_rate_values() {
 pub struct SubscriptionParamsRepr {
     pub price_feed_ids: Vec<PriceFeedId>,
     pub properties: Vec<PriceFeedProperty>,
-    pub chains: Vec<Chain>,
+    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
+    #[serde(alias = "chains")]
+    pub formats: Vec<Format>,
     #[serde(default)]
     pub delivery_format: DeliveryFormat,
     #[serde(default)]
     pub json_binary_encoding: JsonBinaryEncoding,
-    /// If `true`, the stream update will contain a JSON object containing
+    /// If `true`, the stream update will contain a `parsed` JSON field containing
     /// all data of the update.
     #[serde(default = "default_parsed")]
     pub parsed: bool,
@@ -325,8 +329,8 @@ impl SubscriptionParams {
         if !value.price_feed_ids.iter().all_unique() {
             return Err("duplicate price feed ids specified");
         }
-        if !value.chains.iter().all_unique() {
-            return Err("duplicate chains specified");
+        if !value.formats.iter().all_unique() {
+            return Err("duplicate formats or chains specified");
         }
         if value.properties.is_empty() {
             return Err("no properties specified");
@@ -365,12 +369,21 @@ pub struct JsonBinaryData {
 #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct JsonUpdate {
+    /// Present unless `parsed = false` is specified in subscription params.
     #[serde(skip_serializing_if = "Option::is_none")]
     pub parsed: Option<ParsedPayload>,
+    /// Only present if `Evm` is present in `formats` in subscription params.
     #[serde(skip_serializing_if = "Option::is_none")]
     pub evm: Option<JsonBinaryData>,
+    /// Only present if `Solana` is present in `formats` in subscription params.
     #[serde(skip_serializing_if = "Option::is_none")]
     pub solana: Option<JsonBinaryData>,
+    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub le_ecdsa: Option<JsonBinaryData>,
+    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub le_unsigned: Option<JsonBinaryData>,
 }
 
 #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]

+ 7 - 72
lazer/sdk/rust/protocol/src/subscription.rs

@@ -2,15 +2,7 @@
 //! used across publishers, agents and routers.
 
 use {
-    crate::{
-        payload::{
-            BINARY_UPDATE_FORMAT_MAGIC, EVM_FORMAT_MAGIC, PARSED_FORMAT_MAGIC,
-            SOLANA_FORMAT_MAGIC_BE,
-        },
-        router::{JsonBinaryData, JsonBinaryEncoding, JsonUpdate, SubscriptionParams},
-    },
-    anyhow::bail,
-    base64::Engine,
+    crate::router::{JsonUpdate, SubscriptionParams},
     derive_more::From,
     serde::{Deserialize, Serialize},
 };
@@ -42,7 +34,7 @@ pub struct UnsubscribeRequest {
 }
 
 /// A JSON response sent from the server to the client.
-#[derive(Debug, Clone, Serialize, Deserialize, From)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, From)]
 #[serde(tag = "type")]
 #[serde(rename_all = "camelCase")]
 pub enum Response {
@@ -53,71 +45,14 @@ pub enum Response {
     StreamUpdated(StreamUpdatedResponse),
 }
 
-impl Response {
-    /// Parse a binary server message into a Response
-    pub fn from_binary(data: &[u8]) -> anyhow::Result<Self> {
-        let mut pos = 0;
-        let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?);
-        pos += 4;
-
-        if magic != BINARY_UPDATE_FORMAT_MAGIC {
-            bail!("binary update format magic mismatch");
-        }
-
-        let subscription_id = SubscriptionId(u64::from_be_bytes(data[pos..pos + 8].try_into()?));
-        pos += 8;
-
-        let mut evm = None;
-        let mut solana = None;
-        let mut parsed = None;
-
-        while pos < data.len() {
-            let len = u16::from_be_bytes(data[pos..pos + 2].try_into()?) as usize;
-            pos += 2;
-            let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?);
-
-            match magic {
-                EVM_FORMAT_MAGIC => {
-                    evm = Some(JsonBinaryData {
-                        encoding: JsonBinaryEncoding::Base64,
-                        data: base64::engine::general_purpose::STANDARD
-                            .encode(&data[pos..pos + len]),
-                    });
-                }
-                SOLANA_FORMAT_MAGIC_BE => {
-                    solana = Some(JsonBinaryData {
-                        encoding: JsonBinaryEncoding::Base64,
-                        data: base64::engine::general_purpose::STANDARD
-                            .encode(&data[pos..pos + len]),
-                    });
-                }
-                PARSED_FORMAT_MAGIC => {
-                    parsed = Some(serde_json::from_slice(&data[pos + 4..pos + len])?);
-                }
-                _ => bail!("unknown magic: {}", magic),
-            }
-            pos += len;
-        }
-
-        Ok(Response::StreamUpdated(StreamUpdatedResponse {
-            subscription_id,
-            payload: JsonUpdate {
-                evm,
-                solana,
-                parsed,
-            },
-        }))
-    }
-}
-
 /// Sent from the server after a successul subscription.
-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct SubscribedResponse {
     pub subscription_id: SubscriptionId,
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct UnsubscribedResponse {
     pub subscription_id: SubscriptionId,
@@ -125,7 +60,7 @@ pub struct UnsubscribedResponse {
 
 /// Sent from the server if the requested subscription or unsubscription request
 /// could not be fulfilled.
-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct SubscriptionErrorResponse {
     pub subscription_id: SubscriptionId,
@@ -134,7 +69,7 @@ pub struct SubscriptionErrorResponse {
 
 /// Sent from the server if an internal error occured while serving data for an existing subscription,
 /// or a client request sent a bad request.
-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct ErrorResponse {
     pub error: String,
@@ -142,7 +77,7 @@ pub struct ErrorResponse {
 
 /// Sent from the server when new data is available for an existing subscription
 /// (only if `delivery_format == Json`).
-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct StreamUpdatedResponse {
     pub subscription_id: SubscriptionId,