ソースを参照

feat(hip-3-pusher): Mark price support (#3206)

Mike Rolish 2 日 前
コミット
cbb5dd5883

+ 1 - 1
apps/hip-3-pusher/pyproject.toml

@@ -1,6 +1,6 @@
 [project]
 name = "hip-3-pusher"
-version = "0.2.0"
+version = "0.2.1"
 description = "Hyperliquid HIP-3 market oracle pusher"
 readme = "README.md"
 requires-python = "==3.13.*"

+ 7 - 2
apps/hip-3-pusher/src/pusher/config.py

@@ -56,7 +56,7 @@ class SedaConfig(BaseModel):
     poll_interval: float
     poll_failure_interval: float
     poll_timeout: float
-    feeds: dict[str, SedaFeedConfig]
+    feeds: Optional[dict[str, SedaFeedConfig]] = {}
 
 
 class PriceSource(BaseModel):
@@ -81,7 +81,12 @@ class ConstantSourceConfig(BaseModel):
     value: str
 
 
-PriceSourceConfig = SingleSourceConfig | PairSourceConfig | ConstantSourceConfig
+class OracleMidAverageConfig(BaseModel):
+    source_type: Literal["oracle_mid_average"]
+    symbol: str
+
+
+PriceSourceConfig = SingleSourceConfig | PairSourceConfig | ConstantSourceConfig | OracleMidAverageConfig
 
 
 class PriceConfig(BaseModel):

+ 4 - 0
apps/hip-3-pusher/src/pusher/hermes_listener.py

@@ -30,6 +30,10 @@ class HermesListener:
         }
 
     async def subscribe_all(self):
