Browse Source

fix(apps/price_pusher): fix bug causing price_pusher to hand when invalid price feed ids are passed in to hermes ws (#2297)

* fix

* add cleanup
Daniel Chew 10 tháng trước cách đây
mục cha
commit
52c2def74d

+ 2 - 1
apps/price_pusher/package.json

@@ -1,6 +1,6 @@
 {
   "name": "@pythnetwork/price-pusher",
-  "version": "8.3.2",
+  "version": "8.3.3",
   "description": "Pyth Price Pusher",
   "homepage": "https://pyth.network",
   "main": "lib/index.js",
@@ -24,6 +24,7 @@
     "format": "prettier --write \"src/**/*.ts\"",
     "test:lint": "eslint src/",
     "start": "node lib/index.js",
+    "test": "jest",
     "dev": "ts-node src/index.ts",
     "prepublishOnly": "pnpm run build && pnpm run test:lint",
     "preversion": "pnpm run test:lint",

+ 101 - 0
apps/price_pusher/src/__tests__/pyth-price-listener.test.ts

@@ -0,0 +1,101 @@
+import { PythPriceListener } from "../pyth-price-listener";
+import { PriceServiceConnection } from "@pythnetwork/price-service-client";
+import { Logger } from "pino";
+
+describe("PythPriceListener", () => {
+  let logger: Logger;
+  let connection: PriceServiceConnection;
+  let listener: PythPriceListener;
+  let originalConsoleError: typeof console.error;
+
+  beforeEach(() => {
+    // Save original console.error and mock it
+    originalConsoleError = console.error;
+    console.error = jest.fn();
+
+    logger = {
+      debug: jest.fn(),
+      error: jest.fn(),
+      info: jest.fn(),
+    } as unknown as Logger;
+
+    // Use real Hermes beta endpoint for testing
+    connection = new PriceServiceConnection("https://hermes.pyth.network");
+  });
+
+  afterEach(() => {
+    // Clean up websocket connection
+    connection.closeWebSocket();
+    // Clean up health check interval
+    if (listener) {
+      listener.cleanup();
+    }
+    // Restore original console.error
+    console.error = originalConsoleError;
+  });
+
+  it("should handle invalid price feeds gracefully", async () => {
+    const validFeedId =
+      "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"; // BTC/USD
+    const invalidFeedId =
+      "0000000000000000000000000000000000000000000000000000000000000000";
+
+    const priceItems = [
+      { id: validFeedId, alias: "BTC/USD" },
+      { id: invalidFeedId, alias: "INVALID/PRICE" },
+    ];
+
+    listener = new PythPriceListener(connection, priceItems, logger);
+
+    await listener.start();
+
+    // Wait for both error handlers to complete
+    await new Promise((resolve) => {
+      const checkInterval = setInterval(() => {
+        const errorCalls = (logger.error as jest.Mock).mock.calls;
+
+        // Check for both HTTP and websocket error logs
+        const hasHttpError = errorCalls.some(
+          (call) => call[0] === "Failed to get latest price feeds:"
+        );
+        const hasGetLatestError = errorCalls.some((call) =>
+          call[0].includes("not found for getLatestPriceFeeds")
+        );
+        const hasWsError = errorCalls.some((call) =>
+          call[0].includes("not found for subscribePriceFeedUpdates")
+        );
+
+        if (hasHttpError && hasGetLatestError && hasWsError) {
+          clearInterval(checkInterval);
+          resolve(true);
+        }
+      }, 100);
+    });
+
+    // Verify HTTP error was logged
+    expect(logger.error).toHaveBeenCalledWith(
+      "Failed to get latest price feeds:",
+      expect.objectContaining({
+        message: "Request failed with status code 404",
+      })
+    );
+
+    // Verify invalid feed error was logged
+    expect(logger.error).toHaveBeenCalledWith(
+      `Price feed ${invalidFeedId} (INVALID/PRICE) not found for getLatestPriceFeeds`
+    );
+
+    // Verify invalid feed error was logged
+    expect(logger.error).toHaveBeenCalledWith(
+      `Price feed ${invalidFeedId} (INVALID/PRICE) not found for subscribePriceFeedUpdates`
+    );
+
+    // Verify resubscription message was logged
+    expect(logger.info).toHaveBeenCalledWith(
+      "Resubscribing with valid feeds only"
+    );
+
+    // Verify priceIds was updated to only include valid feeds
+    expect(listener["priceIds"]).toEqual([validFeedId]);
+  });
+});

+ 105 - 13
apps/price_pusher/src/pyth-price-listener.ts

@@ -15,6 +15,7 @@ export class PythPriceListener implements IPriceListener {
   private latestPriceInfo: Map<HexString, PriceInfo>;
   private logger: Logger;
   private lastUpdated: TimestampInMs | undefined;
+  private healthCheckInterval?: NodeJS.Timeout;
 
   constructor(
     connection: PriceServiceConnection,
@@ -33,26 +34,111 @@ export class PythPriceListener implements IPriceListener {
   // This method should be awaited on and once it finishes it has the latest value
   // for the given price feeds (if they exist).
   async start() {
+    // Set custom error handler for websocket errors
+    this.connection.onWsError = (error: Error) => {
+      if (error.message.includes("not found")) {
+        // Extract invalid feed IDs from error message
+        const match = error.message.match(/\[(.*?)\]/);
+        if (match) {
+          const invalidFeedIds = match[1].split(",").map((id) => {
+            // Remove '0x' prefix if present to match our stored IDs
+            return id.trim().replace(/^0x/, "");
+          });
+
+          // Log invalid feeds with their aliases
+          invalidFeedIds.forEach((id) => {
+            this.logger.error(
+              `Price feed ${id} (${this.priceIdToAlias.get(
+                id
+              )}) not found for subscribePriceFeedUpdates`
+            );
+          });
+
+          // Filter out invalid feeds and resubscribe with valid ones
+          const validFeeds = this.priceIds.filter(
+            (id) => !invalidFeedIds.includes(id)
+          );
+
+          this.priceIds = validFeeds;
+
+          if (validFeeds.length > 0) {
+            this.logger.info("Resubscribing with valid feeds only");
+            this.connection.subscribePriceFeedUpdates(
+              validFeeds,
+              this.onNewPriceFeed.bind(this)
+            );
+          }
+        }
+      } else {
+        this.logger.error("Websocket error occurred:", error);
+      }
+    };
+
     this.connection.subscribePriceFeedUpdates(
       this.priceIds,
       this.onNewPriceFeed.bind(this)
     );
 
-    const priceFeeds = await this.connection.getLatestPriceFeeds(this.priceIds);
-    priceFeeds?.forEach((priceFeed) => {
-      // Getting unchecked because although it might be old
-      // but might not be there on the target chain.
-      const latestAvailablePrice = priceFeed.getPriceUnchecked();
-      this.latestPriceInfo.set(priceFeed.id, {
-        price: latestAvailablePrice.price,
-        conf: latestAvailablePrice.conf,
-        publishTime: latestAvailablePrice.publishTime,
+    try {
+      const priceFeeds = await this.connection.getLatestPriceFeeds(
+        this.priceIds
+      );
+      priceFeeds?.forEach((priceFeed) => {
+        const latestAvailablePrice = priceFeed.getPriceUnchecked();
+        this.latestPriceInfo.set(priceFeed.id, {
+          price: latestAvailablePrice.price,
+          conf: latestAvailablePrice.conf,
+          publishTime: latestAvailablePrice.publishTime,
+        });
       });
-    });
+    } catch (error: any) {
+      // Always log the HTTP error first
+      this.logger.error("Failed to get latest price feeds:", error);
+
+      if (error.response.data.includes("Price ids not found:")) {
+        // Extract invalid feed IDs from error message
+        const invalidFeedIds = error.response.data
+          .split("Price ids not found:")[1]
+          .split(",")
+          .map((id: string) => id.trim().replace(/^0x/, ""));
+
+        // Log invalid feeds with their aliases
+        invalidFeedIds.forEach((id: string) => {
+          this.logger.error(
+            `Price feed ${id} (${this.priceIdToAlias.get(
+              id
+            )}) not found for getLatestPriceFeeds`
+          );
+        });
 
-    // Check health of the price feeds 5 second. If the price feeds are not updating
-    // for more than 30s, throw an error.
-    setInterval(() => {
+        // Filter out invalid feeds and retry
+        const validFeeds = this.priceIds.filter(
+          (id) => !invalidFeedIds.includes(id)
+        );
+
+        this.priceIds = validFeeds;
+
+        if (validFeeds.length > 0) {
+          this.logger.info(
+            "Retrying getLatestPriceFeeds with valid feeds only"
+          );
+          const validPriceFeeds = await this.connection.getLatestPriceFeeds(
+            validFeeds
+          );
+          validPriceFeeds?.forEach((priceFeed) => {
+            const latestAvailablePrice = priceFeed.getPriceUnchecked();
+            this.latestPriceInfo.set(priceFeed.id, {
+              price: latestAvailablePrice.price,
+              conf: latestAvailablePrice.conf,
+              publishTime: latestAvailablePrice.publishTime,
+            });
+          });
+        }
+      }
+    }
+
+    // Store health check interval reference
+    this.healthCheckInterval = setInterval(() => {
       if (
         this.lastUpdated === undefined ||
         this.lastUpdated < Date.now() - 30 * 1000
@@ -88,4 +174,10 @@ export class PythPriceListener implements IPriceListener {
   getLatestPriceInfo(priceId: string): PriceInfo | undefined {
     return this.latestPriceInfo.get(priceId);
   }
+
+  cleanup() {
+    if (this.healthCheckInterval) {
+      clearInterval(this.healthCheckInterval);
+    }
+  }
 }