浏览代码

[hermes] add /v2/updates/price/<timestamp> endpoint (#1269)

* add /v2/updates/price/<timestamp> endpoint

* refactor

* refactor imports

* address comments

* address comments
Daniel Chew 1 年之前
父节点
当前提交
2d1324e350

+ 5 - 1
hermes/src/api.rs

@@ -27,7 +27,7 @@ use {
 mod doc_examples;
 mod metrics_middleware;
 mod rest;
-mod types;
+pub mod types;
 mod ws;
 
 #[derive(Clone)]
@@ -154,6 +154,10 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> {
         .route("/api/latest_vaas", get(rest::latest_vaas))
         .route("/api/price_feed_ids", get(rest::price_feed_ids))
         .route("/v2/updates/price/latest", get(rest::latest_price_updates))
+        .route(
+            "/v2/updates/price/:publish_time",
+            get(rest::timestamp_price_updates),
+        )
         .route("/live", get(rest::live))
         .route("/ready", get(rest::ready))
         .route("/ws", get(ws::ws_route_handler))

+ 134 - 0
hermes/src/api/benchmarks.rs

@@ -0,0 +1,134 @@
+//! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates.
+
+use {
+    crate::{
+        aggregate::{
+            PriceFeedUpdate,
+            PriceFeedsWithUpdateData,
+            UnixTimestamp,
+        },
+        api::types::PriceUpdate,
+    },
+    anyhow::Result,
+    base64::{
+        engine::general_purpose::STANDARD as base64_standard_engine,
+        Engine as _,
+    },
+    pyth_sdk::{
+        Price,
+        PriceFeed,
+        PriceIdentifier,
+    },
+    serde::Deserialize,
+};
+
+const BENCHMARKS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
+
+#[derive(Deserialize, Debug, Clone)]
+enum BlobEncoding {
+    #[serde(rename = "base64")]
+    Base64,
+    #[serde(rename = "hex")]
+    Hex,
+}
+
+#[derive(Deserialize, Debug, Clone)]
+struct BinaryBlob {
+    pub encoding: BlobEncoding,
+    pub data:     Vec<String>,
+}
+
+impl TryFrom<PriceUpdate> for PriceFeedsWithUpdateData {
+    type Error = anyhow::Error;
+    fn try_from(price_update: PriceUpdate) -> Result<Self> {
+        let price_feeds = match price_update.parsed {
+            Some(parsed_updates) => parsed_updates
+                .into_iter()
+                .map(|parsed_price_update| {
+                    Ok(PriceFeedUpdate {
+                        price_feed:        PriceFeed::new(
+                            parsed_price_update.id,
+                            Price {
+                                price:        parsed_price_update.price.price,
+                                conf:         parsed_price_update.price.conf,
+                                expo:         parsed_price_update.price.expo,
+                                publish_time: parsed_price_update.price.publish_time,
+                            },
+                            Price {
+                                price:        parsed_price_update.ema_price.price,
+                                conf:         parsed_price_update.ema_price.conf,
+                                expo:         parsed_price_update.ema_price.expo,
+                                publish_time: parsed_price_update.ema_price.publish_time,
+                            },
+                        ),
+                        slot:              parsed_price_update.metadata.slot,
+                        received_at:       parsed_price_update.metadata.proof_available_time,
+                        update_data:       None, // This field is not available in ParsedPriceUpdate
+                        prev_publish_time: parsed_price_update.metadata.prev_publish_time,
+                    })
+                })
+                .collect::<Result<Vec<_>>>(),
+            None => Err(anyhow::anyhow!("No parsed price updates available")),
+        }?;
+
+        let update_data = price_update
+            .binary
+            .data
+            .iter()
+            .map(|hex_str| hex::decode(hex_str).unwrap_or_default())
+            .collect::<Vec<Vec<u8>>>();
+
+        Ok(PriceFeedsWithUpdateData {
+            price_feeds,
+            update_data,
+        })
+    }
+}
+
+#[async_trait::async_trait]
+pub trait Benchmarks {
+    async fn get_verified_price_feeds(
+        &self,
+        price_ids: &[PriceIdentifier],
+        publish_time: UnixTimestamp,
+    ) -> Result<PriceFeedsWithUpdateData>;
+}
+
+#[async_trait::async_trait]
+impl Benchmarks for crate::state::State {
+    async fn get_verified_price_feeds(
+        &self,
+        price_ids: &[PriceIdentifier],
+        publish_time: UnixTimestamp,
+    ) -> Result<PriceFeedsWithUpdateData> {
+        let endpoint = self
+            .benchmarks_endpoint
+            .as_ref()
+            .ok_or_else(|| anyhow::anyhow!("Benchmarks endpoint is not set"))?
+            .join(&format!("/v1/updates/price/{}", publish_time))
+            .unwrap();
+
+        let client = reqwest::Client::new();
+        let mut request = client
+            .get(endpoint)
+            .timeout(BENCHMARKS_REQUEST_TIMEOUT)
+            .query(&[("encoding", "hex")])
+            .query(&[("parsed", "true")]);
+
+        for price_id in price_ids {
+            request = request.query(&[("ids", price_id)])
+        }
+
+        let response = request.send().await?;
+
+        if response.status() != reqwest::StatusCode::OK {
+            return Err(anyhow::anyhow!(format!(
+                "Price update for price ids {:?} with publish time {} not found in benchmarks. Status code: {}, message: {}",
+                price_ids, publish_time, response.status(), response.text().await?
+            )));
+        }
+
+        let price_update: PriceUpdate = response.json().await?;
+        price_update.try_into()
+    }
+}

+ 4 - 1
hermes/src/api/rest.rs

@@ -31,7 +31,10 @@ pub use {
     live::*,
     price_feed_ids::*,
     ready::*,
-    v2::latest_price_updates::*,
+    v2::{
+        latest_price_updates::*,
+        timestamp_price_updates::*,
+    },
 };
 
 pub enum RestError {

+ 1 - 0
hermes/src/api/rest/index.rs

@@ -17,5 +17,6 @@ pub async fn index() -> impl IntoResponse {
         "/api/get_vaa?id=<price_feed_id>&publish_time=<publish_time_in_unix_timestamp>",
         "/api/get_vaa_ccip?data=<0x<price_feed_id_32_bytes>+<publish_time_unix_timestamp_be_8_bytes>>",
         "/v2/updates/price/latest?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
+        "/v2/updates/price/<timestamp>?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
     ])
 }

+ 2 - 1
hermes/src/api/rest/v2/latest_price_updates.rs

@@ -25,12 +25,13 @@ use {
         Engine as _,
     },
     pyth_sdk::PriceIdentifier,
+    serde::Deserialize,
     serde_qs::axum::QsQuery,
     utoipa::IntoParams,
 };
 
 
-#[derive(Debug, serde::Deserialize, IntoParams)]
+#[derive(Debug, Deserialize, IntoParams)]
 #[into_params(parameter_in=Query)]
 pub struct LatestPriceUpdatesQueryParams {
     /// Get the most recent price update for this set of price feed ids.

+ 1 - 0
hermes/src/api/rest/v2/mod.rs

@@ -1 +1,2 @@
 pub mod latest_price_updates;
+pub mod timestamp_price_updates;

+ 143 - 0
hermes/src/api/rest/v2/timestamp_price_updates.rs

@@ -0,0 +1,143 @@
+use {
+    crate::{
+        aggregate::{
+            RequestTime,
+            UnixTimestamp,
+        },
+        api::{
+            doc_examples,
+            rest::{
+                verify_price_ids_exist,
+                RestError,
+            },
+            types::{
+                BinaryPriceUpdate,
+                EncodingType,
+                ParsedPriceUpdate,
+                PriceIdInput,
+                PriceUpdate,
+            },
+        },
+    },
+    anyhow::Result,
+    axum::{
+        extract::{
+            Path,
+            State,
+        },
+        Json,
+    },
+    pyth_sdk::PriceIdentifier,
+    serde::Deserialize,
+    serde_qs::axum::QsQuery,
+    utoipa::IntoParams,
+};
+
+#[derive(Debug, Deserialize, IntoParams)]
+#[into_params(parameter_in=Path)]
+pub struct TimestampPriceUpdatesPathParams {
+    /// The unix timestamp in seconds. This endpoint will return the first update whose
+    /// publish_time is >= the provided value.
+    #[param(value_type = i64)]
+    #[param(example = doc_examples::timestamp_example)]
+    publish_time: UnixTimestamp,
+}
+
+#[derive(Debug, Deserialize, IntoParams)]
+#[into_params(parameter_in=Query)]
+pub struct TimestampPriceUpdatesQueryParams {
+    /// Get the most recent price update for this set of price feed ids.
+    ///
+    /// This parameter can be provided multiple times to retrieve multiple price updates,
+    /// for example see the following query string:
+    ///
+    /// ```
+    /// ?ids[]=a12...&ids[]=b4c...
+    /// ```
+    #[param(rename = "ids[]")]
+    #[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")]
+    ids: Vec<PriceIdInput>,
+
+    /// If true, include the parsed price update in the `parsed` field of each returned feed.
+    #[serde(default)]
+    encoding: EncodingType,
+
+    /// If true, include the parsed price update in the `parsed` field of each returned feed.
+    #[serde(default = "default_true")]
+    parsed: bool,
+}
+
+
+fn default_true() -> bool {
+    true
+}
+
+/// Get the latest price updates by price feed id.
+///
+/// Given a collection of price feed ids, retrieve the latest Pyth price for each price feed.
+#[utoipa::path(
+    get,
+    path = "/v2/updates/price/{publish_time}",
+    responses(
+        (status = 200, description = "Price updates retrieved successfully", body = Vec<PriceUpdate>),
+        (status = 404, description = "Price ids not found", body = String)
+    ),
+    params(
+        TimestampPriceUpdatesPathParams,
+        TimestampPriceUpdatesQueryParams
+    )
+)]
+pub async fn timestamp_price_updates(
+    State(state): State<crate::api::ApiState>,
+    Path(path_params): Path<TimestampPriceUpdatesPathParams>,
+    QsQuery(query_params): QsQuery<TimestampPriceUpdatesQueryParams>,
+) -> Result<Json<Vec<PriceUpdate>>, RestError> {
+    let price_ids: Vec<PriceIdentifier> =
+        query_params.ids.into_iter().map(|id| id.into()).collect();
+
+    verify_price_ids_exist(&state, &price_ids).await?;
+
+    let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
+        &*state.state,
+        &price_ids,
+        RequestTime::FirstAfter(path_params.publish_time),
+    )
+    .await
+    .map_err(|e| {
+        tracing::warn!(
+            "Error getting price feeds {:?} with update data: {:?}",
+            price_ids,
+            e
+        );
+        RestError::UpdateDataNotFound
+    })?;
+
+    let price_update_data = price_feeds_with_update_data.update_data;
+    let encoded_data: Vec<String> = price_update_data
+        .into_iter()
+        .map(|data| query_params.encoding.encode_str(&data))
+        .collect();
+    let binary_price_update = BinaryPriceUpdate {
+        encoding: query_params.encoding,
+        data:     encoded_data,
+    };
+    let parsed_price_updates: Option<Vec<ParsedPriceUpdate>> = if query_params.parsed {
+        Some(
+            price_feeds_with_update_data
+                .price_feeds
+                .into_iter()
+                .map(|price_feed| price_feed.into())
+                .collect(),
+        )
+    } else {
+        None
+    };
+
+    let compressed_price_update = PriceUpdate {
+        binary: binary_price_update,
+        parsed: parsed_price_updates,
+    };
+
+
+    Ok(Json(vec![compressed_price_update]))
+}

