Quellcode durchsuchen

organize, clean up

Tejas Badadare vor 6 Monaten
Ursprung
Commit
61f00302e5

+ 12 - 0
apps/argus/Cargo.lock

@@ -164,6 +164,7 @@ dependencies = [
  "sha3",
  "thiserror",
  "tokio",
+ "tokio-stream",
  "tower-http",
  "tracing",
  "tracing-subscriber",
@@ -4236,6 +4237,17 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tokio-stream"
+version = "0.1.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047"
+dependencies = [
+ "futures-core",
+ "pin-project-lite",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-tungstenite"
 version = "0.20.1"

+ 1 - 0
apps/argus/Cargo.toml

@@ -42,6 +42,7 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] }
 thiserror = "1.0.61"
 futures-locks = "0.7.1"
 async-trait = "0.1.88"
+tokio-stream = "0.1.17"
 
 
 [dev-dependencies]

+ 8 - 13
apps/argus/src/actors/chain_price_listener.rs

@@ -1,7 +1,12 @@
 use {
-    crate::actors::types::*,
+    crate::{
+        actors::types::*,
+        adapters::{
+            contract::GetChainPrices,
+            types::{Price, PriceId},
+        },
+    },
     anyhow::Result,
-    async_trait::async_trait,
     ractor::{Actor, ActorProcessingErr, ActorRef},
     std::{
         collections::{HashMap, HashSet},
@@ -57,6 +62,7 @@ impl Actor for ChainPriceListener {
                     chain_id = chain_id.clone(),
                     "Polling for on-chain price updates"
                 );
+                todo!()
             }
         });
 
@@ -88,14 +94,3 @@ impl ChainPriceListenerState {
         latest_prices.get(feed_id).cloned()
     }
 }
-
-#[async_trait]
-pub trait GetChainPrices {
-    async fn get_price_unsafe(
-        &self,
-        subscription_id: SubscriptionId,
-        feed_id: &PriceId,
-    ) -> Result<Option<Price>>;
-
-    async fn subscribe_to_price_events(&self) -> Result<()>;
-}

+ 17 - 57
apps/argus/src/actors/controller.rs

@@ -1,14 +1,15 @@
 use {
-    crate::actors::types::*,
+    crate::{
+        actors::types::*,
+        adapters::types::{PriceId, Subscription, SubscriptionId},
+    },
     anyhow::Result,
-    ractor::{cast, Actor, ActorProcessingErr, ActorRef},
+    ractor::{Actor, ActorProcessingErr, ActorRef},
     std::{
         collections::{HashMap, HashSet},
         time::Duration,
     },
     tokio::sync::watch,
-    tokio::time,
-    tracing,
 };
 
 #[allow(dead_code)]
@@ -63,70 +64,29 @@ impl Actor for Controller {
             feed_ids: HashSet::new(),
         };
 
