router.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. //! WebSocket JSON protocol types for the API the router provides to consumers and publishers.
  2. use {
  3. crate::{
  4. payload::AggregatedPriceFeedData,
  5. time::{DurationUs, TimestampUs},
  6. },
  7. anyhow::{bail, Context},
  8. derive_more::derive::From,
  9. itertools::Itertools,
  10. protobuf::well_known_types::duration::Duration as ProtobufDuration,
  11. rust_decimal::{prelude::FromPrimitive, Decimal},
  12. serde::{de::Error, Deserialize, Serialize},
  13. std::{
  14. fmt::Display,
  15. num::NonZeroI64,
  16. ops::{Add, Deref, DerefMut, Div, Sub},
  17. },
  18. };
  19. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
  20. pub struct PublisherId(pub u16);
  21. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
  22. pub struct PriceFeedId(pub u32);
  23. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
  24. pub struct ChannelId(pub u8);
  25. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
  26. #[repr(transparent)]
  27. pub struct Rate(pub i64);
  28. impl Rate {
  29. pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
  30. let value: Decimal = value.parse()?;
  31. let coef = 10i64.checked_pow(exponent).context("overflow")?;
  32. let coef = Decimal::from_i64(coef).context("overflow")?;
  33. let value = value.checked_mul(coef).context("overflow")?;
  34. if !value.is_integer() {
  35. bail!("price value is more precise than available exponent");
  36. }
  37. let value: i64 = value.try_into().context("overflow")?;
  38. Ok(Self(value))
  39. }
  40. pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
  41. let value = Decimal::from_f64(value).context("overflow")?;
  42. let coef = 10i64.checked_pow(exponent).context("overflow")?;
  43. let coef = Decimal::from_i64(coef).context("overflow")?;
  44. let value = value.checked_mul(coef).context("overflow")?;
  45. let value: i64 = value.try_into().context("overflow")?;
  46. Ok(Self(value))
  47. }
  48. pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
  49. let coef = 10i64.checked_pow(exponent).context("overflow")?;
  50. let value = value.checked_mul(coef).context("overflow")?;
  51. Ok(Self(value))
  52. }
  53. }
  54. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
  55. #[repr(transparent)]
  56. pub struct Price(pub NonZeroI64);
  57. impl Price {
  58. pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
  59. let coef = 10i64.checked_pow(exponent).context("overflow")?;
  60. let value = value.checked_mul(coef).context("overflow")?;
  61. let value = NonZeroI64::new(value).context("zero price is unsupported")?;
  62. Ok(Self(value))
  63. }
  64. pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
  65. let value: Decimal = value.parse()?;
  66. let coef = 10i64.checked_pow(exponent).context("overflow")?;
  67. let coef = Decimal::from_i64(coef).context("overflow")?;
  68. let value = value.checked_mul(coef).context("overflow")?;
  69. if !value.is_integer() {
  70. bail!("price value is more precise than available exponent");
  71. }
  72. let value: i64 = value.try_into().context("overflow")?;
  73. let value = NonZeroI64::new(value).context("zero price is unsupported")?;
  74. Ok(Self(value))
  75. }
  76. pub fn new(value: i64) -> anyhow::Result<Self> {
  77. let value = NonZeroI64::new(value).context("zero price is unsupported")?;
  78. Ok(Self(value))
  79. }
  80. pub fn into_inner(self) -> NonZeroI64 {
  81. self.0
  82. }
  83. pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
  84. Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
  85. }
  86. pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
  87. let value = (value * 10f64.powi(exponent as i32)) as i64;
  88. let value = NonZeroI64::new(value).context("zero price is unsupported")?;
  89. Ok(Self(value))
  90. }
  91. pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
  92. let left_value = i128::from(self.0.get());
  93. let right_value = i128::from(rhs.0.get());
  94. let value = left_value * right_value / 10i128.pow(rhs_exponent);
  95. let value = value.try_into()?;
  96. NonZeroI64::new(value)
  97. .context("zero price is unsupported")
  98. .map(Self)
  99. }
  100. }
  101. impl Sub<i64> for Price {
  102. type Output = Option<Price>;
  103. fn sub(self, rhs: i64) -> Self::Output {
  104. let value = self.0.get().saturating_sub(rhs);
  105. NonZeroI64::new(value).map(Self)
  106. }
  107. }
  108. impl Add<i64> for Price {
  109. type Output = Option<Price>;
  110. fn add(self, rhs: i64) -> Self::Output {
  111. let value = self.0.get().saturating_add(rhs);
  112. NonZeroI64::new(value).map(Self)
  113. }
  114. }
  115. impl Add<Price> for Price {
  116. type Output = Option<Price>;
  117. fn add(self, rhs: Price) -> Self::Output {
  118. let value = self.0.get().saturating_add(rhs.0.get());
  119. NonZeroI64::new(value).map(Self)
  120. }
  121. }
  122. impl Sub<Price> for Price {
  123. type Output = Option<Price>;
  124. fn sub(self, rhs: Price) -> Self::Output {
  125. let value = self.0.get().saturating_sub(rhs.0.get());
  126. NonZeroI64::new(value).map(Self)
  127. }
  128. }
  129. impl Div<i64> for Price {
  130. type Output = Option<Price>;
  131. fn div(self, rhs: i64) -> Self::Output {
  132. let value = self.0.get().saturating_div(rhs);
  133. NonZeroI64::new(value).map(Self)
  134. }
  135. }
  136. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
  137. #[serde(rename_all = "camelCase")]
  138. pub enum PriceFeedProperty {
  139. Price,
  140. BestBidPrice,
  141. BestAskPrice,
  142. PublisherCount,
  143. Exponent,
  144. Confidence,
  145. FundingRate,
  146. FundingTimestamp,
  147. // More fields may be added later.
  148. }
  149. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
  150. #[serde(rename_all = "camelCase")]
  151. pub enum DeliveryFormat {
  152. /// Deliver stream updates as JSON text messages.
  153. #[default]
  154. Json,
  155. /// Deliver stream updates as binary messages.
  156. Binary,
  157. }
  158. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
  159. #[serde(rename_all = "camelCase")]
  160. pub enum Format {
  161. Evm,
  162. Solana,
  163. LeEcdsa,
  164. LeUnsigned,
  165. }
  166. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
  167. #[serde(rename_all = "camelCase")]
  168. pub enum JsonBinaryEncoding {
  169. #[default]
  170. Base64,
  171. Hex,
  172. }
  173. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)]
  174. pub enum Channel {
  175. FixedRate(FixedRate),
  176. }
  177. impl Serialize for Channel {
  178. fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
  179. where
  180. S: serde::Serializer,
  181. {
  182. match self {
  183. Channel::FixedRate(fixed_rate) => {
  184. if *fixed_rate == FixedRate::MIN {
  185. return serializer.serialize_str("real_time");
  186. }
  187. serializer.serialize_str(&format!(
  188. "fixed_rate@{}ms",
  189. fixed_rate.duration().as_millis()
  190. ))
  191. }
  192. }
  193. }
  194. }
  195. pub mod channel_ids {
  196. use super::ChannelId;
  197. pub const FIXED_RATE_1: ChannelId = ChannelId(1);
  198. pub const FIXED_RATE_50: ChannelId = ChannelId(2);
  199. pub const FIXED_RATE_200: ChannelId = ChannelId(3);
  200. }
  201. impl Display for Channel {
  202. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  203. match self {
  204. Channel::FixedRate(fixed_rate) => match *fixed_rate {
  205. FixedRate::MIN => write!(f, "real_time"),
  206. rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()),
  207. },
  208. }
  209. }
  210. }
  211. impl Channel {
  212. pub fn id(&self) -> ChannelId {
  213. match self {
  214. Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
  215. 1 => channel_ids::FIXED_RATE_1,
  216. 50 => channel_ids::FIXED_RATE_50,
  217. 200 => channel_ids::FIXED_RATE_200,
  218. _ => panic!("unknown channel: {self:?}"),
  219. },
  220. }
  221. }
  222. }
  223. #[test]
  224. fn id_supports_all_fixed_rates() {
  225. for rate in FixedRate::ALL {
  226. Channel::FixedRate(rate).id();
  227. }
  228. }
  229. fn parse_channel(value: &str) -> Option<Channel> {
  230. if value == "real_time" {
  231. Some(Channel::FixedRate(FixedRate::MIN))
  232. } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
  233. let ms_value = rest.strip_suffix("ms")?;
  234. Some(Channel::FixedRate(FixedRate::from_millis(
  235. ms_value.parse().ok()?,
  236. )?))
  237. } else {
  238. None
  239. }
  240. }
  241. impl<'de> Deserialize<'de> for Channel {
  242. fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
  243. where
  244. D: serde::Deserializer<'de>,
  245. {
  246. let value = <String>::deserialize(deserializer)?;
  247. parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
  248. }
  249. }
  250. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
  251. pub struct FixedRate {
  252. rate: DurationUs,
  253. }
  254. impl FixedRate {
  255. pub const RATE_1_MS: Self = Self {
  256. rate: DurationUs::from_millis_u32(1),
  257. };
  258. pub const RATE_50_MS: Self = Self {
  259. rate: DurationUs::from_millis_u32(50),
  260. };
  261. pub const RATE_200_MS: Self = Self {
  262. rate: DurationUs::from_millis_u32(200),
  263. };
  264. // Assumptions (tested below):
  265. // - Values are sorted.
  266. // - 1 second contains a whole number of each interval.
  267. // - all intervals are divisable by the smallest interval.
  268. pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS];
  269. pub const MIN: Self = Self::ALL[0];
  270. pub fn from_millis(millis: u32) -> Option<Self> {
  271. Self::ALL
  272. .into_iter()
  273. .find(|v| v.rate.as_millis() == u64::from(millis))
  274. }
  275. pub fn duration(self) -> DurationUs {
  276. self.rate
  277. }
  278. }
  279. impl TryFrom<DurationUs> for FixedRate {
  280. type Error = anyhow::Error;
  281. fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
  282. Self::ALL
  283. .into_iter()
  284. .find(|v| v.rate == value)
  285. .with_context(|| format!("unsupported rate: {value:?}"))
  286. }
  287. }
  288. impl TryFrom<&ProtobufDuration> for FixedRate {
  289. type Error = anyhow::Error;
  290. fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
  291. let duration = DurationUs::try_from(value)?;
  292. Self::try_from(duration)
  293. }
  294. }
  295. impl TryFrom<ProtobufDuration> for FixedRate {
  296. type Error = anyhow::Error;
  297. fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
  298. TryFrom::<&ProtobufDuration>::try_from(&duration)
  299. }
  300. }
  301. impl From<FixedRate> for DurationUs {
  302. fn from(value: FixedRate) -> Self {
  303. value.rate
  304. }
  305. }
  306. impl From<FixedRate> for ProtobufDuration {
  307. fn from(value: FixedRate) -> Self {
  308. value.rate.into()
  309. }
  310. }
  311. #[test]
  312. fn fixed_rate_values() {
  313. assert!(
  314. FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
  315. "values must be unique and sorted"
  316. );
  317. for value in FixedRate::ALL {
  318. assert_eq!(
  319. 1_000_000 % value.duration().as_micros(),
  320. 0,
  321. "1 s must contain whole number of intervals"
  322. );
  323. assert_eq!(
  324. value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
  325. 0,
  326. "the interval's borders must be a subset of the minimal interval's borders"
  327. );
  328. }
  329. }
  330. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
  331. #[serde(rename_all = "camelCase")]
  332. pub struct SubscriptionParamsRepr {
  333. pub price_feed_ids: Vec<PriceFeedId>,
  334. pub properties: Vec<PriceFeedProperty>,
  335. // "chains" was renamed to "formats". "chains" is still supported for compatibility.
  336. #[serde(alias = "chains")]
  337. pub formats: Vec<Format>,
  338. #[serde(default)]
  339. pub delivery_format: DeliveryFormat,
  340. #[serde(default)]
  341. pub json_binary_encoding: JsonBinaryEncoding,
  342. /// If `true`, the stream update will contain a `parsed` JSON field containing
  343. /// all data of the update.
  344. #[serde(default = "default_parsed")]
  345. pub parsed: bool,
  346. pub channel: Channel,
  347. #[serde(default)]
  348. pub ignore_invalid_feed_ids: bool,
  349. }
  350. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
  351. #[serde(rename_all = "camelCase")]
  352. pub struct SubscriptionParams(SubscriptionParamsRepr);
  353. impl<'de> Deserialize<'de> for SubscriptionParams {
  354. fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
  355. where
  356. D: serde::Deserializer<'de>,
  357. {
  358. let value = SubscriptionParamsRepr::deserialize(deserializer)?;
  359. Self::new(value).map_err(Error::custom)
  360. }
  361. }
  362. impl SubscriptionParams {
  363. pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
  364. if value.price_feed_ids.is_empty() {
  365. return Err("no price feed ids specified");
  366. }
  367. if !value.price_feed_ids.iter().all_unique() {
  368. return Err("duplicate price feed ids specified");
  369. }
  370. if !value.formats.iter().all_unique() {
  371. return Err("duplicate formats or chains specified");
  372. }
  373. if value.properties.is_empty() {
  374. return Err("no properties specified");
  375. }
  376. if !value.properties.iter().all_unique() {
  377. return Err("duplicate properties specified");
  378. }
  379. Ok(Self(value))
  380. }
  381. }
  382. impl Deref for SubscriptionParams {
  383. type Target = SubscriptionParamsRepr;
  384. fn deref(&self) -> &Self::Target {
  385. &self.0
  386. }
  387. }
  388. impl DerefMut for SubscriptionParams {
  389. fn deref_mut(&mut self) -> &mut Self::Target {
  390. &mut self.0
  391. }
  392. }
  393. pub fn default_parsed() -> bool {
  394. true
  395. }
  396. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
  397. #[serde(rename_all = "camelCase")]
  398. pub struct JsonBinaryData {
  399. pub encoding: JsonBinaryEncoding,
  400. pub data: String,
  401. }
  402. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
  403. #[serde(rename_all = "camelCase")]
  404. pub struct JsonUpdate {
  405. /// Present unless `parsed = false` is specified in subscription params.
  406. #[serde(skip_serializing_if = "Option::is_none")]
  407. pub parsed: Option<ParsedPayload>,
  408. /// Only present if `Evm` is present in `formats` in subscription params.
  409. #[serde(skip_serializing_if = "Option::is_none")]
  410. pub evm: Option<JsonBinaryData>,
  411. /// Only present if `Solana` is present in `formats` in subscription params.
  412. #[serde(skip_serializing_if = "Option::is_none")]
  413. pub solana: Option<JsonBinaryData>,
  414. /// Only present if `LeEcdsa` is present in `formats` in subscription params.
  415. #[serde(skip_serializing_if = "Option::is_none")]
  416. pub le_ecdsa: Option<JsonBinaryData>,
  417. /// Only present if `LeUnsigned` is present in `formats` in subscription params.
  418. #[serde(skip_serializing_if = "Option::is_none")]
  419. pub le_unsigned: Option<JsonBinaryData>,
  420. }
  421. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
  422. #[serde(rename_all = "camelCase")]
  423. pub struct ParsedPayload {
  424. #[serde(with = "crate::serde_str::timestamp")]
  425. pub timestamp_us: TimestampUs,
  426. pub price_feeds: Vec<ParsedFeedPayload>,
  427. }
  428. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
  429. #[serde(rename_all = "camelCase")]
  430. pub struct ParsedFeedPayload {
  431. pub price_feed_id: PriceFeedId,
  432. #[serde(skip_serializing_if = "Option::is_none")]
  433. #[serde(with = "crate::serde_str::option_price")]
  434. #[serde(default)]
  435. pub price: Option<Price>,
  436. #[serde(skip_serializing_if = "Option::is_none")]
  437. #[serde(with = "crate::serde_str::option_price")]
  438. #[serde(default)]
  439. pub best_bid_price: Option<Price>,
  440. #[serde(skip_serializing_if = "Option::is_none")]
  441. #[serde(with = "crate::serde_str::option_price")]
  442. #[serde(default)]
  443. pub best_ask_price: Option<Price>,
  444. #[serde(skip_serializing_if = "Option::is_none")]
  445. #[serde(default)]
  446. pub publisher_count: Option<u16>,
  447. #[serde(skip_serializing_if = "Option::is_none")]
  448. #[serde(default)]
  449. pub exponent: Option<i16>,
  450. #[serde(skip_serializing_if = "Option::is_none")]
  451. #[serde(default)]
  452. pub confidence: Option<Price>,
  453. #[serde(skip_serializing_if = "Option::is_none")]
  454. #[serde(default)]
  455. pub funding_rate: Option<Rate>,
  456. #[serde(skip_serializing_if = "Option::is_none")]
  457. #[serde(default)]
  458. pub funding_timestamp: Option<TimestampUs>,
  459. // More fields may be added later.
  460. }
  461. impl ParsedFeedPayload {
  462. pub fn new(
  463. price_feed_id: PriceFeedId,
  464. exponent: Option<i16>,
  465. data: &AggregatedPriceFeedData,
  466. properties: &[PriceFeedProperty],
  467. ) -> Self {
  468. let mut output = Self {
  469. price_feed_id,
  470. price: None,
  471. best_bid_price: None,
  472. best_ask_price: None,
  473. publisher_count: None,
  474. exponent: None,
  475. confidence: None,
  476. funding_rate: None,
  477. funding_timestamp: None,
  478. };
  479. for &property in properties {
  480. match property {
  481. PriceFeedProperty::Price => {
  482. output.price = data.price;
  483. }
  484. PriceFeedProperty::BestBidPrice => {
  485. output.best_bid_price = data.best_bid_price;
  486. }
  487. PriceFeedProperty::BestAskPrice => {
  488. output.best_ask_price = data.best_ask_price;
  489. }
  490. PriceFeedProperty::PublisherCount => {
  491. output.publisher_count = Some(data.publisher_count);
  492. }
  493. PriceFeedProperty::Exponent => {
  494. output.exponent = exponent;
  495. }
  496. PriceFeedProperty::Confidence => {
  497. output.confidence = data.confidence;
  498. }
  499. PriceFeedProperty::FundingRate => {
  500. output.funding_rate = data.funding_rate;
  501. }
  502. PriceFeedProperty::FundingTimestamp => {
  503. output.funding_timestamp = data.funding_timestamp;
  504. }
  505. }
  506. }
  507. output
  508. }
  509. pub fn new_full(
  510. price_feed_id: PriceFeedId,
  511. exponent: Option<i16>,
  512. data: &AggregatedPriceFeedData,
  513. ) -> Self {
  514. Self {
  515. price_feed_id,
  516. price: data.price,
  517. best_bid_price: data.best_bid_price,
  518. best_ask_price: data.best_ask_price,
  519. publisher_count: Some(data.publisher_count),
  520. exponent,
  521. confidence: data.confidence,
  522. funding_rate: data.funding_rate,
  523. funding_timestamp: data.funding_timestamp,
  524. }
  525. }
  526. }