controller_service.rs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. //! Controller Service
  2. //!
  3. //! This service orchestrates the price update process for a given blockchain network.
  4. //! It reads from the SubscriptionState, PythPriceState, and ChainPriceState to determine
  5. //! whether to update the on-chain price for a given subscription. It also triggers the
  6. //! PricePusherService to push the update to the target blockchain network.
  7. use anyhow::Result;
  8. use async_trait::async_trait;
  9. use std::sync::Arc;
  10. use std::time::Duration;
  11. use tokio::sync::watch;
  12. use tokio::time;
  13. use tracing;
  14. use crate::adapters::types::{PriceId, SubscriptionId};
  15. use crate::services::types::PushRequest;
  16. use crate::services::Service;
  17. use crate::state::ChainName;
  18. use crate::state::{ChainPriceState, PythPriceState, SubscriptionState};
  19. pub struct ControllerService {
  20. name: String,
  21. update_interval: Duration,
  22. subscription_state: Arc<SubscriptionState>,
  23. pyth_price_state: Arc<PythPriceState>,
  24. chain_price_state: Arc<ChainPriceState>,
  25. }
  26. impl ControllerService {
  27. pub fn new(
  28. chain_name: ChainName,
  29. update_interval: Duration,
  30. subscription_state: Arc<SubscriptionState>,
  31. pyth_price_state: Arc<PythPriceState>,
  32. chain_price_state: Arc<ChainPriceState>,
  33. ) -> Self {
  34. Self {
  35. name: format!("ControllerService-{chain_name}"),
  36. update_interval,
  37. subscription_state,
  38. pyth_price_state,
  39. chain_price_state,
  40. }
  41. }
  42. async fn perform_update(&self) {
  43. let subscriptions = self.subscription_state.get_subscriptions();
  44. tracing::debug!(
  45. service = self.name,
  46. subscription_count = subscriptions.len(),
  47. "Checking subscriptions for updates"
  48. );
  49. for (sub_id, params) in subscriptions {
  50. let needs_update = false;
  51. let mut feed_ids: Vec<PriceId> = Vec::new();
  52. for feed_id in &params.price_ids {
  53. let feed_id = PriceId::new(*feed_id);
  54. let pyth_price = self.pyth_price_state.get_price(&feed_id);
  55. let chain_price = self.chain_price_state.get_price(&feed_id);
  56. if pyth_price.is_none() || chain_price.is_none() {
  57. continue;
  58. }
  59. feed_ids.push(feed_id);
  60. }
  61. // TODO: this never happens
  62. if needs_update && !feed_ids.is_empty() {
  63. self.trigger_update(sub_id, feed_ids).await;
  64. }
  65. }
  66. }
  67. async fn trigger_update(&self, subscription_id: SubscriptionId, price_ids: Vec<PriceId>) {
  68. tracing::info!(
  69. service = self.name,
  70. subscription_id = subscription_id.to_string(),
  71. feed_count = price_ids.len(),
  72. "Triggering price update"
  73. );
  74. let _request = PushRequest {
  75. subscription_id,
  76. price_ids,
  77. };
  78. tracing::debug!(
  79. service = self.name,
  80. "Would push update for subscription {}",
  81. subscription_id
  82. );
  83. }
  84. }
  85. #[async_trait]
  86. impl Service for ControllerService {
  87. fn name(&self) -> &str {
  88. &self.name
  89. }
  90. async fn start(&self, mut stop_rx: watch::Receiver<bool>) -> Result<()> {
  91. let mut interval = time::interval(self.update_interval);
  92. loop {
  93. tokio::select! {
  94. _ = interval.tick() => {
  95. self.perform_update().await;
  96. }
  97. _ = stop_rx.changed() => {
  98. if *stop_rx.borrow() {
  99. tracing::info!(
  100. service = self.name,
  101. "Stopping controller service"
  102. );
  103. break;
  104. }
  105. }
  106. }
  107. }
  108. Ok(())
  109. }
  110. }