Browse Source

actors work now

Tejas Badadare 6 months ago
parent
commit
b7b97063f0

+ 1 - 1
apps/argus/Cargo.lock

@@ -1640,7 +1640,7 @@ dependencies = [
 
 [[package]]
 name = "fortuna"
-version = "7.5.2"
+version = "7.5.3"
 dependencies = [
  "anyhow",
  "axum",

+ 0 - 0
apps/argus/src/actors/mod.rs → apps/argus/src/actors.rs


+ 38 - 28
apps/argus/src/actors/chain_price_listener.rs

@@ -2,7 +2,7 @@ use {
     crate::actors::types::*,
     anyhow::Result,
     async_trait::async_trait,
-    ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort},
+    ractor::{Actor, ActorProcessingErr, ActorRef},
     std::{
         collections::{HashMap, HashSet},
         sync::Arc,
@@ -12,40 +12,27 @@ use {
     tracing,
 };
 
-pub struct ChainPriceListener {
+#[allow(dead_code)]
+pub struct ChainPriceListenerState {
     chain_id: String,
-    contract: Arc<dyn PulseContractInterface + Send + Sync>,
+    contract: Arc<dyn GetChainPrices + Send + Sync>,
     feed_ids: HashSet<PriceId>,
     latest_prices: Arc<RwLock<HashMap<PriceId, Price>>>,
     poll_interval: Duration,
 }
 
-#[async_trait]
-pub trait PulseContractInterface {
-    async fn get_price_unsafe(
-        &self,
-        subscription_id: SubscriptionId,
-        feed_id: &PriceId,
-    ) -> Result<Option<Price>>;
-
-    async fn subscribe_to_price_events(&self) -> Result<()>;
-}
-
+pub struct ChainPriceListener;
 impl Actor for ChainPriceListener {
     type Msg = ChainPriceListenerMessage;
-    type State = Self;
-    type Arguments = (
-        String,
-        Arc<dyn PulseContractInterface + Send + Sync>,
-        Duration,
-    );
+    type State = ChainPriceListenerState;
+    type Arguments = (String, Arc<dyn GetChainPrices + Send + Sync>, Duration);
 
     async fn pre_start(
         &self,
-        myself: ActorRef<Self::Msg>,
+        _myself: ActorRef<Self::Msg>,
         (chain_id, contract, poll_interval): Self::Arguments,
     ) -> Result<Self::State, ActorProcessingErr> {
-        let listener = ChainPriceListener {
+        let state = ChainPriceListenerState {
             chain_id: chain_id.clone(),
             contract,
             feed_ids: HashSet::new(),
@@ -53,15 +40,15 @@ impl Actor for ChainPriceListener {
             poll_interval,
         };
 
-        if let Err(e) = listener.contract.subscribe_to_price_events().await {
+        if let Err(e) = state.contract.subscribe_to_price_events().await {
             tracing::error!(
-                chain_id = listener.chain_id,
+                chain_id = state.chain_id,
                 error = %e,
                 "Failed to subscribe to price events"
             );
         }
 
-        let poll_interval = listener.poll_interval;
+        let poll_interval = state.poll_interval;
         tokio::spawn(async move {
             let mut interval = time::interval(poll_interval);
             loop {
@@ -73,7 +60,7 @@ impl Actor for ChainPriceListener {
             }
         });
 
-        Ok(listener)
+        Ok(state)
     }
 
     async fn handle(
@@ -83,9 +70,32 @@ impl Actor for ChainPriceListener {
         _state: &mut Self::State,
     ) -> Result<(), ActorProcessingErr> {
         match message {
-            ChainPriceListenerMessage::GetLatestPrice(_) => {}
-            ChainPriceListenerMessage::UpdateFeedIdSet(_) => {}
+            ChainPriceListenerMessage::GetLatestPrice(feed_id, reply_port) => {
+                let price = _state.get_latest_price(&feed_id).await;
+                reply_port.send(price)?;
+            }
+            ChainPriceListenerMessage::UpdateFeedIdSet(_) => {
+                todo!()
+            }
         }
         Ok(())
     }
 }
+
+impl ChainPriceListenerState {
+    pub async fn get_latest_price(&self, feed_id: &PriceId) -> Option<Price> {
+        let latest_prices = self.latest_prices.read().await;
+        latest_prices.get(feed_id).cloned()
+    }
+}
+
+#[async_trait]
+pub trait GetChainPrices {
+    async fn get_price_unsafe(
+        &self,
+        subscription_id: SubscriptionId,
+        feed_id: &PriceId,
+    ) -> Result<Option<Price>>;
+
+    async fn subscribe_to_price_events(&self) -> Result<()>;
+}

+ 46 - 74
apps/argus/src/actors/controller.rs

@@ -1,17 +1,18 @@
 use {
     crate::actors::types::*,
     anyhow::Result,
-    ractor::{call, call_t, Actor, ActorProcessingErr, ActorRef},
+    ractor::{cast, Actor, ActorProcessingErr, ActorRef},
     std::{
         collections::{HashMap, HashSet},
-        sync::Arc,
         time::Duration,
     },
+    tokio::sync::watch,
     tokio::time,
     tracing,
 };
 
-pub struct Controller {
+#[allow(dead_code)]
+pub struct ControllerState {
     chain_id: String,
     subscription_listener: ActorRef<SubscriptionListenerMessage>,
     pyth_price_listener: ActorRef<PythPriceListenerMessage>,
@@ -19,13 +20,15 @@ pub struct Controller {
     price_pusher: ActorRef<PricePusherMessage>,
     update_interval: Duration,
     update_loop_running: bool,
+    stop_sender: Option<watch::Sender<bool>>,
     active_subscriptions: HashMap<SubscriptionId, Subscription>,
     feed_ids: HashSet<PriceId>,
 }
 
+pub struct Controller;
 impl Actor for Controller {
     type Msg = ControllerMessage;
-    type State = Self;
+    type State = ControllerState;
     type Arguments = (
         String,
         ActorRef<SubscriptionListenerMessage>,
@@ -47,7 +50,7 @@ impl Actor for Controller {
             update_interval,
         ): Self::Arguments,
     ) -> Result<Self::State, ActorProcessingErr> {
-        let controller = Controller {
+        let state = ControllerState {
             chain_id,
             subscription_listener,
             pyth_price_listener,
@@ -55,13 +58,14 @@ impl Actor for Controller {
             price_pusher,
             update_interval,
             update_loop_running: false,
+            stop_sender: None,
             active_subscriptions: HashMap::new(),
             feed_ids: HashSet::new(),
         };
 
-        let _ = myself.cast(ControllerMessage::StartUpdateLoop);
+        cast!(myself, ControllerMessage::StartUpdateLoop)?;
 
-        Ok(controller)
+        Ok(state)
     }
 
     async fn handle(
@@ -74,87 +78,55 @@ impl Actor for Controller {
             ControllerMessage::StartUpdateLoop => {
                 if !state.update_loop_running {
                     state.update_loop_running = true;
+                    let (tx, mut rx) = watch::channel(false);
+                    state.stop_sender = Some(tx);
 
                     let update_interval = state.update_interval;
                     let myself_clone = myself.clone();
+                    let chain_id_clone = state.chain_id.clone();
+
                     tokio::spawn(async move {
+                        tracing::info!(chain_id = chain_id_clone, "Update loop task started");
                         let mut interval = time::interval(update_interval);
-                        while let Ok(ControllerResponse::UpdateLoopStarted) =
-                            call_t!(myself_clone, ControllerMessage::StartUpdateLoop).await
-                        {
-                            interval.tick().await;
-                            let _ = myself_clone.cast(ControllerMessage::CheckForUpdates);
+
+                        loop {
+                            tokio::select! {
+                                _ = interval.tick() => {
+                                    if let Err(e) = cast!(myself_clone, ControllerMessage::CheckForUpdates) {
+                                        tracing::error!(chain_id = chain_id_clone, error = %e, "Failed to check for updates");
+                                    }
+                                }
+                                _ = rx.changed() => {
+                                    if *rx.borrow() {
+                                        tracing::info!(chain_id = chain_id_clone, "Update loop received stop signal.");
+                                        break;
+                                    }
+                                }
+                            }
                         }
+                        tracing::info!(chain_id = chain_id_clone, "Update loop task finished");
                     });
-
-                    tracing::info!(chain_id = state.chain_id, "Update loop started");
                 }
             }
             ControllerMessage::StopUpdateLoop => {
-                state.update_loop_running = false;
-                tracing::info!(chain_id = state.chain_id, "Update loop stopped");
-            }
-            ControllerMessage::CheckForUpdates => {
-                match call_t!(
-                    state.subscription_listener,
-                    SubscriptionListenerMessage::GetActiveSubscriptions
-                )
-                .await
-                {
-                    Ok(SubscriptionListenerResponse::ActiveSubscriptions(subscriptions)) => {
-                        state.active_subscriptions = subscriptions;
-
-                        let mut feed_ids = HashSet::new();
-                        for subscription in state.active_subscriptions.values() {
-                            for price_id in &subscription.price_ids {
-                                feed_ids.insert(*price_id);
-                            }
-                        }
-
-                        if feed_ids != state.feed_ids {
-                            state.feed_ids = feed_ids.clone();
-
-                            let msg_builder =
-                                PythPriceListenerMessage::UpdateFeedIdSet(feed_ids.clone());
-                            let _ = state.pyth_price_listener.cast(msg_builder);
-
-                            let _ = call_t!(
-                                state.chain_price_listener,
-                                ChainPriceListenerMessage::UpdateFeedIdSet(feed_ids)
-                            )
-                            .await;
-                        }
-
-                        for (subscription_id, subscription) in &state.active_subscriptions {
-                            if state.should_update_subscription(subscription).await {
-                                let push_request = PushRequest {
-                                    subscription_id: *subscription_id,
-                                    price_ids: subscription.price_ids.clone(),
-                                };
-
-                                let _ = state
-                                    .price_pusher
-                                    .cast(PricePusherMessage::PushPriceUpdates(push_request));
-                            }
-                        }
-                    }
-                    Err(e) => {
-                        tracing::error!(
-                            chain_id = state.chain_id,
-                            error = %e,
-                            "Failed to get active subscriptions"
-                        );
+                if state.update_loop_running {
+                    state.update_loop_running = false;
+                    if let Some(sender) = state.stop_sender.take() {
+                        let _ = sender.send(true);
                     }
-                    _ => {}
+                    tracing::info!(chain_id = state.chain_id, "Stop signal sent to update loop");
+                } else {
+                    tracing::warn!(
+                        chain_id = state.chain_id,
+                        "StopUpdateLoop called but loop was not running."
+                    );
                 }
             }
+            ControllerMessage::CheckForUpdates => {
+                tracing::debug!(chain_id = state.chain_id, "Received CheckForUpdates");
+                todo!()
+            }
         }
         Ok(())
     }
 }
-
-impl Controller {
-    async fn should_update_subscription(&self, subscription: &Subscription) -> bool {
-        true
-    }
-}

+ 25 - 25
apps/argus/src/actors/price_pusher.rs

@@ -3,41 +3,27 @@ use {
     anyhow::Result,
     async_trait::async_trait,
     backoff::{backoff::Backoff, ExponentialBackoff},
-    ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort},
-    std::{sync::Arc, time::Duration},
+    ractor::{Actor, ActorProcessingErr, ActorRef},
+    std::sync::Arc,
     tokio::time,
     tracing,
 };
 
-pub struct PricePusher {
+pub struct PricePusherState {
     chain_id: String,
-    contract: Arc<dyn PulseContractInterface + Send + Sync>,
-    hermes_client: Arc<dyn HermesClient + Send + Sync>,
+    contract: Arc<dyn UpdateChainPrices + Send + Sync>,
+    hermes_client: Arc<dyn GetPythPrices + Send + Sync>,
     backoff_policy: ExponentialBackoff,
 }
 
-#[async_trait]
-pub trait PulseContractInterface {
-    async fn update_price_feeds(
-        &self,
-        subscription_id: SubscriptionId,
-        price_ids: &[PriceId],
-        update_data: &[Vec<u8>],
-    ) -> Result<String>;
-}
-
-#[async_trait]
-pub trait HermesClient {
-    async fn get_price_update_data(&self, feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>>;
-}
-
+pub struct PricePusher;
 impl Actor for PricePusher {
     type Msg = PricePusherMessage;
-    type State = Self;
+    type State = PricePusherState;
     type Arguments = (
         String,
-        Arc<dyn PulseContractInterface + Send + Sync>,
-        Arc<dyn HermesClient + Send + Sync>,
+        Arc<dyn UpdateChainPrices + Send + Sync>,
+        Arc<dyn GetPythPrices + Send + Sync>,
         ExponentialBackoff,
     );
 
@@ -46,14 +32,14 @@ impl Actor for PricePusher {
         _myself: ActorRef<Self::Msg>,
         (chain_id, contract, hermes_client, backoff_policy): Self::Arguments,
     ) -> Result<Self::State, ActorProcessingErr> {
-        let pusher = PricePusher {
+        let state = PricePusherState {
             chain_id,
             contract,
             hermes_client,
             backoff_policy,
         };
 
-        Ok(pusher)
+        Ok(state)
     }
 
     async fn handle(
@@ -131,3 +117,17 @@ impl Actor for PricePusher {
         Ok(())
     }
 }
+
+#[async_trait]
+pub trait GetPythPrices {
+    async fn get_price_update_data(&self, feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>>;
+}
+#[async_trait]
+pub trait UpdateChainPrices {
+    async fn update_price_feeds(
+        &self,
+        subscription_id: SubscriptionId,
+        price_ids: &[PriceId],
+        update_data: &[Vec<u8>],
+    ) -> Result<String>;
+}

+ 32 - 20
apps/argus/src/actors/pyth_price_listener.rs

@@ -2,7 +2,7 @@ use {
     crate::actors::types::*,
     anyhow::Result,
     async_trait::async_trait,
-    ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort},
+    ractor::{Actor, ActorProcessingErr, ActorRef},
     std::{
         collections::{HashMap, HashSet},
         sync::Arc,
@@ -11,48 +11,41 @@ use {
     tracing,
 };
 
-pub struct PythPriceListener {
+#[allow(dead_code)]
+pub struct PythPriceListenerState {
     chain_id: String,
-    hermes_client: Arc<dyn HermesClient + Send + Sync>,
+    hermes_client: Arc<dyn StreamPythPrices + Send + Sync>,
     feed_ids: HashSet<PriceId>,
     latest_prices: Arc<RwLock<HashMap<PriceId, Price>>>,
 }
 
-#[async_trait]
-pub trait HermesClient {
-    async fn connect(&self) -> Result<()>;
-
-    async fn subscribe_to_price_updates(&self, feed_ids: &HashSet<PriceId>) -> Result<()>;
-
-    async fn get_latest_price(&self, feed_id: &PriceId) -> Result<Option<Price>>;
-}
-
+pub struct PythPriceListener;
 impl Actor for PythPriceListener {
     type Msg = PythPriceListenerMessage;
-    type State = Self;
-    type Arguments = (String, Arc<dyn HermesClient + Send + Sync>);
+    type State = PythPriceListenerState;
+    type Arguments = (String, Arc<dyn StreamPythPrices + Send + Sync>);
 
     async fn pre_start(
         &self,
         _myself: ActorRef<Self::Msg>,
         (chain_id, hermes_client): Self::Arguments,
     ) -> Result<Self::State, ActorProcessingErr> {
-        let listener = PythPriceListener {
+        let state = PythPriceListenerState {
             chain_id,
             hermes_client,
             feed_ids: HashSet::new(),
             latest_prices: Arc::new(RwLock::new(HashMap::new())),
         };
 
-        if let Err(e) = listener.hermes_client.connect().await {
+        if let Err(e) = state.hermes_client.connect().await {
             tracing::error!(
-                chain_id = listener.chain_id,
+                chain_id = state.chain_id,
                 error = %e,
                 "Failed to connect to Hermes"
             );
         }
 
-        Ok(listener)
+        Ok(state)
     }
 
     async fn handle(
@@ -62,9 +55,28 @@ impl Actor for PythPriceListener {
         _state: &mut Self::State,
     ) -> Result<(), ActorProcessingErr> {
         match message {
-            PythPriceListenerMessage::GetLatestPrice(_) => {}
-            PythPriceListenerMessage::UpdateFeedIdSet(_) => {}
+            PythPriceListenerMessage::GetLatestPrice(feed_id, reply_port) => {
+                let price = _state.get_latest_price(&feed_id).await;
+                reply_port.send(price)?;
+            }
+            PythPriceListenerMessage::UpdateFeedIdSet(_) => {
+                todo!()
+            }
         }
         Ok(())
     }
 }
+
+impl PythPriceListenerState {
+    pub async fn get_latest_price(&self, feed_id: &PriceId) -> Option<Price> {
+        let latest_prices = self.latest_prices.read().await;
+        latest_prices.get(feed_id).cloned()
+    }
+}
+
+#[async_trait]
+pub trait StreamPythPrices {
+    async fn connect(&self) -> Result<()>;
+
+    async fn subscribe_to_price_updates(&self, feed_ids: &HashSet<PriceId>) -> Result<()>;
+}

+ 12 - 13
apps/argus/src/actors/subscription_listener.rs

@@ -1,9 +1,8 @@
 use {
-    crate::{actors::types::*, api::BlockchainState},
+    super::{Subscription, SubscriptionId, SubscriptionListenerMessage},
     anyhow::Result,
     async_trait::async_trait,
-    ethers::providers::Middleware,
-    ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort},
+    ractor::{Actor, ActorProcessingErr, ActorRef},
     std::{
         collections::{HashMap, HashSet},
         sync::Arc,
@@ -15,7 +14,7 @@ use {
 
 pub struct SubscriptionListenerState {
     pub chain_name: String,
-    pub contract: Arc<dyn PulseContractInterface + Send + Sync>,
+    pub contract: Arc<dyn ReadChainSubscriptions + Send + Sync>,
     pub active_subscriptions: HashMap<SubscriptionId, Subscription>,
     pub poll_interval: Duration,
 }
@@ -46,19 +45,12 @@ impl SubscriptionListenerState {
 
 pub struct SubscriptionListener;
 
-#[async_trait]
-pub trait PulseContractInterface {
-    async fn get_active_subscriptions(&self) -> Result<HashMap<SubscriptionId, Subscription>>;
-
-    async fn subscribe_to_events(&self) -> Result<()>;
-}
-
 impl Actor for SubscriptionListener {
     type Msg = SubscriptionListenerMessage;
     type State = SubscriptionListenerState;
     type Arguments = (
         String,
-        Arc<dyn PulseContractInterface + Send + Sync>,
+        Arc<dyn ReadChainSubscriptions + Send + Sync>,
         Duration,
     );
 
@@ -113,7 +105,7 @@ impl Actor for SubscriptionListener {
 
     async fn handle(
         &self,
-        myself: ActorRef<Self::Msg>,
+        _myself: ActorRef<Self::Msg>,
         message: Self::Msg,
         state: &mut Self::State,
     ) -> Result<(), ActorProcessingErr> {
@@ -134,3 +126,10 @@ impl Actor for SubscriptionListener {
         Ok(())
     }
 }
+
+#[async_trait]
+pub trait ReadChainSubscriptions {
+    async fn get_active_subscriptions(&self) -> Result<HashMap<SubscriptionId, Subscription>>;
+
+    async fn subscribe_to_events(&self) -> Result<()>;
+}

+ 6 - 40
apps/argus/src/actors/types.rs

@@ -1,7 +1,8 @@
 use {
     ethers::types::{Address, U256},
+    ractor::RpcReplyPort,
     serde::{Deserialize, Serialize},
-    std::collections::{HashMap, HashSet},
+    std::collections::HashSet,
 };
 
 pub type PriceId = [u8; 32];
@@ -41,15 +42,15 @@ pub enum SubscriptionListenerMessage {
     RefreshSubscriptions,
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug)]
 pub enum PythPriceListenerMessage {
-    GetLatestPrice(PriceId),
+    GetLatestPrice(PriceId, RpcReplyPort<Option<Price>>),
     UpdateFeedIdSet(HashSet<PriceId>),
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug)]
 pub enum ChainPriceListenerMessage {
-    GetLatestPrice(PriceId),
+    GetLatestPrice(PriceId, RpcReplyPort<Option<Price>>),
     UpdateFeedIdSet(HashSet<PriceId>),
 }
 
@@ -70,38 +71,3 @@ pub struct PushRequest {
 pub enum PricePusherMessage {
     PushPriceUpdates(PushRequest),
 }
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub enum SubscriptionListenerResponse {
-    ActiveSubscriptions(HashMap<SubscriptionId, Subscription>),
-    RefreshAcknowledged,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub enum PythPriceListenerResponse {
-    LatestPrice(Option<Price>),
-    FeedIdSetUpdated,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub enum ChainPriceListenerResponse {
-    LatestPrice(Option<Price>),
-    FeedIdSetUpdated,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub enum ControllerResponse {
-    UpdateLoopStarted,
-    UpdateLoopStopped,
-    UpdateCheckResult(Vec<PushRequest>),
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub enum PricePusherResponse {
-    PushResult {
-        success: bool,
-        subscription_id: SubscriptionId,
-        tx_hash: Option<String>,
-        error: Option<String>,
-    },
-}

+ 1 - 1
apps/argus/src/command/run.rs

@@ -83,7 +83,7 @@ pub async fn run_keeper(
             .expect("All chains should be present in the config file")
             .clone();
         let private_key = private_key.clone();
-        handles.push(spawn(keeper::run_keeper_threads(
+        handles.push(spawn(keeper::run_keeper_for_chain(
             private_key,
             chain_eth_config,
             chain_config.clone(),

+ 20 - 39
apps/argus/src/keeper.rs

@@ -1,18 +1,11 @@
 use {
     crate::{
         actors::{
-            chain_price_listener::{
-                ChainPriceListener, PulseContractInterface as ChainPriceContractInterface,
-            },
+            chain_price_listener::{ChainPriceListener, GetChainPrices},
             controller::Controller,
-            price_pusher::{
-                HermesClient, PricePusher, PulseContractInterface as PricePusherContractInterface,
-            },
-            pyth_price_listener::{HermesClient as PythPriceHermesClient, PythPriceListener},
-            subscription_listener::{
-                PulseContractInterface as SubscriptionContractInterface, SubscriptionListener,
-                SubscriptionListenerState,
-            },
+            price_pusher::{GetPythPrices, PricePusher, UpdateChainPrices},
+            pyth_price_listener::{PythPriceListener, StreamPythPrices},
+            subscription_listener::{ReadChainSubscriptions, SubscriptionListener},
             types::*,
         },
         api::BlockchainState,
@@ -37,7 +30,7 @@ use {
 pub(crate) mod keeper_metrics;
 
 #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
-pub async fn run_keeper_threads(
+pub async fn run_keeper_for_chain(
     private_key: String,
     chain_eth_config: EthereumConfig,
     chain_state: BlockchainState,
@@ -56,7 +49,7 @@ pub async fn run_keeper_threads(
         .await
         .expect("Chain config should be valid"),
     );
-    let _keeper_address = contract.wallet().address();
+    let keeper_address = contract.wallet().address();
 
     tracing::info!(
         chain_id = chain_state.id,
@@ -100,7 +93,7 @@ pub async fn run_keeper_threads(
         SubscriptionListener,
         (
             chain_state.id.clone(),
-            subscription_contract as Arc<dyn SubscriptionContractInterface + Send + Sync>,
+            subscription_contract as Arc<dyn ReadChainSubscriptions + Send + Sync>,
             subscription_poll_interval,
         ),
     )
@@ -112,42 +105,39 @@ pub async fn run_keeper_threads(
         PythPriceListener,
         (
             chain_state.id.clone(),
-            hermes_client.clone() as Arc<dyn PythPriceHermesClient + Send + Sync>,
+            hermes_client.clone() as Arc<dyn StreamPythPrices + Send + Sync>,
         ),
-        SupervisionStrategy::Restart(5),
     )
     .await
     .expect("Failed to spawn PythPriceListener actor");
 
     let (chain_price_listener, _) = Actor::spawn(
-        None,
+        Some(String::from("ChainPriceListener")),
         ChainPriceListener,
         (
             chain_state.id.clone(),
-            chain_price_contract as Arc<dyn ChainPriceContractInterface + Send + Sync>,
+            chain_price_contract as Arc<dyn GetChainPrices + Send + Sync>,
             chain_price_poll_interval,
         ),
-        SupervisionStrategy::Restart(5),
     )
     .await
     .expect("Failed to spawn ChainPriceListener actor");
 
     let (price_pusher, _) = Actor::spawn(
-        None,
+        Some(String::from("PricePusher")),
         PricePusher,
         (
             chain_state.id.clone(),
-            price_pusher_contract as Arc<dyn PricePusherContractInterface + Send + Sync>,
-            hermes_client as Arc<dyn HermesClient + Send + Sync>,
+            price_pusher_contract as Arc<dyn UpdateChainPrices + Send + Sync>,
+            hermes_client as Arc<dyn GetPythPrices + Send + Sync>,
             backoff_policy,
         ),
-        SupervisionStrategy::Restart(5),
     )
     .await
     .expect("Failed to spawn PricePusher actor");
 
     let (_controller, _) = Actor::spawn(
-        None,
+        Some(String::from("Controller")),
         Controller,
         (
             chain_state.id.clone(),
@@ -157,7 +147,6 @@ pub async fn run_keeper_threads(
             price_pusher,
             controller_update_interval,
         ),
-        SupervisionStrategy::Restart(5),
     )
     .await
     .expect("Failed to spawn Controller actor");
@@ -165,13 +154,14 @@ pub async fn run_keeper_threads(
     tracing::info!(chain_id = chain_state.id, "Keeper actors started");
 }
 
+#[allow(dead_code)]
 struct PulseContractAdapter {
     contract: Arc<InstrumentedSignablePythContract>,
     chain_id: String,
 }
 
 #[async_trait]
-impl SubscriptionContractInterface for PulseContractAdapter {
+impl ReadChainSubscriptions for PulseContractAdapter {
     async fn get_active_subscriptions(&self) -> Result<HashMap<SubscriptionId, Subscription>> {
         tracing::debug!(chain_id = self.chain_id, "Getting active subscriptions");
         Ok(HashMap::new())
@@ -184,7 +174,7 @@ impl SubscriptionContractInterface for PulseContractAdapter {
 }
 
 #[async_trait]
-impl ChainPriceContractInterface for PulseContractAdapter {
+impl GetChainPrices for PulseContractAdapter {
     async fn get_price_unsafe(
         &self,
         subscription_id: SubscriptionId,
@@ -206,7 +196,7 @@ impl ChainPriceContractInterface for PulseContractAdapter {
 }
 
 #[async_trait]
-impl PricePusherContractInterface for PulseContractAdapter {
+impl UpdateChainPrices for PulseContractAdapter {
     async fn update_price_feeds(
         &self,
         subscription_id: SubscriptionId,
@@ -229,7 +219,7 @@ struct HermesClientAdapter {
 }
 
 #[async_trait]
-impl PythPriceHermesClient for HermesClientAdapter {
+impl StreamPythPrices for HermesClientAdapter {
     async fn connect(&self) -> Result<()> {
         tracing::debug!(chain_id = self.chain_id, "Connecting to Hermes");
         Ok(())
@@ -243,19 +233,10 @@ impl PythPriceHermesClient for HermesClientAdapter {
         );
         Ok(())
     }
-
-    async fn get_latest_price(&self, feed_id: &PriceId) -> Result<Option<Price>> {
-        tracing::debug!(
-            chain_id = self.chain_id,
-            feed_id = hex::encode(feed_id),
-            "Getting latest price from Hermes"
-        );
-        Ok(None)
-    }
 }
 
 #[async_trait]
-impl HermesClient for HermesClientAdapter {
+impl GetPythPrices for HermesClientAdapter {
     async fn get_price_update_data(&self, feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>> {
         tracing::debug!(
             chain_id = self.chain_id,