rest.rs 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. use {
  2. super::ApiState,
  3. crate::state::aggregate::Aggregates,
  4. axum::{
  5. http::StatusCode,
  6. response::{IntoResponse, Response},
  7. },
  8. pyth_sdk::PriceIdentifier,
  9. };
  10. mod get_price_feed;
  11. mod get_vaa;
  12. mod get_vaa_ccip;
  13. mod index;
  14. mod latest_price_feeds;
  15. mod latest_vaas;
  16. mod live;
  17. mod price_feed_ids;
  18. mod ready;
  19. mod v2;
  20. pub use {
  21. get_price_feed::*,
  22. get_vaa::*,
  23. get_vaa_ccip::*,
  24. index::*,
  25. latest_price_feeds::*,
  26. latest_vaas::*,
  27. live::*,
  28. price_feed_ids::*,
  29. ready::*,
  30. v2::{
  31. latest_price_updates::*, latest_publisher_stake_caps::*, latest_twaps::*,
  32. price_feeds_metadata::*, sse::*, timestamp_price_updates::*,
  33. },
  34. };
  35. #[derive(Debug)]
  36. pub enum RestError {
  37. BenchmarkPriceNotUnique,
  38. UpdateDataNotFound,
  39. CcipUpdateDataNotFound,
  40. InvalidCCIPInput,
  41. PriceIdsNotFound { missing_ids: Vec<PriceIdentifier> },
  42. RpcConnectionError { message: String },
  43. }
  44. impl IntoResponse for RestError {
  45. fn into_response(self) -> Response {
  46. match self {
  47. RestError::BenchmarkPriceNotUnique => {
  48. (StatusCode::NOT_FOUND, "Benchmark price is not unique").into_response()
  49. }
  50. RestError::UpdateDataNotFound => {
  51. (StatusCode::NOT_FOUND, "Update data not found").into_response()
  52. }
  53. RestError::CcipUpdateDataNotFound => {
  54. // Return "Bad Gateway" error because CCIP expects a 5xx error if it needs to retry
  55. // or try other endpoints. "Bad Gateway" seems the best choice here as this is not
  56. // an internal error and could happen on two scenarios:
  57. //
  58. // 1. DB Api is not responding well (Bad Gateway is appropriate here)
  59. // 2. Publish time is a few seconds before current time and a VAA Will be available
  60. // in a few seconds. So we want the client to retry.
  61. (StatusCode::BAD_GATEWAY, "CCIP update data not found").into_response()
  62. }
  63. RestError::InvalidCCIPInput => {
  64. (StatusCode::BAD_REQUEST, "Invalid CCIP input").into_response()
  65. }
  66. RestError::PriceIdsNotFound { missing_ids } => {
  67. let missing_ids = missing_ids
  68. .into_iter()
  69. .map(|id| id.to_string())
  70. .collect::<Vec<_>>()
  71. .join(", ");
  72. (
  73. StatusCode::NOT_FOUND,
  74. format!("Price ids not found: {}", missing_ids),
  75. )
  76. .into_response()
  77. }
  78. RestError::RpcConnectionError { message } => {
  79. (StatusCode::INTERNAL_SERVER_ERROR, message).into_response()
  80. }
  81. }
  82. }
  83. }
  84. /// Validate that the passed in price_ids exist in the aggregate state. Return a Vec of valid price ids.
  85. /// # Returns
  86. /// If `remove_invalid` is true, invalid price ids are filtered out and only valid price ids are returned.
  87. /// If `remove_invalid` is false and any passed in IDs are invalid, an error is returned.
  88. pub async fn validate_price_ids<S>(
  89. state: &ApiState<S>,
  90. price_ids: &[PriceIdentifier],
  91. remove_invalid: bool,
  92. ) -> Result<Vec<PriceIdentifier>, RestError>
  93. where
  94. S: Aggregates,
  95. {
  96. let state = &*state.state;
  97. let available_ids = Aggregates::get_price_feed_ids(state).await;
  98. // Partition into (valid_ids, invalid_ids)
  99. let (valid_ids, invalid_ids): (Vec<_>, Vec<_>) = price_ids
  100. .iter()
  101. .copied()
  102. .partition(|id| available_ids.contains(id));
  103. if invalid_ids.is_empty() || remove_invalid {
  104. // All IDs are valid
  105. Ok(valid_ids)
  106. } else {
  107. // Return error with list of missing IDs
  108. Err(RestError::PriceIdsNotFound {
  109. missing_ids: invalid_ids,
  110. })
  111. }
  112. }
  113. #[cfg(test)]
  114. mod tests {
  115. use {
  116. super::*,
  117. crate::state::{
  118. aggregate::{
  119. AggregationEvent, PriceFeedsWithUpdateData, PublisherStakeCapsWithUpdateData,
  120. ReadinessMetadata, RequestTime, TwapsWithUpdateData, Update,
  121. },
  122. benchmarks::BenchmarksState,
  123. cache::CacheState,
  124. metrics::MetricsState,
  125. price_feeds_metadata::PriceFeedMetaState,
  126. },
  127. anyhow::Result,
  128. std::{collections::HashSet, sync::Arc},
  129. tokio::sync::broadcast::Receiver,
  130. };
  131. // Simplified mock that only contains what we need
  132. struct MockAggregates {
  133. available_ids: HashSet<PriceIdentifier>,
  134. }
  135. // Implement all required From traits with unimplemented!()
  136. impl<'a> From<&'a MockAggregates> for &'a CacheState {
  137. fn from(_: &'a MockAggregates) -> Self {
  138. unimplemented!("Not needed for this test")
  139. }
  140. }
  141. impl<'a> From<&'a MockAggregates> for &'a BenchmarksState {
  142. fn from(_: &'a MockAggregates) -> Self {
  143. unimplemented!("Not needed for this test")
  144. }
  145. }
  146. impl<'a> From<&'a MockAggregates> for &'a PriceFeedMetaState {
  147. fn from(_: &'a MockAggregates) -> Self {
  148. unimplemented!("Not needed for this test")
  149. }
  150. }
  151. impl<'a> From<&'a MockAggregates> for &'a MetricsState {
  152. fn from(_: &'a MockAggregates) -> Self {
  153. unimplemented!("Not needed for this test")
  154. }
  155. }
  156. #[async_trait::async_trait]
  157. impl Aggregates for MockAggregates {
  158. async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier> {
  159. self.available_ids.clone()
  160. }
  161. fn subscribe(&self) -> Receiver<AggregationEvent> {
  162. unimplemented!("Not needed for this test")
  163. }
  164. async fn is_ready(&self) -> (bool, ReadinessMetadata) {
  165. unimplemented!("Not needed for this test")
  166. }
  167. async fn store_update(&self, _update: Update) -> Result<()> {
  168. unimplemented!("Not needed for this test")
  169. }
  170. async fn get_price_feeds_with_update_data(
  171. &self,
  172. _price_ids: &[PriceIdentifier],
  173. _request_time: RequestTime,
  174. ) -> Result<PriceFeedsWithUpdateData> {
  175. unimplemented!("Not needed for this test")
  176. }
  177. async fn get_latest_publisher_stake_caps_with_update_data(
  178. &self,
  179. ) -> Result<PublisherStakeCapsWithUpdateData> {
  180. unimplemented!("Not needed for this test")
  181. }
  182. async fn get_twaps_with_update_data(
  183. &self,
  184. _price_ids: &[PriceIdentifier],
  185. _window_seconds: u64,
  186. _end_time: RequestTime,
  187. ) -> Result<TwapsWithUpdateData> {
  188. unimplemented!("Not needed for this test")
  189. }
  190. }
  191. #[tokio::test]
  192. async fn validate_price_ids_accepts_all_valid_ids() {
  193. let id1 = PriceIdentifier::new([1; 32]);
  194. let id2 = PriceIdentifier::new([2; 32]);
  195. let mut available_ids = HashSet::new();
  196. available_ids.insert(id1);
  197. available_ids.insert(id2);
  198. let mock_state = MockAggregates { available_ids };
  199. let api_state = ApiState::new(Arc::new(mock_state), vec![], String::new());
  200. let input_ids = vec![id1, id2];
  201. let result = validate_price_ids(&api_state, &input_ids, false).await;
  202. assert!(result.is_ok());
  203. assert_eq!(result.unwrap(), input_ids);
  204. }
  205. #[tokio::test]
  206. async fn validate_price_ids_removes_invalid_ids_when_requested() {
  207. let id1 = PriceIdentifier::new([1; 32]);
  208. let id2 = PriceIdentifier::new([2; 32]);
  209. let id3 = PriceIdentifier::new([3; 32]);
  210. let mut available_ids = HashSet::new();
  211. available_ids.insert(id1);
  212. available_ids.insert(id2);
  213. let mock_state = MockAggregates { available_ids };
  214. let api_state = ApiState::new(Arc::new(mock_state), vec![], String::new());
  215. let input_ids = vec![id1, id2, id3];
  216. let result = validate_price_ids(&api_state, &input_ids, true).await;
  217. assert!(result.is_ok());
  218. assert_eq!(result.unwrap(), vec![id1, id2]);
  219. }
  220. #[tokio::test]
  221. async fn validate_price_ids_errors_on_invalid_ids() {
  222. let id1 = PriceIdentifier::new([1; 32]);
  223. let id2 = PriceIdentifier::new([2; 32]);
  224. let id3 = PriceIdentifier::new([3; 32]);
  225. let mut available_ids = HashSet::new();
  226. available_ids.insert(id1);
  227. available_ids.insert(id2);
  228. let mock_state = MockAggregates { available_ids };
  229. let api_state = ApiState::new(Arc::new(mock_state), vec![], String::new());
  230. let input_ids = vec![id1, id2, id3];
  231. let result = validate_price_ids(&api_state, &input_ids, false).await;
  232. assert!(
  233. matches!(result, Err(RestError::PriceIdsNotFound { missing_ids }) if missing_ids == vec![id3])
  234. );
  235. }
  236. }