-        cast!(myself, ControllerMessage::StartUpdateLoop)?;
+        // Start the update loop
+        tokio::spawn(async move {
+            let mut interval = tokio::time::interval(update_interval);
+            loop {
+                interval.tick().await;
+                let _ = myself.cast(ControllerMessage::PerformUpdate);
+            }
+        });
 
         Ok(state)
     }
 
     async fn handle(
         &self,
-        myself: ActorRef<Self::Msg>,
+        _myself: ActorRef<Self::Msg>,
         message: Self::Msg,
-        state: &mut Self::State,
+        _state: &mut Self::State,
     ) -> Result<(), ActorProcessingErr> {
         match message {
-            ControllerMessage::StartUpdateLoop => {
-                if !state.update_loop_running {
-                    state.update_loop_running = true;
-                    let (tx, mut rx) = watch::channel(false);
-                    state.stop_sender = Some(tx);
-
-                    let update_interval = state.update_interval;
-                    let myself_clone = myself.clone();
-                    let chain_id_clone = state.chain_id.clone();
-
-                    tokio::spawn(async move {
-                        tracing::info!(chain_id = chain_id_clone, "Update loop task started");
-                        let mut interval = time::interval(update_interval);
-
-                        loop {
-                            tokio::select! {
-                                _ = interval.tick() => {
-                                    if let Err(e) = cast!(myself_clone, ControllerMessage::CheckForUpdates) {
-                                        tracing::error!(chain_id = chain_id_clone, error = %e, "Failed to check for updates");
-                                    }
-                                }
-                                _ = rx.changed() => {
-                                    if *rx.borrow() {
-                                        tracing::info!(chain_id = chain_id_clone, "Update loop received stop signal.");
-                                        break;
-                                    }
-                                }
-                            }
-                        }
-                        tracing::info!(chain_id = chain_id_clone, "Update loop task finished");
-                    });
-                }
-            }
-            ControllerMessage::StopUpdateLoop => {
-                if state.update_loop_running {
-                    state.update_loop_running = false;
-                    if let Some(sender) = state.stop_sender.take() {
-                        let _ = sender.send(true);
-                    }
-                    tracing::info!(chain_id = state.chain_id, "Stop signal sent to update loop");
-                } else {
-                    tracing::warn!(
-                        chain_id = state.chain_id,
-                        "StopUpdateLoop called but loop was not running."
-                    );
-                }
-            }
-            ControllerMessage::CheckForUpdates => {
-                tracing::debug!(chain_id = state.chain_id, "Received CheckForUpdates");
+            ControllerMessage::PerformUpdate => {
+                // Main processing logic. Keep active subscriptions up-to-date, check for price updates, and push them to the chain.
                 todo!()
             }
         }
-        Ok(())
     }
 }

+ 9 - 22
apps/argus/src/actors/price_pusher.rs

