Răsfoiți Sursa

fix metrics

Tejas Badadare 6 luni în urmă
părinte
comite
8ecfea8bbf

+ 0 - 13
apps/argus/src/actors.rs

@@ -1,13 +0,0 @@
-pub mod subscription_listener;
-pub mod pyth_price_listener;
-pub mod chain_price_listener;
-pub mod controller;
-pub mod price_pusher;
-pub mod types;
-
-pub use subscription_listener::SubscriptionListener;
-pub use pyth_price_listener::PythPriceListener;
-pub use chain_price_listener::ChainPriceListener;
-pub use controller::Controller;
-pub use price_pusher::PricePusher;
-pub use types::*;

+ 0 - 96
apps/argus/src/actors/chain_price_listener.rs

@@ -1,96 +0,0 @@
-use {
-    crate::{
-        actors::types::*,
-        adapters::{
-            contract::GetChainPrices,
-            types::{Price, PriceId},
-        },
-    },
-    anyhow::Result,
-    ractor::{Actor, ActorProcessingErr, ActorRef},
-    std::{
-        collections::{HashMap, HashSet},
-        sync::Arc,
-        time::Duration,
-    },
-    tokio::{sync::RwLock, time},
-    tracing,
-};
-
-#[allow(dead_code)]
-pub struct ChainPriceListenerState {
-    chain_id: String,
-    contract: Arc<dyn GetChainPrices + Send + Sync>,
-    feed_ids: HashSet<PriceId>,
-    latest_prices: Arc<RwLock<HashMap<PriceId, Price>>>,
-    poll_interval: Duration,
-}
-
-pub struct ChainPriceListener;
-impl Actor for ChainPriceListener {
-    type Msg = ChainPriceListenerMessage;
-    type State = ChainPriceListenerState;
-    type Arguments = (String, Arc<dyn GetChainPrices + Send + Sync>, Duration);
-
-    async fn pre_start(
-        &self,
-        _myself: ActorRef<Self::Msg>,
-        (chain_id, contract, poll_interval): Self::Arguments,
-    ) -> Result<Self::State, ActorProcessingErr> {
-        let state = ChainPriceListenerState {
-            chain_id: chain_id.clone(),
-            contract,
-            feed_ids: HashSet::new(),
-            latest_prices: Arc::new(RwLock::new(HashMap::new())),
-            poll_interval,
-        };
-
-        if let Err(e) = state.contract.subscribe_to_price_events().await {
-            tracing::error!(
-                chain_id = state.chain_id,
-                error = %e,
-                "Failed to subscribe to price events"
-            );
-        }
-
-        let poll_interval = state.poll_interval;
-        tokio::spawn(async move {
-            let mut interval = time::interval(poll_interval);
-            loop {
-                interval.tick().await;
-                tracing::debug!(
-                    chain_id = chain_id.clone(),
-                    "Polling for on-chain price updates"
-                );
-                // todo!()
-            }
-        });
-
-        Ok(state)
-    }
-
-    async fn handle(
-        &self,
-        _myself: ActorRef<Self::Msg>,
-        message: Self::Msg,
-        _state: &mut Self::State,
-    ) -> Result<(), ActorProcessingErr> {
-        match message {
-            ChainPriceListenerMessage::GetLatestPrice(feed_id, reply_port) => {
-                let price = _state.get_latest_price(&feed_id).await;
-                reply_port.send(price)?;
-            }
-            ChainPriceListenerMessage::UpdateFeedIdSet(_) => {
-                todo!()
-            }
-        }
-        Ok(())
-    }
-}
-
-impl ChainPriceListenerState {
-    pub async fn get_latest_price(&self, feed_id: &PriceId) -> Option<Price> {
-        let latest_prices = self.latest_prices.read().await;
-        latest_prices.get(feed_id).cloned()
-    }
-}

+ 0 - 96
apps/argus/src/actors/controller.rs

@@ -1,96 +0,0 @@
-use {
-    crate::{
-        actors::types::*,
-        adapters::{
-            ethereum::SubscriptionParams,
-            types::{PriceId, SubscriptionId},
-        },
-    },
-    anyhow::Result,
-    ractor::{Actor, ActorProcessingErr, ActorRef},
-    std::{
-        collections::{HashMap, HashSet},
-        time::Duration,
-    },
-    tokio::sync::watch,
-};
-
-#[allow(dead_code)]
-pub struct ControllerState {
-    chain_id: String,
-    subscription_listener: ActorRef<SubscriptionListenerMessage>,
-    pyth_price_listener: ActorRef<PythPriceListenerMessage>,
-    chain_price_listener: ActorRef<ChainPriceListenerMessage>,
-    price_pusher: ActorRef<PricePusherMessage>,
-    update_interval: Duration,
-    update_loop_running: bool,
-    stop_sender: Option<watch::Sender<bool>>,
-    active_subscriptions: HashMap<SubscriptionId, SubscriptionParams>,
-    feed_ids: HashSet<PriceId>,
-}
-
-pub struct Controller;
-impl Actor for Controller {
-    type Msg = ControllerMessage;
-    type State = ControllerState;
-    type Arguments = (
-        String,
-        ActorRef<SubscriptionListenerMessage>,
-        ActorRef<PythPriceListenerMessage>,
-        ActorRef<ChainPriceListenerMessage>,
-        ActorRef<PricePusherMessage>,
-        Duration,
-    );
-
-    async fn pre_start(
-        &self,
-        myself: ActorRef<Self::Msg>,
-        (
-            chain_id,
-            subscription_listener,
-            pyth_price_listener,
-            chain_price_listener,
-            price_pusher,
-            update_interval,
-        ): Self::Arguments,
-    ) -> Result<Self::State, ActorProcessingErr> {
-        let state = ControllerState {
-            chain_id,
-            subscription_listener,
-            pyth_price_listener,
-            chain_price_listener,
-            price_pusher,
-            update_interval,
-            update_loop_running: false,
-            stop_sender: None,
-            active_subscriptions: HashMap::new(),
-            feed_ids: HashSet::new(),
-        };
-
-        // Start the update loop
-        tokio::spawn(async move {
-            let mut interval = tokio::time::interval(update_interval);
-            loop {
-                interval.tick().await;
-                let _ = myself.cast(ControllerMessage::PerformUpdate);
-            }
-        });
-
-        Ok(state)
-    }
-
-    async fn handle(
-        &self,
-        _myself: ActorRef<Self::Msg>,
-        message: Self::Msg,
-        _state: &mut Self::State,
-    ) -> Result<(), ActorProcessingErr> {
-        match message {
-            ControllerMessage::PerformUpdate => {
-                // Main processing logic. Keep active subscriptions up-to-date, check for price updates, and push them to the chain.
-                tracing::info!("Performing update (todo)");
-            }
-        }
-        Ok(())
-    }
-}

