Эх сурвалжийг харах

Better timeout and error handling

Mike Rolish 1 сар өмнө
parent
commit
4d480d15a6

+ 1 - 0
apps/hip-3-pusher/config/config.toml

@@ -8,6 +8,7 @@ market_symbol = "BTC"
 use_testnet = false
 oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt"
 publish_interval = 3.0
+publish_timeout = 5.0
 enable_publish = false
 
 [kms]

+ 1 - 0
apps/hip-3-pusher/src/pusher/config.py

@@ -35,6 +35,7 @@ class HyperliquidConfig(BaseModel):
     use_testnet: bool
     oracle_pusher_key_path: FilePath
     publish_interval: float
+    publish_timeout: float
     enable_publish: bool
 
     @model_validator(mode="after")

+ 5 - 1
apps/hip-3-pusher/src/pusher/exception.py

@@ -1,2 +1,6 @@
-class StaleConnection(Exception):
+class StaleConnectionError(Exception):
+    pass
+
+
+class PushError(Exception):
     pass

+ 3 - 3
apps/hip-3-pusher/src/pusher/hermes_listener.py

@@ -6,7 +6,7 @@ import websockets
 from tenacity import retry, retry_if_exception_type, wait_exponential
 
 from pusher.config import Config, STALE_TIMEOUT_SECONDS
-from pusher.exception import StaleConnection
+from pusher.exception import StaleConnectionError
 from pusher.price_state import PriceState, PriceUpdate
 
 
@@ -34,7 +34,7 @@ class HermesListener:
         await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))
 
     @retry(
-        retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
+        retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
         wait=wait_exponential(multiplier=1, min=1, max=10),
         reraise=True,
     )
@@ -55,7 +55,7 @@ class HermesListener:
                     data = json.loads(message)
                     self.parse_hermes_message(data)
                 except asyncio.TimeoutError:
-                    raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
+                    raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
                 except websockets.ConnectionClosed:
                     raise
                 except json.JSONDecodeError as e:

+ 3 - 3
apps/hip-3-pusher/src/pusher/hyperliquid_listener.py

@@ -6,7 +6,7 @@ from tenacity import retry, retry_if_exception_type, wait_exponential
 import time
 
 from pusher.config import Config, STALE_TIMEOUT_SECONDS
-from pusher.exception import StaleConnection
+from pusher.exception import StaleConnectionError
 from pusher.price_state import PriceState, PriceUpdate
 
 # This will be in config, but note here.
@@ -35,7 +35,7 @@ class HyperliquidListener:
         await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))
 
     @retry(
-        retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
+        retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
         wait=wait_exponential(multiplier=1, min=1, max=10),
         reraise=True,
     )
@@ -65,7 +65,7 @@ class HyperliquidListener:
                     else:
                         logger.error("Received unknown channel: {}", channel)
                 except asyncio.TimeoutError:
-                    raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
+                    raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
                 except websockets.ConnectionClosed:
                     raise
                 except json.JSONDecodeError as e:

+ 2 - 2
apps/hip-3-pusher/src/pusher/kms_signer.py

@@ -1,4 +1,3 @@
-
 import boto3
 from cryptography.hazmat.primitives import serialization
 from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
@@ -12,6 +11,7 @@ from loguru import logger
 from pathlib import Path
 
 from pusher.config import Config
+from pusher.exception import PushError
 
 SECP256K1_N_HALF = SECP256K1_N // 2
 
@@ -94,7 +94,7 @@ class KMSSigner:
             except Exception as e:
                 logger.exception("perp_deploy_set_oracle exception for endpoint: {} error: {}", exchange.base_url, repr(e))
 
-        return None
+        raise PushError("all push endpoints failed")
 
     def sign_l1_action(self, action, nonce, is_mainnet):
         hash = action_hash(action, vault_address=None, nonce=nonce, expires_after=None)

+ 3 - 3
apps/hip-3-pusher/src/pusher/lazer_listener.py

@@ -6,7 +6,7 @@ import websockets
 from tenacity import retry, retry_if_exception_type, wait_exponential
 
 from pusher.config import Config, STALE_TIMEOUT_SECONDS