@@ -1,7 +1,7 @@
 use {
-    crate::actors::types::*,
+    super::PricePusherMessage,
+    crate::adapters::types::{ReadPythPrices, UpdateChainPrices},
     anyhow::Result,
-    async_trait::async_trait,
     backoff::{backoff::Backoff, ExponentialBackoff},
     ractor::{Actor, ActorProcessingErr, ActorRef},
     std::sync::Arc,
@@ -12,7 +12,7 @@ use {
 pub struct PricePusherState {
     chain_id: String,
     contract: Arc<dyn UpdateChainPrices + Send + Sync>,
-    hermes_client: Arc<dyn GetPythPrices + Send + Sync>,
+    pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
     backoff_policy: ExponentialBackoff,
 }
 
@@ -23,7 +23,7 @@ impl Actor for PricePusher {
     type Arguments = (
         String,
         Arc<dyn UpdateChainPrices + Send + Sync>,
-        Arc<dyn GetPythPrices + Send + Sync>,
+        Arc<dyn ReadPythPrices + Send + Sync>,
         ExponentialBackoff,
     );
 
@@ -35,7 +35,7 @@ impl Actor for PricePusher {
         let state = PricePusherState {
             chain_id,
             contract,
-            hermes_client,
+            pyth_price_client: hermes_client,
             backoff_policy,
         };
 
@@ -51,11 +51,12 @@ impl Actor for PricePusher {
         match message {
             PricePusherMessage::PushPriceUpdates(push_request) => {
                 let price_ids = push_request.price_ids.clone();
-                match state.hermes_client.get_price_update_data(&price_ids).await {
+                match state.pyth_price_client.get_latest_prices(&price_ids).await {
                     Ok(update_data) => {
                         let mut backoff = state.backoff_policy.clone();
                         let mut attempt = 0;
 
+                        // TODO: gas escalation policy
                         loop {
                             attempt += 1;
 
@@ -72,7 +73,7 @@ impl Actor for PricePusher {
                                     tracing::info!(
                                         chain_id = state.chain_id,
                                         subscription_id = push_request.subscription_id,
-                                        tx_hash = tx_hash,
+                                        tx_hash = tx_hash.to_string(),
                                         attempt = attempt,
                                         "Successfully pushed price updates"
                                     );
@@ -108,7 +109,7 @@ impl Actor for PricePusher {
                             chain_id = state.chain_id,
                             subscription_id = push_request.subscription_id,
                             error = %e,
-                            "Failed to get price update data from Hermes"
+                            "Failed to get Pyth price update data"
                         );
                     }
                 }
@@ -117,17 +118,3 @@ impl Actor for PricePusher {
         Ok(())
     }
 }
-
-#[async_trait]
-pub trait GetPythPrices {
-    async fn get_price_update_data(&self, feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>>;
-}
-#[async_trait]
-pub trait UpdateChainPrices {
-    async fn update_price_feeds(
-        &self,
-        subscription_id: SubscriptionId,
-        price_ids: &[PriceId],
-        update_data: &[Vec<u8>],
-    ) -> Result<String>;
-}

+ 24 - 39
apps/argus/src/actors/pyth_price_listener.rs

@@ -1,50 +1,49 @@
 use {
-    crate::actors::types::*,
+    crate::{
+        actors::types::*,
+        adapters::types::{Price, PriceId, ReadPythPrices},
+    },
     anyhow::Result,
-    async_trait::async_trait,
     ractor::{Actor, ActorProcessingErr, ActorRef},
-    std::{
-        collections::{HashMap, HashSet},
-        sync::Arc,
-    },
+    std::{collections::HashMap, sync::Arc},
     tokio::sync::RwLock,
-    tracing,
 };
 
-#[allow(dead_code)]
 pub struct PythPriceListenerState {
-    chain_id: String,
-    hermes_client: Arc<dyn StreamPythPrices + Send + Sync>,
-    feed_ids: HashSet<PriceId>,
+    pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
+    feed_ids: Vec<PriceId>,
     latest_prices: Arc<RwLock<HashMap<PriceId, Price>>>,
 }
 
+impl PythPriceListenerState {
+    pub async fn get_latest_price(&self, feed_id: &PriceId) -> Option<Price> {
+        let latest_prices = self.latest_prices.read().await;
+        latest_prices.get(feed_id).cloned()
+    }
+
+    pub async fn subscribe_to_price_updates(&self) -> Result<()> {
+        self.pyth_price_client
+            .subscribe_to_price_updates(&self.feed_ids)
+            .await
+    }
+}
+
 pub struct PythPriceListener;
 impl Actor for PythPriceListener {
     type Msg = PythPriceListenerMessage;
     type State = PythPriceListenerState;
-    type Arguments = (String, Arc<dyn StreamPythPrices + Send + Sync>);
+    type Arguments = Arc<dyn ReadPythPrices + Send + Sync>;
 
     async fn pre_start(
         &self,
         _myself: ActorRef<Self::Msg>,
-        (chain_id, hermes_client): Self::Arguments,
+        hermes_client: Self::Arguments,
     ) -> Result<Self::State, ActorProcessingErr> {
         let state = PythPriceListenerState {
-            chain_id,
-            hermes_client,
-            feed_ids: HashSet::new(),
+            pyth_price_client: hermes_client,
+            feed_ids: vec![],
             latest_prices: Arc::new(RwLock::new(HashMap::new())),
         };
-
-        if let Err(e) = state.hermes_client.connect().await {
-            tracing::error!(
-                chain_id = state.chain_id,
-                error = %e,
-                "Failed to connect to Hermes"
-            );
-        }
-
         Ok(state)
     }
 
@@ -66,17 +65,3 @@ impl Actor for PythPriceListener {
         Ok(())
     }
 }
-
-impl PythPriceListenerState {
-    pub async fn get_latest_price(&self, feed_id: &PriceId) -> Option<Price> {
-        let latest_prices = self.latest_prices.read().await;
-        latest_prices.get(feed_id).cloned()
-    }
-}
-
-#[async_trait]
-pub trait StreamPythPrices {
-    async fn connect(&self) -> Result<()>;
-
-    async fn subscribe_to_price_updates(&self, feed_ids: &HashSet<PriceId>) -> Result<()>;
-}

+ 3 - 10
apps/argus/src/actors/subscription_listener.rs

@@ -1,7 +1,7 @@
 use {
-    super::{Subscription, SubscriptionId, SubscriptionListenerMessage},
+    super::SubscriptionListenerMessage,
+    crate::adapters::types::{ReadChainSubscriptions, Subscription, SubscriptionId},
     anyhow::Result,
-    async_trait::async_trait,
     ractor::{Actor, ActorProcessingErr, ActorRef},
     std::{
         collections::{HashMap, HashSet},
@@ -83,7 +83,7 @@ impl Actor for SubscriptionListener {
             }
         }
 
-        if let Err(e) = listener.contract.subscribe_to_events().await {
+        if let Err(e) = listener.contract.subscribe_to_subscription_events().await {
             tracing::error!(
                 chain_name = listener.chain_name,
                 error = %e,
@@ -126,10 +126,3 @@ impl Actor for SubscriptionListener {
         Ok(())
     }
 }
-
-#[async_trait]
-pub trait ReadChainSubscriptions {
-    async fn get_active_subscriptions(&self) -> Result<HashMap<SubscriptionId, Subscription>>;
-
-    async fn subscribe_to_events(&self) -> Result<()>;
-}

+ 4 - 38
apps/argus/src/actors/types.rs

@@ -1,41 +1,9 @@
+use crate::adapters::types::*;
 use {
-    ethers::types::{Address, U256},
     ractor::RpcReplyPort,
     serde::{Deserialize, Serialize},
-    std::collections::HashSet,
 };
 
-pub type PriceId = [u8; 32];
-
-pub type SubscriptionId = u64;
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct Price {
-    pub price: i64,
-    pub conf: u64,
-    pub expo: i32,
-    pub publish_time: u64,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct Subscription {
-    pub id: SubscriptionId,
-    pub price_ids: Vec<PriceId>,
-    pub manager: Address,
-    pub is_active: bool,
-    pub update_criteria: UpdateCriteria,
-    pub last_updated_at: u64,
-    pub balance: U256,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct UpdateCriteria {
-    pub update_on_heartbeat: bool,
-    pub heartbeat_seconds: u32,
-    pub update_on_deviation: bool,
-    pub deviation_threshold_bps: u32,
-}
-
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub enum SubscriptionListenerMessage {
     GetActiveSubscriptions,
@@ -45,20 +13,18 @@ pub enum SubscriptionListenerMessage {
 #[derive(Debug)]
 pub enum PythPriceListenerMessage {
     GetLatestPrice(PriceId, RpcReplyPort<Option<Price>>),
-    UpdateFeedIdSet(HashSet<PriceId>),
+    UpdateFeedIdSet(Vec<PriceId>),
 }
 
 #[derive(Debug)]
 pub enum ChainPriceListenerMessage {
     GetLatestPrice(PriceId, RpcReplyPort<Option<Price>>),
-    UpdateFeedIdSet(HashSet<PriceId>),
+    UpdateFeedIdSet(Vec<PriceId>),
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub enum ControllerMessage {
-    StartUpdateLoop,
-    StopUpdateLoop,
-    CheckForUpdates,
+    PerformUpdate,
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]

+ 4 - 0
apps/argus/src/adapters.rs

@@ -0,0 +1,4 @@
+pub mod contract;
+pub mod ethereum;
+pub mod hermes;
+pub mod types;

+ 63 - 0
apps/argus/src/adapters/contract.rs

@@ -0,0 +1,63 @@
+use super::{ethereum::PythPulse, types::*};
+use anyhow::Result;
+use async_trait::async_trait;
+use ethers::providers::Middleware;
+use ethers::types::H256;
+use std::collections::HashMap;
+
+// TODO: implement retries & backoff policy
+#[async_trait]
+pub trait GetChainPrices {
+    async fn get_price_unsafe(
+        &self,
+        subscription_id: SubscriptionId,
+        feed_id: &PriceId,
+    ) -> Result<Option<Price>>;
+
+    async fn subscribe_to_price_events(&self) -> Result<()>;
+}
+
+#[async_trait]
+impl<M: Middleware + 'static> GetChainPrices for PythPulse<M> {
+    async fn get_price_unsafe(
+        &self,
+        _subscription_id: SubscriptionId,
+        _feed_id: &PriceId,
+    ) -> Result<Option<Price>> {
+        todo!()
+    }
+
+    async fn subscribe_to_price_events(&self) -> Result<()> {
+        tracing::debug!("Subscribing to price events via PythPulse");
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl<M: Middleware + 'static> UpdateChainPrices for PythPulse<M> {
+    async fn update_price_feeds(
+        &self,
+        subscription_id: SubscriptionId,
+        price_ids: &[PriceId],
+        update_data: &[Vec<u8>],
+    ) -> Result<H256> {
+        tracing::debug!(
+            subscription_id = subscription_id,
+            price_ids_count = price_ids.len(),
+            update_data_count = update_data.len(),
+            "Updating price feeds on-chain via PythPulse"
+        );
+        todo!()
+    }
+}
+
+#[async_trait]
+impl<M: Middleware + 'static> ReadChainSubscriptions for PythPulse<M> {
+    async fn get_active_subscriptions(&self) -> Result<HashMap<SubscriptionId, Subscription>> {
+        Ok(HashMap::new())
+    }
+
+    async fn subscribe_to_subscription_events(&self) -> Result<()> {
+        Ok(())
+    }
+}

+ 28 - 5
apps/argus/src/chain/ethereum.rs → apps/argus/src/adapters/ethereum.rs

@@ -1,9 +1,5 @@
 use {
-    crate::{
-        api::ChainId,
-        chain::reader::{BlockNumber, BlockStatus},
-        config::EthereumConfig,
-    },
+    crate::{api::ChainId, config::EthereumConfig},
     anyhow::{Error, Result},
     ethers::{
         contract::abigen,
@@ -144,3 +140,30 @@ impl<M: Middleware + 'static> PythPulse<M> {
             .as_u64())
     }
 }
+
+// TODO: extract to a SDK
+
+pub type BlockNumber = u64;
+
+#[derive(
+    Copy, Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
+)]
+pub enum BlockStatus {
+    /// Latest block
+    #[default]
+    Latest,
+    /// Finalized block accepted as canonical
+    Finalized,
+    /// Safe head block
+    Safe,
+}
+
+impl From<BlockStatus> for EthersBlockNumber {
+    fn from(val: BlockStatus) -> Self {
+        match val {
+            BlockStatus::Latest => EthersBlockNumber::Latest,
+            BlockStatus::Finalized => EthersBlockNumber::Finalized,
+            BlockStatus::Safe => EthersBlockNumber::Safe,
+        }
+    }
+}

+ 14 - 0
apps/argus/src/adapters/hermes.rs

@@ -0,0 +1,14 @@
+use super::types::*;
+use anyhow::Result;
+
+pub struct HermesClient;
+
+impl ReadPythPrices for HermesClient {
+    async fn get_latest_prices(&self, feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>> {
+        todo!()
+    }
+
+    async fn subscribe_to_price_updates(&self, feed_ids: &[PriceId]) -> Result<()> {
+        todo!()
+    }
+}

+ 62 - 0
apps/argus/src/adapters/types.rs

@@ -0,0 +1,62 @@
+use anyhow::Result;
+use async_trait::async_trait;
+use ethers::types::{Address, H256, U256};
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+#[async_trait]
+pub trait UpdateChainPrices {
+    async fn update_price_feeds(
+        &self,
+        subscription_id: SubscriptionId,
+        price_ids: &[PriceId],
+        update_data: &[Vec<u8>],
+    ) -> Result<H256>;
+}
+
+#[async_trait]
+pub trait ReadChainSubscriptions {
+    async fn get_active_subscriptions(&self) -> Result<HashMap<SubscriptionId, Subscription>>;
+    async fn subscribe_to_subscription_events(&self) -> Result<()>; // TODO: return a stream
+}
+
+#[async_trait]
+pub trait ReadPythPrices {
+    async fn get_latest_prices(&self, feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>>;
+    async fn subscribe_to_price_updates(&self, feed_ids: &[PriceId]) -> Result<()>; // TODO: return a stream
+}
+
+// TODO: find a different home for these
+
+pub type PriceId = [u8; 32];
+
+pub type SubscriptionId = u64;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct Price {
+    pub price: i64,
+    pub conf: u64,
+    pub expo: i32,
+    pub publish_time: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct Subscription {
+    pub id: SubscriptionId,
+    pub price_ids: Vec<PriceId>,
+    pub manager: Address,
+    pub is_active: bool,
+    pub update_criteria: UpdateCriteria,
+    pub last_updated_at: u64,
+    pub balance: U256,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UpdateCriteria {
+    pub update_on_heartbeat: bool,
+    pub heartbeat_seconds: u32,
+    pub update_on_deviation: bool,
+    pub deviation_threshold_bps: u32,
+}
+
+pub struct SubscriptionEvent;

+ 1 - 1
apps/argus/src/api.rs

@@ -1,5 +1,5 @@
 use {
-    crate::chain::reader::BlockStatus,
+    crate::adapters::ethereum::BlockStatus,
     axum::{
         body::Body,
         http::StatusCode,

+ 0 - 2
apps/argus/src/chain.rs

@@ -1,2 +0,0 @@
-pub mod ethereum;
-pub mod reader;

+ 0 - 33
apps/argus/src/chain/reader.rs

@@ -1,33 +0,0 @@
-use ethers::types::{Address, BlockNumber as EthersBlockNumber};
-
-pub type BlockNumber = u64;
-
-#[derive(
-    Copy, Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
-)]
-pub enum BlockStatus {
-    /// Latest block
-    #[default]
-    Latest,
-    /// Finalized block accepted as canonical
-    Finalized,
-    /// Safe head block
-    Safe,
-}
-
-impl From<BlockStatus> for EthersBlockNumber {
-    fn from(val: BlockStatus) -> Self {
-        match val {
-            BlockStatus::Latest => EthersBlockNumber::Latest,
-            BlockStatus::Finalized => EthersBlockNumber::Finalized,
-            BlockStatus::Safe => EthersBlockNumber::Safe,
-        }
-    }
-}
-
-#[derive(Clone)]
-pub struct RequestedWithCallbackEvent {
-    pub sequence_number: u64,
-    pub user_random_number: [u8; 32],
-    pub provider_address: Address,
-}

+ 1 - 1
apps/argus/src/config.rs

@@ -1,6 +1,6 @@
 pub use run::RunOptions;
 use {
-    crate::{api::ChainId, chain::reader::BlockStatus},
+    crate::{adapters::ethereum::BlockStatus, api::ChainId},
     anyhow::{anyhow, Result},
     clap::{crate_authors, crate_description, crate_name, crate_version, Args, Parser},
     ethers::types::Address,

+ 42 - 148
apps/argus/src/keeper.rs

@@ -1,29 +1,23 @@
 use {
     crate::{
         actors::{
-            chain_price_listener::{ChainPriceListener, GetChainPrices},
-            controller::Controller,
-            price_pusher::{GetPythPrices, PricePusher, UpdateChainPrices},
-            pyth_price_listener::{PythPriceListener, StreamPythPrices},
-            subscription_listener::{ReadChainSubscriptions, SubscriptionListener},
-            types::*,
+            chain_price_listener::ChainPriceListener, controller::Controller,
+            price_pusher::PricePusher, pyth_price_listener::PythPriceListener,
+            subscription_listener::SubscriptionListener,
+        },
+        adapters::{
+            ethereum::InstrumentedSignablePythContract, hermes::HermesClient,
+            types::ReadChainSubscriptions,
         },
         api::BlockchainState,
-        chain::ethereum::InstrumentedSignablePythContract,
         config::EthereumConfig,
     },
-    anyhow::Result,
-    async_trait::async_trait,
     backoff::ExponentialBackoff,
     ethers::signers::Signer,
     fortuna::eth_utils::traced_client::RpcMetrics,
     keeper_metrics::KeeperMetrics,
     ractor::Actor,
-    std::{
-        collections::{HashMap, HashSet},
-        sync::Arc,
-        time::Duration,
-    },
+    std::{sync::Arc, time::Duration},
     tracing,
 };
 
@@ -37,7 +31,7 @@ pub async fn run_keeper_for_chain(
     _metrics: Arc<KeeperMetrics>,
     rpc_metrics: Arc<RpcMetrics>,
 ) {
-    tracing::info!("starting keeper");
+    tracing::info!("Starting keeper");
 
     let contract = Arc::new(
         InstrumentedSignablePythContract::from_config(
@@ -49,6 +43,7 @@ pub async fn run_keeper_for_chain(
         .await
         .expect("Chain config should be valid"),
     );
+
     let keeper_address = contract.wallet().address();
 
     tracing::info!(
@@ -57,43 +52,26 @@ pub async fn run_keeper_for_chain(
         "Keeper address"
     );
 
-    let subscription_contract = Arc::new(PulseContractAdapter {
-        contract: contract.clone(),
-        chain_id: chain_state.id.clone(),
-    });
-
-    let chain_price_contract = Arc::new(PulseContractAdapter {
-        contract: contract.clone(),
-        chain_id: chain_state.id.clone(),
-    });
-
-    let price_pusher_contract = Arc::new(PulseContractAdapter {
-        contract: contract.clone(),
-        chain_id: chain_state.id.clone(),
-    });
-
-    let hermes_client = Arc::new(HermesClientAdapter {
-        chain_id: chain_state.id.clone(),
-    });
-
-    let subscription_poll_interval = Duration::from_secs(60); // Poll for subscriptions every 60 seconds
-    let chain_price_poll_interval = Duration::from_secs(10); // Poll for on-chain prices every 10 seconds
-    let controller_update_interval = Duration::from_secs(5); // Run the update loop every 5 seconds
+    let hermes_client = Arc::new(HermesClient);
 
+    // TODO: Make these configurable
+    let subscription_poll_interval = Duration::from_secs(60);
+    let chain_price_poll_interval = Duration::from_secs(10);
+    let controller_update_interval = Duration::from_secs(5);
     let backoff_policy = ExponentialBackoff {
         initial_interval: Duration::from_secs(1),
         max_interval: Duration::from_secs(60),
         multiplier: 2.0,
-        max_elapsed_time: Some(Duration::from_secs(300)), // Give up after 5 minutes
+        max_elapsed_time: Some(Duration::from_secs(300)),
         ..ExponentialBackoff::default()
     };
 
     let (subscription_listener, _) = Actor::spawn(
-        Some("SubscriptionListener".to_string()),
+        Some(format!("SubscriptionListener-{}", chain_state.id)),
         SubscriptionListener,
         (
             chain_state.id.clone(),
-            subscription_contract as Arc<dyn ReadChainSubscriptions + Send + Sync>,
+            contract.clone() as Arc<dyn ReadChainSubscriptions + Send + Sync>,
             subscription_poll_interval,
         ),
     )
@@ -101,43 +79,49 @@ pub async fn run_keeper_for_chain(
     .expect("Failed to spawn SubscriptionListener actor");
 
     let (pyth_price_listener, _) = Actor::spawn(
-        None,
+        Some(format!("PythPriceListener-{}", chain_state.id)),
         PythPriceListener,
-        (
-            chain_state.id.clone(),
-            hermes_client.clone() as Arc<dyn StreamPythPrices + Send + Sync>,
-        ),
+        hermes_client.clone(),
     )
     .await
-    .expect("Failed to spawn PythPriceListener actor");
+    .expect(&format!(
+        "Failed to spawn PythPriceListener-{} actor",
+        chain_state.id
+    ));
 
     let (chain_price_listener, _) = Actor::spawn(
-        Some(String::from("ChainPriceListener")),
+        Some(format!("ChainPriceListener-{}", chain_state.id)),
         ChainPriceListener,
         (
             chain_state.id.clone(),
-            chain_price_contract as Arc<dyn GetChainPrices + Send + Sync>,
+            contract.clone(),
             chain_price_poll_interval,
         ),
     )
     .await
-    .expect("Failed to spawn ChainPriceListener actor");
+    .expect(&format!(
+        "Failed to spawn ChainPriceListener-{} actor",
+        chain_state.id
+    ));
 
     let (price_pusher, _) = Actor::spawn(
-        Some(String::from("PricePusher")),
+        Some(format!("PricePusher-{}", chain_state.id)),
         PricePusher,
         (
             chain_state.id.clone(),
-            price_pusher_contract as Arc<dyn UpdateChainPrices + Send + Sync>,
-            hermes_client as Arc<dyn GetPythPrices + Send + Sync>,
+            contract.clone(),
+            hermes_client.clone(),
             backoff_policy,
         ),
     )
     .await
-    .expect("Failed to spawn PricePusher actor");
+    .expect(&format!(
+        "Failed to spawn PricePusher-{} actor",
+        chain_state.id
+    ));
 
     let (_controller, _) = Actor::spawn(
-        Some(String::from("Controller")),
+        Some(format!("Controller-{}", chain_state.id)),
         Controller,
         (
             chain_state.id.clone(),
@@ -149,100 +133,10 @@ pub async fn run_keeper_for_chain(
         ),
     )
     .await
-    .expect("Failed to spawn Controller actor");
+    .expect(&format!(
+        "Failed to spawn Controller-{} actor",
+        chain_state.id
+    ));
 
     tracing::info!(chain_id = chain_state.id, "Keeper actors started");
 }
-
-#[allow(dead_code)]
-struct PulseContractAdapter {
-    contract: Arc<InstrumentedSignablePythContract>,
-    chain_id: String,
-}
-
-#[async_trait]
-impl ReadChainSubscriptions for PulseContractAdapter {
-    async fn get_active_subscriptions(&self) -> Result<HashMap<SubscriptionId, Subscription>> {
-        tracing::debug!(chain_id = self.chain_id, "Getting active subscriptions");
-        Ok(HashMap::new())
-    }
-
-    async fn subscribe_to_events(&self) -> Result<()> {
-        tracing::debug!(chain_id = self.chain_id, "Subscribing to contract events");
-        Ok(())
-    }
-}
-
-#[async_trait]
-impl GetChainPrices for PulseContractAdapter {
-    async fn get_price_unsafe(
-        &self,
-        subscription_id: SubscriptionId,
-        feed_id: &PriceId,
-    ) -> Result<Option<Price>> {
-        tracing::debug!(
-            chain_id = self.chain_id,
-            subscription_id = subscription_id,
-            feed_id = hex::encode(feed_id),
-            "Getting on-chain price"
-        );
-        Ok(None)
-    }
-
-    async fn subscribe_to_price_events(&self) -> Result<()> {
-        tracing::debug!(chain_id = self.chain_id, "Subscribing to price events");
-        Ok(())
-    }
-}
-
-#[async_trait]
-impl UpdateChainPrices for PulseContractAdapter {
-    async fn update_price_feeds(
-        &self,
-        subscription_id: SubscriptionId,
-        price_ids: &[PriceId],
-        update_data: &[Vec<u8>],
-    ) -> Result<String> {
-        tracing::debug!(
-            chain_id = self.chain_id,
-            subscription_id = subscription_id,
-            price_ids_count = price_ids.len(),
-            update_data_count = update_data.len(),
-            "Updating price feeds"
-        );
-        Ok("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef".to_string())
-    }
-}
-
-struct HermesClientAdapter {
-    chain_id: String,
-}
-
-#[async_trait]
-impl StreamPythPrices for HermesClientAdapter {
-    async fn connect(&self) -> Result<()> {
-        tracing::debug!(chain_id = self.chain_id, "Connecting to Hermes");
-        Ok(())
-    }
-
-    async fn subscribe_to_price_updates(&self, feed_ids: &HashSet<PriceId>) -> Result<()> {
-        tracing::debug!(
-            chain_id = self.chain_id,
-            feed_ids_count = feed_ids.len(),
-            "Subscribing to price updates"
-        );
-        Ok(())
-    }
-}
-
-#[async_trait]
-impl GetPythPrices for HermesClientAdapter {
-    async fn get_price_update_data(&self, feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>> {
-        tracing::debug!(
-            chain_id = self.chain_id,
-            feed_ids_count = feed_ids.len(),
-            "Getting price update data from Hermes"
-        );
-        Ok(vec![vec![0u8; 32]; feed_ids.len()])
-    }
-}

+ 1 - 1
apps/argus/src/lib.rs

@@ -1,6 +1,6 @@
 pub mod actors;
+pub mod adapters;
 pub mod api;
-pub mod chain;
 pub mod command;
 pub mod config;
 pub mod keeper;