+        if not self.feed_ids:
+            logger.info("No Hermes subscriptions needed")
+            return
+
         await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))
 
     @retry(

+ 26 - 5
apps/hip-3-pusher/src/pusher/hyperliquid_listener.py

@@ -20,11 +20,13 @@ class HyperliquidListener:
     Subscribe to any relevant Hyperliquid websocket streams
     See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket
     """
-    def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_state: PriceSourceState):
+    def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_state: PriceSourceState, hl_mid_state: PriceSourceState):
+        self.market_name = config.hyperliquid.market_name
         self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls
         self.asset_context_symbols = config.hyperliquid.asset_context_symbols
         self.hl_oracle_state = hl_oracle_state
         self.hl_mark_state = hl_mark_state
+        self.hl_mid_state = hl_mid_state
 
     def get_subscribe_request(self, asset):
         return {
@@ -50,6 +52,13 @@ class HyperliquidListener:
                 await ws.send(json.dumps(subscribe_request))
                 logger.info("Sent subscribe request for symbol: {} to {}", symbol,  url)
 
+            subscribe_all_mids_request = {
+                "method": "subscribe",
+                "subscription": {"type": "allMids", "dex": self.market_name}
+            }
+            await ws.send(json.dumps(subscribe_all_mids_request))
+            logger.info("Sent subscribe request for allMids for dex: {} to {}", self.market_name, url)
+
             # listen for updates
             while True:
                 try:
@@ -63,7 +72,9 @@ class HyperliquidListener:
                     elif channel == "error":
                         logger.error("Received Hyperliquid error response: {}", data)
                     elif channel == "activeAssetCtx":
-                        self.parse_hyperliquid_ws_message(data)
+                        self.parse_hyperliquid_active_asset_ctx_update(data)
+                    elif channel == "allMids":
+                        self.parse_hyperliquid_all_mids_update(data)
                     else:
                         logger.error("Received unknown channel: {}", channel)
                 except asyncio.TimeoutError:
@@ -75,13 +86,23 @@ class HyperliquidListener:
                 except Exception as e:
                     logger.error("Unexpected exception: {}", e)
 
-    def parse_hyperliquid_ws_message(self, message):
+    def parse_hyperliquid_active_asset_ctx_update(self, message):
         try:
             ctx = message["data"]["ctx"]
             symbol = message["data"]["coin"]
             now = time.time()
             self.hl_oracle_state.put(symbol, PriceUpdate(ctx["oraclePx"], now))
             self.hl_mark_state.put(symbol, PriceUpdate(ctx["markPx"], now))
-            logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", ctx["oraclePx"], ctx["markPx"])
+            logger.debug("activeAssetCtx symbol: {} oraclePx: {} markPx: {}", symbol, ctx["oraclePx"], ctx["markPx"])
+        except Exception as e:
+            logger.error("parse_hyperliquid_active_asset_ctx_update error: message: {} e: {}", message, e)
+
+    def parse_hyperliquid_all_mids_update(self, message):
+        try:
+            mids = message["data"]["mids"]
+            now = time.time()
+            for mid in mids:
+                self.hl_mid_state.put(mid, PriceUpdate(mids[mid], now))
+            logger.debug("allMids: {}", mids)
         except Exception as e:
-            logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", message, e)
+            logger.error("parse_hyperliquid_all_mids_update error: message: {} e: {}", message, e)

+ 4 - 0
apps/hip-3-pusher/src/pusher/lazer_listener.py

@@ -34,6 +34,10 @@ class LazerListener:
         }
 
     async def subscribe_all(self):
+        if not self.feed_ids:
+            logger.info("No Lazer subscriptions needed")
+            return
+
         await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))
 
     @retry(

+ 1 - 1
apps/hip-3-pusher/src/pusher/main.py

@@ -46,7 +46,7 @@ async def main():
     metrics = Metrics(config)
 
     publisher = Publisher(config, price_state, metrics)
-    hyperliquid_listener = HyperliquidListener(config, price_state.hl_oracle_state, price_state.hl_mark_state)
+    hyperliquid_listener = HyperliquidListener(config, price_state.hl_oracle_state, price_state.hl_mark_state, price_state.hl_mid_state)
     lazer_listener = LazerListener(config, price_state.lazer_state)
     hermes_listener = HermesListener(config, price_state.hermes_state)
     seda_listener = SedaListener(config, price_state.seda_state)

+ 41 - 11
apps/hip-3-pusher/src/pusher/price_state.py

@@ -3,7 +3,7 @@ from loguru import logger
 import time
 
 from pusher.config import Config, PriceSource, PriceSourceConfig, ConstantSourceConfig, SingleSourceConfig, \
-    PairSourceConfig
+    PairSourceConfig, OracleMidAverageConfig
 
 DEFAULT_STALE_PRICE_THRESHOLD_SECONDS = 5
 
@@ -17,6 +17,13 @@ class PriceUpdate:
         return now - self.timestamp
 
 
+@dataclass
+class OracleUpdate:
+    oracle: dict[str, str]
+    mark: dict[str, str]
+    external: dict[str, str]
+
+
 class PriceSourceState:
     def __init__(self, name: str):
         self.name = name
@@ -35,6 +42,7 @@ class PriceSourceState:
 class PriceState:
     HL_ORACLE = "hl_oracle"
     HL_MARK = "hl_mark"
+    HL_MID = "hl_mid"
     LAZER = "lazer"
     HERMES = "hermes"
     SEDA = "seda"
@@ -43,11 +51,13 @@ class PriceState:
     Maintain latest prices seen across listeners and publisher.
     """
     def __init__(self, config: Config):
+        self.market_name = config.hyperliquid.market_name
         self.stale_price_threshold_seconds = config.stale_price_threshold_seconds
         self.price_config = config.price
 
         self.hl_oracle_state = PriceSourceState(self.HL_ORACLE)
         self.hl_mark_state = PriceSourceState(self.HL_MARK)
+        self.hl_mid_state = PriceSourceState(self.HL_MID)
         self.lazer_state = PriceSourceState(self.LAZER)
         self.hermes_state = PriceSourceState(self.HERMES)
         self.seda_state = PriceSourceState(self.SEDA)
@@ -55,38 +65,42 @@ class PriceState:
         self.all_states = {
             self.HL_ORACLE: self.hl_oracle_state,
             self.HL_MARK: self.hl_mark_state,
+            self.HL_MID: self.hl_mid_state,
             self.LAZER: self.lazer_state,
             self.HERMES: self.hermes_state,
             self.SEDA: self.seda_state,
         }
 
-    def get_all_prices(self, market_name):
+    def get_all_prices(self) -> OracleUpdate:
         logger.debug("get_all_prices state: {}", self.all_states)
 
-        return (
-            self.get_prices(self.price_config.oracle, market_name),
-            self.get_prices(self.price_config.mark, market_name),
-            self.get_prices(self.price_config.external, market_name)
-        )
+        oracle_update = OracleUpdate({}, {}, {})
+        oracle_update.oracle = self.get_prices(self.price_config.oracle, oracle_update)
+        oracle_update.mark = self.get_prices(self.price_config.mark, oracle_update)
+        oracle_update.external = self.get_prices(self.price_config.external, oracle_update)
 
-    def get_prices(self, symbol_configs: dict[str, list[PriceSourceConfig]], market_name: str):
+        return oracle_update
+
+    def get_prices(self, symbol_configs: dict[str, list[PriceSourceConfig]], oracle_update: OracleUpdate):
         pxs = {}
         for symbol in symbol_configs:
             for source_config in symbol_configs[symbol]:
                 # find first valid price in the waterfall
-                px = self.get_price(source_config)
+                px = self.get_price(source_config, oracle_update)
                 if px is not None:
-                    pxs[f"{market_name}:{symbol}"] = px
+                    pxs[f"{self.market_name}:{symbol}"] = str(px)
                     break
         return pxs
 
-    def get_price(self, price_source_config: PriceSourceConfig):
+    def get_price(self, price_source_config: PriceSourceConfig, oracle_update: OracleUpdate):
         if isinstance(price_source_config, ConstantSourceConfig):
             return price_source_config.value
         elif isinstance(price_source_config, SingleSourceConfig):
             return self.get_price_from_single_source(price_source_config.source)
         elif isinstance(price_source_config, PairSourceConfig):
             return self.get_price_from_pair_source(price_source_config.base_source, price_source_config.quote_source)
+        elif isinstance(price_source_config, OracleMidAverageConfig):
+            return self.get_price_from_oracle_mid_average(price_source_config.symbol, oracle_update)
         else:
             raise ValueError
 
@@ -115,3 +129,19 @@ class PriceState:
             return None
 
         return str(round(float(base_price) / float(quote_price), 2))
+
+    def get_price_from_oracle_mid_average(self, symbol: str, oracle_update: OracleUpdate):
+        oracle_price = oracle_update.oracle.get(symbol)
+        if oracle_price is None:
+            return None
+
+        mid_price_update: PriceUpdate | None = self.hl_mid_state.get(symbol)
+        if mid_price_update is None:
+            logger.warning("mid price for {} is missing", symbol)
+            return None
+        time_diff = mid_price_update.time_diff(time.time())
+        if time_diff >= self.stale_price_threshold_seconds:
+            logger.warning("mid price for {} is stale by {} seconds", symbol, time_diff)
+            return None
+
+        return (float(oracle_price) + float(mid_price_update.price)) / 2.0

+ 5 - 6
apps/hip-3-pusher/src/pusher/publisher.py

@@ -69,11 +69,10 @@ class Publisher:
                 logger.exception("Publisher.publish() exception: {}", repr(e))
 
     def publish(self):
-        oracle_pxs, mark_pxs, external_perp_pxs = self.price_state.get_all_prices(self.market_name)
-        logger.debug("oracle_pxs: {}", oracle_pxs)
-        logger.debug("mark_pxs: {}", mark_pxs)
-        logger.debug("external_perp_pxs: {}", external_perp_pxs)
+        oracle_update = self.price_state.get_all_prices()
+        logger.debug("oracle_update: {}", oracle_update)
 
+        oracle_pxs, mark_pxs, external_perp_pxs = oracle_update.oracle, oracle_update.mark, oracle_update.external
         if not oracle_pxs:
             logger.error("No valid oracle prices available")
             self.metrics.no_oracle_price_counter.add(1, self.metrics_labels)
@@ -130,13 +129,13 @@ class Publisher:
         raise PushError("all push endpoints failed")
 
     def _handle_response(self, response):
-        logger.debug("publish: push response: {} {}", response, type(response))
+        logger.debug("oracle update response: {}", response)
         status = response.get("status")
         if status == "ok":
             self.metrics.successful_push_counter.add(1, self.metrics_labels)
         elif status == "err":
             self.metrics.failed_push_counter.add(1, self.metrics_labels)
-            logger.error("publish: publish error: {}", response)
+            logger.error("oracle update error response: {}", response)
 
     def _record_push_interval_metric(self):
         now = time.time()

+ 4 - 0
apps/hip-3-pusher/src/pusher/seda_listener.py

@@ -25,6 +25,10 @@ class SedaListener:
         self.seda_state = seda_state
 
     async def run(self):
+        if not self.feeds:
+            logger.info("No SEDA feeds needed")
+            return
+
         await asyncio.gather(*[self._run_single(feed_name, self.feeds[feed_name]) for feed_name in self.feeds])
 
     async def _run_single(self, feed_name: str, feed_config: SedaFeedConfig) -> None:

+ 1 - 1
apps/hip-3-pusher/uv.lock

@@ -351,7 +351,7 @@ wheels = [
 
 [[package]]
 name = "hip-3-pusher"
-version = "0.2.0"
+version = "0.2.1"
 source = { editable = "." }
 dependencies = [
     { name = "boto3" },