浏览代码

refactor: implement trait-based state access restrictions

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>
Devin AI 6 月之前
父节点
当前提交
91cf013b2e

+ 150 - 21
apps/argus/src/keeper_shared.rs

@@ -2,7 +2,7 @@ use std::sync::Arc;
 use std::time::Duration;
 use anyhow::Result;
 use backoff::ExponentialBackoff;
-use tokio::sync::watch;
+use tokio::sync::{mpsc, watch};
 use tracing;
 
 use crate::adapters::{
@@ -14,11 +14,15 @@ use crate::api::BlockchainState;
 use crate::config::EthereumConfig;
 use crate::keeper::keeper_metrics::KeeperMetrics;
 use crate::services::{
+    ChainPriceAware,
     ChainPriceService,
     ControllerService,
     PricePusherService,
+    PythPriceAware,
     PythPriceService,
     Service,
+    SubscriptionAware,
+    SystemControlAware,
     SubscriptionService,
 };
 use crate::state::ArgusState;
@@ -77,51 +81,176 @@ pub async fn run_keeper_for_chain(
         ..ExponentialBackoff::default()
     };
 
-    let subscription_service = SubscriptionService::new(
+    let subscription_service = Arc::new(SubscriptionService::new(
         chain_state.name.clone(),
         contract.clone() as Arc<dyn ReadChainSubscriptions + Send + Sync>,
         subscription_poll_interval,
-    );
+    ));
 
-    let pyth_price_service = PythPriceService::new(
+    let pyth_price_service = Arc::new(PythPriceService::new(
         chain_state.name.clone(),
         hermes_client.clone(),
-    );
+    ));
 
-    let chain_price_service = ChainPriceService::new(
+    let chain_price_service = Arc::new(ChainPriceService::new(
         chain_state.name.clone(),
         contract.clone(),
         chain_price_poll_interval,
-    );
+    ));
 
-    let price_pusher_service = PricePusherService::new(
+    let price_pusher_service = Arc::new(PricePusherService::new(
         chain_state.name.clone(),
         contract.clone(),
         hermes_client.clone(),
         backoff_policy,
-    );
+    ));
 
-    let controller_service = ControllerService::new(
+    let mut controller_service = ControllerService::new(
         chain_state.name.clone(),
         controller_update_interval,
     );
-
-    let services: Vec<Arc<dyn Service>> = vec![
-        Arc::new(subscription_service),
-        Arc::new(pyth_price_service),
-        Arc::new(chain_price_service),
-        Arc::new(price_pusher_service),
-        Arc::new(controller_service),
-    ];
+    
+    let price_pusher_tx = price_pusher_service.request_sender();
+    controller_service.set_price_pusher_tx(price_pusher_tx);
+    let controller_service = Arc::new(controller_service);
 
     let mut handles = Vec::new();
-    for service in services {
-        let service_state = state.clone();
+    
+    {
+        let subscription_reader = state.subscription_reader();
+        let subscription_writer = state.subscription_writer();
+        let pyth_price_writer = state.pyth_price_writer();
+        let chain_price_writer = state.chain_price_writer();
+        let service_stop_rx = stop_rx.clone();
+        
+        let service = subscription_service.clone();
+        let handle = tokio::spawn(async move {
+            let service_name = service.name().to_string();
+            match service.start_with_subscription(
+                subscription_reader,
+                subscription_writer,
+                pyth_price_writer,
+                chain_price_writer,
+                service_stop_rx
+            ).await {
+                Ok(_) => {
+                    tracing::info!(service = service_name, "Service stopped gracefully");
+                }
+                Err(e) => {
+                    tracing::error!(
+                        service = service_name,
+                        error = %e,
+                        "Service stopped with error"
+                    );
+                }
+            }
+        });
+        
+        handles.push(handle);
+    }
+    
+    {
+        let pyth_price_reader = state.pyth_price_reader();
+        let pyth_price_writer = state.pyth_price_writer();
+        let service_stop_rx = stop_rx.clone();
+        
+        let service = pyth_price_service.clone();
+        let handle = tokio::spawn(async move {
+            let service_name = service.name().to_string();
+            match service.start_with_pyth_price(
+                pyth_price_reader,
+                pyth_price_writer,
+                service_stop_rx
+            ).await {
+                Ok(_) => {
+                    tracing::info!(service = service_name, "Service stopped gracefully");
+                }
+                Err(e) => {
+                    tracing::error!(
+                        service = service_name,
+                        error = %e,
+                        "Service stopped with error"
+                    );
+                }
+            }
+        });
+        
+        handles.push(handle);
+    }
+    
+    {
+        let chain_price_reader = state.chain_price_reader();
+        let chain_price_writer = state.chain_price_writer();
+        let service_stop_rx = stop_rx.clone();
+        
+        let service = chain_price_service.clone();
+        let handle = tokio::spawn(async move {
+            let service_name = service.name().to_string();
+            match service.start_with_chain_price(
+                chain_price_reader,
+                chain_price_writer,
+                service_stop_rx
+            ).await {
+                Ok(_) => {
+                    tracing::info!(service = service_name, "Service stopped gracefully");
+                }
+                Err(e) => {
+                    tracing::error!(
+                        service = service_name,
+                        error = %e,
+                        "Service stopped with error"
+                    );
+                }
+            }
+        });
+        
+        handles.push(handle);
+    }
+    
+    {
+        let system_control = state.system_control();
+        let service_stop_rx = stop_rx.clone();
+        
+        let service = price_pusher_service.clone();
+        let handle = tokio::spawn(async move {
+            let service_name = service.name().to_string();
+            match service.start_with_system_control(
+                system_control,
+                service_stop_rx
+            ).await {
+                Ok(_) => {
+                    tracing::info!(service = service_name, "Service stopped gracefully");
+                }
+                Err(e) => {
+                    tracing::error!(
+                        service = service_name,
+                        error = %e,
+                        "Service stopped with error"
+                    );
+                }
+            }
+        });
+        
+        handles.push(handle);
+    }
+    
+    {
+        let subscription_reader = state.subscription_reader();
+        let subscription_writer = state.subscription_writer();
+        let pyth_price_writer = state.pyth_price_writer();
+        let chain_price_writer = state.chain_price_writer();
         let service_stop_rx = stop_rx.clone();
         
+        let service = controller_service.clone();
         let handle = tokio::spawn(async move {
             let service_name = service.name().to_string();
-            match service.start(service_state, service_stop_rx).await {
+            match service.start_with_subscription(
+                subscription_reader,
+                subscription_writer,
+                pyth_price_writer,
+                chain_price_writer,
+                service_stop_rx
+            ).await {
                 Ok(_) => {
                     tracing::info!(service = service_name, "Service stopped gracefully");
                 }

+ 24 - 7
apps/argus/src/services/chain_price_service.rs

@@ -7,11 +7,12 @@ use tokio::time;
 use tracing;
 
 use crate::adapters::contract::GetChainPrices;
-use crate::state::ArgusState;
-use crate::services::Service;
+use crate::services::{Service, ChainPriceAware};
+use crate::state::traits::{ReadChainPriceState, WriteChainPriceState};
 
 pub struct ChainPriceService {
     name: String,
+    chain_id: String,
     contract: Arc<dyn GetChainPrices + Send + Sync>,
     poll_interval: Duration,
 }
@@ -24,14 +25,14 @@ impl ChainPriceService {
     ) -> Self {
         Self {
             name: format!("ChainPriceService-{}", chain_id),
+            chain_id: chain_id.clone(),
             contract,
             poll_interval,
         }
     }
     
-    async fn poll_prices(&self, state: Arc<ArgusState>) {
-        let feed_ids = state.chain_price_state.get_feed_ids();
-        
+    async fn poll_prices(&self, chain_price_reader: Arc<dyn ReadChainPriceState>) {
+        let feed_ids = chain_price_reader.get_feed_ids();
         
         tracing::debug!(
             service = self.name,
@@ -47,7 +48,23 @@ impl Service for ChainPriceService {
         &self.name
     }
     
-    async fn start(&self, state: Arc<ArgusState>, mut stop_rx: watch::Receiver<bool>) -> Result<()> {
+    async fn start(&self, stop_rx: watch::Receiver<bool>) -> Result<()> {
+        tracing::error!(
+            service = self.name,
+            "ChainPriceService must be started with chain price state access"
+        );
+        Err(anyhow::anyhow!("ChainPriceService requires chain price state access"))
+    }
+}
+
+#[async_trait]
+impl ChainPriceAware for ChainPriceService {
+    async fn start_with_chain_price(
+        &self,
+        chain_price_reader: Arc<dyn ReadChainPriceState>,
+        chain_price_writer: Arc<dyn WriteChainPriceState>,
+        mut stop_rx: watch::Receiver<bool>
+    ) -> Result<()> {
         if let Err(e) = self.contract.subscribe_to_price_events().await {
             tracing::error!(
                 service = self.name,
@@ -60,7 +77,7 @@ impl Service for ChainPriceService {
         loop {
             tokio::select! {
                 _ = interval.tick() => {
-                    self.poll_prices(state.clone()).await;
+                    self.poll_prices(chain_price_reader.clone()).await;
                 }
                 _ = stop_rx.changed() => {
                     if *stop_rx.borrow() {

+ 93 - 17
apps/argus/src/services/controller_service.rs

@@ -1,19 +1,49 @@
 use std::sync::Arc;
 use std::time::Duration;
+use std::collections::HashSet;
 use anyhow::Result;
 use async_trait::async_trait;
-use tokio::sync::watch;
+use tokio::sync::{mpsc, watch};
 use tokio::time;
 use tracing;
 
-use crate::adapters::types::{PriceId, SubscriptionId};
+use crate::adapters::types::{PriceId, SubscriptionId, Price};
+use crate::services::{Service, SubscriptionAware};
 use crate::services::types::PushRequest;
-use crate::state::ArgusState;
-use crate::services::Service;
+use crate::state::traits::{
+    ReadSubscriptionState, WriteSubscriptionState,
+    ReadPythPriceState, WritePythPriceState,
+    ReadChainPriceState, WriteChainPriceState
+};
+
+struct PythPriceReaderAdapter(Arc<dyn ReadSubscriptionState>);
+struct ChainPriceReaderAdapter(Arc<dyn ReadSubscriptionState>);
+
+impl ReadPythPriceState for PythPriceReaderAdapter {
+    fn get_price(&self, feed_id: &PriceId) -> Option<Price> {
+        None // Placeholder implementation
+    }
+    
+    fn get_feed_ids(&self) -> HashSet<PriceId> {
+        self.0.get_feed_ids()
+    }
+}
+
+impl ReadChainPriceState for ChainPriceReaderAdapter {
+    fn get_price(&self, feed_id: &PriceId) -> Option<Price> {
+        None // Placeholder implementation
+    }
+    
+    fn get_feed_ids(&self) -> HashSet<PriceId> {
+        self.0.get_feed_ids()
+    }
+}
 
 pub struct ControllerService {
     name: String,
+    chain_id: String,
     update_interval: Duration,
+    price_pusher_tx: Option<mpsc::Sender<PushRequest>>,
 }
 
 impl ControllerService {
@@ -23,12 +53,23 @@ impl ControllerService {
     ) -> Self {
         Self {
             name: format!("ControllerService-{}", chain_id),
+            chain_id: chain_id.clone(),
             update_interval,
+            price_pusher_tx: None,
         }
     }
     
-    async fn perform_update(&self, state: Arc<ArgusState>) {
-        let subscriptions = state.subscription_state.get_subscriptions();
+    pub fn set_price_pusher_tx(&mut self, tx: mpsc::Sender<PushRequest>) {
+        self.price_pusher_tx = Some(tx);
+    }
+    
+    async fn perform_update(
+        &self,
+        subscription_reader: Arc<dyn ReadSubscriptionState>,
+        pyth_price_reader: Arc<dyn ReadPythPriceState>,
+        chain_price_reader: Arc<dyn ReadChainPriceState>
+    ) {
+        let subscriptions = subscription_reader.get_subscriptions();
         
         tracing::debug!(
             service = self.name,
@@ -41,8 +82,8 @@ impl ControllerService {
             let mut feed_ids = Vec::new();
             
             for feed_id in &params.price_feed_ids {
-                let pyth_price = state.pyth_price_state.get_price(feed_id);
-                let chain_price = state.chain_price_state.get_price(feed_id);
+                let pyth_price = pyth_price_reader.get_price(feed_id);
+                let chain_price = chain_price_reader.get_price(feed_id);
                 
                 if pyth_price.is_none() || chain_price.is_none() {
                     continue;
@@ -53,12 +94,12 @@ impl ControllerService {
             }
             
             if needs_update && !feed_ids.is_empty() {
-                self.trigger_update(state.clone(), sub_id, feed_ids).await;
+                self.trigger_update(sub_id, feed_ids).await;
             }
         }
     }
     
-    async fn trigger_update(&self, state: Arc<ArgusState>, subscription_id: SubscriptionId, price_ids: Vec<PriceId>) {
+    async fn trigger_update(&self, subscription_id: SubscriptionId, price_ids: Vec<PriceId>) {
         tracing::info!(
             service = self.name,
             subscription_id = subscription_id.to_string(),
@@ -71,11 +112,21 @@ impl ControllerService {
             price_ids,
         };
         
-        tracing::debug!(
-            service = self.name,
-            "Would push update for subscription {}",
-            subscription_id
-        );
+        if let Some(tx) = &self.price_pusher_tx {
+            if let Err(e) = tx.send(request).await {
+                tracing::error!(
+                    service = self.name,
+                    error = %e,
+                    "Failed to send price update request"
+                );
+            }
+        } else {
+            tracing::debug!(
+                service = self.name,
+                "Would push update for subscription {}",
+                subscription_id
+            );
+        }
     }
 }
 
@@ -85,13 +136,38 @@ impl Service for ControllerService {
         &self.name
     }
     
-    async fn start(&self, state: Arc<ArgusState>, mut stop_rx: watch::Receiver<bool>) -> Result<()> {
+    async fn start(&self, stop_rx: watch::Receiver<bool>) -> Result<()> {
+        tracing::error!(
+            service = self.name,
+            "ControllerService must be started with subscription state access"
+        );
+        Err(anyhow::anyhow!("ControllerService requires subscription state access"))
+    }
+}
+
+#[async_trait]
+impl SubscriptionAware for ControllerService {
+    async fn start_with_subscription(
+        &self,
+        subscription_reader: Arc<dyn ReadSubscriptionState>,
+        _subscription_writer: Arc<dyn WriteSubscriptionState>,
+        _pyth_price_writer: Arc<dyn WritePythPriceState>,
+        _chain_price_writer: Arc<dyn WriteChainPriceState>,
+        mut stop_rx: watch::Receiver<bool>
+    ) -> Result<()> {
+        let pyth_price_reader = Arc::new(PythPriceReaderAdapter(subscription_reader.clone()));
+        let chain_price_reader = Arc::new(ChainPriceReaderAdapter(subscription_reader.clone()));
+        
         let mut interval = time::interval(self.update_interval);
         
         loop {
             tokio::select! {
                 _ = interval.tick() => {
-                    self.perform_update(state.clone()).await;
+                    self.perform_update(
+                        subscription_reader.clone(),
+                        pyth_price_reader.clone(),
+                        chain_price_reader.clone()
+                    ).await;
                 }
                 _ = stop_rx.changed() => {
                     if *stop_rx.borrow() {

+ 20 - 3
apps/argus/src/services/price_pusher_service.rs

@@ -6,12 +6,13 @@ use tokio::sync::{mpsc, watch};
 use tracing;
 
 use crate::adapters::types::{ReadPythPrices, UpdateChainPrices};
+use crate::services::{Service, SystemControlAware};
 use crate::services::types::PushRequest;
-use crate::state::ArgusState;
-use crate::services::Service;
+use crate::state::traits::SystemControl;
 
 pub struct PricePusherService {
     name: String,
+    chain_id: String,
     contract: Arc<dyn UpdateChainPrices + Send + Sync>,
     pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
     backoff_policy: ExponentialBackoff,
@@ -30,6 +31,7 @@ impl PricePusherService {
         
         Self {
             name: format!("PricePusherService-{}", chain_id),
+            chain_id: chain_id.clone(),
             contract,
             pyth_price_client,
             backoff_policy,
@@ -88,7 +90,22 @@ impl Service for PricePusherService {
         &self.name
     }
     
-    async fn start(&self, _state: Arc<ArgusState>, mut stop_rx: watch::Receiver<bool>) -> Result<()> {
+    async fn start(&self, stop_rx: watch::Receiver<bool>) -> Result<()> {
+        tracing::error!(
+            service = self.name,
+            "PricePusherService must be started with system control access"
+        );
+        Err(anyhow::anyhow!("PricePusherService requires system control access"))
+    }
+}
+
+#[async_trait]
+impl SystemControlAware for PricePusherService {
+    async fn start_with_system_control(
+        &self,
+        _system_control: Arc<dyn SystemControl>,
+        mut stop_rx: watch::Receiver<bool>
+    ) -> Result<()> {
         let mut receiver = self.request_rx.lock().expect("Mutex poisoned")
             .take()
             .expect("Request receiver already taken");

+ 21 - 5
apps/argus/src/services/pyth_price_service.rs

@@ -5,8 +5,8 @@ use tokio::sync::watch;
 use tracing;
 
 use crate::adapters::types::ReadPythPrices;
-use crate::state::ArgusState;
-use crate::services::Service;
+use crate::services::{Service, PythPriceAware};
+use crate::state::traits::{ReadPythPriceState, WritePythPriceState};
 
 pub struct PythPriceService {
     name: String,
@@ -31,8 +31,24 @@ impl Service for PythPriceService {
         &self.name
     }
     
-    async fn start(&self, state: Arc<ArgusState>, mut stop_rx: watch::Receiver<bool>) -> Result<()> {
-        let mut last_feed_ids = state.pyth_price_state.get_feed_ids();
+    async fn start(&self, stop_rx: watch::Receiver<bool>) -> Result<()> {
+        tracing::error!(
+            service = self.name,
+            "PythPriceService must be started with Pyth price state access"
+        );
+        Err(anyhow::anyhow!("PythPriceService requires Pyth price state access"))
+    }
+}
+
+#[async_trait]
+impl PythPriceAware for PythPriceService {
+    async fn start_with_pyth_price(
+        &self,
+        pyth_price_reader: Arc<dyn ReadPythPriceState>,
+        pyth_price_writer: Arc<dyn WritePythPriceState>,
+        mut stop_rx: watch::Receiver<bool>
+    ) -> Result<()> {
+        let mut last_feed_ids = pyth_price_reader.get_feed_ids();
         if !last_feed_ids.is_empty() {
             let feed_ids_vec: Vec<_> = last_feed_ids.iter().cloned().collect();
             if let Err(e) = self.pyth_price_client.subscribe_to_price_updates(&feed_ids_vec).await {
@@ -45,7 +61,7 @@ impl Service for PythPriceService {
         }
         
         loop {
-            let current_feed_ids = state.pyth_price_state.get_feed_ids();
+            let current_feed_ids = pyth_price_reader.get_feed_ids();
             if current_feed_ids != last_feed_ids {
                 let feed_ids_vec: Vec<_> = current_feed_ids.iter().cloned().collect();
                 if !feed_ids_vec.is_empty() {

+ 46 - 13
apps/argus/src/services/subscription_service.rs

@@ -7,11 +7,12 @@ use tokio::time;
 use tracing;
 
 use crate::adapters::types::ReadChainSubscriptions;
-use crate::state::ArgusState;
-use crate::services::Service;
+use crate::services::{Service, SubscriptionAware};
+use crate::state::traits::{ReadSubscriptionState, WriteSubscriptionState, WritePythPriceState, WriteChainPriceState};
 
 pub struct SubscriptionService {
     name: String,
+    chain_id: String,
     contract: Arc<dyn ReadChainSubscriptions + Send + Sync>,
     poll_interval: Duration,
 }
@@ -24,31 +25,37 @@ impl SubscriptionService {
     ) -> Self {
         Self {
             name: format!("SubscriptionService-{}", chain_id),
+            chain_id: chain_id.clone(),
             contract,
             poll_interval,
         }
     }
 
-    async fn refresh_subscriptions(&self, state: Arc<ArgusState>) -> Result<()> {
+    async fn refresh_subscriptions(
+        &self,
+        subscription_writer: Arc<dyn WriteSubscriptionState>,
+        pyth_price_writer: Arc<dyn WritePythPriceState>,
+        chain_price_writer: Arc<dyn WriteChainPriceState>
+    ) -> Result<()> {
         match self.contract.get_active_subscriptions().await {
             Ok(subscriptions) => {
                 tracing::info!(
-                    chain_name = state.chain_id,
+                    chain_name = self.chain_id,
                     subscription_count = subscriptions.len(),
                     "Retrieved active subscriptions"
                 );
                 
-                state.subscription_state.update_subscriptions(subscriptions);
+                subscription_writer.update_subscriptions(subscriptions);
                 
-                let feed_ids = state.subscription_state.get_feed_ids();
-                state.pyth_price_state.update_feed_ids(feed_ids.clone());
-                state.chain_price_state.update_feed_ids(feed_ids);
+                let feed_ids = subscription_writer.get_feed_ids();
+                pyth_price_writer.update_feed_ids(feed_ids.clone());
+                chain_price_writer.update_feed_ids(feed_ids);
                 
                 Ok(())
             }
             Err(e) => {
                 tracing::error!(
-                    chain_name = state.chain_id,
+                    chain_name = self.chain_id,
                     error = %e,
                     "Failed to load active subscriptions"
                 );
@@ -64,22 +71,48 @@ impl Service for SubscriptionService {
         &self.name
     }
     
-    async fn start(&self, state: Arc<ArgusState>, mut stop_rx: watch::Receiver<bool>) -> Result<()> {
+    async fn start(&self, stop_rx: watch::Receiver<bool>) -> Result<()> {
+        tracing::error!(
+            service = self.name,
+            "SubscriptionService must be started with subscription state access"
+        );
+        Err(anyhow::anyhow!("SubscriptionService requires subscription state access"))
+    }
+}
+
+#[async_trait]
+impl SubscriptionAware for SubscriptionService {
+    async fn start_with_subscription(
+        &self,
+        subscription_reader: Arc<dyn ReadSubscriptionState>,
+        subscription_writer: Arc<dyn WriteSubscriptionState>,
+        pyth_price_writer: Arc<dyn WritePythPriceState>,
+        chain_price_writer: Arc<dyn WriteChainPriceState>,
+        mut stop_rx: watch::Receiver<bool>
+    ) -> Result<()> {
         if let Err(e) = self.contract.subscribe_to_subscription_events().await {
             tracing::error!(
-                chain_name = state.chain_id,
+                chain_name = self.chain_id,
                 error = %e,
                 "Failed to subscribe to contract events"
             );
         }
         
-        let _ = self.refresh_subscriptions(state.clone()).await;
+        let _ = self.refresh_subscriptions(
+            subscription_writer.clone(),
+            pyth_price_writer.clone(),
+            chain_price_writer.clone()
+        ).await;
         
         let mut interval = time::interval(self.poll_interval);
         loop {
             tokio::select! {
                 _ = interval.tick() => {
-                    let _ = self.refresh_subscriptions(state.clone()).await;
+                    let _ = self.refresh_subscriptions(
+                        subscription_writer.clone(),
+                        pyth_price_writer.clone(),
+                        chain_price_writer.clone()
+                    ).await;
                 }
                 _ = stop_rx.changed() => {
                     if *stop_rx.borrow() {

+ 48 - 2
apps/argus/src/services/types.rs

@@ -3,13 +3,59 @@ use anyhow::Result;
 use async_trait::async_trait;
 use tokio::sync::watch;
 
-use crate::state::ArgusState;
+use crate::state::traits::{
+    ReadSubscriptionState, WriteSubscriptionState,
+    ReadPythPriceState, WritePythPriceState,
+    ReadChainPriceState, WriteChainPriceState,
+    SystemControl
+};
 
 #[async_trait]
 pub trait Service: Send + Sync {
     fn name(&self) -> &str;
     
-    async fn start(&self, state: Arc<ArgusState>, stop_rx: watch::Receiver<bool>) -> Result<()>;
+    async fn start(&self, stop_rx: watch::Receiver<bool>) -> Result<()>;
+}
+
+#[async_trait]
+pub trait SubscriptionAware: Service {
+    async fn start_with_subscription(
+        &self,
+        subscription_reader: Arc<dyn ReadSubscriptionState>,
+        subscription_writer: Arc<dyn WriteSubscriptionState>,
+        pyth_price_writer: Arc<dyn WritePythPriceState>,
+        chain_price_writer: Arc<dyn WriteChainPriceState>,
+        stop_rx: watch::Receiver<bool>
+    ) -> Result<()>;
+}
+
+#[async_trait]
+pub trait PythPriceAware: Service {
+    async fn start_with_pyth_price(
+        &self,
+        pyth_price_reader: Arc<dyn ReadPythPriceState>,
+        pyth_price_writer: Arc<dyn WritePythPriceState>,
+        stop_rx: watch::Receiver<bool>
+    ) -> Result<()>;
+}
+
+#[async_trait]
+pub trait ChainPriceAware: Service {
+    async fn start_with_chain_price(
+        &self,
+        chain_price_reader: Arc<dyn ReadChainPriceState>,
+        chain_price_writer: Arc<dyn WriteChainPriceState>,
+        stop_rx: watch::Receiver<bool>
+    ) -> Result<()>;
+}
+
+#[async_trait]
+pub trait SystemControlAware: Service {
+    async fn start_with_system_control(
+        &self,
+        system_control: Arc<dyn SystemControl>,
+        stop_rx: watch::Receiver<bool>
+    ) -> Result<()>;
 }
 
 #[derive(Debug, Clone)]

+ 98 - 28
apps/argus/src/state.rs → apps/argus/src/state/mod.rs

@@ -1,8 +1,12 @@
+pub mod traits;
+
 use std::collections::{HashMap, HashSet};
 use std::sync::{Arc, RwLock, Mutex};
+use anyhow::Result;
 use tokio::sync::watch;
 
 use crate::adapters::types::{Price, PriceId, SubscriptionId};
+use self::traits::*;
 
 #[derive(Clone)]
 pub struct SubscriptionParams {
@@ -14,9 +18,9 @@ pub struct SubscriptionParams {
 #[derive(Clone)]
 pub struct ArgusState {
     pub chain_id: String,
-    pub subscription_state: Arc<SubscriptionState>,
-    pub pyth_price_state: Arc<PythPriceState>,
-    pub chain_price_state: Arc<ChainPriceState>,
+    subscription_state: Arc<SubscriptionState>,
+    pyth_price_state: Arc<PythPriceState>,
+    chain_price_state: Arc<ChainPriceState>,
     pub stop_sender: Arc<Mutex<Option<watch::Sender<bool>>>>,
 }
 
@@ -30,6 +34,44 @@ impl ArgusState {
             stop_sender: Arc::new(Mutex::new(None)),
         }
     }
+    
+    
+    pub fn subscription_reader(&self) -> Arc<dyn ReadSubscriptionState> {
+        self.subscription_state.clone()
+    }
+    
+    pub fn subscription_writer(&self) -> Arc<dyn WriteSubscriptionState> {
+        self.subscription_state.clone()
+    }
+    
+    pub fn pyth_price_reader(&self) -> Arc<dyn ReadPythPriceState> {
+        self.pyth_price_state.clone()
+    }
+    
+    pub fn pyth_price_writer(&self) -> Arc<dyn WritePythPriceState> {
+        self.pyth_price_state.clone()
+    }
+    
+    pub fn chain_price_reader(&self) -> Arc<dyn ReadChainPriceState> {
+        self.chain_price_state.clone()
+    }
+    
+    pub fn chain_price_writer(&self) -> Arc<dyn WriteChainPriceState> {
+        self.chain_price_state.clone()
+    }
+    
+    pub fn system_control(&self) -> Arc<dyn SystemControl> {
+        Arc::new(SystemController {
+            stop_sender: self.stop_sender.clone(),
+        })
+    }
+    
+    pub fn setup_stop_channel(&self) -> watch::Receiver<bool> {
+        let (tx, rx) = watch::channel(false);
+        let mut stop_sender = self.stop_sender.lock().expect("Mutex poisoned");
+        *stop_sender = Some(tx);
+        rx
+    }
 }
 
 pub struct SubscriptionState {
@@ -42,12 +84,14 @@ impl SubscriptionState {
             subscriptions: RwLock::new(HashMap::new()),
         }
     }
+}
 
-    pub fn get_subscriptions(&self) -> HashMap<SubscriptionId, SubscriptionParams> {
+impl ReadSubscriptionState for SubscriptionState {
+    fn get_subscriptions(&self) -> HashMap<SubscriptionId, SubscriptionParams> {
         self.subscriptions.read().expect("RwLock poisoned").clone()
     }
 
-    pub fn get_subscription(&self, id: &SubscriptionId) -> Option<SubscriptionParams> {
+    fn get_subscription(&self, id: &SubscriptionId) -> Option<SubscriptionParams> {
         self.subscriptions
             .read()
             .expect("RwLock poisoned")
@@ -55,15 +99,7 @@ impl SubscriptionState {
             .cloned()
     }
 
-    pub fn update_subscriptions(
-        &self,
-        subscriptions: HashMap<SubscriptionId, SubscriptionParams>,
-    ) {
-        let mut lock = self.subscriptions.write().expect("RwLock poisoned");
-        *lock = subscriptions;
-    }
-
-    pub fn get_feed_ids(&self) -> HashSet<PriceId> {
+    fn get_feed_ids(&self) -> HashSet<PriceId> {
         let subscriptions = self.subscriptions.read().expect("RwLock poisoned");
         let mut feed_ids = HashSet::new();
         
@@ -77,6 +113,16 @@ impl SubscriptionState {
     }
 }
 
+impl WriteSubscriptionState for SubscriptionState {
+    fn update_subscriptions(
+        &self,
+        subscriptions: HashMap<SubscriptionId, SubscriptionParams>,
+    ) {
+        let mut lock = self.subscriptions.write().expect("RwLock poisoned");
+        *lock = subscriptions;
+    }
+}
+
 pub struct PythPriceState {
     prices: RwLock<HashMap<PriceId, Price>>,
     feed_ids: RwLock<HashSet<PriceId>>,
@@ -89,8 +135,10 @@ impl PythPriceState {
             feed_ids: RwLock::new(HashSet::new()),
         }
     }
+}
 
-    pub fn get_price(&self, feed_id: &PriceId) -> Option<Price> {
+impl ReadPythPriceState for PythPriceState {
+    fn get_price(&self, feed_id: &PriceId) -> Option<Price> {
         self.prices
             .read()
             .expect("RwLock poisoned")
@@ -98,24 +146,26 @@ impl PythPriceState {
             .cloned()
     }
 
-    pub fn update_price(&self, feed_id: PriceId, price: Price) {
+    fn get_feed_ids(&self) -> HashSet<PriceId> {
+        self.feed_ids.read().expect("RwLock poisoned").clone()
+    }
+}
+
+impl WritePythPriceState for PythPriceState {
+    fn update_price(&self, feed_id: PriceId, price: Price) {
         let mut prices = self.prices.write().expect("RwLock poisoned");
         prices.insert(feed_id, price);
     }
 
-    pub fn update_prices(&self, prices: HashMap<PriceId, Price>) {
+    fn update_prices(&self, prices: HashMap<PriceId, Price>) {
         let mut lock = self.prices.write().expect("RwLock poisoned");
         lock.extend(prices);
     }
 
-    pub fn update_feed_ids(&self, feed_ids: HashSet<PriceId>) {
+    fn update_feed_ids(&self, feed_ids: HashSet<PriceId>) {
         let mut lock = self.feed_ids.write().expect("RwLock poisoned");
         *lock = feed_ids;
     }
-
-    pub fn get_feed_ids(&self) -> HashSet<PriceId> {
-        self.feed_ids.read().expect("RwLock poisoned").clone()
-    }
 }
 
 pub struct ChainPriceState {
@@ -130,8 +180,10 @@ impl ChainPriceState {
             feed_ids: RwLock::new(HashSet::new()),
         }
     }
+}
 
-    pub fn get_price(&self, feed_id: &PriceId) -> Option<Price> {
+impl ReadChainPriceState for ChainPriceState {
+    fn get_price(&self, feed_id: &PriceId) -> Option<Price> {
         self.prices
             .read()
             .expect("RwLock poisoned")
@@ -139,22 +191,40 @@ impl ChainPriceState {
             .cloned()
     }
 
-    pub fn update_price(&self, feed_id: PriceId, price: Price) {
+    fn get_feed_ids(&self) -> HashSet<PriceId> {
+        self.feed_ids.read().expect("RwLock poisoned").clone()
+    }
+}
+
+impl WriteChainPriceState for ChainPriceState {
+    fn update_price(&self, feed_id: PriceId, price: Price) {
         let mut prices = self.prices.write().expect("RwLock poisoned");
         prices.insert(feed_id, price);
     }
 
-    pub fn update_prices(&self, prices: HashMap<PriceId, Price>) {
+    fn update_prices(&self, prices: HashMap<PriceId, Price>) {
         let mut lock = self.prices.write().expect("RwLock poisoned");
         lock.extend(prices);
     }
 
-    pub fn update_feed_ids(&self, feed_ids: HashSet<PriceId>) {
+    fn update_feed_ids(&self, feed_ids: HashSet<PriceId>) {
         let mut lock = self.feed_ids.write().expect("RwLock poisoned");
         *lock = feed_ids;
     }
+}
 
-    pub fn get_feed_ids(&self) -> HashSet<PriceId> {
-        self.feed_ids.read().expect("RwLock poisoned").clone()
+struct SystemController {
+    pub stop_sender: Arc<Mutex<Option<watch::Sender<bool>>>>,
+}
+
+impl SystemControl for SystemController {
+    fn signal_shutdown(&self) -> Result<()> {
+        let sender = self.stop_sender.lock().expect("Mutex poisoned");
+        if let Some(sender) = &*sender {
+            sender.send(true)?;
+            Ok(())
+        } else {
+            Err(anyhow::anyhow!("Stop sender not initialized"))
+        }
     }
 }

+ 50 - 0
apps/argus/src/state/traits.rs

@@ -0,0 +1,50 @@
+use std::collections::{HashMap, HashSet};
+use anyhow::Result;
+use async_trait::async_trait;
+
+use crate::adapters::types::{Price, PriceId, SubscriptionId};
+use crate::state::SubscriptionParams;
+
+pub trait ReadSubscriptionState: Send + Sync {
+    fn get_subscriptions(&self) -> HashMap<SubscriptionId, SubscriptionParams>;
+    
+    fn get_subscription(&self, id: &SubscriptionId) -> Option<SubscriptionParams>;
+    
+    fn get_feed_ids(&self) -> HashSet<PriceId>;
+}
+
+pub trait WriteSubscriptionState: ReadSubscriptionState {
+    fn update_subscriptions(&self, subscriptions: HashMap<SubscriptionId, SubscriptionParams>);
+}
+
+pub trait ReadPythPriceState: Send + Sync {
+    fn get_price(&self, feed_id: &PriceId) -> Option<Price>;
+    
+    fn get_feed_ids(&self) -> HashSet<PriceId>;
+}
+
+pub trait WritePythPriceState: ReadPythPriceState {
+    fn update_price(&self, feed_id: PriceId, price: Price);
+    
+    fn update_prices(&self, prices: HashMap<PriceId, Price>);
+    
+    fn update_feed_ids(&self, feed_ids: HashSet<PriceId>);
+}
+
+pub trait ReadChainPriceState: Send + Sync {
+    fn get_price(&self, feed_id: &PriceId) -> Option<Price>;
+    
+    fn get_feed_ids(&self) -> HashSet<PriceId>;
+}
+
+pub trait WriteChainPriceState: ReadChainPriceState {
+    fn update_price(&self, feed_id: PriceId, price: Price);
+    
+    fn update_prices(&self, prices: HashMap<PriceId, Price>);
+    
+    fn update_feed_ids(&self, feed_ids: HashSet<PriceId>);
+}
+
+pub trait SystemControl: Send + Sync {
+    fn signal_shutdown(&self) -> Result<()>;
+}