Переглянути джерело

feat(hip-3-pusher): Price fallback logic and staleness checks

Mike Rolish 2 місяців тому
батько
коміт
c6ead2ca4c

+ 10 - 4
apps/hip-3-pusher/config/config.toml

@@ -1,3 +1,5 @@
+stale_price_threshold_seconds = 5
+
 [hyperliquid]
 market_name = ""
 market_symbol = "BTC"
@@ -12,12 +14,16 @@ key_path = "/path/to/kms_key.txt"
 aws_region_name = "ap-northeast-1"
 
 [lazer]
-router_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"]
+lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"]
 api_key = "lazer_api_key"
 base_feed_id = 1   # BTC
+base_feed_exponent = -8
 quote_feed_id = 8  # USDT
+quote_feed_exponent = -8
 
 [hermes]
-urls = ["wss://hermes.pyth.network/ws"]
-base_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"   # BTC
-quote_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"  # USDT
+hermes_urls = ["wss://hermes.pyth.network/ws"]
+base_feed_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"   # BTC
+base_feed_exponent = -8
+quote_feed_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"  # USDT
+quote_feed_exponent = -8

+ 2 - 2
apps/hip-3-pusher/pyproject.toml

@@ -1,7 +1,7 @@
 [project]
 name = "hip-3-pusher"
-version = "0.1.0"
-description = "Add your description here"
+version = "0.1.1"
+description = "Hyperliquid HIP-3 market oracle pusher"
 readme = "README.md"
 requires-python = ">=3.13"
 dependencies = [

+ 9 - 8
apps/hip-3-pusher/src/hermes_listener.py

@@ -1,6 +1,7 @@
 import asyncio
 import json
 from loguru import logger
+import time
 import websockets
 
 from price_state import PriceState
@@ -9,18 +10,17 @@ from price_state import PriceState
 class HermesListener:
     """
     Subscribe to Hermes price updates for needed feeds.
-    TODO: Will need to handle specific conversions/factors and exponents.
     """
     def __init__(self, config, price_state: PriceState):
-        self.urls = config["hermes"]["urls"]
-        self.base_id = config["hermes"]["base_id"]
-        self.quote_id = config["hermes"]["quote_id"]
+        self.hermes_urls = config["hermes"]["hermes_urls"]
+        self.base_feed_id = config["hermes"]["base_feed_id"]
+        self.quote_feed_id = config["hermes"]["quote_feed_id"]
         self.price_state = price_state
 
     def get_subscribe_request(self):
         return {
             "type": "subscribe",
-            "ids": [self.base_id, self.quote_id],
+            "ids": [self.base_feed_id, self.quote_feed_id],
             "verbose": False,
             "binary": True,
             "allow_out_of_order": False,
@@ -28,7 +28,7 @@ class HermesListener:
         }
 
     async def subscribe_all(self):
-        await asyncio.gather(*(self.subscribe_single(url) for url in self.urls))
+        await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))
 
     async def subscribe_single(self, url):
         while True:
@@ -71,9 +71,10 @@ class HermesListener:
             expo = price_object["expo"]
             publish_time = price_object["publish_time"]
             logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time)
-            if id == self.base_id:
+            if id == self.base_feed_id:
                 self.price_state.hermes_base_price = price
-            if id == self.quote_id:
+            if id == self.quote_feed_id:
                 self.price_state.hermes_quote_price = price
+            self.price_state.latest_hermes_timestamp = time.time()
         except Exception as e:
             logger.error("parse_hermes_message error: {}", e)

+ 5 - 3
apps/hip-3-pusher/src/hyperliquid_listener.py

@@ -1,4 +1,5 @@
 from loguru import logger
+import time
 
 from hyperliquid.info import Info
 from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL
@@ -28,6 +29,7 @@ class HyperliquidListener:
         :return: None
         """
         ctx = message["data"]["ctx"]
-        self.price_state.latest_oracle_price = ctx["oraclePx"]
-        self.price_state.latest_mark_price = ctx["markPx"]
-        logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.latest_oracle_price, self.price_state.latest_mark_price)
+        self.price_state.hl_oracle_price = ctx["oraclePx"]
+        self.price_state.hl_mark_price = ctx["markPx"]
+        logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, self.price_state.hl_mark_price)
+        self.price_state.latest_hl_timestamp = time.time()

+ 6 - 5
apps/hip-3-pusher/src/lazer_listener.py

@@ -1,6 +1,7 @@
 import asyncio
 import json
 from loguru import logger
+import time
 import websockets
 
 from price_state import PriceState
@@ -9,11 +10,10 @@ from price_state import PriceState
 class LazerListener:
     """
     Subscribe to Lazer price updates for needed feeds.
-    TODO: Will need to handle specific conversions/factors and exponents.
     """
     def __init__(self, config, price_state: PriceState):
-        self.router_urls = config["lazer"]["router_urls"]
-        self.api_key = config["lazer"]["api_key"]
+        self.lazer_urls = config["lazer"]["lazer_urls"]
+        self.api_key = config["lazer"]["lazer_api_key"]
         self.base_feed_id = config["lazer"]["base_feed_id"]
         self.quote_feed_id = config["lazer"]["quote_feed_id"]
         self.price_state = price_state
@@ -32,7 +32,7 @@ class LazerListener:
         }
 
     async def subscribe_all(self):
-        await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.router_urls))
+        await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))
 
     async def subscribe_single(self, router_url):
         while True:
@@ -52,7 +52,7 @@ class LazerListener:
             subscribe_request = self.get_subscribe_request(1)
 
             await ws.send(json.dumps(subscribe_request))
-            logger.info("Sent Lazer subscribe request to {}", self.router_urls[0])
+            logger.info("Sent Lazer subscribe request to {}", self.lazer_urls[0])
 
             # listen for updates
             async for message in ws:
@@ -83,5 +83,6 @@ class LazerListener:
                     self.price_state.lazer_base_price = price
                 if feed_id == self.quote_feed_id:
                     self.price_state.lazer_quote_price = price
+            self.price_state.latest_lazer_timestamp = time.time()
         except Exception as e:
             logger.error("parse_lazer_message error: {}", e)

+ 18 - 5
apps/hip-3-pusher/src/main.py

@@ -1,6 +1,8 @@
 import argparse
 import asyncio
 from loguru import logger
+import os
+import sys
 import toml
 
 from hyperliquid_listener import HyperliquidListener
@@ -23,25 +25,36 @@ def load_config():
     return config
 
 
+def init_logging():
+    logger.remove()
+    log_level = os.getenv("LOG_LEVEL", "INFO").upper()
+    # serialize=True if we want json logging
+    logger.add(sys.stderr, level=log_level, serialize=False)
+
+
 async def main():
-    logger.info("Starting hip3-agent...")
+    init_logging()
+    logger.info("Starting hip-3-pusher...")
     config = load_config()
 
-    price_state = PriceState()
+    price_state = PriceState(config)
     publisher = Publisher(config, price_state)
     hyperliquid_listener = HyperliquidListener(config, price_state)
     lazer_listener = LazerListener(config, price_state)
     hermes_listener = HermesListener(config, price_state)
 
-    # TODO: Probably pull this out of the sdk.
+    # TODO: Probably pull this out of the sdk so we can handle reconnects.
     hyperliquid_listener.subscribe()
     await asyncio.gather(
         publisher.run(),
         lazer_listener.subscribe_all(),
         hermes_listener.subscribe_all(),
     )
-    logger.info("Exiting hip3-agent...")
+    logger.info("Exiting hip-3-pusher..")
 
 
 if __name__ == "__main__":
-    asyncio.run(main())
+    try:
+        asyncio.run(main())
+    except Exception as e:
+        logger.exception("Uncaught exception, exiting: {}", e)

+ 65 - 3
apps/hip-3-pusher/src/price_state.py

@@ -1,11 +1,73 @@
+from loguru import logger
+import time
+
+DEFAULT_STALE_PRICE_THRESHOLD_SECONDS = 5
+
+
 class PriceState:
     """
     Maintain latest prices seen across listeners and publisher.
     """
-    def __init__(self):
-        self.latest_oracle_price = None
-        self.latest_mark_price = None
+    def __init__(self, config):
+        self.stale_price_threshold_seconds = config.get("stale_price_threshold_seconds", DEFAULT_STALE_PRICE_THRESHOLD_SECONDS)
+        now = time.time()
+
+        self.hl_oracle_price = None
+        self.hl_mark_price = None
+        self.latest_hl_timestamp = now
+
         self.lazer_base_price = None
+        self.lazer_base_exponent = config["lazer"]["base_feed_exponent"]
         self.lazer_quote_price = None
+        self.lazer_quote_exponent = config["lazer"]["quote_feed_exponent"]
+        self.latest_lazer_timestamp = now
+
         self.hermes_base_price = None
+        self.hermes_base_exponent = config["hermes"]["base_feed_exponent"]
         self.hermes_quote_price = None
+        self.hermes_quote_exponent = config["hermes"]["quote_feed_exponent"]
+        self.latest_hermes_timestamp = now
+
+    def get_current_oracle_price(self):
+        now = time.time()
+        if self.hl_oracle_price:
+            time_diff = now - self.latest_hl_timestamp
+            if time_diff < self.stale_price_threshold_seconds:
+                return self.hl_oracle_price
+            else:
+                logger.error("Hyperliquid oracle price stale by {} seconds", time_diff)
+        else:
+            logger.error("Hyperliquid oracle price not received yet")
+
+        # fall back to Hermes
+        if self.hermes_base_price and self.hermes_quote_price:
+            time_diff = now - self.latest_hermes_timestamp
+            if time_diff < self.stale_price_threshold_seconds:
+                return self.get_hermes_price()
+            else:
+                logger.error("Hermes price stale by {} seconds", time_diff)
+        else:
+            logger.error("Hermes base/quote prices not received yet")
+
+        # fall back to Lazer
+        if self.lazer_base_price and self.lazer_quote_price:
+            time_diff = now - self.latest_lazer_timestamp
+            if time_diff < self.stale_price_threshold_seconds:
+                return self.get_lazer_price()
+            else:
+                logger.error("Lazer price stale by {} seconds", time_diff)
+        else:
+            logger.error("Lazer base/quote prices not received yet")
+
+        logger.error("All prices missing or stale!")
+        return None
+
+    def get_hermes_price(self):
+        base_price = float(self.hermes_base_price) / (10.0 ** -self.hermes_base_exponent)
+        quote_price = float(self.hermes_quote_price) / (10.0 ** -self.hermes_quote_exponent)
+        return str(round(base_price / quote_price, 2))
+
+    def get_lazer_price(self):
+        base_price = float(self.lazer_base_price) / (10.0 ** -self.lazer_base_exponent)
+        quote_price = float(self.lazer_quote_price) / (10.0 ** -self.lazer_quote_exponent)
+        return str(round(base_price / quote_price, 2))

+ 11 - 5
apps/hip-3-pusher/src/publisher.py

@@ -47,15 +47,21 @@ class Publisher:
     async def run(self):
         while True:
             await asyncio.sleep(self.publish_interval)
-            logger.debug("publish price_state: {}", vars(self.price_state))
 
             oracle_pxs = {}
+            oracle_px = self.price_state.get_current_oracle_price()
+            if not oracle_px:
+                logger.error("No valid oracle price available!")
+                return
+            else:
+                logger.debug("Current oracle price: {}", oracle_px)
+                oracle_pxs[self.market_symbol] = oracle_px
+
             mark_pxs = []
+            #if self.price_state.hl_mark_price:
+            #    mark_pxs.append({self.market_symbol: self.price_state.hl_mark_price})
+
             external_perp_pxs = {}
-            if self.price_state.latest_oracle_price:
-                oracle_pxs[self.market_symbol] = self.price_state.latest_oracle_price
-            if self.price_state.latest_mark_price:
-                mark_pxs.append({self.market_symbol: self.price_state.latest_mark_price})
 
             if self.enable_publish:
                 if self.enable_kms:

+ 1 - 1
apps/hip-3-pusher/uv.lock

@@ -365,7 +365,7 @@ wheels = [
 
 [[package]]
 name = "hip-3-pusher"
-version = "0.1.0"
+version = "0.1.1"
 source = { virtual = "." }
 dependencies = [
     { name = "asn1crypto" },