-from pusher.exception import StaleConnection
+from pusher.exception import StaleConnectionError
 from pusher.price_state import PriceState, PriceUpdate
 
 
@@ -38,7 +38,7 @@ class LazerListener:
         await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))
 
     @retry(
-        retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
+        retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
         wait=wait_exponential(multiplier=1, min=1, max=10),
         reraise=True,
     )
@@ -63,7 +63,7 @@ class LazerListener:
                     data = json.loads(message)
                     self.parse_lazer_message(data)
                 except asyncio.TimeoutError:
-                    raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
+                    raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
                 except websockets.ConnectionClosed:
                     raise
                 except json.JSONDecodeError as e:

+ 28 - 23
apps/hip-3-pusher/src/pusher/publisher.py

@@ -7,6 +7,7 @@ from eth_account.signers.local import LocalAccount
 from hyperliquid.exchange import Exchange
 
 from pusher.config import Config
+from pusher.exception import PushError
 from pusher.kms_signer import KMSSigner
 from pusher.metrics import Metrics
 from pusher.price_state import PriceState
@@ -30,7 +31,10 @@ class Publisher:
             oracle_pusher_key = Path(config.hyperliquid.oracle_pusher_key_path).read_text().strip()
             oracle_account: LocalAccount = Account.from_key(oracle_pusher_key)
             logger.info("oracle pusher local pubkey: {}", oracle_account.address)
-        self.publisher_exchanges = [Exchange(wallet=oracle_account, base_url=url) for url in self.push_urls]
+        self.publisher_exchanges = [Exchange(wallet=oracle_account,
+                                             base_url=url,
+                                             timeout=config.hyperliquid.publish_timeout)
+                                    for url in self.push_urls]
         if config.kms.enable_kms:
             self.enable_kms = True
             self.kms_signer = KMSSigner(config, self.publisher_exchanges)
@@ -70,23 +74,29 @@ class Publisher:
         # TODO: "Each update can change oraclePx and markPx by at most 1%."
         # TODO: "The markPx cannot be updated such that open interest would be 10x the open interest cap."
 
-        push_response = None
         if self.enable_publish:
-            if self.enable_kms:
-                push_response = self.kms_signer.set_oracle(
-                    dex=self.market_name,
-                    oracle_pxs=oracle_pxs,
-                    all_mark_pxs=mark_pxs,
-                    external_perp_pxs=external_perp_pxs,
-                )
-            else:
-                push_response = self._send_update(
-                    oracle_pxs=oracle_pxs,
-                    all_mark_pxs=mark_pxs,
-                    external_perp_pxs=external_perp_pxs,
-                )
-
-        self._handle_response(push_response)
+            try:
+                if self.enable_kms:
+                    push_response = self.kms_signer.set_oracle(
+                        dex=self.market_name,
+                        oracle_pxs=oracle_pxs,
+                        all_mark_pxs=mark_pxs,
+                        external_perp_pxs=external_perp_pxs,
+                    )
+                else:
+                    push_response = self._send_update(
+                        oracle_pxs=oracle_pxs,
+                        all_mark_pxs=mark_pxs,
+                        external_perp_pxs=external_perp_pxs,
+                    )
+                self._handle_response(push_response)
+            except PushError:
+                logger.error("All push attempts failed")
+                self.metrics.failed_push_counter.add(1, self.metrics_labels)
+            except Exception as e:
+                logger.exception("Unexpected exception in push request: {}", repr(e))
+        else:
+            logger.debug("push disabled")
 
     def _send_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs):
         for exchange in self.publisher_exchanges:
@@ -100,14 +110,9 @@ class Publisher:
             except Exception as e:
                 logger.exception("perp_deploy_set_oracle exception for endpoint: {} error: {}", exchange.base_url, repr(e))
 
-        return None
+        raise PushError("all push endpoints failed")
 
     def _handle_response(self, response):
-        if response is None:
-            logger.error("Push API call failed")
-            self.metrics.failed_push_counter.add(1, self.metrics_labels)
-            return
-
         logger.debug("publish: push response: {} {}", response, type(response))
         status = response.get("status")
         if status == "ok":