+ 0 - 120
apps/argus/src/actors/price_pusher.rs

@@ -1,120 +0,0 @@
-use {
-    super::PricePusherMessage,
-    crate::adapters::types::{ReadPythPrices, UpdateChainPrices},
-    anyhow::Result,
-    backoff::{backoff::Backoff, ExponentialBackoff},
-    ractor::{Actor, ActorProcessingErr, ActorRef},
-    std::sync::Arc,
-    tokio::time,
-    tracing,
-};
-
-pub struct PricePusherState {
-    chain_name: String,
-    contract: Arc<dyn UpdateChainPrices + Send + Sync>,
-    pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
-    backoff_policy: ExponentialBackoff,
-}
-
-pub struct PricePusher;
-impl Actor for PricePusher {
-    type Msg = PricePusherMessage;
-    type State = PricePusherState;
-    type Arguments = (
-        String,
-        Arc<dyn UpdateChainPrices + Send + Sync>,
-        Arc<dyn ReadPythPrices + Send + Sync>,
-        ExponentialBackoff,
-    );
-
-    async fn pre_start(
-        &self,
-        _myself: ActorRef<Self::Msg>,
-        (chain_name, contract, hermes_client, backoff_policy): Self::Arguments,
-    ) -> Result<Self::State, ActorProcessingErr> {
-        let state = PricePusherState {
-            chain_name,
-            contract,
-            pyth_price_client: hermes_client,
-            backoff_policy,
-        };
-
-        Ok(state)
-    }
-
-    async fn handle(
-        &self,
-        _myself: ActorRef<Self::Msg>,
-        message: Self::Msg,
-        state: &mut Self::State,
-    ) -> Result<(), ActorProcessingErr> {
-        match message {
-            PricePusherMessage::PushPriceUpdates(push_request) => {
-                let price_ids = push_request.price_ids.clone();
-                match state.pyth_price_client.get_latest_prices(&price_ids).await {
-                    Ok(update_data) => {
-                        let mut backoff = state.backoff_policy.clone();
-                        let mut attempt = 0;
-
-                        // TODO: gas escalation policy
-                        loop {
-                            attempt += 1;
-
-                            match state
-                                .contract
-                                .update_price_feeds(
-                                    push_request.subscription_id,
-                                    &price_ids,
-                                    &update_data,
-                                )
-                                .await
-                            {
-                                Ok(tx_hash) => {
-                                    tracing::info!(
-                                        chain_id = state.chain_name,
-                                        subscription_id = push_request.subscription_id.to_string(),
-                                        tx_hash = tx_hash.to_string(),
-                                        attempt = attempt,
-                                        "Successfully pushed price updates"
-                                    );
-                                    break;
-                                }
-                                Err(e) => {
-                                    if let Some(duration) = backoff.next_backoff() {
-                                        tracing::warn!(
-                                            chain_id = state.chain_name,
-                                            subscription_id = push_request.subscription_id.to_string(),
-                                            error = %e,
-                                            attempt = attempt,
-                                            retry_after_ms = duration.as_millis(),
-                                            "Failed to push price updates, retrying"
-                                        );
-                                        time::sleep(duration).await;
-                                    } else {
-                                        tracing::error!(
-                                            chain_id = state.chain_name,
-                                            subscription_id = push_request.subscription_id.to_string(),
-                                            error = %e,
-                                            attempt = attempt,
-                                            "Failed to push price updates, giving up"
-                                        );
-                                        break;
-                                    }
-                                }
-                            }
-                        }
-                    }
-                    Err(e) => {
-                        tracing::error!(
-                            chain_id = state.chain_name,
-                            subscription_id = push_request.subscription_id.to_string(),
-                            error = %e,
-                            "Failed to get Pyth price update data"
-                        );
-                    }
-                }
-            }
-        }
-        Ok(())
-    }
-}

+ 0 - 67
apps/argus/src/actors/pyth_price_listener.rs

@@ -1,67 +0,0 @@
-use {
-    crate::{
-        actors::types::*,
-        adapters::types::{Price, PriceId, ReadPythPrices},
-    },
-    anyhow::Result,
-    ractor::{Actor, ActorProcessingErr, ActorRef},
-    std::{collections::HashMap, sync::Arc},
-    tokio::sync::RwLock,
-};
-
-pub struct PythPriceListenerState {
-    pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
-    feed_ids: Vec<PriceId>,
-    latest_prices: Arc<RwLock<HashMap<PriceId, Price>>>,
-}
-
-impl PythPriceListenerState {
-    pub async fn get_latest_price(&self, feed_id: &PriceId) -> Option<Price> {
-        let latest_prices = self.latest_prices.read().await;
-        latest_prices.get(feed_id).cloned()
-    }
-
-    pub async fn subscribe_to_price_updates(&self) -> Result<()> {
-        self.pyth_price_client
-            .subscribe_to_price_updates(&self.feed_ids)
-            .await
-    }
-}
-
-pub struct PythPriceListener;
-impl Actor for PythPriceListener {
-    type Msg = PythPriceListenerMessage;
-    type State = PythPriceListenerState;
-    type Arguments = Arc<dyn ReadPythPrices + Send + Sync>;
-
-    async fn pre_start(
-        &self,
-        _myself: ActorRef<Self::Msg>,
-        hermes_client: Self::Arguments,
-    ) -> Result<Self::State, ActorProcessingErr> {
-        let state = PythPriceListenerState {
-            pyth_price_client: hermes_client,
-            feed_ids: vec![],
-            latest_prices: Arc::new(RwLock::new(HashMap::new())),
-        };
-        Ok(state)
-    }
-
-    async fn handle(
-        &self,
-        _myself: ActorRef<Self::Msg>,
-        message: Self::Msg,
-        _state: &mut Self::State,
-    ) -> Result<(), ActorProcessingErr> {
-        match message {
-            PythPriceListenerMessage::GetLatestPrice(feed_id, reply_port) => {
-                let price = _state.get_latest_price(&feed_id).await;
-                reply_port.send(price)?;
-            }
-            PythPriceListenerMessage::UpdateFeedIdSet(_) => {
-                todo!()
-            }
-        }
-        Ok(())
-    }
-}

