batch_vaa.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. use {
  2. crate::store::{
  3. storage::{
  4. Key,
  5. StorageData,
  6. },
  7. RequestTime,
  8. State,
  9. UnixTimestamp,
  10. },
  11. anyhow::{
  12. anyhow,
  13. Result,
  14. },
  15. pyth_sdk::{
  16. Price,
  17. PriceFeed,
  18. PriceIdentifier,
  19. },
  20. pyth_wormhole_attester_sdk::{
  21. BatchPriceAttestation,
  22. PriceAttestation,
  23. PriceStatus,
  24. },
  25. std::{
  26. collections::{
  27. HashMap,
  28. HashSet,
  29. },
  30. time::{
  31. SystemTime,
  32. UNIX_EPOCH,
  33. },
  34. },
  35. wormhole::VAA,
  36. };
  37. // TODO: We need to add more metadata to this struct.
  38. #[derive(Clone, Default, PartialEq, Debug)]
  39. pub struct PriceInfo {
  40. pub price_feed: PriceFeed,
  41. pub vaa_bytes: Vec<u8>,
  42. pub publish_time: UnixTimestamp,
  43. pub emitter_chain: u16,
  44. pub attestation_time: UnixTimestamp,
  45. pub receive_time: UnixTimestamp,
  46. pub sequence_number: u64,
  47. }
  48. #[derive(Clone, Default)]
  49. pub struct PriceInfosWithUpdateData {
  50. pub price_infos: HashMap<PriceIdentifier, PriceInfo>,
  51. pub update_data: Vec<Vec<u8>>,
  52. }
  53. pub fn store_vaa_update(state: State, vaa_bytes: Vec<u8>) -> Result<Vec<PriceIdentifier>> {
  54. // FIXME: Vaa bytes might not be a valid Pyth BatchUpdate message nor originate from Our emitter.
  55. // We should check that.
  56. // FIXME: We receive multiple vaas for the same update (due to different signedVAAs). We need
  57. // to drop them.
  58. let vaa = VAA::from_bytes(&vaa_bytes)?;
  59. let batch_price_attestation = BatchPriceAttestation::deserialize(vaa.payload.as_slice())
  60. .map_err(|_| anyhow!("Failed to deserialize VAA"))?;
  61. let mut updated_price_feed_ids = Vec::new();
  62. for price_attestation in batch_price_attestation.price_attestations {
  63. let price_feed = price_attestation_to_price_feed(price_attestation.clone());
  64. let publish_time = price_feed.get_price_unchecked().publish_time.try_into()?;
  65. let price_info = PriceInfo {
  66. price_feed,
  67. vaa_bytes: vaa_bytes.clone(),
  68. publish_time,
  69. emitter_chain: vaa.emitter_chain.into(),
  70. attestation_time: price_attestation.attestation_time.try_into()?,
  71. receive_time: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
  72. sequence_number: vaa.sequence,
  73. };
  74. let key = Key::BatchVaa(price_feed.id);
  75. state.insert(key, publish_time, StorageData::BatchVaa(price_info))?;
  76. // FIXME: Only add price feed if it's newer
  77. // or include whether it's newer or not in the vector
  78. updated_price_feed_ids.push(price_feed.id);
  79. }
  80. Ok(updated_price_feed_ids)
  81. }
  82. pub fn get_price_infos_with_update_data(
  83. state: State,
  84. price_ids: Vec<PriceIdentifier>,
  85. request_time: RequestTime,
  86. ) -> Result<PriceInfosWithUpdateData> {
  87. let mut price_infos = HashMap::new();
  88. let mut vaas: HashSet<Vec<u8>> = HashSet::new();
  89. for price_id in price_ids {
  90. let key = Key::BatchVaa(price_id);
  91. let maybe_data = state.get(key, request_time.clone())?;
  92. match maybe_data {
  93. Some(StorageData::BatchVaa(price_info)) => {
  94. vaas.insert(price_info.vaa_bytes.clone());
  95. price_infos.insert(price_id, price_info);
  96. }
  97. None => {
  98. return Err(anyhow!("No price feed found for price id: {:?}", price_id));
  99. }
  100. }
  101. }
  102. let update_data: Vec<Vec<u8>> = vaas.into_iter().collect();
  103. Ok(PriceInfosWithUpdateData {
  104. price_infos,
  105. update_data,
  106. })
  107. }
  108. pub fn get_price_feed_ids(state: State) -> Vec<PriceIdentifier> {
  109. // Currently we have only one type and filter map is not necessary.
  110. // But we might have more types in the future.
  111. #[allow(clippy::unnecessary_filter_map)]
  112. state
  113. .keys()
  114. .into_iter()
  115. .filter_map(|key| match key {
  116. Key::BatchVaa(price_id) => Some(price_id),
  117. })
  118. .collect()
  119. }
  120. /// Convert a PriceAttestation to a PriceFeed.
  121. ///
  122. /// We cannot implmenet this function as From/Into trait because none of these types are defined in this crate.
  123. /// Ideally we need to move this method to the wormhole_attester sdk crate or have our own implementation of PriceFeed.
  124. pub fn price_attestation_to_price_feed(price_attestation: PriceAttestation) -> PriceFeed {
  125. if price_attestation.status == PriceStatus::Trading {
  126. PriceFeed::new(
  127. // This conversion is done because the identifier on the wormhole_attester uses sdk v0.5.0 and this crate uses 0.7.0
  128. PriceIdentifier::new(price_attestation.price_id.to_bytes()),
  129. Price {
  130. price: price_attestation.price,
  131. conf: price_attestation.conf,
  132. publish_time: price_attestation.publish_time,
  133. expo: price_attestation.expo,
  134. },
  135. Price {
  136. price: price_attestation.ema_price,
  137. conf: price_attestation.ema_conf,
  138. publish_time: price_attestation.publish_time,
  139. expo: price_attestation.expo,
  140. },
  141. )
  142. } else {
  143. PriceFeed::new(
  144. PriceIdentifier::new(price_attestation.price_id.to_bytes()),
  145. Price {
  146. price: price_attestation.prev_price,
  147. conf: price_attestation.prev_conf,
  148. publish_time: price_attestation.prev_publish_time,
  149. expo: price_attestation.expo,
  150. },
  151. Price {
  152. price: price_attestation.ema_price,
  153. conf: price_attestation.ema_conf,
  154. publish_time: price_attestation.prev_publish_time,
  155. expo: price_attestation.expo,
  156. },
  157. )
  158. }
  159. }