Ver código fonte

feat(hip-3-pusher): Configurable price sources (#3169)

Mike Rolish 1 semana atrás
pai
commit
d773ffa047

+ 43 - 0
apps/hip-3-pusher/config/config.sample.toml

@@ -0,0 +1,43 @@
+stale_price_threshold_seconds = 5
+prometheus_port = 9090
+
+[hyperliquid]
+hyperliquid_ws_urls = ["wss://api.hyperliquid-testnet.xyz/ws"]
+market_name = "pyth"
+asset_context_symbols = ["BTC"]
+use_testnet = false
+oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt"
+publish_interval = 3.0
+publish_timeout = 5.0
+enable_publish = false
+
+[multisig]
+enable_multisig = false
+
+[kms]
+enable_kms = false
+aws_kms_key_id_path = "/path/to/aws_kms_key_id.txt"
+
+[lazer]
+lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"]
+lazer_api_key = "lazer_api_key"
+feed_ids = [1, 8] # BTC, USDT
+
+[hermes]
+hermes_urls = ["wss://hermes.pyth.network/ws"]
+feed_ids = [
+    "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", # BTC
+    "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"  # USDT
+]
+
+[price.oracle]
+BTC = [
+    { source_type = "single", source = { source_name = "hl_oracle", source_id = "BTC" } },
+    { source_type = "pair", base_source = { source_name = "lazer", source_id = 1, exponent = -8 }, quote_source = { source_name = "lazer", source_id = 8, exponent = -8 } },
+    { source_type = "pair", base_source = { source_name = "hermes", source_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", exponent = -8 }, quote_source = { source_name = "hermes", source_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", exponent = -8 } },
+]
+
+[price.external]
+BTC = [{ source_type = "single", source = { source_name = "hl_mark", source_id = "BTC" } }]
+PYTH = [{ source_type = "constant", value = "0.10" }]
+FOGO = [{ source_type = "constant", value = "0.01" }]

+ 0 - 35
apps/hip-3-pusher/config/config.toml

@@ -1,35 +0,0 @@
-stale_price_threshold_seconds = 5
-prometheus_port = 9090
-
-[hyperliquid]
-hyperliquid_ws_urls = ["wss://api.hyperliquid-testnet.xyz/ws"]
-market_name = ""
-market_symbol = "BTC"
-use_testnet = false
-oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt"
-publish_interval = 3.0
-publish_timeout = 5.0
-enable_publish = false
-
-[multisig]
-enable_multisig = false
-multisig_address = "0x0000000000000000000000000000000000000005"
-
-[kms]
-enable_kms = false
-aws_kms_key_id_path = "/path/to/aws_kms_key_id.txt"
-
-[lazer]
-lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"]
-lazer_api_key = "lazer_api_key"
-base_feed_id = 1   # BTC
-base_feed_exponent = -8
-quote_feed_id = 8  # USDT
-quote_feed_exponent = -8
-
-[hermes]
-hermes_urls = ["wss://hermes.pyth.network/ws"]
-base_feed_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"   # BTC
-base_feed_exponent = -8
-quote_feed_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"  # USDT
-quote_feed_exponent = -8

+ 2 - 1
apps/hip-3-pusher/pyproject.toml

@@ -1,12 +1,13 @@
 [project]
 name = "hip-3-pusher"
-version = "0.1.7"
+version = "0.2.0"
 description = "Hyperliquid HIP-3 market oracle pusher"
 readme = "README.md"
 requires-python = "==3.13.*"
 dependencies = [
     "boto3~=1.40.38",
     "cryptography~=46.0.1",
+    "httpx~=0.28.1",
     "hyperliquid-python-sdk~=0.19.0",
     "loguru~=0.7.3",
     "opentelemetry-exporter-prometheus~=0.58b0",

+ 51 - 9
apps/hip-3-pusher/src/pusher/config.py

@@ -1,6 +1,7 @@
 from hyperliquid.utils.constants import MAINNET_API_URL, TESTNET_API_URL
 from pydantic import BaseModel, FilePath, model_validator
 from typing import Optional
+from typing import Literal
 
 STALE_TIMEOUT_SECONDS = 5
 
@@ -18,25 +19,19 @@ class MultisigConfig(BaseModel):
 class LazerConfig(BaseModel):
     lazer_urls: list[str]
     lazer_api_key: str
-    base_feed_id: int
-    base_feed_exponent: int
-    quote_feed_id: int
-    quote_feed_exponent: int
+    feed_ids: list[int]
 
 
 class HermesConfig(BaseModel):
     hermes_urls: list[str]
-    base_feed_id: str
-    base_feed_exponent: int
-    quote_feed_id: str
-    quote_feed_exponent: int
+    feed_ids: list[str]
 
 
 class HyperliquidConfig(BaseModel):
     hyperliquid_ws_urls: list[str]
     push_urls: Optional[list[str]] = None
     market_name: str
-    market_symbol: str
+    asset_context_symbols: list[str]
     use_testnet: bool
     oracle_pusher_key_path: Optional[FilePath] = None
     publish_interval: float
@@ -50,6 +45,51 @@ class HyperliquidConfig(BaseModel):
         return self
 
 
+class SedaFeedConfig(BaseModel):
+    exec_program_id: str
+    exec_inputs: str
+
+
+class SedaConfig(BaseModel):
+    url: str
+    api_key_path: Optional[FilePath] = None
+    poll_interval: float
+    poll_failure_interval: float
+    poll_timeout: float
+    feeds: dict[str, SedaFeedConfig]
+
+
+class PriceSource(BaseModel):
+    source_name: str
+    source_id: str | int
+    exponent: Optional[int] = None
+
+
+class SingleSourceConfig(BaseModel):
+    source_type: Literal["single"]
+    source: PriceSource
+
+
+class PairSourceConfig(BaseModel):
+    source_type: Literal["pair"]
+    base_source: PriceSource
+    quote_source: PriceSource
+
+
+class ConstantSourceConfig(BaseModel):
+    source_type: Literal["constant"]
+    value: str
+
+
+PriceSourceConfig = SingleSourceConfig | PairSourceConfig | ConstantSourceConfig
+
+
+class PriceConfig(BaseModel):
+    oracle: dict[str, list[PriceSourceConfig]] = {}
+    mark: dict[str, list[PriceSourceConfig]] = {}
+    external: dict[str, list[PriceSourceConfig]] = {}
+
+
 class Config(BaseModel):
     stale_price_threshold_seconds: int
     prometheus_port: int
@@ -57,4 +97,6 @@ class Config(BaseModel):
     kms: KMSConfig
     lazer: LazerConfig
     hermes: HermesConfig
+    seda: SedaConfig
     multisig: MultisigConfig
+    price: PriceConfig

+ 6 - 10
apps/hip-3-pusher/src/pusher/hermes_listener.py

@@ -7,23 +7,22 @@ from tenacity import retry, retry_if_exception_type, wait_exponential
 
 from pusher.config import Config, STALE_TIMEOUT_SECONDS
 from pusher.exception import StaleConnectionError
-from pusher.price_state import PriceState, PriceUpdate
+from pusher.price_state import PriceSourceState, PriceUpdate
 
 
 class HermesListener:
     """
     Subscribe to Hermes price updates for needed feeds.
     """
-    def __init__(self, config: Config, price_state: PriceState):
+    def __init__(self, config: Config, hermes_state: PriceSourceState):
         self.hermes_urls = config.hermes.hermes_urls
-        self.base_feed_id = config.hermes.base_feed_id
-        self.quote_feed_id = config.hermes.quote_feed_id
-        self.price_state = price_state
+        self.feed_ids = config.hermes.feed_ids
+        self.hermes_state = hermes_state
 
     def get_subscribe_request(self):
         return {
             "type": "subscribe",
-            "ids": [self.base_feed_id, self.quote_feed_id],
+            "ids": self.feed_ids,
             "verbose": False,
             "binary": True,
             "allow_out_of_order": False,
@@ -81,9 +80,6 @@ class HermesListener:
             publish_time = price_object["publish_time"]
             logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time)
             now = time.time()
-            if id == self.base_feed_id:
-                self.price_state.hermes_base_price = PriceUpdate(price, now)
-            if id == self.quote_feed_id:
-                self.price_state.hermes_quote_price = PriceUpdate(price, now)
+            self.hermes_state.put(id, PriceUpdate(price, now))
         except Exception as e:
             logger.error("parse_hermes_message error: {}", e)

+ 13 - 11
apps/hip-3-pusher/src/pusher/hyperliquid_listener.py

@@ -7,7 +7,7 @@ import time
 
 from pusher.config import Config, STALE_TIMEOUT_SECONDS
 from pusher.exception import StaleConnectionError
-from pusher.price_state import PriceState, PriceUpdate
+from pusher.price_state import PriceSourceState, PriceUpdate
 
 # This will be in config, but note here.
 # Other RPC providers exist but so far we've seen their support is incomplete.
@@ -20,10 +20,11 @@ class HyperliquidListener:
     Subscribe to any relevant Hyperliquid websocket streams
     See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket
     """
-    def __init__(self, config: Config, price_state: PriceState):
+    def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_state: PriceSourceState):
         self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls
-        self.market_symbol = config.hyperliquid.market_symbol
-        self.price_state = price_state
+        self.asset_context_symbols = config.hyperliquid.asset_context_symbols
+        self.hl_oracle_state = hl_oracle_state
+        self.hl_mark_state = hl_mark_state
 
     def get_subscribe_request(self, asset):
         return {
@@ -44,9 +45,10 @@ class HyperliquidListener:
 
     async def subscribe_single_inner(self, url):
         async with websockets.connect(url) as ws:
-            subscribe_request = self.get_subscribe_request(self.market_symbol)
-            await ws.send(json.dumps(subscribe_request))
-            logger.info("Sent subscribe request to {}", url)
+            for symbol in self.asset_context_symbols:
+                subscribe_request = self.get_subscribe_request(symbol)
+                await ws.send(json.dumps(subscribe_request))
+                logger.info("Sent subscribe request for symbol: {} to {}", symbol,  url)
 
             # listen for updates
             while True:
@@ -76,10 +78,10 @@ class HyperliquidListener:
     def parse_hyperliquid_ws_message(self, message):
         try:
             ctx = message["data"]["ctx"]
+            symbol = message["data"]["coin"]
             now = time.time()
-            self.price_state.hl_oracle_price = PriceUpdate(ctx["oraclePx"], now)
-            self.price_state.hl_mark_price = PriceUpdate(ctx["markPx"], now)
-            logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price,
-                         self.price_state.hl_mark_price)
+            self.hl_oracle_state.put(symbol, PriceUpdate(ctx["oraclePx"], now))
+            self.hl_mark_state.put(symbol, PriceUpdate(ctx["markPx"], now))
+            logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", ctx["oraclePx"], ctx["markPx"])
         except Exception as e:
             logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", message, e)

+ 8 - 11
apps/hip-3-pusher/src/pusher/lazer_listener.py

@@ -7,25 +7,24 @@ from tenacity import retry, retry_if_exception_type, wait_exponential
 
 from pusher.config import Config, STALE_TIMEOUT_SECONDS
 from pusher.exception import StaleConnectionError
-from pusher.price_state import PriceState, PriceUpdate
+from pusher.price_state import PriceSourceState, PriceUpdate
 
 
 class LazerListener:
     """
     Subscribe to Lazer price updates for needed feeds.
     """
-    def __init__(self, config: Config, price_state: PriceState):
+    def __init__(self, config: Config, lazer_state: PriceSourceState):
         self.lazer_urls = config.lazer.lazer_urls
         self.api_key = config.lazer.lazer_api_key
-        self.base_feed_id = config.lazer.base_feed_id
-        self.quote_feed_id = config.lazer.quote_feed_id
-        self.price_state = price_state
+        self.feed_ids = config.lazer.feed_ids
+        self.lazer_state = lazer_state
 
     def get_subscribe_request(self, subscription_id: int):
         return {
             "type": "subscribe",
             "subscriptionId": subscription_id,
-            "priceFeedIds": [self.base_feed_id, self.quote_feed_id],
+            "priceFeedIds": self.feed_ids,
             "properties": ["price"],
             "formats": [],
             "deliveryFormat": "json",
@@ -54,7 +53,7 @@ class LazerListener:
             subscribe_request = self.get_subscribe_request(1)
 
             await ws.send(json.dumps(subscribe_request))
-            logger.info("Sent Lazer subscribe request to {}", router_url)
+            logger.info("Sent Lazer subscribe request to {} feed_ids {}", router_url, self.feed_ids)
 
             # listen for updates
             while True:
@@ -89,9 +88,7 @@ class LazerListener:
                 price = feed_update.get("price", None)
                 if feed_id is None or price is None:
                     continue
-                if feed_id == self.base_feed_id:
-                    self.price_state.lazer_base_price = PriceUpdate(price, now)
-                if feed_id == self.quote_feed_id:
-                    self.price_state.lazer_quote_price = PriceUpdate(price, now)
+                else:
+                    self.lazer_state.put(feed_id, PriceUpdate(price, now))
         except Exception as e:
             logger.error("parse_lazer_message error: {}", e)

+ 6 - 3
apps/hip-3-pusher/src/pusher/main.py

@@ -9,6 +9,7 @@ from pusher.config import Config
 from pusher.hyperliquid_listener import HyperliquidListener
 from pusher.lazer_listener import LazerListener
 from pusher.hermes_listener import HermesListener
+from pusher.seda_listener import SedaListener
 from pusher.price_state import PriceState
 from pusher.publisher import Publisher
 from pusher.metrics import Metrics
@@ -45,15 +46,17 @@ async def main():
     metrics = Metrics(config)
 
     publisher = Publisher(config, price_state, metrics)
-    hyperliquid_listener = HyperliquidListener(config, price_state)
-    lazer_listener = LazerListener(config, price_state)
-    hermes_listener = HermesListener(config, price_state)
+    hyperliquid_listener = HyperliquidListener(config, price_state.hl_oracle_state, price_state.hl_mark_state)
+    lazer_listener = LazerListener(config, price_state.lazer_state)
+    hermes_listener = HermesListener(config, price_state.hermes_state)
+    seda_listener = SedaListener(config, price_state.seda_state)
 
     await asyncio.gather(
         publisher.run(),
         hyperliquid_listener.subscribe_all(),
         lazer_listener.subscribe_all(),
         hermes_listener.subscribe_all(),
+        seda_listener.run(),
     )
     logger.info("Exiting hip-3-pusher..")
 

+ 93 - 59
apps/hip-3-pusher/src/pusher/price_state.py

@@ -1,83 +1,117 @@
+from dataclasses import dataclass
 from loguru import logger
 import time
 
-from pusher.config import Config
+from pusher.config import Config, PriceSource, PriceSourceConfig, ConstantSourceConfig, SingleSourceConfig, \
+    PairSourceConfig
 
 DEFAULT_STALE_PRICE_THRESHOLD_SECONDS = 5
 
 
+@dataclass
 class PriceUpdate:
-    def __init__(self, price, timestamp):
-        self.price = price
-        self.timestamp = timestamp
-
-    def __str__(self):
-        return f"PriceUpdate(price={self.price}, timestamp={self.timestamp})"
+    price: float | str
+    timestamp: float
 
     def time_diff(self, now):
         return now - self.timestamp
 
 
+class PriceSourceState:
+    def __init__(self, name: str):
+        self.name = name
+        self.state: dict[str, PriceUpdate] = {}
+
+    def __repr__(self):
+        return f"PriceSourceState(name={self.name} state={self.state})"
+
+    def get(self, symbol: str) -> PriceUpdate | None:
+        return self.state.get(symbol)
+
+    def put(self, symbol: str, value: PriceUpdate):
+        self.state[symbol] = value
+
+
 class PriceState:
+    HL_ORACLE = "hl_oracle"
+    HL_MARK = "hl_mark"
+    LAZER = "lazer"
+    HERMES = "hermes"
+    SEDA = "seda"
+
     """
     Maintain latest prices seen across listeners and publisher.
     """
     def __init__(self, config: Config):
         self.stale_price_threshold_seconds = config.stale_price_threshold_seconds
+        self.price_config = config.price
+
+        self.hl_oracle_state = PriceSourceState(self.HL_ORACLE)
+        self.hl_mark_state = PriceSourceState(self.HL_MARK)
+        self.lazer_state = PriceSourceState(self.LAZER)
+        self.hermes_state = PriceSourceState(self.HERMES)
+        self.seda_state = PriceSourceState(self.SEDA)
+
+        self.all_states = {
+            self.HL_ORACLE: self.hl_oracle_state,
+            self.HL_MARK: self.hl_mark_state,
+            self.LAZER: self.lazer_state,
+            self.HERMES: self.hermes_state,
+            self.SEDA: self.seda_state,
+        }
+
+    def get_all_prices(self, market_name):
+        logger.debug("get_all_prices state: {}", self.all_states)
+
+        return (
+            self.get_prices(self.price_config.oracle, market_name),
+            self.get_prices(self.price_config.mark, market_name),
+            self.get_prices(self.price_config.external, market_name)
+        )
+
+    def get_prices(self, symbol_configs: dict[str, list[PriceSourceConfig]], market_name: str):
+        pxs = {}
+        for symbol in symbol_configs:
+            for source_config in symbol_configs[symbol]:
+                # find first valid price in the waterfall
+                px = self.get_price(source_config)
+                if px is not None:
+                    pxs[f"{market_name}:{symbol}"] = px
+                    break
+        return pxs
+
+    def get_price(self, price_source_config: PriceSourceConfig):
+        if isinstance(price_source_config, ConstantSourceConfig):
+            return price_source_config.value
+        elif isinstance(price_source_config, SingleSourceConfig):
+            return self.get_price_from_single_source(price_source_config.source)
+        elif isinstance(price_source_config, PairSourceConfig):
+            return self.get_price_from_pair_source(price_source_config.base_source, price_source_config.quote_source)
+        else:
+            raise ValueError
 
-        self.hl_oracle_price: PriceUpdate | None = None
-        self.hl_mark_price: PriceUpdate | None = None
-
-        self.lazer_base_price: PriceUpdate | None = None
-        self.lazer_base_exponent = config.lazer.base_feed_exponent
-        self.lazer_quote_price: PriceUpdate | None = None
-        self.lazer_quote_exponent = config.lazer.quote_feed_exponent
-
-        self.hermes_base_price: PriceUpdate | None = None
-        self.hermes_base_exponent = config.hermes.base_feed_exponent
-        self.hermes_quote_price: PriceUpdate | None = None
-        self.hermes_quote_exponent = config.hermes.quote_feed_exponent
-
-    def get_current_oracle_price(self):
+    def get_price_from_single_source(self, source: PriceSource):
         now = time.time()
-        if self.hl_oracle_price:
-            time_diff = self.hl_oracle_price.time_diff(now)
-            if time_diff < self.stale_price_threshold_seconds:
-                return self.hl_oracle_price.price
-            else:
-                logger.error("Hyperliquid oracle price stale by {} seconds", time_diff)
+        update: PriceUpdate | None = self.all_states.get(source.source_name, {}).get(source.source_id)
+        if update is None:
+            logger.warning("source {} id {} is missing", source.source_name, source.source_id)
+            return None
+        time_diff = update.time_diff(now)
+        if time_diff >= self.stale_price_threshold_seconds:
+            logger.warning("source {} id {} is stale by {} seconds", source.source_name, source.source_id, time_diff)
+            return None
+        # valid price found
+        if source.exponent is not None:
+            return float(update.price) / (10.0 ** -source.exponent)
         else:
-            logger.error("Hyperliquid oracle price not received yet")
-
-        # fall back to Lazer
-        if self.lazer_base_price and self.lazer_quote_price:
-            max_time_diff = max(self.lazer_base_price.time_diff(now), self.lazer_quote_price.time_diff(now))
-            if max_time_diff < self.stale_price_threshold_seconds:
-                return self.get_lazer_price()
-            else:
-                logger.error("Lazer price stale by {} seconds", max_time_diff)
-        else:
-            logger.error("Lazer base/quote prices not received yet")
-
-        # fall back to Hermes
-        if self.hermes_base_price and self.hermes_quote_price:
-            max_time_diff = max(self.hermes_base_price.time_diff(now), self.hermes_quote_price.time_diff(now))
-            if max_time_diff < self.stale_price_threshold_seconds:
-                return self.get_hermes_price()
-            else:
-                logger.error("Hermes price stale by {} seconds", max_time_diff)
-        else:
-            logger.error("Hermes base/quote prices not received yet")
-
-        logger.error("All prices missing or stale!")
-        return None
+            return update.price
 
-    def get_hermes_price(self):
-        base_price = float(self.hermes_base_price.price) / (10.0 ** -self.hermes_base_exponent)
-        quote_price = float(self.hermes_quote_price.price) / (10.0 ** -self.hermes_quote_exponent)
-        return str(round(base_price / quote_price, 2))
+    def get_price_from_pair_source(self, base_source: PriceSource, quote_source: PriceSource):
+        base_price = self.get_price_from_single_source(base_source)
+        if base_price is None:
+            return None
+        quote_price = self.get_price_from_single_source(quote_source)
+        if quote_price is None:
+            return None
 
-    def get_lazer_price(self):
-        base_price = float(self.lazer_base_price.price) / (10.0 ** -self.lazer_base_exponent)
-        quote_price = float(self.lazer_quote_price.price) / (10.0 ** -self.lazer_quote_exponent)
-        return str(round(base_price / quote_price, 2))
+        return str(round(float(base_price) / float(quote_price), 2))

+ 9 - 14
apps/hip-3-pusher/src/pusher/publisher.py

@@ -53,7 +53,6 @@ class Publisher:
             self.multisig_address = config.multisig.multisig_address
 
         self.market_name = config.hyperliquid.market_name
-        self.market_symbol = config.hyperliquid.market_symbol
         self.enable_publish = config.hyperliquid.enable_publish
 
         self.price_state = price_state
@@ -70,20 +69,16 @@ class Publisher:
                 logger.exception("Publisher.publish() exception: {}", repr(e))
 
     def publish(self):
-        oracle_pxs = {}
-        oracle_px = self.price_state.get_current_oracle_price()
-        if not oracle_px:
-            logger.error("No valid oracle price available")
-            self.metrics.no_oracle_price_counter.add(1, self.metrics_labels)
-            return
-        else:
-            logger.debug("Current oracle price: {}", oracle_px)
-            oracle_pxs[f"{self.market_name}:{self.market_symbol}"] = oracle_px
+        oracle_pxs, mark_pxs, external_perp_pxs = self.price_state.get_all_prices(self.market_name)
+        logger.debug("oracle_pxs: {}", oracle_pxs)
+        logger.debug("mark_pxs: {}", mark_pxs)
+        logger.debug("external_perp_pxs: {}", external_perp_pxs)
 
-        mark_pxs = []
-        external_perp_pxs = {}
-        if self.price_state.hl_mark_price:
-            external_perp_pxs[f"{self.market_name}:{self.market_symbol}"] = self.price_state.hl_mark_price.price
+        if not oracle_pxs:
+            logger.error("No valid oracle prices available")
+            self.metrics.no_oracle_price_counter.add(1, self.metrics_labels)
+        # markPxs is a list of dicts of length 0-2, and so can be empty
+        mark_pxs = [mark_pxs] if mark_pxs else []
 
         # TODO: "Each update can change oraclePx and markPx by at most 1%."
         # TODO: "The markPx cannot be updated such that open interest would be 10x the open interest cap."

+ 70 - 0
apps/hip-3-pusher/src/pusher/seda_listener.py

@@ -0,0 +1,70 @@
+import asyncio
+import datetime
+import httpx
+import json
+from loguru import logger
+from pathlib import Path
+
+from pusher.config import Config, SedaFeedConfig
+from pusher.price_state import PriceSourceState, PriceUpdate
+
+
+class SedaListener:
+    SOURCE_NAME = "seda"
+
+    """
+    Subscribe to SEDA price updates for needed feeds.
+    """
+    def __init__(self, config: Config, seda_state: PriceSourceState):
+        self.url = config.seda.url
+        self.api_key = Path(config.seda.api_key_path).read_text().strip()
+        self.feeds = config.seda.feeds
+        self.poll_interval = config.seda.poll_interval
+        self.poll_failure_interval = config.seda.poll_failure_interval
+        self.poll_timeout = config.seda.poll_timeout
+        self.seda_state = seda_state
+
+    async def run(self):
+        await asyncio.gather(*[self._run_single(feed_name, self.feeds[feed_name]) for feed_name in self.feeds])
+
+    async def _run_single(self, feed_name: str, feed_config: SedaFeedConfig) -> None:
+        headers = {
+            "Accept": "application/json",
+            "Authorization": f"Bearer {self.api_key}",
+        }
+        params = {
+            "execProgramId": feed_config.exec_program_id,
+            "execInputs": feed_config.exec_inputs,
+            "encoding": "utf8",
+        }
+
+        async with httpx.AsyncClient(timeout=self.poll_timeout) as client:
+            while True:
+                result = await self._poll(client, headers, params)
+                if result["ok"]:
+                    self._parse_seda_message(feed_name, result["json"])
+                else:
+                    logger.error("SEDA poll request for {} failed: {}", feed_name, result)
+
+                await asyncio.sleep(self.poll_interval if result.get("ok") else self.poll_failure_interval)
+
+    async def _poll(self,
+        client: httpx.AsyncClient,
+        headers: dict[str, str],
+        params: dict[str, str],
+    ) -> dict:
+        try:
+            resp = await client.get(self.url, headers=headers, params=params)
+            resp.raise_for_status()
+            return {"ok": True, "status": resp.status_code, "json": resp.json()}
+        except httpx.HTTPStatusError as e:
+            return {"ok": False, "status": e.response.status_code, "error": str(e)}
+        except Exception as e:
+            return {"ok": False, "status": None, "error": str(e)}
+
+    def _parse_seda_message(self, feed_name, message):
+        result = json.loads(message["data"]["result"])
+        price = result["composite_rate"]
+        timestamp = datetime.datetime.fromisoformat(result["timestamp"]).timestamp()
+        logger.debug("Parsed SEDA update for feed: {} price: {} timestamp: {}", feed_name, price, timestamp)
+        self.seda_state.put(feed_name, PriceUpdate(price, timestamp))

+ 63 - 31
apps/hip-3-pusher/tests/test_price_state.py

@@ -1,70 +1,102 @@
 import time
 
-from pusher.config import Config, LazerConfig, HermesConfig
+from pusher.config import Config, LazerConfig, HermesConfig, PriceConfig, PriceSource, SingleSourceConfig, \
+    PairSourceConfig, HyperliquidConfig
 from pusher.price_state import PriceState, PriceUpdate
 
+DEX = "pyth"
+SYMBOL = "BTC"
+
 
 def get_config():
     config: Config = Config.model_construct()
     config.stale_price_threshold_seconds = 5
+    config.hyperliquid = HyperliquidConfig.model_construct()
+    config.hyperliquid.asset_context_symbols = [SYMBOL]
     config.lazer = LazerConfig.model_construct()
-    config.lazer.base_feed_exponent = -8
-    config.lazer.quote_feed_exponent = -8
+    config.lazer.feed_ids = [1, 8]
     config.hermes = HermesConfig.model_construct()
-    config.hermes.base_feed_exponent = -8
-    config.hermes.quote_feed_exponent = -8
+    config.hermes.feed_ids = ["e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"]
+    config.price = PriceConfig(
+        oracle={
+            SYMBOL: [
+                SingleSourceConfig(source_type="single", source=PriceSource(source_name="hl_oracle", source_id="BTC", exponent=None)),
+                PairSourceConfig(source_type="pair",
+                                 base_source=PriceSource(source_name="lazer", source_id=1, exponent=-8),
+                                 quote_source=PriceSource(source_name="lazer", source_id=8, exponent=-8)),
+                PairSourceConfig(source_type="pair",
+                                 base_source=PriceSource(source_name="hermes", source_id="e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", exponent=-8),
+                                 quote_source=PriceSource(source_name="hermes", source_id="2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", exponent=-8))
+            ]
+        },
+        mark={},
+        external={}
+    )
     return config
 
 
 def test_good_hl_price():
+    """
+    Pass through fresh HL oracle price.
+    """
     config = get_config()
     price_state = PriceState(config)
     now = time.time()
-    price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0)
-
-    oracle_px = price_state.get_current_oracle_price()
-    assert oracle_px == price_state.hl_oracle_price.price
-    assert oracle_px == "110000.0"
+    price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0))
 
+    oracle_px, _, _ = price_state.get_all_prices(DEX)
+    assert oracle_px == {f"{DEX}:{SYMBOL}": "110000.0"}
 
 
 def test_fallback_lazer():
+    """
+    HL oracle price is stale, so fall back to fresh Lazer price.
+    """
     config = get_config()
     price_state = PriceState(config)
     now = time.time()
-    price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)
-    price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0)
-    price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)
+    price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0))
+    price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0))
+    price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0))
 
-    oracle_px = price_state.get_current_oracle_price()
-    assert oracle_px == price_state.get_lazer_price()
-    assert oracle_px == "111616.16"
+    oracle_px, _, _ = price_state.get_all_prices(DEX)
+    assert oracle_px == {f"{DEX}:{SYMBOL}": "111616.16"}
 
 
 
 def test_fallback_hermes():
+    """
+    HL oracle price and Lazer prices are stale, so fall back to fresh Hermes price.
+    """
     config = get_config()
     price_state = PriceState(config)
     now = time.time()
-    price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)
-    price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0)
-    price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)
-    price_state.hermes_base_price = PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds / 2.0)
-    price_state.hermes_quote_price = PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0)
+    price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0))
+    price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0))
+    price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0))
+    price_state.hermes_state.put("e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43",
+                                 PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds / 2.0))
+    price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b",
+                                 PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0))
 
-    oracle_px = price_state.get_current_oracle_price()
-    assert oracle_px == price_state.get_hermes_price()
-    assert oracle_px == "113265.31"
+    oracle_px, _, _ = price_state.get_all_prices(DEX)
+    assert oracle_px == {f"{DEX}:{SYMBOL}": "113265.31"}
 
 
 def test_all_fail():
+    """
+    All prices are stale, so return nothing.
+    """
     config = get_config()
     price_state = PriceState(config)
     now = time.time()
-    price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)
-    price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)
-    price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0)
-    price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds - 1.0)
-    price_state.hermes_base_price = PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds - 1.0)
-    price_state.hermes_quote_price = PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0)
-    assert price_state.get_current_oracle_price() is None
+    price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0))
+    price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0))
+    price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds - 1.0))
+    price_state.hermes_state.put("e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43",
+                                 PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds - 1.0))
+    price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b",
+                                 PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0))
+
+    oracle_px, _, _ = price_state.get_all_prices(DEX)
+    assert oracle_px == {}

+ 62 - 1
apps/hip-3-pusher/uv.lock

@@ -11,6 +11,19 @@ wheels = [
     { url = "https://files.pythonhosted.org/packages/78/b6/6307fbef88d9b5ee7421e68d78a9f162e0da4900bc5f5793f6d3d0e34fb8/annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53", size = 13643, upload-time = "2024-05-20T21:33:24.1Z" },
 ]
 
+[[package]]
+name = "anyio"
+version = "4.11.0"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+    { name = "idna" },
+    { name = "sniffio" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/c6/78/7d432127c41b50bccba979505f272c16cbcadcc33645d5fa3a738110ae75/anyio-4.11.0.tar.gz", hash = "sha256:82a8d0b81e318cc5ce71a5f1f8b5c4e63619620b63141ef8c995fa0db95a57c4", size = 219094, upload-time = "2025-09-23T09:19:12.58Z" }
+wheels = [
+    { url = "https://files.pythonhosted.org/packages/15/b3/9b1a8074496371342ec1e796a96f99c82c945a339cd81a8e73de28b4cf9e/anyio-4.11.0-py3-none-any.whl", hash = "sha256:0287e96f4d26d4149305414d4e3bc32f0dcd0862365a4bddea19d7a1ec38c4fc", size = 109097, upload-time = "2025-09-23T09:19:10.601Z" },
+]
+
 [[package]]
 name = "bitarray"
 version = "3.7.1"
@@ -318,6 +331,15 @@ wheels = [
     { url = "https://files.pythonhosted.org/packages/bf/4d/257cdc01ada430b8e84b9f2385c2553f33218f5b47da9adf0a616308d4b7/eth_utils-5.3.1-py3-none-any.whl", hash = "sha256:1f5476d8f29588d25b8ae4987e1ffdfae6d4c09026e476c4aad13b32dda3ead0", size = 102529, upload-time = "2025-08-27T16:37:15.449Z" },
 ]
 
+[[package]]
+name = "h11"
+version = "0.16.0"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" }
+wheels = [
+    { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" },
+]
+
 [[package]]
 name = "hexbytes"
 version = "1.3.1"
@@ -329,11 +351,12 @@ wheels = [
 
 [[package]]
 name = "hip-3-pusher"
-version = "0.1.7"
+version = "0.2.0"
 source = { editable = "." }
 dependencies = [
     { name = "boto3" },
     { name = "cryptography" },
+    { name = "httpx" },
     { name = "hyperliquid-python-sdk" },
     { name = "loguru" },
     { name = "opentelemetry-exporter-prometheus" },
@@ -352,6 +375,7 @@ dev = [
 requires-dist = [
     { name = "boto3", specifier = "~=1.40.38" },
     { name = "cryptography", specifier = "~=46.0.1" },
+    { name = "httpx", specifier = "~=0.28.1" },
     { name = "hyperliquid-python-sdk", specifier = "~=0.19.0" },
     { name = "loguru", specifier = "~=0.7.3" },
     { name = "opentelemetry-exporter-prometheus", specifier = "~=0.58b0" },
@@ -364,6 +388,34 @@ requires-dist = [
 [package.metadata.requires-dev]
 dev = [{ name = "pytest", specifier = "~=8.4.2" }]
 
+[[package]]
+name = "httpcore"
+version = "1.0.9"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+    { name = "certifi" },
+    { name = "h11" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" }
+wheels = [
+    { url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" },
+]
+
+[[package]]
+name = "httpx"
+version = "0.28.1"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+    { name = "anyio" },
+    { name = "certifi" },
+    { name = "httpcore" },
+    { name = "idna" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" }
+wheels = [
+    { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" },
+]
+
 [[package]]
 name = "hyperliquid-python-sdk"
 version = "0.19.0"
@@ -732,6 +784,15 @@ wheels = [
     { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" },
 ]
 
+[[package]]
+name = "sniffio"
+version = "1.3.1"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/a2/87/a6771e1546d97e7e041b6ae58d80074f81b7d5121207425c964ddf5cfdbd/sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc", size = 20372, upload-time = "2024-02-25T23:20:04.057Z" }
+wheels = [
+    { url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235, upload-time = "2024-02-25T23:20:01.196Z" },
+]
+
 [[package]]
 name = "tenacity"
 version = "9.1.2"