+ 0 - 143
apps/argus/src/actors/subscription_listener.rs

@@ -1,143 +0,0 @@
-use {
-    super::SubscriptionListenerMessage,
-    crate::adapters::{
-        ethereum::SubscriptionParams,
-        types::{ReadChainSubscriptions, SubscriptionId},
-    },
-    anyhow::Result,
-    ractor::{Actor, ActorProcessingErr, ActorRef},
-    std::{
-        collections::{HashMap, HashSet},
-        sync::Arc,
-        time::Duration,
-    },
-    tokio::time,
-    tracing,
-};
-
-pub struct SubscriptionListenerState {
-    pub chain_name: String,
-    pub contract: Arc<dyn ReadChainSubscriptions + Send + Sync>,
-    pub active_subscriptions: HashMap<SubscriptionId, SubscriptionParams>,
-    pub poll_interval: Duration,
-}
-
-impl SubscriptionListenerState {
-    async fn refresh_subscriptions(&mut self) -> Result<()> {
-        let subscriptions = self.contract.get_active_subscriptions().await?;
-
-        tracing::info!(
-            chain_name = self.chain_name,
-            subscription_count = subscriptions.len(),
-            subscription_ids = ?subscriptions.keys().collect::<Vec<_>>(),
-            "Retrieved active subscriptions"
-        );
-
-        let old_ids: HashSet<_> = self.active_subscriptions.keys().cloned().collect();
-        let new_ids: HashSet<_> = subscriptions.keys().cloned().collect();
-
-        let added = new_ids.difference(&old_ids).count();
-        let removed = old_ids.difference(&new_ids).count();
-
-        if added > 0 || removed > 0 {
-            tracing::info!(
-                chain_name = self.chain_name,
-                added = added,
-                removed = removed,
-                "Subscription changes detected"
-            );
-        } else {
-            tracing::debug!(
-                chain_name = self.chain_name,
-                "No subscription changes detected"
-            );
-        }
-
-        self.active_subscriptions = subscriptions;
-        Ok(())
-    }
-}
-
-pub struct SubscriptionListener;
-
-impl Actor for SubscriptionListener {
-    type Msg = SubscriptionListenerMessage;
-    type State = SubscriptionListenerState;
-    type Arguments = (
-        String,
-        Arc<dyn ReadChainSubscriptions + Send + Sync>,
-        Duration,
-    );
-
-    async fn pre_start(
-        &self,
-        myself: ActorRef<Self::Msg>,
-        (chain_name, contract, poll_interval): Self::Arguments,
-    ) -> Result<Self::State, ActorProcessingErr> {
-        let mut state = SubscriptionListenerState {
-            chain_name,
-            contract,
-            active_subscriptions: HashMap::new(),
-            poll_interval,
-        };
-
-        match state.refresh_subscriptions().await {
-            Ok(_) => {
-                tracing::info!(
-                    chain_name = state.chain_name,
-                    "Loaded {} active subscriptions",
-                    state.active_subscriptions.len()
-                );
-            }
-            Err(e) => {
-                tracing::error!(
-                    chain_name = state.chain_name,
-                    error = %e,
-                    "Failed to load active subscriptions"
-                );
-            }
-        }
-
-        if let Err(e) = state.contract.subscribe_to_subscription_events().await {
-            tracing::error!(
-                chain_name = state.chain_name,
-                error = %e,
-                "Failed to subscribe to contract events"
-            );
-        }
-
-        let poll_interval = state.poll_interval;
-        tokio::spawn(async move {
-            let mut interval = time::interval(poll_interval);
-            loop {
-                interval.tick().await;
-                let _ = myself.cast(SubscriptionListenerMessage::RefreshSubscriptions);
-            }
-        });
-
-        Ok(state)
-    }
-
-    async fn handle(
-        &self,
-        _myself: ActorRef<Self::Msg>,
-        message: Self::Msg,
-        state: &mut Self::State,
-    ) -> Result<(), ActorProcessingErr> {
-        match message {
-            SubscriptionListenerMessage::GetActiveSubscriptions => {
-                todo!()
-            }
-            SubscriptionListenerMessage::RefreshSubscriptions => {
-                if let Err(e) = state.refresh_subscriptions().await {
-                    tracing::error!(
-                        chain_name = state.chain_name,
-                        error = %e,
-                        "Failed to refresh subscriptions"
-                    );
-                }
-            }
-        }
-        Ok(())
-    }
-}

+ 0 - 39
apps/argus/src/actors/types.rs

