Explorar o código

feat(lazer): Improve JS SDK reliability via redundant parallel websocket cxns (#2236)

* feat: improve js sdk reliability via redundant parallel websocket cxns

* doc: update comment

* feat: add error handler, bump ver

* feat: improve promise handling, fix eslint

* fix: eslint

* doc: fix comment
Tejas Badadare hai 10 meses
pai
achega
7cb1725be8

+ 46 - 17
lazer/sdk/js/examples/index.ts

@@ -1,13 +1,24 @@
+/* eslint-disable no-console */
+/* eslint-disable @typescript-eslint/no-empty-function */
+
 import { PythLazerClient } from "../src/index.js";
 
-/* eslint-disable no-console */
-const client = new PythLazerClient("ws://127.0.0.1:1234/v1/stream", "ctoken1");
+// Ignore debug messages
+console.debug = () => {};
+
+const client = new PythLazerClient(
+  ["wss://pyth-lazer.dourolabs.app/v1/stream"],
+  "access_token",
+  3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
+  console // Optionally log socket operations (to the console in this case.)
+);
+
 client.addMessageListener((message) => {
-  console.log("got message:", message);
+  console.info("got message:", message);
   switch (message.type) {
     case "json": {
       if (message.value.type == "streamUpdated") {
-        console.log(
+        console.info(
           "stream updated for subscription",
           message.value.subscriptionId,
           ":",
@@ -18,24 +29,42 @@ client.addMessageListener((message) => {
     }
     case "binary": {
       if ("solana" in message.value) {
-        console.log("solana message:", message.value.solana?.toString("hex"));
+        console.info("solana message:", message.value.solana?.toString("hex"));
       }
       if ("evm" in message.value) {
-        console.log("evm message:", message.value.evm?.toString("hex"));
+        console.info("evm message:", message.value.evm?.toString("hex"));
       }
       break;
     }
   }
 });
-client.ws.addEventListener("open", () => {
-  client.send({
-    type: "subscribe",
-    subscriptionId: 1,
-    priceFeedIds: [1, 2],
-    properties: ["price"],
-    chains: ["solana"],
-    deliveryFormat: "json",
-    channel: "fixed_rate@200ms",
-    jsonBinaryEncoding: "hex",
-  });
+
+// Create and remove one or more subscriptions on the fly
+await client.subscribe({
+  type: "subscribe",
+  subscriptionId: 1,
+  priceFeedIds: [1, 2],
+  properties: ["price"],
+  chains: ["solana"],
+  deliveryFormat: "binary",
+  channel: "fixed_rate@200ms",
+  parsed: false,
+  jsonBinaryEncoding: "base64",
+});
+await client.subscribe({
+  type: "subscribe",
+  subscriptionId: 2,
+  priceFeedIds: [1, 2, 3, 4, 5],
+  properties: ["price"],
+  chains: ["evm"],
+  deliveryFormat: "json",
+  channel: "fixed_rate@200ms",
+  parsed: true,
+  jsonBinaryEncoding: "hex",
 });
+
+await new Promise((resolve) => setTimeout(resolve, 10_000));
+
+await client.unsubscribe(1);
+await client.unsubscribe(2);
+client.shutdown();

+ 4 - 2
lazer/sdk/js/package.json

@@ -1,6 +1,6 @@
 {
   "name": "@pythnetwork/pyth-lazer-sdk",
-  "version": "0.1.2",
+  "version": "0.2.0",
   "description": "Pyth Lazer SDK",
   "publishConfig": {
     "access": "public"
@@ -63,6 +63,8 @@
     "@solana/buffer-layout": "^4.0.1",
     "@solana/web3.js": "^1.98.0",
     "isomorphic-ws": "^5.0.0",
-    "ws": "^8.18.0"
+    "ws": "^8.18.0",
+    "@isaacs/ttlcache": "^1.4.1",
+    "ts-log": "^2.2.7"
   }
 }

+ 46 - 23
lazer/sdk/js/src/client.ts

@@ -1,4 +1,5 @@
 import WebSocket from "isomorphic-ws";
+import { dummyLogger, type Logger } from "ts-log";
 
 import {
   BINARY_UPDATE_FORMAT_MAGIC,
@@ -9,6 +10,7 @@ import {
   type Response,
   SOLANA_FORMAT_MAGIC_BE,
 } from "./protocol.js";
+import { WebSocketPool } from "./socket/web-socket-pool.js";
 
 export type BinaryResponse = {
   subscriptionId: number;
@@ -28,52 +30,58 @@ const UINT32_NUM_BYTES = 4;
 const UINT64_NUM_BYTES = 8;
 
 export class PythLazerClient {
-  ws: WebSocket;
+  wsp: WebSocketPool;
 
-  constructor(url: string, token: string) {
-    const finalUrl = new URL(url);
-    finalUrl.searchParams.append("ACCESS_TOKEN", token);
-    this.ws = new WebSocket(finalUrl);
+  /**
+   * Creates a new PythLazerClient instance.
+   * @param urls - List of WebSocket URLs of the Pyth Lazer service
+   * @param token - The access token for authentication
+   * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream.
+   * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
+   */
+  constructor(
+    urls: string[],
+    token: string,
+    numConnections = 3,
+    logger: Logger = dummyLogger
+  ) {
+    this.wsp = new WebSocketPool(urls, token, numConnections, logger);
   }
 
   addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
-    this.ws.addEventListener("message", (event: WebSocket.MessageEvent) => {
-      if (typeof event.data == "string") {
+    this.wsp.addMessageListener((data: WebSocket.Data) => {
+      if (typeof data == "string") {
         handler({
           type: "json",
-          value: JSON.parse(event.data) as Response,
+          value: JSON.parse(data) as Response,
         });
-      } else if (Buffer.isBuffer(event.data)) {
+      } else if (Buffer.isBuffer(data)) {
         let pos = 0;
-        const magic = event.data
-          .subarray(pos, pos + UINT32_NUM_BYTES)
-          .readUint32BE();
+        const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32BE();
         pos += UINT32_NUM_BYTES;
         if (magic != BINARY_UPDATE_FORMAT_MAGIC) {
           throw new Error("binary update format magic mismatch");
         }
         // TODO: some uint64 values may not be representable as Number.
         const subscriptionId = Number(
-          event.data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE()
+          data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE()
         );
         pos += UINT64_NUM_BYTES;
 
         const value: BinaryResponse = { subscriptionId };
-        while (pos < event.data.length) {
-          const len = event.data
-            .subarray(pos, pos + UINT16_NUM_BYTES)
-            .readUint16BE();
+        while (pos < data.length) {
+          const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
           pos += UINT16_NUM_BYTES;
-          const magic = event.data
+          const magic = data
             .subarray(pos, pos + UINT32_NUM_BYTES)
             .readUint32BE();
           if (magic == EVM_FORMAT_MAGIC) {
-            value.evm = event.data.subarray(pos, pos + len);
+            value.evm = data.subarray(pos, pos + len);
           } else if (magic == SOLANA_FORMAT_MAGIC_BE) {
-            value.solana = event.data.subarray(pos, pos + len);
+            value.solana = data.subarray(pos, pos + len);
           } else if (magic == PARSED_FORMAT_MAGIC) {
             value.parsed = JSON.parse(
-              event.data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString()
+              data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString()
             ) as ParsedPayload;
           } else {
             throw new Error("unknown magic: " + magic.toString());
@@ -87,7 +95,22 @@ export class PythLazerClient {
     });
   }
 
-  send(request: Request) {
-    this.ws.send(JSON.stringify(request));
+  async subscribe(request: Request): Promise<void> {
+    if (request.type !== "subscribe") {
+      throw new Error("Request must be a subscribe request");
+    }
+    await this.wsp.addSubscription(request);
+  }
+
+  async unsubscribe(subscriptionId: number): Promise<void> {
+    await this.wsp.removeSubscription(subscriptionId);
+  }
+
+  async send(request: Request): Promise<void> {
+    await this.wsp.sendRequest(request);
+  }
+
+  shutdown(): void {
+    this.wsp.shutdown();
   }
 }

+ 195 - 0
lazer/sdk/js/src/socket/resilient-web-socket.ts

@@ -0,0 +1,195 @@
+import type { ClientRequestArgs } from "node:http";
+
+import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws";
+import type { Logger } from "ts-log";
+
+// Reconnect with expo backoff if we don't get a message or ping for 10 seconds
+const HEARTBEAT_TIMEOUT_DURATION = 10_000;
+
+/**
+ * This class wraps websocket to provide a resilient web socket client.
+ *
+ * It will reconnect if connection fails with exponential backoff. Also, it will reconnect
+ * if it receives no ping request or regular message from server within a while as indication
+ * of timeout (assuming the server sends either regularly).
+ *
+ * This class also logs events if logger is given and by replacing onError method you can handle
+ * connection errors yourself (e.g: do not retry and close the connection).
+ */
+export class ResilientWebSocket {
+  endpoint: string;
+  wsClient: undefined | WebSocket;
+  wsUserClosed: boolean;
+  private wsOptions: ClientOptions | ClientRequestArgs | undefined;
+  private wsFailedAttempts: number;
+  private heartbeatTimeout: undefined | NodeJS.Timeout;
+  private logger: undefined | Logger;
+
+  onError: (error: ErrorEvent) => void;
+  onMessage: (data: WebSocket.Data) => void;
+  onReconnect: () => void;
+  constructor(
+    endpoint: string,
+    wsOptions?: ClientOptions | ClientRequestArgs,
+    logger?: Logger
+  ) {
+    this.endpoint = endpoint;
+    this.wsOptions = wsOptions;
+    this.logger = logger;
+
+    this.wsFailedAttempts = 0;
+    this.onError = (error: ErrorEvent) => {
+      this.logger?.error(error.error);
+    };
+    this.wsUserClosed = true;
+    this.onMessage = (data: WebSocket.Data): void => {
+      void data;
+    };
+    this.onReconnect = (): void => {
+      // Empty function, can be set by the user.
+    };
+  }
+
+  async send(data: string | Buffer) {
+    this.logger?.info(`Sending message`);
+
+    await this.waitForMaybeReadyWebSocket();
+
+    if (this.wsClient === undefined) {
+      this.logger?.error(
+        "Couldn't connect to the websocket server. Error callback is called."
+      );
+    } else {
+      this.wsClient.send(data);
+    }
+  }
+
+  startWebSocket(): void {
+    if (this.wsClient !== undefined) {
+      return;
+    }
+
+    this.logger?.info(`Creating Web Socket client`);
+
+    this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
+    this.wsUserClosed = false;
+
+    this.wsClient.addEventListener("open", () => {
+      this.wsFailedAttempts = 0;
+      this.resetHeartbeat();
+    });
+
+    this.wsClient.addEventListener("error", (event) => {
+      this.onError(event);
+    });
+
+    this.wsClient.addEventListener("message", (event) => {
+      this.resetHeartbeat();
+      this.onMessage(event.data);
+    });
+
+    this.wsClient.addEventListener("close", () => {
+      void this.handleClose();
+    });
+
+    // Handle ping events if supported (Node.js only)
+    if ("on" in this.wsClient) {
+      // Ping handler is undefined in browser side
+      this.wsClient.on("ping", () => {
+        this.logger?.info("Ping received");
+        this.resetHeartbeat();
+      });
+    }
+  }
+
+  /**
+   * Reset the heartbeat timeout. This is called when we receive any message (ping or regular)
+   * from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION,
+   * we assume the connection is dead and reconnect.
+   */
+  private resetHeartbeat(): void {
+    if (this.heartbeatTimeout !== undefined) {
+      clearTimeout(this.heartbeatTimeout);
+    }
+
+    this.heartbeatTimeout = setTimeout(() => {
+      this.logger?.warn("Connection timed out. Reconnecting...");
+      this.wsClient?.terminate();
+      void this.restartUnexpectedClosedWebsocket();
+    }, HEARTBEAT_TIMEOUT_DURATION);
+  }
+
+  private async waitForMaybeReadyWebSocket(): Promise<void> {
+    let waitedTime = 0;
+    while (
+      this.wsClient !== undefined &&
+      this.wsClient.readyState !== this.wsClient.OPEN
+    ) {
+      if (waitedTime > 5000) {
+        this.wsClient.close();
+        return;
+      } else {
+        waitedTime += 10;
+        await sleep(10);
+      }
+    }
+  }
+
+  private async handleClose(): Promise<void> {
+    if (this.heartbeatTimeout !== undefined) {
+      clearTimeout(this.heartbeatTimeout);
+    }
+
+    if (this.wsUserClosed) {
+      this.logger?.info("The connection has been closed successfully.");
+    } else {
+      this.wsFailedAttempts += 1;
+      this.wsClient = undefined;
+      const waitTime = expoBackoff(this.wsFailedAttempts);
+
+      this.logger?.error(
+        "Connection closed unexpectedly or because of timeout. Reconnecting after " +
+          String(waitTime) +
+          "ms."
+      );
+
+      await sleep(waitTime);
+      await this.restartUnexpectedClosedWebsocket();
+    }
+  }
+
+  private async restartUnexpectedClosedWebsocket(): Promise<void> {
+    if (this.wsUserClosed) {
+      return;
+    }
+
+    this.startWebSocket();
+    await this.waitForMaybeReadyWebSocket();
+
+    if (this.wsClient === undefined) {
+      this.logger?.error(
+        "Couldn't reconnect to websocket. Error callback is called."
+      );
+      return;
+    }
+
+    this.onReconnect();
+  }
+
+  closeWebSocket(): void {
+    if (this.wsClient !== undefined) {
+      const client = this.wsClient;
+      this.wsClient = undefined;
+      client.close();
+    }
+    this.wsUserClosed = true;
+  }
+}
+
+async function sleep(ms: number): Promise<void> {
+  return new Promise((resolve) => setTimeout(resolve, ms));
+}
+
+function expoBackoff(attempts: number): number {
+  return 2 ** attempts * 100;
+}

+ 197 - 0
lazer/sdk/js/src/socket/web-socket-pool.ts

@@ -0,0 +1,197 @@
+import TTLCache from "@isaacs/ttlcache";
+import WebSocket from "isomorphic-ws";
+import { dummyLogger, type Logger } from "ts-log";
+
+import { ResilientWebSocket } from "./resilient-web-socket.js";
+import type { Request, Response } from "../protocol.js";
+
+// Number of redundant parallel WebSocket connections
+const DEFAULT_NUM_CONNECTIONS = 3;
+
+export class WebSocketPool {
+  rwsPool: ResilientWebSocket[];
+  private cache: TTLCache<string, boolean>;
+  private subscriptions: Map<number, Request>; // id -> subscription Request
+  private messageListeners: ((event: WebSocket.Data) => void)[];
+
+  /**
+   * Creates a new WebSocketPool instance that uses multiple redundant WebSocket connections for reliability.
+   * Usage semantics are similar to using a regular WebSocket client.
+   * @param urls - List of WebSocket URLs to connect to
+   * @param token - Authentication token to use for the connections
+   * @param numConnections - Number of parallel WebSocket connections to maintain (default: 3)
+   * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
+   */
+  constructor(
+    urls: string[],
+    token: string,
+    numConnections: number = DEFAULT_NUM_CONNECTIONS,
+    private readonly logger: Logger = dummyLogger
+  ) {
+    if (urls.length === 0) {
+      throw new Error("No URLs provided");
+    }
+    // This cache is used to deduplicate messages received across different websocket clients in the pool.
+    // A TTL cache is used to prevent unbounded memory usage. A very short TTL of 10 seconds is chosen since
+    // deduplication only needs to happen between messages received very close together in time.
+    this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds
+    this.rwsPool = [];
+    this.subscriptions = new Map();
+    this.messageListeners = [];
+    for (let i = 0; i < numConnections; i++) {
+      const url = urls[i % urls.length];
+      if (!url) {
+        throw new Error(`URLs must not be null or empty`);
+      }
+      const wsOptions = {
+        headers: {
+          Authorization: `Bearer ${token}`,
+        },
+      };
+      const rws = new ResilientWebSocket(url, wsOptions, logger);
+
+      // If a websocket client unexpectedly disconnects, ResilientWebSocket will reestablish
+      // the connection and call the onReconnect callback.
+      // When we reconnect, replay all subscription messages to resume the data stream.
+      rws.onReconnect = () => {
+        if (rws.wsUserClosed) {
+          return;
+        }
+        for (const [, request] of this.subscriptions) {
+          try {
+            void rws.send(JSON.stringify(request));
+          } catch (error) {
+            this.logger.error(
+              "Failed to resend subscription on reconnect:",
+              error
+            );
+          }
+        }
+      };
+      // Handle all client messages ourselves. Dedupe before sending to registered message handlers.
+      rws.onMessage = this.dedupeHandler;
+      this.rwsPool.push(rws);
+    }
+
+    // Let it rip
+    // TODO: wait for sockets to receive `open` msg before subscribing?
+    for (const rws of this.rwsPool) {
+      rws.startWebSocket();
+    }
+
+    this.logger.info(
+      `Using ${numConnections.toString()} redundant WebSocket connections`
+    );
+  }
+
+  /**
+   * Checks for error responses in JSON messages and throws appropriate errors
+   */
+  private handleErrorMessages(data: string): void {
+    const message = JSON.parse(data) as Response;
+    if (message.type === "subscriptionError") {
+      throw new Error(
+        `Error occurred for subscription ID ${String(
+          message.subscriptionId
+        )}: ${message.error}`
+      );
+    } else if (message.type === "error") {
+      throw new Error(`Error: ${message.error}`);
+    }
+  }
+
+  /**
+   * Handles incoming websocket messages by deduplicating identical messages received across
+   * multiple connections before forwarding to registered handlers
+   */
+  dedupeHandler = (data: WebSocket.Data): void => {
+    // For string data, use the whole string as the cache key. This avoids expensive JSON parsing during deduping.
+    // For binary data, use the hex string representation as the cache key
+    const cacheKey =
+      typeof data === "string"
+        ? data
+        : Buffer.from(data as Buffer).toString("hex");
+
+    // If we've seen this exact message recently, drop it
+    if (this.cache.has(cacheKey)) {
+      this.logger.debug("Dropping duplicate message");
+      return;
+    }
+
+    // Haven't seen this message, cache it and forward to handlers
+    this.cache.set(cacheKey, true);
+
+    // Check for errors in JSON responses
+    if (typeof data === "string") {
+      this.handleErrorMessages(data);
+    }
+
+    for (const handler of this.messageListeners) {
+      handler(data);
+    }
+  };
+
+  /**
+   * Sends a message to all websockets in the pool
+   * @param request - The request to send
+   */
+  async sendRequest(request: Request): Promise<void> {
+    // Send to all websockets in the pool
+    const sendPromises = this.rwsPool.map(async (rws) => {
+      try {
+        await rws.send(JSON.stringify(request));
+      } catch (error) {
+        this.logger.error("Failed to send request:", error);
+        throw error; // Re-throw the error
+      }
+    });
+    await Promise.all(sendPromises);
+  }
+
+  /**
+   * Adds a subscription by sending a subscribe request to all websockets in the pool
+   * and storing it for replay on reconnection
+   * @param request - The subscription request to send
+   */
+  async addSubscription(request: Request): Promise<void> {
+    if (request.type !== "subscribe") {
+      throw new Error("Request must be a subscribe request");
+    }
+    this.subscriptions.set(request.subscriptionId, request);
+    await this.sendRequest(request);
+  }
+
+  /**
+   * Removes a subscription by sending an unsubscribe request to all websockets in the pool
+   * and removing it from stored subscriptions
+   * @param subscriptionId - The ID of the subscription to remove
+   */
+  async removeSubscription(subscriptionId: number): Promise<void> {
+    this.subscriptions.delete(subscriptionId);
+    const request: Request = {
+      type: "unsubscribe",
+      subscriptionId,
+    };
+    await this.sendRequest(request);
+  }
+
+  /**
+   * Adds a message handler function to receive websocket messages
+   * @param handler - Function that will be called with each received message
+   */
+  addMessageListener(handler: (data: WebSocket.Data) => void): void {
+    this.messageListeners.push(handler);
+  }
+
+  /**
+   * Elegantly closes all websocket connections in the pool
+   */
+  shutdown(): void {
+    for (const rws of this.rwsPool) {
+      rws.closeWebSocket();
+    }
+    this.rwsPool = [];
+    this.subscriptions.clear();
+    this.messageListeners = [];
+  }
+}

+ 15 - 4
pnpm-lock.yaml

@@ -1525,6 +1525,9 @@ importers:
 
   lazer/sdk/js:
     dependencies:
+      '@isaacs/ttlcache':
+        specifier: ^1.4.1
+        version: 1.4.1
       '@solana/buffer-layout':
         specifier: ^4.0.1
         version: 4.0.1
@@ -1534,6 +1537,9 @@ importers:
       isomorphic-ws:
         specifier: ^5.0.0
         version: 5.0.0(ws@8.18.0(bufferutil@4.0.8)(utf-8-validate@5.0.10))
+      ts-log:
+        specifier: ^2.2.7
+        version: 2.2.7
       ws:
         specifier: ^8.18.0
         version: 8.18.0(bufferutil@4.0.8)(utf-8-validate@5.0.10)
@@ -21667,6 +21673,9 @@ packages:
   ts-log@2.2.5:
     resolution: {integrity: sha512-PGcnJoTBnVGy6yYNFxWVNkdcAuAMstvutN9MgDJIV6L0oG8fB+ZNNy1T+wJzah8RPGor1mZuPQkVfXNDpy9eHA==}
 
+  ts-log@2.2.7:
+    resolution: {integrity: sha512-320x5Ggei84AxzlXp91QkIGSw5wgaLT6GeAH0KsqDmRZdVWW2OiSeVvElVoatk3f7nicwXlElXsoFkARiGE2yg==}
+
   ts-mixer@6.0.4:
     resolution: {integrity: sha512-ufKpbmrugz5Aou4wcr5Wc1UUFWOLhq+Fm6qa6P0w0K5Qw2yhaUoiWszhCVuNQyNwrlGiscHOmqYoAox1PtvgjA==}
 
@@ -33901,7 +33910,7 @@ snapshots:
       axios: 1.7.2
       axios-retry: 3.9.1
       isomorphic-ws: 4.0.1(ws@8.18.0(bufferutil@4.0.8)(utf-8-validate@6.0.4))
-      ts-log: 2.2.5
+      ts-log: 2.2.7
       ws: 8.18.0(bufferutil@4.0.8)(utf-8-validate@6.0.4)
     transitivePeerDependencies:
       - bufferutil
@@ -35972,13 +35981,13 @@ snapshots:
     dependencies:
       '@noble/hashes': 1.1.5
       '@noble/secp256k1': 1.6.3
-      '@scure/base': 1.1.7
+      '@scure/base': 1.1.9
 
   '@scure/bip32@1.1.5':
     dependencies:
       '@noble/hashes': 1.2.0
       '@noble/secp256k1': 1.7.1
-      '@scure/base': 1.1.7
+      '@scure/base': 1.1.9
 
   '@scure/bip32@1.3.2':
     dependencies:
@@ -36012,7 +36021,7 @@ snapshots:
   '@scure/bip39@1.1.1':
     dependencies:
       '@noble/hashes': 1.2.0
-      '@scure/base': 1.1.7
+      '@scure/base': 1.1.9
 
   '@scure/bip39@1.2.1':
     dependencies:
@@ -58744,6 +58753,8 @@ snapshots:
 
   ts-log@2.2.5: {}
 
+  ts-log@2.2.7: {}
+
   ts-mixer@6.0.4: {}
 
   ts-mocha@10.0.0(mocha@9.2.2):