+ 65 - 3
hermes/src/api/types.rs

@@ -2,9 +2,11 @@ use {
     super::doc_examples,
     crate::aggregate::{
         PriceFeedUpdate,
+        PriceFeedsWithUpdateData,
         Slot,
         UnixTimestamp,
     },
+    anyhow::Result,
     base64::{
         engine::general_purpose::STANDARD as base64_standard_engine,
         Engine as _,
@@ -17,7 +19,11 @@ use {
         Deref,
         DerefMut,
     },
-    pyth_sdk::PriceIdentifier,
+    pyth_sdk::{
+        Price,
+        PriceFeed,
+        PriceIdentifier,
+    },
     serde::{
         Deserialize,
         Serialize,
@@ -199,6 +205,15 @@ pub enum EncodingType {
     Base64,
 }
 
+impl EncodingType {
+    pub fn encode_str(&self, data: &[u8]) -> String {
+        match self {
+            EncodingType::Base64 => base64_standard_engine.encode(data),
+            EncodingType::Hex => hex::encode(data),
+        }
+    }
+}
+
 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)]
 pub struct BinaryPriceUpdate {
     pub encoding: EncodingType,
@@ -207,7 +222,7 @@ pub struct BinaryPriceUpdate {
 
 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)]
 pub struct ParsedPriceUpdate {
-    pub id:        String,
+    pub id:        PriceIdentifier,
     pub price:     RpcPrice,
     pub ema_price: RpcPrice,
     pub metadata:  RpcPriceFeedMetadataV2,
@@ -218,7 +233,7 @@ impl From<PriceFeedUpdate> for ParsedPriceUpdate {
         let price_feed = price_feed_update.price_feed;
 
         Self {
-            id:        price_feed.id.to_string(),
+            id:        price_feed.id,
             price:     RpcPrice {
                 price:        price_feed.get_price_unchecked().price,
                 conf:         price_feed.get_price_unchecked().conf,
@@ -246,3 +261,50 @@ pub struct PriceUpdate {
     #[serde(skip_serializing_if = "Option::is_none")]
     pub parsed: Option<Vec<ParsedPriceUpdate>>,
 }
+
+impl TryFrom<PriceUpdate> for PriceFeedsWithUpdateData {
+    type Error = anyhow::Error;
+    fn try_from(price_update: PriceUpdate) -> Result<Self> {
+        let price_feeds = match price_update.parsed {
+            Some(parsed_updates) => parsed_updates
+                .into_iter()
+                .map(|parsed_price_update| {
+                    Ok(PriceFeedUpdate {
+                        price_feed:        PriceFeed::new(
+                            parsed_price_update.id,
+                            Price {
+                                price:        parsed_price_update.price.price,
+                                conf:         parsed_price_update.price.conf,
+                                expo:         parsed_price_update.price.expo,
+                                publish_time: parsed_price_update.price.publish_time,
+                            },
+                            Price {
+                                price:        parsed_price_update.ema_price.price,
+                                conf:         parsed_price_update.ema_price.conf,
+                                expo:         parsed_price_update.ema_price.expo,
+                                publish_time: parsed_price_update.ema_price.publish_time,
+                            },
+                        ),
+                        slot:              parsed_price_update.metadata.slot,
+                        received_at:       parsed_price_update.metadata.proof_available_time,
+                        update_data:       None, // This field is not available in ParsedPriceUpdate
+                        prev_publish_time: parsed_price_update.metadata.prev_publish_time,
+                    })
+                })
+                .collect::<Result<Vec<_>>>(),
+            None => Err(anyhow::anyhow!("No parsed price updates available")),
+        }?;
+
+        let update_data = price_update
+            .binary
+            .data
+            .iter()
+            .map(|hex_str| hex::decode(hex_str).unwrap_or_default())
+            .collect::<Vec<Vec<u8>>>();
+
+        Ok(PriceFeedsWithUpdateData {
+            price_feeds,
+            update_data,
+        })
+    }
+}

+ 12 - 38
hermes/src/state/benchmarks.rs

@@ -1,25 +1,25 @@
 //! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates.
 
 use {
-    crate::aggregate::{
-        PriceFeedUpdate,
-        PriceFeedsWithUpdateData,
-        UnixTimestamp,
+    crate::{
+        aggregate::{
+            PriceFeedsWithUpdateData,
+            UnixTimestamp,
+        },
+        api::types::PriceUpdate,
     },
     anyhow::Result,
     base64::{
         engine::general_purpose::STANDARD as base64_standard_engine,
         Engine as _,
     },
-    pyth_sdk::{
-        PriceFeed,
-        PriceIdentifier,
-    },
+    pyth_sdk::PriceIdentifier,
+    serde::Deserialize,
 };
 
 const BENCHMARKS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
 
-#[derive(serde::Deserialize, Debug, Clone)]
+#[derive(Deserialize, Debug, Clone)]
 enum BlobEncoding {
     #[serde(rename = "base64")]
     Base64,
@@ -27,18 +27,12 @@ enum BlobEncoding {
     Hex,
 }
 
-#[derive(serde::Deserialize, Debug, Clone)]
+#[derive(Deserialize, Debug, Clone)]
 struct BinaryBlob {
     pub encoding: BlobEncoding,
     pub data:     Vec<String>,
 }
 
-#[derive(serde::Deserialize, Debug, Clone)]
-struct BenchmarkUpdates {
-    pub parsed: Vec<PriceFeed>,
-    pub binary: BinaryBlob,
-}
-
 impl TryFrom<BinaryBlob> for Vec<Vec<u8>> {
     type Error = anyhow::Error;
 
@@ -56,26 +50,6 @@ impl TryFrom<BinaryBlob> for Vec<Vec<u8>> {
     }
 }
 
-impl TryFrom<BenchmarkUpdates> for PriceFeedsWithUpdateData {
-    type Error = anyhow::Error;
-    fn try_from(benchmark_updates: BenchmarkUpdates) -> Result<Self> {
-        Ok(PriceFeedsWithUpdateData {
-            price_feeds: benchmark_updates
-                .parsed
-                .into_iter()
-                .map(|price_feed| PriceFeedUpdate {
-                    price_feed,
-                    slot: None,
-                    received_at: None,
-                    update_data: None,
-                    prev_publish_time: None, // TODO: Set this field when Benchmarks API supports it.
-                })
-                .collect::<Vec<_>>(),
-            update_data: benchmark_updates.binary.try_into()?,
-        })
-    }
-}
-
 #[async_trait::async_trait]
 pub trait Benchmarks {
     async fn get_verified_price_feeds(
@@ -119,7 +93,7 @@ impl Benchmarks for crate::state::State {
             )));
         }
 
-        let benchmark_updates: BenchmarkUpdates = response.json().await?;
-        benchmark_updates.try_into()
+        let price_update: PriceUpdate = response.json().await?;
+        price_update.try_into()
     }
 }