@@ -1,39 +0,0 @@
-use crate::adapters::types::*;
-use {
-    ractor::RpcReplyPort,
-    serde::{Deserialize, Serialize},
-};
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub enum SubscriptionListenerMessage {
-    GetActiveSubscriptions,
-    RefreshSubscriptions,
-}
-
-#[derive(Debug)]
-pub enum PythPriceListenerMessage {
-    GetLatestPrice(PriceId, RpcReplyPort<Option<Price>>),
-    UpdateFeedIdSet(Vec<PriceId>),
-}
-
-#[derive(Debug)]
-pub enum ChainPriceListenerMessage {
-    GetLatestPrice(PriceId, RpcReplyPort<Option<Price>>),
-    UpdateFeedIdSet(Vec<PriceId>),
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub enum ControllerMessage {
-    PerformUpdate,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct PushRequest {
-    pub subscription_id: SubscriptionId,
-    pub price_ids: Vec<PriceId>,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub enum PricePusherMessage {
-    PushPriceUpdates(PushRequest),
-}

+ 2 - 7
apps/argus/src/adapters/contract.rs

@@ -1,16 +1,11 @@
-use super::ethereum::{PythPulse, SubscriptionUpdatedFilter};
+use super::ethereum::PythPulse;
 use super::types::*;
 use crate::adapters::ethereum::SubscriptionParams;
 use anyhow::Result;
 use async_trait::async_trait;
-use ethers::contract::stream::EventStream;
-use ethers::contract::{ContractError, EthEvent};
 use ethers::providers::Middleware;
-use ethers::types::{Filter, H256, U256};
-use futures::Stream;
-use futures::TryStreamExt;
+use ethers::types::H256;
 use std::collections::HashMap;
-use std::pin::Pin;
 
 #[async_trait]
 pub trait GetChainPrices {

+ 2 - 9
apps/argus/src/adapters/types.rs

@@ -1,18 +1,11 @@
 use anyhow::Result;
 use async_trait::async_trait;
-use ethers::{
-    contract::ContractError,
-    providers::Middleware,
-    types::{H256, U256},
-};
-use futures::Stream;
+use ethers::types::{H256, U256};
 use serde::{Deserialize, Serialize};
-use std::{collections::HashMap, pin::Pin};
+use std::collections::HashMap;
 
 use crate::adapters::ethereum::SubscriptionParams;
 
-use super::ethereum::SubscriptionUpdatedFilter;
-
 #[async_trait]
 pub trait UpdateChainPrices {
     async fn update_price_feeds(

+ 9 - 88
apps/argus/src/api.rs

@@ -1,106 +1,27 @@
 use {
-    crate::adapters::ethereum::BlockStatus,
-    axum::{
-        body::Body,
-        http::StatusCode,
-        response::{IntoResponse, Response},
-        routing::get,
-        Router,
-    },
-    prometheus_client::{
-        encoding::EncodeLabelSet,
-        metrics::{counter::Counter, family::Family},
-        registry::Registry,
-    },
+    axum::{body::Body, routing::get, Router},
+    index::index,
+    live::live,
+    metrics::metrics,
+    prometheus_client::registry::Registry,
+    ready::ready,
     std::sync::Arc,
     tokio::sync::RwLock,
 };
-pub use {index::*, live::*, metrics::*, ready::*};
-
 mod index;
 mod live;
 mod metrics;
 mod ready;
-
-pub type ChainName = String;
-
-#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
-pub struct RequestLabel {
-    pub value: String,
-}
-
-pub struct ApiMetrics {
-    pub http_requests: Family<RequestLabel, Counter>,
-}
-
 #[derive(Clone)]
 pub struct ApiState {
     pub metrics_registry: Arc<RwLock<Registry>>,
-
-    /// Prometheus metrics
-    pub metrics: Arc<ApiMetrics>,
 }
 
-impl ApiState {
-    pub async fn new(metrics_registry: Arc<RwLock<Registry>>) -> ApiState {
-        let metrics = ApiMetrics {
-            http_requests: Family::default(),
-        };
-
-        let http_requests = metrics.http_requests.clone();
-        metrics_registry.write().await.register(
-            "http_requests",
-            "Number of HTTP requests received",
-            http_requests,
-        );
-
-        ApiState {
-            metrics: Arc::new(metrics),
-            metrics_registry,
-        }
-    }
-}
-
-/// The state of the service for a single blockchain.
-#[derive(Clone)]
-pub struct BlockchainState {
-    /// The human friendly name for this blockchain, useful for logging
-    pub name: ChainName,
-    /// The BlockStatus of the block that is considered to be confirmed on the blockchain.
-    /// For eg., Finalized, Safe
-    pub confirmed_block_status: BlockStatus,
-}
-
-pub enum RestError {
-    /// The server cannot currently communicate with the blockchain, so is not able to verify
-    /// which random values have been requested.
-    TemporarilyUnavailable,
-    /// A catch-all error for all other types of errors that could occur during processing.
-    Unknown,
-}
-
-impl IntoResponse for RestError {
-    fn into_response(self) -> Response {
-        match self {
-            RestError::TemporarilyUnavailable => (
-                StatusCode::SERVICE_UNAVAILABLE,
-                "This service is temporarily unavailable",
-            )
-                .into_response(),
-            RestError::Unknown => (
-                StatusCode::INTERNAL_SERVER_ERROR,
-                "An unknown error occurred processing the request",
-            )
-                .into_response(),
-        }
-    }
-}
-
-pub fn routes(state: ApiState) -> Router<(), Body> {
+pub fn routes(api_state: ApiState) -> Router<(), Body> {
     Router::new()
         .route("/", get(index))
         .route("/live", get(live))
-        .route("/metrics", get(metrics))
         .route("/ready", get(ready))
-        .with_state(state)
+        .route("/metrics", get(metrics))
+        .with_state(api_state)
 }

+ 6 - 3
apps/argus/src/command/run.rs

@@ -2,7 +2,8 @@ use {
     crate::{
         api::{self, BlockchainState, ChainName},
         config::{Config, EthereumConfig, RunOptions},
-        keeper::{self, keeper_metrics::KeeperMetrics},
+        keeper::{self, metrics::KeeperMetrics},
+        state::BlockchainState,
     },
     anyhow::{anyhow, Error, Result},
     axum::Router,
@@ -25,7 +26,9 @@ pub async fn run_api(
     metrics_registry: Arc<RwLock<Registry>>,
     mut rx_exit: watch::Receiver<bool>,
 ) -> Result<()> {
-    let api_state = api::ApiState::new(metrics_registry).await;
+    let api_state = api::ApiState {
+        metrics_registry: metrics_registry.clone(),
+    };
 
     // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
     // `with_state` method which replaces `Body` with `State` in the type signature.
@@ -54,7 +57,7 @@ pub async fn run_api(
 }
 
 pub async fn run_keeper(
-    chains: HashMap<String, api::BlockchainState>,
+    chains: HashMap<String, BlockchainState>,
     config: Config,
     private_key: String,
     metrics_registry: Arc<RwLock<Registry>>,

+ 1 - 1
apps/argus/src/config.rs

@@ -1,6 +1,6 @@
 pub use run::RunOptions;
 use {
-    crate::{adapters::ethereum::BlockStatus, api::ChainName},
+    crate::{adapters::ethereum::BlockStatus, state::ChainName},
     anyhow::{anyhow, Result},
     clap::{crate_authors, crate_description, crate_name, crate_version, Args, Parser},
     ethers::types::Address,

+ 105 - 102
apps/argus/src/keeper.rs

@@ -1,37 +1,31 @@
-use {
-    crate::{
-        actors::{
-            chain_price_listener::ChainPriceListener, controller::Controller,
-            price_pusher::PricePusher, pyth_price_listener::PythPriceListener,
-            subscription_listener::SubscriptionListener,
-        },
-        adapters::{
-            ethereum::InstrumentedSignablePythContract, hermes::HermesClient,
-            types::ReadChainSubscriptions,
-        },
-        api::BlockchainState,
-        config::EthereumConfig,
-    },
-    backoff::ExponentialBackoff,
-    ethers::signers::Signer,
-    fortuna::eth_utils::traced_client::RpcMetrics,
-    keeper_metrics::KeeperMetrics,
-    ractor::Actor,
-    std::{sync::Arc, time::Duration},
-    tracing,
-};
+use anyhow::Result;
+use backoff::ExponentialBackoff;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::sync::watch;
+use tracing;
 
-pub(crate) mod keeper_metrics;
+use crate::adapters::{ethereum::InstrumentedSignablePythContract, hermes::HermesClient};
+use crate::config::EthereumConfig;
+use crate::metrics::KeeperMetrics;
+use crate::services::{
+    ChainPriceService, ControllerService, PricePusherService, PythPriceService, Service,
+    SubscriptionService,
+};
+use crate::state::ArgusState;
+use crate::state::BlockchainState;
+use ethers::signers::Signer;
+use fortuna::eth_utils::traced_client::RpcMetrics;
 
-#[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.name))]
+#[tracing::instrument(name = "keeper_shared", skip_all, fields(chain_name = chain_state.name))]
 pub async fn run_keeper_for_chain(
     private_key: String,
     chain_eth_config: EthereumConfig,
     chain_state: BlockchainState,
-    _metrics: Arc<KeeperMetrics>,
+    _metrics: Arc<KeeperMetrics>, // TODO: add metrics
     rpc_metrics: Arc<RpcMetrics>,
-) {
-    tracing::info!("Starting keeper");
+) -> Result<()> {
+    tracing::info!("Starting keeper with shared memory architecture");
 
     let contract = Arc::new(
         InstrumentedSignablePythContract::from_config(
@@ -48,18 +42,24 @@ pub async fn run_keeper_for_chain(
     );
 
     let keeper_address = contract.wallet().address();
-
     tracing::info!(
-        chain_id = chain_state.name,
         keeper_address = %keeper_address,
         "Keeper address"
     );
 
     let hermes_client = Arc::new(HermesClient);
 
-    // TODO: Make these configurable
+    let state = Arc::new(ArgusState::new());
+
+    let (stop_tx, stop_rx) = watch::channel(false);
+    {
+        let mut stop_sender = state.stop_sender.lock().expect("Mutex poisoned");
+        *stop_sender = Some(stop_tx);
+    }
+
     let subscription_poll_interval = Duration::from_secs(60);
     let chain_price_poll_interval = Duration::from_secs(10);
+    let pyth_price_poll_interval = Duration::from_secs(5);
     let controller_update_interval = Duration::from_secs(5);
     let backoff_policy = ExponentialBackoff {
         initial_interval: Duration::from_secs(1),
@@ -69,77 +69,80 @@ pub async fn run_keeper_for_chain(
         ..ExponentialBackoff::default()
     };
 
-    let (subscription_listener, _) = Actor::spawn(
-        Some(format!("SubscriptionListener-{}", chain_state.name)),
-        SubscriptionListener,
-        (
-            chain_state.name.clone(),
-            contract.clone() as Arc<dyn ReadChainSubscriptions + Send + Sync>,
-            subscription_poll_interval,
-        ),
-    )
-    .await
-    .expect("Failed to spawn SubscriptionListener actor");
-
-    let (pyth_price_listener, _) = Actor::spawn(
-        Some(format!("PythPriceListener-{}", chain_state.name)),
-        PythPriceListener,
+    let subscription_service = SubscriptionService::new(
+        chain_state.name.clone(),
+        contract.clone(),
+        subscription_poll_interval,
+        state.subscription_state.clone(),
+        state.pyth_price_state.clone(),
+        state.chain_price_state.clone(),
+    );
+
+    let pyth_price_service = PythPriceService::new(
+        chain_state.name.clone(),
+        pyth_price_poll_interval,
         hermes_client.clone(),
-    )
-    .await
-    .expect(&format!(
-        "Failed to spawn PythPriceListener-{} actor",
-        chain_state.name
-    ));
-
-    let (chain_price_listener, _) = Actor::spawn(
-        Some(format!("ChainPriceListener-{}", chain_state.name)),
-        ChainPriceListener,
-        (
-            chain_state.name.clone(),
-            contract.clone(),
-            chain_price_poll_interval,
-        ),
-    )
-    .await
-    .expect(&format!(
-        "Failed to spawn ChainPriceListener-{} actor",
-        chain_state.name
-    ));
-
-    let (price_pusher, _) = Actor::spawn(
-        Some(format!("PricePusher-{}", chain_state.name)),
-        PricePusher,
-        (
-            chain_state.name.clone(),
-            contract.clone(),
-            hermes_client.clone(),
-            backoff_policy,
-        ),
-    )
-    .await
-    .expect(&format!(
-        "Failed to spawn PricePusher-{} actor",
-        chain_state.name
-    ));
-
-    let (_controller, _) = Actor::spawn(
-        Some(format!("Controller-{}", chain_state.name)),
-        Controller,
-        (
-            chain_state.name.clone(),
-            subscription_listener,
-            pyth_price_listener,
-            chain_price_listener,
-            price_pusher,
-            controller_update_interval,
-        ),
-    )
-    .await
-    .expect(&format!(
-        "Failed to spawn Controller-{} actor",
-        chain_state.name
-    ));
-
-    tracing::info!(chain_id = chain_state.name, "Keeper actors started");
+        state.pyth_price_state.clone(),
+    );
+
+    let chain_price_service = ChainPriceService::new(
+        chain_state.name.clone(),
+        contract.clone(),
+        chain_price_poll_interval,
+        state.chain_price_state.clone(),
+    );
+
+    let price_pusher_service = PricePusherService::new(
+        chain_state.name.clone(),
+        contract.clone(),
+        hermes_client.clone(),
+        backoff_policy,
+    );
+
+    let controller_service = ControllerService::new(
+        chain_state.name.clone(),
+        controller_update_interval,
+        state.subscription_state.clone(),
+        state.pyth_price_state.clone(),
+        state.chain_price_state.clone(),
+    );
+
+    let services: Vec<Arc<dyn Service>> = vec![
+        Arc::new(subscription_service),
+        Arc::new(pyth_price_service),
+        Arc::new(chain_price_service),
+        Arc::new(price_pusher_service),
+        Arc::new(controller_service),
+    ];
+
+    let mut handles = Vec::new();
+    for service in services {
+        let service_stop_rx = stop_rx.clone();
+
+        let handle = tokio::spawn(async move {
+            let service_name = service.name().to_string();
+            match service.start(service_stop_rx).await {
+                Ok(_) => {
+                    tracing::info!(service = service_name, "Service stopped gracefully");
+                }
+                Err(e) => {
+                    tracing::error!(
+                        service = service_name,
+                        error = %e,
+                        "Service stopped with error"
+                    );
+                }
+            }
+        });
+
+        handles.push(handle);
+    }
+
+    tracing::info!("Keeper services started");
+
+    for handle in handles {
+        let _ = handle.await;
+    }
+
+    Ok(())
 }

+ 0 - 153
apps/argus/src/keeper_shared.rs

@@ -1,153 +0,0 @@
-use anyhow::Result;
-use backoff::ExponentialBackoff;
-use std::sync::Arc;
-use std::time::Duration;
-use tokio::sync::watch;
-use tracing;
-
-use crate::adapters;
-use crate::adapters::types::ReadPythPrices;
-use crate::adapters::{
-    ethereum::InstrumentedSignablePythContract, hermes::HermesClient, types::ReadChainSubscriptions,
-};
-use crate::api::BlockchainState;
-use crate::config::EthereumConfig;
-use crate::keeper::keeper_metrics::KeeperMetrics;
-use crate::services::{
-    ChainPriceService, ControllerService, PricePusherService, PythPriceService, Service,
-    SubscriptionService,
-};
-use crate::state::ArgusState;
-use ethers::signers::Signer;
-use fortuna::eth_utils::traced_client::RpcMetrics;
-
-#[tracing::instrument(name = "keeper_shared", skip_all, fields(chain_id = chain_state.name))]
-pub async fn run_keeper_for_chain(
-    private_key: String,
-    chain_eth_config: EthereumConfig,
-    chain_state: BlockchainState,
-    _metrics: Arc<KeeperMetrics>,
-    rpc_metrics: Arc<RpcMetrics>,
-) -> Result<()> {
-    tracing::info!("Starting keeper with shared memory architecture");
-
-    let contract = Arc::new(
-        InstrumentedSignablePythContract::from_config(
-            &chain_eth_config,
-            &private_key,
-            chain_state.name.clone(),
-            rpc_metrics.clone(),
-        )
-        .await
-        .expect(&format!(
-            "Failed to create InstrumentedSignablePythContract from config for chain {}",
-            chain_state.name
-        )),
-    );
-
-    let keeper_address = contract.wallet().address();
-    tracing::info!(
-        chain_id = chain_state.name,
-        keeper_address = %keeper_address,
-        "Keeper address"
-    );
-
-    let hermes_client = Arc::new(HermesClient);
-
-    let state = Arc::new(ArgusState::new(chain_state.name.clone()));
-
-    let (stop_tx, stop_rx) = watch::channel(false);
-    {
-        let mut stop_sender = state.stop_sender.lock().expect("Mutex poisoned");
-        *stop_sender = Some(stop_tx);
-    }
-
-    let subscription_poll_interval = Duration::from_secs(60);
-    let chain_price_poll_interval = Duration::from_secs(10);
-    let pyth_price_poll_interval = Duration::from_secs(5);
-    let controller_update_interval = Duration::from_secs(5);
-    let backoff_policy = ExponentialBackoff {
-        initial_interval: Duration::from_secs(1),
-        max_interval: Duration::from_secs(60),
-        multiplier: 2.0,
-        max_elapsed_time: Some(Duration::from_secs(300)),
-        ..ExponentialBackoff::default()
-    };
-
-    let subscription_service = SubscriptionService::new(
-        chain_state.name.clone(),
-        contract.clone(),
-        subscription_poll_interval,
-        state.subscription_state.clone(),
-        state.pyth_price_state.clone(),
-        state.chain_price_state.clone(),
-    );
-
-    let pyth_price_service = PythPriceService::new(
-        chain_state.name.clone(),
-        pyth_price_poll_interval,
-        hermes_client.clone(),
-        state.pyth_price_state.clone(),
-    );
-
-    let chain_price_service = ChainPriceService::new(
-        chain_state.name.clone(),
-        contract.clone(),
-        chain_price_poll_interval,
-        state.chain_price_state.clone(),
-    );
-
-    let price_pusher_service = PricePusherService::new(
-        chain_state.name.clone(),
-        contract.clone(),
-        hermes_client.clone(),
-        backoff_policy,
-    );
-
-    let controller_service = ControllerService::new(
-        chain_state.name.clone(),
-        controller_update_interval,
-        state.subscription_state.clone(),
-        state.pyth_price_state.clone(),
-        state.chain_price_state.clone(),
-    );
-
-    let services: Vec<Arc<dyn Service>> = vec![
-        Arc::new(subscription_service),
-        Arc::new(pyth_price_service),
-        Arc::new(chain_price_service),
-        Arc::new(price_pusher_service),
-        Arc::new(controller_service),
-    ];
-
-    let mut handles = Vec::new();
-    for service in services {
-        let service_stop_rx = stop_rx.clone();
-
-        let handle = tokio::spawn(async move {
-            let service_name = service.name().to_string();
-            match service.start(service_stop_rx).await {
-                Ok(_) => {
-                    tracing::info!(service = service_name, "Service stopped gracefully");
-                }
-                Err(e) => {
-                    tracing::error!(
-                        service = service_name,
-                        error = %e,
-                        "Service stopped with error"
-                    );
-                }
-            }
-        });
-
-        handles.push(handle);
-    }
-
-    tracing::info!(chain_id = chain_state.name, "Keeper services started");
-
-    for handle in handles {
-        let _ = handle.await;
-    }
-
-    Ok(())
-}

+ 1 - 2
apps/argus/src/lib.rs

@@ -1,9 +1,8 @@
-pub mod actors;
 pub mod adapters;
 pub mod api;
 pub mod command;
 pub mod config;
 pub mod keeper;
-pub mod keeper_shared;
+pub mod metrics;
 pub mod services;
 pub mod state;

+ 17 - 17
apps/argus/src/keeper/keeper_metrics.rs → apps/argus/src/metrics.rs

@@ -1,54 +1,54 @@
 use {
+    crate::state::ChainName,
     prometheus_client::{
         encoding::EncodeLabelSet,
         metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram},
         registry::Registry,
     },
-    std::sync::atomic::AtomicU64,
-    std::sync::Arc,
+    std::sync::{atomic::AtomicU64, Arc},
     tokio::sync::RwLock,
 };
 
 #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
-pub struct ChainIdLabel {
-    pub chain_id: String,
+pub struct ChainNameLabel {
+    pub chain_name: ChainName,
 }
 
 #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
 pub struct SubscriptionIdLabel {
-    pub chain_id: String,
+    pub chain_name: ChainName,
     pub subscription_id: String,
 }
 
 #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
 pub struct PriceFeedIdLabel {
-    pub chain_id: String,
+    pub chain_name: ChainName,
     pub price_feed_id: String,
 }
 
 #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
 pub struct KeeperIdLabel {
-    pub chain_id: String,
+    pub chain_name: ChainName,
     pub keeper_id: String,
 }
 
 pub struct KeeperMetrics {
     /// Number of active subscriptions per chain
-    pub active_subscriptions: Family<ChainIdLabel, Gauge>,
+    pub active_subscriptions: Family<ChainNameLabel, Gauge>,
     /// Number of price feeds per chain that are in an active subscription
-    pub active_price_feeds: Family<ChainIdLabel, Gauge>,
+    pub active_price_feeds: Family<ChainNameLabel, Gauge>,
     /// Last published time for an active price feed (Unix timestamp seconds)
     pub last_published_time_s: Family<PriceFeedIdLabel, Gauge<f64, AtomicU64>>,
     /// Total gas fee (in native token) spent on price updates per chain
-    pub total_gas_fee_spent: Family<ChainIdLabel, Gauge<f64, AtomicU64>>,
+    pub total_gas_fee_spent: Family<ChainNameLabel, Gauge<f64, AtomicU64>>,
     /// Total payment received (in native token) per chain
-    pub total_payment_received: Family<ChainIdLabel, Gauge<f64, AtomicU64>>,
+    pub total_payment_received: Family<ChainNameLabel, Gauge<f64, AtomicU64>>,
     /// Number of successful price updates per chain
-    pub successful_price_updates: Family<ChainIdLabel, Counter>,
+    pub successful_price_updates: Family<ChainNameLabel, Counter>,
     /// Number of failed price updates per chain
-    pub failed_price_updates: Family<ChainIdLabel, Counter>,
+    pub failed_price_updates: Family<ChainNameLabel, Counter>,
     /// Current gas price estimate (in Gwei) per chain
-    pub gas_price_estimate: Family<ChainIdLabel, Gauge<f64, AtomicU64>>,
+    pub gas_price_estimate: Family<ChainNameLabel, Gauge<f64, AtomicU64>>,
     /// Keeper wallet balance (in native token) per chain
     pub keeper_wallet_balance: Family<KeeperIdLabel, Gauge<f64, AtomicU64>>,
     /// Duration from the time the keeper notices an eligible update criteria to the time the keeper lands the update on-chain in milliseconds per chain
@@ -81,7 +81,7 @@ impl Default for KeeperMetrics {
 }
 
 impl KeeperMetrics {
-    pub async fn new(registry: Arc<RwLock<Registry>>, chain_ids: Vec<String>) -> Self {
+    pub async fn new(registry: Arc<RwLock<Registry>>, chain_names: Vec<String>) -> Self {
         let mut writable_registry = registry.write().await;
         let keeper_metrics = KeeperMetrics::default();
 
@@ -146,8 +146,8 @@ impl KeeperMetrics {
         );
 
         // Initialize metrics for each chain_id
-        for chain_id in chain_ids {
-            let chain_label = ChainIdLabel { chain_id };
+        for chain_name in chain_names {
+            let chain_label = ChainNameLabel { chain_name };
 
             let _ = keeper_metrics
                 .active_subscriptions

+ 5 - 2
apps/argus/src/services/chain_price_service.rs

@@ -7,10 +7,12 @@ use tokio::time;
 use tracing;
 
 use crate::adapters::contract::GetChainPrices;
+use crate::state::ChainName;
 use crate::services::Service;
 use crate::state::ChainPriceState;
 
 pub struct ChainPriceService {
+    chain_name: ChainName,
     name: String,
     contract: Arc<dyn GetChainPrices + Send + Sync>,
     poll_interval: Duration,
@@ -19,13 +21,14 @@ pub struct ChainPriceService {
 
 impl ChainPriceService {
     pub fn new(
-        chain_id: String,
+        chain_name: ChainName,
         contract: Arc<dyn GetChainPrices + Send + Sync>,
         poll_interval: Duration,
         chain_price_state: Arc<ChainPriceState>,
     ) -> Self {
         Self {
-            name: format!("ChainPriceService-{}", chain_id),
+            chain_name: chain_name.clone(),
+            name: format!("ChainPriceService-{}", chain_name),
             contract,
             poll_interval,
             chain_price_state,

+ 3 - 2
apps/argus/src/services/controller_service.rs

@@ -9,6 +9,7 @@ use tracing;
 use crate::adapters::types::{PriceId, SubscriptionId};
 use crate::services::types::PushRequest;
 use crate::services::Service;
+use crate::state::ChainName;
 use crate::state::{ChainPriceState, PythPriceState, SubscriptionState};
 
 pub struct ControllerService {
@@ -21,14 +22,14 @@ pub struct ControllerService {
 
 impl ControllerService {
     pub fn new(
-        chain_id: String,
+        chain_name: ChainName,
         update_interval: Duration,
         subscription_state: Arc<SubscriptionState>,
         pyth_price_state: Arc<PythPriceState>,
         chain_price_state: Arc<ChainPriceState>,
     ) -> Self {
         Self {
-            name: format!("ControllerService-{}", chain_id),
+            name: format!("ControllerService-{}", chain_name),
             update_interval,
             subscription_state,
             pyth_price_state,

+ 13 - 2
apps/argus/src/services/price_pusher_service.rs

@@ -8,8 +8,10 @@ use tracing;
 use crate::adapters::types::{ReadPythPrices, UpdateChainPrices};
 use crate::services::types::PushRequest;
 use crate::services::Service;
+use crate::state::ChainName;
 
 pub struct PricePusherService {
+    chain_name: ChainName,
     name: String,
     contract: Arc<dyn UpdateChainPrices + Send + Sync>,
     pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
@@ -20,7 +22,7 @@ pub struct PricePusherService {
 
 impl PricePusherService {
     pub fn new(
-        chain_id: String,
+        chain_name: ChainName,
         contract: Arc<dyn UpdateChainPrices + Send + Sync>,
         pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
         backoff_policy: ExponentialBackoff,
@@ -28,7 +30,8 @@ impl PricePusherService {
         let (request_tx, request_rx) = mpsc::channel(100);
 
         Self {
-            name: format!("PricePusherService-{}", chain_id),
+            chain_name: chain_name.clone(),
+            name: format!("PricePusherService-{}", chain_name),
             contract,
             pyth_price_client,
             backoff_policy,
@@ -41,6 +44,14 @@ impl PricePusherService {
         self.request_tx.clone()
     }
 
+    #[tracing::instrument(
+        skip(self),
+        fields(
+            name = "handle_request",
+            task = self.name,
+            subscription_id = request.subscription_id.to_string()
+        )
+    )]
     async fn handle_request(&self, request: PushRequest) {
         let price_ids = request.price_ids.clone();
 

+ 5 - 2
apps/argus/src/services/pyth_price_service.rs

@@ -6,9 +6,11 @@ use tokio::sync::watch;
 use tracing;
 
 use crate::adapters::types::ReadPythPrices;
+use crate::state::ChainName;
 use crate::services::Service;
 
 pub struct PythPriceService {
+    chain_name: ChainName,
     name: String,
     pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
     pyth_price_state: Arc<crate::state::PythPriceState>,
@@ -17,13 +19,14 @@ pub struct PythPriceService {
 
 impl PythPriceService {
     pub fn new(
-        chain_id: String,
+        chain_name: ChainName,
         poll_interval: Duration,
         pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
         pyth_price_state: Arc<crate::state::PythPriceState>,
     ) -> Self {
         Self {
-            name: format!("PythPriceService-{}", chain_id),
+            chain_name: chain_name.clone(),
+            name: format!("PythPriceService-{}", chain_name),
             poll_interval,
             pyth_price_client,
             pyth_price_state,

+ 6 - 3
apps/argus/src/services/subscription_service.rs

@@ -7,9 +7,11 @@ use tokio::time;
 use tracing;
 
 use crate::adapters::types::ReadChainSubscriptions;
+use crate::state::ChainName;
 use crate::services::Service;
 
 pub struct SubscriptionService {
+    chain_name: ChainName,
     name: String,
     contract: Arc<dyn ReadChainSubscriptions + Send + Sync>,
     poll_interval: Duration,
@@ -20,7 +22,7 @@ pub struct SubscriptionService {
 
 impl SubscriptionService {
     pub fn new(
-        chain_id: String,
+        chain_name: ChainName,
         contract: Arc<dyn ReadChainSubscriptions + Send + Sync>,
         poll_interval: Duration,
         subscription_state: Arc<crate::state::SubscriptionState>,
@@ -28,7 +30,8 @@ impl SubscriptionService {
         chain_price_state: Arc<crate::state::ChainPriceState>,
     ) -> Self {
         Self {
-            name: format!("SubscriptionService-{}", chain_id),
+            chain_name: chain_name.clone(),
+            name: format!("SubscriptionService-{}", chain_name),
             contract,
             poll_interval,
             subscription_state,
@@ -76,7 +79,7 @@ impl Service for SubscriptionService {
         let _ = self.refresh_subscriptions().await;
 
         // Subscribe to contract events
-        let event_stream = match self.contract.subscribe_to_subscription_events().await {
+        let _event_stream = match self.contract.subscribe_to_subscription_events().await {
             Ok(stream) => {
                 tracing::info!(
                     service_name = self.name,

+ 16 - 4
apps/argus/src/state.rs

@@ -2,22 +2,34 @@ use std::collections::{HashMap, HashSet};
 use std::sync::{Arc, Mutex, RwLock};
 use tokio::sync::watch;
 
-use crate::adapters::ethereum::SubscriptionParams;
+use crate::adapters::ethereum::{BlockStatus, SubscriptionParams};
 use crate::adapters::types::{Price, PriceId, SubscriptionId};
 
+pub type ChainName = String;
+
+/// The state of the service for a single blockchain.
+#[derive(Clone)]
+pub struct BlockchainState {
+    /// The human friendly name for this blockchain, useful for logging
+    pub name: ChainName,
+    /// The BlockStatus of the block that is considered to be confirmed on the blockchain.
+    /// For eg., Finalized, Safe
+    pub confirmed_block_status: BlockStatus,
+}
+
 #[derive(Clone)]
 pub struct ArgusState {
-    pub chain_id: String,
     pub subscription_state: Arc<SubscriptionState>,
     pub pyth_price_state: Arc<PythPriceState>,
     pub chain_price_state: Arc<ChainPriceState>,
     pub stop_sender: Arc<Mutex<Option<watch::Sender<bool>>>>,
 }
 
+/// The state of Argus per chain.
+/// Each sub state object should be a singleton and shared across services.
 impl ArgusState {
-    pub fn new(chain_id: String) -> Self {
+    pub fn new() -> Self {
         Self {
-            chain_id,
             subscription_state: Arc::new(SubscriptionState::new()),
             pyth_price_state: Arc::new(PythPriceState::new()),
             chain_price_state: Arc::new(ChainPriceState::new()),