Browse Source

feat: define Argus keeper architecture and interfaces

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>
Devin AI 6 months ago
parent
commit
af8d38e78f

+ 110 - 0
apps/argus/src/chain_price_listener.rs

@@ -0,0 +1,110 @@
+use {
+    crate::{
+        chain::ethereum::PythContract,
+        config::EthereumConfig,
+    },
+    anyhow::Result,
+    async_trait::async_trait,
+    ethers::{
+        contract::EthEvent,
+        types::{Address, U256},
+    },
+    std::{collections::HashSet, sync::Arc},
+    tokio::sync::RwLock,
+};
+
+#[derive(Debug, Clone, EthEvent)]
+#[ethevent(name = "PriceUpdated")]
+pub struct PriceUpdatedEvent {
+    #[ethevent(indexed)]
+    pub subscription_id: U256,
+    #[ethevent(indexed)]
+    pub price_id: [u8; 32],
+    pub publish_time: u64,
+}
+
+#[derive(Debug, Clone)]
+pub struct ChainPrice {
+    pub feed_id: [u8; 32],
+    pub price: i64,
+    pub conf: u64,
+    pub expo: i32,
+    pub publish_time: u64,
+}
+
+#[async_trait]
+pub trait ChainPriceListener: Send + Sync {
+    async fn initialize(&self) -> Result<()>;
+
+    async fn update_feed_id_set(&self, feed_ids: HashSet<[u8; 32]>) -> Result<()>;
+
+    async fn get_latest_price(&self, feed_id: [u8; 32]) -> Result<Option<ChainPrice>>;
+
+    async fn get_latest_prices(&self, feed_ids: &[[u8; 32]]) -> Result<Vec<Option<ChainPrice>>>;
+}
+
+pub struct EthereumChainPriceListener {
+    contract: Arc<PythContract>,
+    config: EthereumConfig,
+    feed_ids: Arc<RwLock<HashSet<[u8; 32]>>>,
+    latest_prices: Arc<RwLock<std::collections::HashMap<[u8; 32], ChainPrice>>>,
+}
+
+impl EthereumChainPriceListener {
+    pub fn new(contract: Arc<PythContract>, config: EthereumConfig) -> Self {
+        Self {
+            contract,
+            config,
+            feed_ids: Arc::new(RwLock::new(HashSet::new())),
+            latest_prices: Arc::new(RwLock::new(std::collections::HashMap::new())),
+        }
+    }
+
+    async fn subscribe_to_price_updates(&self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn poll_on_chain_prices(&self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn handle_price_update_event(&self, event: PriceUpdatedEvent) -> Result<()> {
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl ChainPriceListener for EthereumChainPriceListener {
+    async fn initialize(&self) -> Result<()> {
+        self.subscribe_to_price_updates().await?;
+        
+        self.poll_on_chain_prices().await?;
+        
+        Ok(())
+    }
+
+    async fn update_feed_id_set(&self, feed_ids: HashSet<[u8; 32]>) -> Result<()> {
+        let mut current_feed_ids = self.feed_ids.write().await;
+        
+        if *current_feed_ids != feed_ids {
+            *current_feed_ids = feed_ids;
+            self.poll_on_chain_prices().await?;
+        }
+        
+        Ok(())
+    }
+
+    async fn get_latest_price(&self, feed_id: [u8; 32]) -> Result<Option<ChainPrice>> {
+        let prices = self.latest_prices.read().await;
+        Ok(prices.get(&feed_id).cloned())
+    }
+
+    async fn get_latest_prices(&self, feed_ids: &[[u8; 32]]) -> Result<Vec<Option<ChainPrice>>> {
+        let prices = self.latest_prices.read().await;
+        let result = feed_ids
+            .iter()
+            .map(|id| prices.get(id).cloned())
+            .collect();
+        Ok(result)
+    }
+}

+ 215 - 0
apps/argus/src/controller.rs

@@ -0,0 +1,215 @@
+use {
+    crate::{
+        chain_price_listener::{ChainPrice, ChainPriceListener},
+        config::EthereumConfig,
+        metrics::ControllerMetrics,
+        price_pusher::{PriceUpdateRequest, PricePusher},
+        pyth_price_listener::{PythPrice, PythPriceListener},
+        subscription_listener::{Subscription, SubscriptionListener, UpdateCriteria},
+    },
+    anyhow::Result,
+    async_trait::async_trait,
+    std::{collections::HashSet, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}},
+    tokio::time,
+};
+
+#[async_trait]
+pub trait Controller: Send + Sync {
+    async fn initialize(&self) -> Result<()>;
+
+    async fn run(&self) -> Result<()>;
+}
+
+pub struct PulseController {
+    chain_id: String,
+    subscription_listener: Arc<dyn SubscriptionListener>,
+    pyth_price_listener: Arc<dyn PythPriceListener>,
+    chain_price_listener: Arc<dyn ChainPriceListener>,
+    price_pusher: Arc<dyn PricePusher>,
+    metrics: Arc<ControllerMetrics>,
+    update_interval: Duration,
+}
+
+impl PulseController {
+    pub fn new(
+        chain_id: String,
+        subscription_listener: Arc<dyn SubscriptionListener>,
+        pyth_price_listener: Arc<dyn PythPriceListener>,
+        chain_price_listener: Arc<dyn ChainPriceListener>,
+        price_pusher: Arc<dyn PricePusher>,
+        metrics: Arc<ControllerMetrics>,
+        update_interval: Duration,
+    ) -> Self {
+        Self {
+            chain_id,
+            subscription_listener,
+            pyth_price_listener,
+            chain_price_listener,
+            price_pusher,
+            metrics,
+            update_interval,
+        }
+    }
+
+    async fn update_feed_id_sets(&self) -> Result<()> {
+        let feed_ids = self.subscription_listener.get_all_active_feed_ids().await?;
+        
+        self.pyth_price_listener.update_feed_id_set(feed_ids.clone()).await?;
+        self.chain_price_listener.update_feed_id_set(feed_ids).await?;
+        
+        Ok(())
+    }
+
+    async fn check_subscription_update_needed(
+        &self,
+        subscription: &Subscription,
+    ) -> Result<(bool, Vec<[u8; 32]>)> {
+        let now = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap()
+            .as_secs();
+        
+        let pyth_prices = self.pyth_price_listener.get_latest_prices(&subscription.price_ids).await?;
+        let chain_prices = self.chain_price_listener.get_latest_prices(&subscription.price_ids).await?;
+        
+        let mut update_needed = false;
+        let mut feeds_to_update = Vec::new();
+        
+        for (i, feed_id) in subscription.price_ids.iter().enumerate() {
+            let pyth_price = match pyth_prices[i] {
+                Some(ref price) => price,
+                None => continue, // Skip if no Pyth price
+            };
+            
+            let chain_price = match chain_prices[i] {
+                Some(ref price) => price,
+                None => {
+                    update_needed = true;
+                    feeds_to_update.push(*feed_id);
+                    continue;
+                }
+            };
+            
+            if subscription.update_criteria.update_on_heartbeat {
+                let time_since_last_update = now.saturating_sub(chain_price.publish_time);
+                if time_since_last_update >= subscription.update_criteria.heartbeat_seconds as u64 {
+                    update_needed = true;
+                    feeds_to_update.push(*feed_id);
+                    continue;
+                }
+            }
+            
+            if subscription.update_criteria.update_on_deviation {
+                let deviation = self.calculate_price_deviation(pyth_price, chain_price);
+                if deviation >= subscription.update_criteria.deviation_threshold_bps as u64 {
+                    update_needed = true;
+                    feeds_to_update.push(*feed_id);
+                    continue;
+                }
+            }
+        }
+        
+        Ok((update_needed, feeds_to_update))
+    }
+
+    fn calculate_price_deviation(&self, pyth_price: &PythPrice, chain_price: &ChainPrice) -> u64 {
+        if pyth_price.expo != chain_price.expo {
+            return 10000; // 100% deviation if exponents don't match
+        }
+        
+        let pyth_price_abs = pyth_price.price.abs() as u64;
+        let chain_price_abs = chain_price.price.abs() as u64;
+        
+        if pyth_price_abs == 0 && chain_price_abs == 0 {
+            return 0;
+        }
+        
+        if pyth_price_abs == 0 || chain_price_abs == 0 {
+            return 10000; // 100% deviation if one price is zero
+        }
+        
+        let max_price = std::cmp::max(pyth_price_abs, chain_price_abs);
+        let min_price = std::cmp::min(pyth_price_abs, chain_price_abs);
+        
+        let deviation = (max_price - min_price) * 10000 / max_price;
+        deviation
+    }
+
+    async fn process_subscription(&self, subscription: &Subscription) -> Result<()> {
+        if !subscription.is_active {
+            return Ok(());
+        }
+        
+        let (update_needed, feeds_to_update) = self.check_subscription_update_needed(subscription).await?;
+        
+        if update_needed {
+            let request = PriceUpdateRequest {
+                subscription_id: subscription.id,
+                price_ids: subscription.price_ids.clone(),
+            };
+            
+            match self.price_pusher.push_price_updates(request).await {
+                Ok(tx_hash) => {
+                    tracing::info!(
+                        "Successfully pushed price updates for subscription {} with tx hash {}",
+                        subscription.id,
+                        tx_hash
+                    );
+                    self.metrics.record_update_success(subscription.id.as_u64());
+                }
+                Err(err) => {
+                    tracing::error!(
+                        "Failed to push price updates for subscription {}: {}",
+                        subscription.id,
+                        err
+                    );
+                    self.metrics.record_update_failure(subscription.id.as_u64());
+                }
+            }
+        }
+        
+        Ok(())
+    }
+
+    async fn run_update_loop_once(&self) -> Result<()> {
+        let subscriptions = self.subscription_listener.get_active_subscriptions().await?;
+        
+        self.update_feed_id_sets().await?;
+        
+        for subscription in subscriptions {
+            if let Err(err) = self.process_subscription(&subscription).await {
+                tracing::error!(
+                    "Error processing subscription {}: {}",
+                    subscription.id,
+                    err
+                );
+            }
+        }
+        
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl Controller for PulseController {
+    async fn initialize(&self) -> Result<()> {
+        self.subscription_listener.initialize().await?;
+        self.pyth_price_listener.initialize().await?;
+        self.chain_price_listener.initialize().await?;
+        self.price_pusher.initialize().await?;
+        
+        Ok(())
+    }
+
+    async fn run(&self) -> Result<()> {
+        tracing::info!("Starting controller for chain {}", self.chain_id);
+        
+        loop {
+            if let Err(err) = self.run_update_loop_once().await {
+                tracing::error!("Error in update loop: {}", err);
+            }
+            
+            time::sleep(self.update_interval).await;
+        }
+    }
+}

+ 123 - 0
apps/argus/src/keeper/mod.rs

@@ -0,0 +1,123 @@
+use {
+    crate::{
+        api::BlockchainState,
+        chain::ethereum::InstrumentedSignablePythContract,
+        chain_price_listener::{ChainPriceListener, EthereumChainPriceListener},
+        config::EthereumConfig,
+        controller::{Controller, PulseController},
+        keeper::keeper_metrics::KeeperMetrics,
+        metrics::{ControllerMetrics, PricePusherMetrics},
+        price_pusher::{EthereumPricePusher, PricePusher},
+        pyth_price_listener::{HermesPythPriceListener, PythPriceListener},
+        subscription_listener::{EthereumSubscriptionListener, SubscriptionListener},
+    },
+    anyhow::Result,
+    ethers::signers::Signer,
+    fortuna::eth_utils::traced_client::RpcMetrics,
+    std::{sync::Arc, time::Duration},
+    tokio::spawn,
+    tracing,
+};
+
+pub(crate) mod keeper_metrics;
+
+const DEFAULT_UPDATE_INTERVAL: Duration = Duration::from_secs(5);
+
+const DEFAULT_HERMES_WS_URL: &str = "wss://hermes.pyth.network/ws";
+
+const DEFAULT_HERMES_API_URL: &str = "https://hermes.pyth.network/api";
+
+#[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
+pub async fn run_keeper_threads(
+    private_key: String,
+    chain_eth_config: EthereumConfig,
+    chain_state: BlockchainState,
+    metrics: Arc<KeeperMetrics>,
+    rpc_metrics: Arc<RpcMetrics>,
+) {
+    tracing::info!("starting keeper");
+
+    let contract = Arc::new(
+        InstrumentedSignablePythContract::from_config(
+            &chain_eth_config,
+            &private_key,
+            chain_state.id.clone(),
+            rpc_metrics.clone(),
+        )
+        .await
+        .expect("Chain config should be valid"),
+    );
+    let keeper_address = contract.wallet().address();
+    tracing::info!("Keeper address: {}", keeper_address);
+
+    let read_contract = Arc::new(
+        crate::chain::ethereum::InstrumentedPythContract::from_config(
+            &chain_eth_config,
+            chain_state.id.clone(),
+            rpc_metrics.clone(),
+        )
+        .expect("Chain config should be valid"),
+    );
+
+    let registry = metrics.registry();
+    let controller_metrics = Arc::new(
+        ControllerMetrics::new(registry.clone(), chain_state.id.clone()).await
+    );
+    let price_pusher_metrics = Arc::new(
+        PricePusherMetrics::new(registry.clone(), chain_state.id.clone()).await
+    );
+
+    let subscription_listener = Arc::new(
+        EthereumSubscriptionListener::new(
+            read_contract.clone(),
+            contract.clone(),
+            chain_eth_config.clone(),
+        )
+    );
+
+    let pyth_price_listener = Arc::new(
+        HermesPythPriceListener::new(
+            DEFAULT_HERMES_WS_URL.to_string(),
+        )
+    );
+
+    let chain_price_listener = Arc::new(
+        EthereumChainPriceListener::new(
+            read_contract.clone(),
+            chain_eth_config.clone(),
+        )
+    );
+
+    let price_pusher = Arc::new(
+        EthereumPricePusher::new(
+            contract.clone(),
+            chain_eth_config.clone(),
+            pyth_price_listener.clone(),
+            price_pusher_metrics.clone(),
+            DEFAULT_HERMES_API_URL.to_string(),
+        )
+    );
+
+    let controller = Arc::new(
+        PulseController::new(
+            chain_state.id.clone(),
+            subscription_listener,
+            pyth_price_listener,
+            chain_price_listener,
+            price_pusher,
+            controller_metrics,
+            DEFAULT_UPDATE_INTERVAL,
+        )
+    );
+
+    if let Err(err) = controller.initialize().await {
+        tracing::error!("Failed to initialize controller: {}", err);
+        return;
+    }
+
+    spawn(async move {
+        if let Err(err) = controller.run().await {
+            tracing::error!("Controller error: {}", err);
+        }
+    });
+}

+ 7 - 0
apps/argus/src/lib.rs

@@ -3,3 +3,10 @@ pub mod chain;
 pub mod command;
 pub mod config;
 pub mod keeper;
+
+pub mod subscription_listener;
+pub mod pyth_price_listener;
+pub mod chain_price_listener;
+pub mod controller;
+pub mod price_pusher;
+pub mod metrics;

+ 118 - 0
apps/argus/src/metrics.rs

@@ -0,0 +1,118 @@
+use {
+    prometheus_client::{
+        encoding::EncodeLabelSet,
+        metrics::{counter::Counter, family::Family, gauge::Gauge},
+        registry::Registry,
+    },
+    std::sync::Arc,
+    tokio::sync::RwLock,
+};
+
+#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
+pub struct SubscriptionLabel {
+    pub subscription_id: String,
+}
+
+#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
+pub struct ChainLabel {
+    pub chain_id: String,
+}
+
+pub struct ControllerMetrics {
+    registry: Arc<RwLock<Registry>>,
+    update_success_counter: Family<SubscriptionLabel, Counter>,
+    update_failure_counter: Family<SubscriptionLabel, Counter>,
+}
+
+impl ControllerMetrics {
+    pub async fn new(registry: Arc<RwLock<Registry>>, chain_id: String) -> Self {
+        let update_success_counter = Family::<SubscriptionLabel, Counter>::default();
+        let update_failure_counter = Family::<SubscriptionLabel, Counter>::default();
+        
+        registry.write().await.register(
+            format!("controller_{}_update_success", chain_id),
+            "Number of successful price updates per subscription",
+            update_success_counter.clone(),
+        );
+        
+        registry.write().await.register(
+            format!("controller_{}_update_failure", chain_id),
+            "Number of failed price updates per subscription",
+            update_failure_counter.clone(),
+        );
+        
+        Self {
+            registry,
+            update_success_counter,
+            update_failure_counter,
+        }
+    }
+
+    pub fn record_update_success(&self, subscription_id: u64) {
+        self.update_success_counter
+            .get_or_create(&SubscriptionLabel {
+                subscription_id: subscription_id.to_string(),
+            })
+            .inc();
+    }
+
+    pub fn record_update_failure(&self, subscription_id: u64) {
+        self.update_failure_counter
+            .get_or_create(&SubscriptionLabel {
+                subscription_id: subscription_id.to_string(),
+            })
+            .inc();
+    }
+}
+
+pub struct PricePusherMetrics {
+    registry: Arc<RwLock<Registry>>,
+    success_counter: Counter,
+    failure_counter: Counter,
+    gas_used_gauge: Gauge,
+}
+
+impl PricePusherMetrics {
+    pub async fn new(registry: Arc<RwLock<Registry>>, chain_id: String) -> Self {
+        let success_counter = Counter::default();
+        let failure_counter = Counter::default();
+        let gas_used_gauge = Gauge::default();
+        
+        registry.write().await.register(
+            format!("price_pusher_{}_success", chain_id),
+            "Number of successful price pushes",
+            success_counter.clone(),
+        );
+        
+        registry.write().await.register(
+            format!("price_pusher_{}_failure", chain_id),
+            "Number of failed price pushes",
+            failure_counter.clone(),
+        );
+        
+        registry.write().await.register(
+            format!("price_pusher_{}_gas_used", chain_id),
+            "Gas used for price pushes",
+            gas_used_gauge.clone(),
+        );
+        
+        Self {
+            registry,
+            success_counter,
+            failure_counter,
+            gas_used_gauge,
+        }
+    }
+
+    pub fn record_success(&self) {
+        self.success_counter.inc();
+    }
+
+    pub fn record_failure(&self) {
+        self.failure_counter.inc();
+    }
+
+    pub fn record_gas_used(&self, gas_used: u64) {
+        self.gas_used_gauge.set(gas_used as i64);
+    }
+}

+ 113 - 0
apps/argus/src/price_pusher.rs

@@ -0,0 +1,113 @@
+use {
+    crate::{
+        chain::ethereum::SignablePythContract,
+        config::EthereumConfig,
+        metrics::PricePusherMetrics,
+        pyth_price_listener::{PythPrice, PythPriceListener},
+    },
+    anyhow::Result,
+    async_trait::async_trait,
+    backoff::{backoff::Backoff, ExponentialBackoff},
+    ethers::types::{U256, H256},
+    std::{sync::Arc, time::Duration},
+};
+
+#[derive(Debug, Clone)]
+pub struct PriceUpdateRequest {
+    pub subscription_id: U256,
+    pub price_ids: Vec<[u8; 32]>,
+}
+
+#[async_trait]
+pub trait PricePusher: Send + Sync {
+    async fn initialize(&self) -> Result<()>;
+
+    async fn push_price_updates(&self, request: PriceUpdateRequest) -> Result<H256>;
+}
+
+pub struct EthereumPricePusher {
+    contract: Arc<SignablePythContract>,
+    config: EthereumConfig,
+    pyth_price_listener: Arc<dyn PythPriceListener>,
+    metrics: Arc<PricePusherMetrics>,
+    hermes_api_url: String,
+}
+
+impl EthereumPricePusher {
+    pub fn new(
+        contract: Arc<SignablePythContract>,
+        config: EthereumConfig,
+        pyth_price_listener: Arc<dyn PythPriceListener>,
+        metrics: Arc<PricePusherMetrics>,
+        hermes_api_url: String,
+    ) -> Self {
+        Self {
+            contract,
+            config,
+            pyth_price_listener,
+            metrics,
+            hermes_api_url,
+        }
+    }
+
+    async fn fetch_price_update_data(&self, price_ids: &[[u8; 32]]) -> Result<Vec<Vec<u8>>> {
+        Ok(Vec::new())
+    }
+
+    async fn push_with_retry(&self, request: PriceUpdateRequest) -> Result<H256> {
+        let mut backoff = ExponentialBackoff {
+            initial_interval: Duration::from_secs(1),
+            max_interval: Duration::from_secs(60),
+            multiplier: 1.5,
+            max_elapsed_time: Some(Duration::from_secs(300)),
+            ..Default::default()
+        };
+
+        let mut last_error = None;
+
+        loop {
+            match self.push_price_updates_once(&request).await {
+                Ok(tx_hash) => {
+                    self.metrics.record_success();
+                    return Ok(tx_hash);
+                }
+                Err(err) => {
+                    last_error = Some(err);
+                    self.metrics.record_failure();
+
+                    match backoff.next_backoff() {
+                        Some(duration) => {
+                            tokio::time::sleep(duration).await;
+                        }
+                        None => {
+                            return Err(last_error.unwrap());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    async fn push_price_updates_once(&self, request: &PriceUpdateRequest) -> Result<H256> {
+        let update_data = self.fetch_price_update_data(&request.price_ids).await?;
+        
+        let tx = self.contract.update_price_feeds(
+            request.subscription_id,
+            update_data,
+            request.price_ids.iter().map(|id| H256::from(*id)).collect(),
+        ).send().await?;
+        
+        Ok(tx.tx_hash())
+    }
+}
+
+#[async_trait]
+impl PricePusher for EthereumPricePusher {
+    async fn initialize(&self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn push_price_updates(&self, request: PriceUpdateRequest) -> Result<H256> {
+        self.push_with_retry(request).await
+    }
+}

+ 93 - 0
apps/argus/src/pyth_price_listener.rs

@@ -0,0 +1,93 @@
+use {
+    anyhow::Result,
+    async_trait::async_trait,
+    std::{collections::HashSet, sync::Arc},
+    tokio::sync::RwLock,
+};
+
+#[derive(Debug, Clone)]
+pub struct PythPrice {
+    pub feed_id: [u8; 32],
+    pub price: i64,
+    pub conf: u64,
+    pub expo: i32,
+    pub publish_time: u64,
+    pub slot: u64,
+}
+
+#[async_trait]
+pub trait PythPriceListener: Send + Sync {
+    async fn initialize(&self) -> Result<()>;
+
+    async fn update_feed_id_set(&self, feed_ids: HashSet<[u8; 32]>) -> Result<()>;
+
+    async fn get_latest_price(&self, feed_id: [u8; 32]) -> Result<Option<PythPrice>>;
+
+    async fn get_latest_prices(&self, feed_ids: &[[u8; 32]]) -> Result<Vec<Option<PythPrice>>>;
+}
+
+pub struct HermesPythPriceListener {
+    hermes_ws_url: String,
+    feed_ids: Arc<RwLock<HashSet<[u8; 32]>>>,
+    latest_prices: Arc<RwLock<std::collections::HashMap<[u8; 32], PythPrice>>>,
+}
+
+impl HermesPythPriceListener {
+    pub fn new(hermes_ws_url: String) -> Self {
+        Self {
+            hermes_ws_url,
+            feed_ids: Arc::new(RwLock::new(HashSet::new())),
+            latest_prices: Arc::new(RwLock::new(std::collections::HashMap::new())),
+        }
+    }
+
+    async fn connect_to_hermes(&self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn subscribe_to_price_updates(&self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn handle_price_update(&self, price: PythPrice) -> Result<()> {
+        let mut prices = self.latest_prices.write().await;
+        prices.insert(price.feed_id, price);
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl PythPriceListener for HermesPythPriceListener {
+    async fn initialize(&self) -> Result<()> {
+        self.connect_to_hermes().await?;
+        
+        self.subscribe_to_price_updates().await?;
+        
+        Ok(())
+    }
+
+    async fn update_feed_id_set(&self, feed_ids: HashSet<[u8; 32]>) -> Result<()> {
+        let mut current_feed_ids = self.feed_ids.write().await;
+        
+        if *current_feed_ids != feed_ids {
+            *current_feed_ids = feed_ids;
+            self.subscribe_to_price_updates().await?;
+        }
+        
+        Ok(())
+    }
+
+    async fn get_latest_price(&self, feed_id: [u8; 32]) -> Result<Option<PythPrice>> {
+        let prices = self.latest_prices.read().await;
+        Ok(prices.get(&feed_id).cloned())
+    }
+
+    async fn get_latest_prices(&self, feed_ids: &[[u8; 32]]) -> Result<Vec<Option<PythPrice>>> {
+        let prices = self.latest_prices.read().await;
+        let result = feed_ids
+            .iter()
+            .map(|id| prices.get(id).cloned())
+            .collect();
+        Ok(result)
+    }
+}

+ 132 - 0
apps/argus/src/subscription_listener.rs

@@ -0,0 +1,132 @@
+use {
+    crate::{
+        chain::ethereum::{PythContract, SignablePythContract},
+        config::EthereumConfig,
+    },
+    anyhow::Result,
+    async_trait::async_trait,
+    ethers::{
+        contract::EthEvent,
+        types::{Address, U256},
+    },
+    std::{collections::HashSet, sync::Arc},
+    tokio::sync::RwLock,
+};
+
+#[derive(Debug, Clone, EthEvent)]
+#[ethevent(name = "SubscriptionUpdated")]
+pub struct SubscriptionUpdatedEvent {
+    #[ethevent(indexed)]
+    pub subscription_id: U256,
+    pub is_active: bool,
+}
+
+#[derive(Debug, Clone, EthEvent)]
+#[ethevent(name = "SubscriptionFunded")]
+pub struct SubscriptionFundedEvent {
+    #[ethevent(indexed)]
+    pub subscription_id: U256,
+    pub amount: U256,
+}
+
+#[derive(Debug, Clone)]
+pub struct Subscription {
+    pub id: U256,
+    pub price_ids: Vec<[u8; 32]>,
+    pub is_active: bool,
+    pub update_criteria: UpdateCriteria,
+}
+
+#[derive(Debug, Clone)]
+pub struct UpdateCriteria {
+    pub update_on_heartbeat: bool,
+    pub heartbeat_seconds: u32,
+    pub update_on_deviation: bool,
+    pub deviation_threshold_bps: u32,
+}
+
+#[async_trait]
+pub trait SubscriptionListener: Send + Sync {
+    async fn initialize(&self) -> Result<()>;
+
+    async fn get_active_subscriptions(&self) -> Result<Vec<Subscription>>;
+
+    async fn get_all_active_feed_ids(&self) -> Result<HashSet<[u8; 32]>>;
+}
+
+pub struct EthereumSubscriptionListener {
+    contract: Arc<PythContract>,
+    signable_contract: Arc<SignablePythContract>,
+    config: EthereumConfig,
+    active_subscriptions: Arc<RwLock<Vec<Subscription>>>,
+    active_feed_ids: Arc<RwLock<HashSet<[u8; 32]>>>,
+}
+
+impl EthereumSubscriptionListener {
+    pub fn new(
+        contract: Arc<PythContract>,
+        signable_contract: Arc<SignablePythContract>,
+        config: EthereumConfig,
+    ) -> Self {
+        Self {
+            contract,
+            signable_contract,
+            config,
+            active_subscriptions: Arc::new(RwLock::new(Vec::new())),
+            active_feed_ids: Arc::new(RwLock::new(HashSet::new())),
+        }
+    }
+
+    async fn subscribe_to_events(&self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn poll_active_subscriptions(&self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn update_subscription_cache(&self, subscriptions: Vec<Subscription>) -> Result<()> {
+        let mut active_feed_ids = HashSet::new();
+        
+        for subscription in &subscriptions {
+            if subscription.is_active {
+                for feed_id in &subscription.price_ids {
+                    active_feed_ids.insert(*feed_id);
+                }
+            }
+        }
+
+        {
+            let mut subscriptions_lock = self.active_subscriptions.write().await;
+            *subscriptions_lock = subscriptions;
+        }
+        
+        {
+            let mut feed_ids_lock = self.active_feed_ids.write().await;
+            *feed_ids_lock = active_feed_ids;
+        }
+
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl SubscriptionListener for EthereumSubscriptionListener {
+    async fn initialize(&self) -> Result<()> {
+        self.subscribe_to_events().await?;
+        
+        self.poll_active_subscriptions().await?;
+        
+        Ok(())
+    }
+
+    async fn get_active_subscriptions(&self) -> Result<Vec<Subscription>> {
+        let subscriptions = self.active_subscriptions.read().await.clone();
+        Ok(subscriptions)
+    }
+
+    async fn get_all_active_feed_ids(&self) -> Result<HashSet<[u8; 32]>> {
+        let feed_ids = self.active_feed_ids.read().await.clone();
+        Ok(feed_ids)
+    }
+}