price_pusher_service.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. //! Price Pusher Service
  2. //!
  3. //! This service is responsible for pushing price updates to the target blockchain network
  4. //! via the Pulse contract's updatePriceFeeds function.
  5. //! It is used by the Controller service to update the on-chain price when the update criteria
  6. //! is met for a given subscription.
  7. //! The service handles retries and gas escalation to ensure the price update is successful.
  8. use anyhow::{anyhow, Context as _, Result};
  9. use async_trait::async_trait;
  10. use backoff::ExponentialBackoff;
  11. use std::sync::{Arc, Mutex};
  12. use tokio::sync::{mpsc, watch};
  13. use tracing;
  14. use crate::adapters::contract::UpdateChainPrices;
  15. use crate::adapters::hermes::ReadPythPrices;
  16. use crate::services::types::PushRequest;
  17. use crate::services::Service;
  18. use crate::state::ChainName;
  19. pub struct PricePusherService {
  20. #[allow(dead_code, reason = "unknown")]
  21. chain_name: ChainName,
  22. name: String,
  23. contract: Arc<dyn UpdateChainPrices + Send + Sync>,
  24. pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
  25. #[allow(dead_code, reason = "unknown")]
  26. backoff_policy: ExponentialBackoff,
  27. request_rx: Mutex<Option<mpsc::Receiver<PushRequest>>>,
  28. request_tx: mpsc::Sender<PushRequest>,
  29. }
  30. impl PricePusherService {
  31. pub fn new(
  32. chain_name: ChainName,
  33. contract: Arc<dyn UpdateChainPrices + Send + Sync>,
  34. pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
  35. backoff_policy: ExponentialBackoff,
  36. ) -> Self {
  37. let (request_tx, request_rx) = mpsc::channel(100);
  38. Self {
  39. chain_name: chain_name.clone(),
  40. name: format!("PricePusherService-{chain_name}"),
  41. contract,
  42. pyth_price_client,
  43. backoff_policy,
  44. request_rx: Mutex::new(Some(request_rx)),
  45. request_tx,
  46. }
  47. }
  48. pub fn request_sender(&self) -> mpsc::Sender<PushRequest> {
  49. self.request_tx.clone()
  50. }
  51. #[tracing::instrument(
  52. skip(self),
  53. fields(
  54. name = "handle_request",
  55. task = self.name,
  56. subscription_id = request.subscription_id.to_string()
  57. )
  58. )]
  59. async fn handle_request(&self, request: PushRequest) {
  60. let price_ids = request.price_ids.clone();
  61. match self.pyth_price_client.get_latest_prices(&price_ids).await {
  62. Ok(update_data) => {
  63. match self
  64. .contract
  65. .update_price_feeds(request.subscription_id, &price_ids, &update_data)
  66. .await
  67. {
  68. Ok(tx_hash) => {
  69. tracing::info!(
  70. service = self.name,
  71. subscription_id = request.subscription_id.to_string(),
  72. tx_hash = tx_hash.to_string(),
  73. "Successfully pushed price updates"
  74. );
  75. }
  76. Err(e) => {
  77. tracing::error!(
  78. service = self.name,
  79. subscription_id = request.subscription_id.to_string(),
  80. error = %e,
  81. "Failed to push price updates"
  82. );
  83. }
  84. }
  85. }
  86. Err(e) => {
  87. tracing::error!(
  88. service = self.name,
  89. subscription_id = request.subscription_id.to_string(),
  90. error = %e,
  91. "Failed to get Pyth price update data"
  92. );
  93. }
  94. }
  95. }
  96. }
  97. #[async_trait]
  98. impl Service for PricePusherService {
  99. fn name(&self) -> &str {
  100. &self.name
  101. }
  102. async fn start(&self, mut exit_rx: watch::Receiver<bool>) -> Result<()> {
  103. let mut receiver = self
  104. .request_rx
  105. .lock()
  106. .map_err(|_| anyhow!("Mutex poisoned"))?
  107. .take()
  108. .context("Request receiver already taken")?;
  109. loop {
  110. tokio::select! {
  111. Some(request) = receiver.recv() => {
  112. self.handle_request(request).await;
  113. }
  114. _ = exit_rx.changed() => {
  115. if *exit_rx.borrow() {
  116. tracing::info!(
  117. service = self.name,
  118. "Stopping price pusher service"
  119. );
  120. break;
  121. }
  122. }
  123. }
  124. }
  125. Ok(())
  126. }
  127. }