state.rs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. //! This module contains the global state of the application.
  2. use {
  3. self::{
  4. aggregate::{
  5. AggregateState,
  6. AggregationEvent,
  7. },
  8. benchmarks::BenchmarksState,
  9. cache::CacheState,
  10. metrics::MetricsState,
  11. price_feeds_metadata::PriceFeedMetaState,
  12. wormhole::WormholeState,
  13. },
  14. prometheus_client::registry::Registry,
  15. reqwest::Url,
  16. std::sync::Arc,
  17. tokio::sync::broadcast::Sender,
  18. };
  19. pub mod aggregate;
  20. pub mod benchmarks;
  21. pub mod cache;
  22. pub mod metrics;
  23. pub mod price_feeds_metadata;
  24. pub mod wormhole;
  25. // Expose State interfaces and types for other modules.
  26. pub use {
  27. aggregate::Aggregates,
  28. benchmarks::Benchmarks,
  29. cache::Cache,
  30. metrics::Metrics,
  31. price_feeds_metadata::PriceFeedMeta,
  32. wormhole::Wormhole,
  33. };
  34. /// State contains all relevant shared application state.
  35. ///
  36. /// This type is intentionally not exposed, forcing modules to interface with the
  37. /// various API's using the provided traits. This is done to enforce separation of
  38. /// concerns and to avoid direct manipulation of state.
  39. struct State {
  40. /// State for the `Cache` service for short-lived storage of updates.
  41. pub cache: CacheState,
  42. /// State for the `Benchmarks` service for looking up historical updates.
  43. pub benchmarks: BenchmarksState,
  44. /// State for the `PriceFeedMeta` service for looking up metadata related to Pyth price feeds.
  45. pub price_feed_meta: PriceFeedMetaState,
  46. /// State for accessing/storing Pyth price aggregates.
  47. pub aggregates: AggregateState,
  48. /// State for tracking wormhole state when reading VAAs.
  49. pub wormhole: WormholeState,
  50. /// Metrics registry for tracking process metrics and timings.
  51. pub metrics: MetricsState,
  52. }
  53. pub fn new(
  54. update_tx: Sender<AggregationEvent>,
  55. cache_size: u64,
  56. benchmarks_endpoint: Option<Url>,
  57. ) -> Arc<impl Metrics + Wormhole> {
  58. let mut metrics_registry = Registry::default();
  59. Arc::new(State {
  60. cache: CacheState::new(cache_size),
  61. benchmarks: BenchmarksState::new(benchmarks_endpoint),
  62. price_feed_meta: PriceFeedMetaState::new(),
  63. aggregates: AggregateState::new(update_tx, &mut metrics_registry),
  64. wormhole: WormholeState::new(),
  65. metrics: MetricsState::new(metrics_registry),
  66. })
  67. }
  68. #[cfg(test)]
  69. pub mod test {
  70. use {
  71. super::{
  72. aggregate::AggregationEvent,
  73. Aggregates,
  74. Wormhole,
  75. },
  76. crate::network::wormhole::GuardianSet,
  77. std::sync::Arc,
  78. tokio::sync::broadcast::Receiver,
  79. };
  80. pub async fn setup_state(
  81. cache_size: u64,
  82. ) -> (Arc<impl Aggregates>, Receiver<AggregationEvent>) {
  83. let (update_tx, update_rx) = tokio::sync::broadcast::channel(1000);
  84. let state = super::new(update_tx, cache_size, None);
  85. // Add an initial guardian set with public key 0
  86. Wormhole::update_guardian_set(
  87. &*state,
  88. 0,
  89. GuardianSet {
  90. keys: vec![[0; 20]],
  91. },
  92. )
  93. .await;
  94. (state, update_rx)
  95. }
  96. }