Преглед на файлове

feat(pyth-lazer-stk): correctly handle connection drops + other improvements (#2737)

* fix: pyth lazer js sdk

* increase version

* make resilient websocket configurable

* fix: config types

* handle onError

* fix

* fix example

* fix

* fix format

* fix typo

* fix format
Keyvan Khademi преди 5 месеца
родител
ревизия
b8ae2252e3
променени са 5 файла, в които са добавени 202 реда и са изтрити 199 реда
  1. 22 10
      lazer/sdk/js/examples/index.ts
  2. 1 1
      lazer/sdk/js/package.json
  3. 9 15
      lazer/sdk/js/src/client.ts
  4. 119 131
      lazer/sdk/js/src/socket/resilient-websocket.ts
  5. 51 42
      lazer/sdk/js/src/socket/websocket-pool.ts

+ 22 - 10
lazer/sdk/js/examples/index.ts

@@ -6,12 +6,24 @@ import { PythLazerClient } from "../src/index.js";
 // Ignore debug messages
 console.debug = () => {};
 
-const client = await PythLazerClient.create(
-  ["wss://pyth-lazer.dourolabs.app/v1/stream"],
-  "access_token",
-  3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
-  console, // Optionally log socket operations (to the console in this case.)
-);
+const client = await PythLazerClient.create({
+  urls: [
+    "wss://pyth-lazer-0.dourolabs.app/v1/stream",
+    "wss://pyth-lazer-1.dourolabs.app/v1/stream",
+  ],
+  token: "you-access-token-here", // Replace with your actual access token
+  numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4.
+  logger: console, // Optionally log socket operations (to the console in this case.)
+  onError: (error) => {
+    console.error("WebSocket error:", error);
+  },
+  // Optional configuration for resilient WebSocket connections
+  rwsConfig: {
+    heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds
+    maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds
+    logAfterRetryCount: 10, // Optional log after how many retries
+  },
+});
 
 // Read and process messages from the Lazer stream
 client.addMessageListener((message) => {
@@ -47,7 +59,7 @@ client.addAllConnectionsDownListener(() => {
 });
 
 // Create and remove one or more subscriptions on the fly
-await client.subscribe({
+client.subscribe({
   type: "subscribe",
   subscriptionId: 1,
   priceFeedIds: [1, 2],
@@ -58,7 +70,7 @@ await client.subscribe({
   parsed: false,
   jsonBinaryEncoding: "base64",
 });
-await client.subscribe({
+client.subscribe({
   type: "subscribe",
   subscriptionId: 2,
   priceFeedIds: [1, 2, 3, 4, 5],
@@ -72,6 +84,6 @@ await client.subscribe({
 
 await new Promise((resolve) => setTimeout(resolve, 10_000));
 
-await client.unsubscribe(1);
-await client.unsubscribe(2);
+client.unsubscribe(1);
+client.unsubscribe(2);
 client.shutdown();

+ 1 - 1
lazer/sdk/js/package.json

@@ -1,6 +1,6 @@
 {
   "name": "@pythnetwork/pyth-lazer-sdk",
-  "version": "0.5.1",
+  "version": "1.0.0",
   "description": "Pyth Lazer SDK",
   "publishConfig": {
     "access": "public"

+ 9 - 15
lazer/sdk/js/src/client.ts

@@ -1,9 +1,8 @@
 import WebSocket from "isomorphic-ws";
-import type { Logger } from "ts-log";
-import { dummyLogger } from "ts-log";
 
 import type { ParsedPayload, Request, Response } from "./protocol.js";
 import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js";
+import type { WebSocketPoolConfig } from "./socket/websocket-pool.js";
 import { WebSocketPool } from "./socket/websocket-pool.js";
 
 export type BinaryResponse = {
@@ -35,13 +34,8 @@ export class PythLazerClient {
    * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. The connections will round-robin across the provided URLs.
    * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
    */
-  static async create(
-    urls: string[],
-    token: string,
-    numConnections = 3,
-    logger: Logger = dummyLogger,
-  ): Promise<PythLazerClient> {
-    const wsp = await WebSocketPool.create(urls, token, numConnections, logger);
+  static async create(config: WebSocketPoolConfig): Promise<PythLazerClient> {
+    const wsp = await WebSocketPool.create(config);
     return new PythLazerClient(wsp);
   }
 
@@ -102,19 +96,19 @@ export class PythLazerClient {
     });
   }
 
-  async subscribe(request: Request): Promise<void> {
+  subscribe(request: Request) {
     if (request.type !== "subscribe") {
       throw new Error("Request must be a subscribe request");
     }
-    await this.wsp.addSubscription(request);
+    this.wsp.addSubscription(request);
   }
 
-  async unsubscribe(subscriptionId: number): Promise<void> {
-    await this.wsp.removeSubscription(subscriptionId);
+  unsubscribe(subscriptionId: number) {
+    this.wsp.removeSubscription(subscriptionId);
   }
 
-  async send(request: Request): Promise<void> {
-    await this.wsp.sendRequest(request);
+  send(request: Request) {
+    this.wsp.sendRequest(request);
   }
 
   /**

+ 119 - 131
lazer/sdk/js/src/socket/resilient-websocket.ts

@@ -3,49 +3,67 @@ import type { ClientRequestArgs } from "node:http";
 import type { ClientOptions, ErrorEvent } from "isomorphic-ws";
 import WebSocket from "isomorphic-ws";
 import type { Logger } from "ts-log";
+import { dummyLogger } from "ts-log";
 
-const HEARTBEAT_TIMEOUT_DURATION = 10_000;
-const CONNECTION_TIMEOUT = 5000;
+const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds
+const DEFAULT_MAX_RETRY_DELAY_MS = 1000; // 1 second'
+const DEFAULT_LOG_AFTER_RETRY_COUNT = 10;
 
-export class ResilientWebSocket {
+export type ResilientWebSocketConfig = {
   endpoint: string;
+  wsOptions?: ClientOptions | ClientRequestArgs | undefined;
+  logger?: Logger;
+  heartbeatTimeoutDurationMs?: number;
+  maxRetryDelayMs?: number;
+  logAfterRetryCount?: number;
+};
+
+export class ResilientWebSocket {
+  private endpoint: string;
+  private wsOptions?: ClientOptions | ClientRequestArgs | undefined;
+  private logger: Logger;
+  private heartbeatTimeoutDurationMs: number;
+  private maxRetryDelayMs: number;
+  private logAfterRetryCount: number;
+
   wsClient: undefined | WebSocket;
-  wsUserClosed: boolean;
-  private wsOptions: ClientOptions | ClientRequestArgs | undefined;
+  wsUserClosed = false;
   private wsFailedAttempts: number;
-  private heartbeatTimeout: undefined | NodeJS.Timeout;
-  private logger: undefined | Logger;
-  private connectionPromise: Promise<void> | undefined;
-  private resolveConnection: (() => void) | undefined;
-  private rejectConnection: ((error: Error) => void) | undefined;
+  private heartbeatTimeout?: NodeJS.Timeout | undefined;
+  private retryTimeout?: NodeJS.Timeout | undefined;
   private _isReconnecting = false;
 
-  get isReconnecting(): boolean {
+  isReconnecting(): boolean {
     return this._isReconnecting;
   }
 
-  get isConnected(): boolean {
+  isConnected(): this is this & { wsClient: WebSocket } {
     return this.wsClient?.readyState === WebSocket.OPEN;
   }
 
+  private shouldLogRetry() {
+    return this.wsFailedAttempts % this.logAfterRetryCount === 0;
+  }
+
   onError: (error: ErrorEvent) => void;
   onMessage: (data: WebSocket.Data) => void;
   onReconnect: () => void;
 
-  constructor(
-    endpoint: string,
-    wsOptions?: ClientOptions | ClientRequestArgs,
-    logger?: Logger,
-  ) {
-    this.endpoint = endpoint;
-    this.wsOptions = wsOptions;
-    this.logger = logger;
+  constructor(config: ResilientWebSocketConfig) {
+    this.endpoint = config.endpoint;
+    this.wsOptions = config.wsOptions;
+    this.logger = config.logger ?? dummyLogger;
+    this.heartbeatTimeoutDurationMs =
+      config.heartbeatTimeoutDurationMs ??
+      DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS;
+    this.maxRetryDelayMs = config.maxRetryDelayMs ?? DEFAULT_MAX_RETRY_DELAY_MS;
+    this.logAfterRetryCount =
+      config.logAfterRetryCount ?? DEFAULT_LOG_AFTER_RETRY_COUNT;
 
     this.wsFailedAttempts = 0;
     this.onError = (error: ErrorEvent) => {
-      this.logger?.error(error.error);
+      void error;
     };
-    this.wsUserClosed = true;
     this.onMessage = (data: WebSocket.Data): void => {
       void data;
     };
@@ -54,62 +72,67 @@ export class ResilientWebSocket {
     };
   }
 
-  async send(data: string | Buffer) {
-    this.logger?.info(`Sending message`);
-
-    await this.waitForMaybeReadyWebSocket();
+  send(data: string | Buffer) {
+    this.logger.debug(`Sending message`);
 
-    if (this.wsClient === undefined) {
-      this.logger?.error(
-        "Couldn't connect to the websocket server. Error callback is called.",
-      );
-    } else {
+    if (this.isConnected()) {
       this.wsClient.send(data);
+    } else {
+      this.logger.warn(
+        `WebSocket to ${this.endpoint} is not connected. Cannot send message.`,
+      );
     }
   }
 
-  async startWebSocket(): Promise<void> {
-    if (this.wsClient !== undefined) {
-      // If there's an existing connection attempt, wait for it
-      if (this.connectionPromise) {
-        return this.connectionPromise;
-      }
+  startWebSocket() {
+    if (this.wsUserClosed) {
+      this.logger.error(
+        "Connection was explicitly closed by user. Will not reconnect.",
+      );
       return;
     }
 
-    this.logger?.info(`Creating Web Socket client`);
+    if (this.wsClient !== undefined) {
+      this.logger.info("WebSocket client already started.");
+      return;
+    }
 
-    // Create a new promise for this connection attempt
-    this.connectionPromise = new Promise((resolve, reject) => {
-      this.resolveConnection = resolve;
-      this.rejectConnection = reject;
-    });
+    if (this.wsFailedAttempts == 0) {
+      this.logger.info(`Creating Web Socket client`);
+    }
 
-    // Set a connection timeout
-    const timeoutId = setTimeout(() => {
-      if (this.rejectConnection) {
-        this.rejectConnection(
-          new Error(`Connection timeout after ${String(CONNECTION_TIMEOUT)}ms`),
-        );
-      }
-    }, CONNECTION_TIMEOUT);
+    if (this.retryTimeout !== undefined) {
+      clearTimeout(this.retryTimeout);
+      this.retryTimeout = undefined;
+    }
 
     this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
-    this.wsUserClosed = false;
 
     this.wsClient.addEventListener("open", () => {
+      this.logger.info("WebSocket connection established");
       this.wsFailedAttempts = 0;
-      this.resetHeartbeat();
-      clearTimeout(timeoutId);
       this._isReconnecting = false;
-      this.resolveConnection?.();
+      this.resetHeartbeat();
+      this.onReconnect();
+    });
+
+    this.wsClient.addEventListener("close", (e) => {
+      if (this.wsUserClosed) {
+        this.logger.info(
+          `WebSocket connection to ${this.endpoint} closed by user`,
+        );
+      } else {
+        if (this.shouldLogRetry()) {
+          this.logger.warn(
+            `WebSocket connection to ${this.endpoint} closed unexpectedly: Code: ${e.code.toString()}`,
+          );
+        }
+        this.handleReconnect();
+      }
     });
 
     this.wsClient.addEventListener("error", (event) => {
       this.onError(event);
-      if (this.rejectConnection) {
-        this.rejectConnection(new Error("WebSocket connection failed"));
-      }
     });
 
     this.wsClient.addEventListener("message", (event) => {
@@ -117,22 +140,12 @@ export class ResilientWebSocket {
       this.onMessage(event.data);
     });
 
-    this.wsClient.addEventListener("close", () => {
-      clearTimeout(timeoutId);
-      if (this.rejectConnection) {
-        this.rejectConnection(new Error("WebSocket closed before connecting"));
-      }
-      void this.handleClose();
-    });
-
     if ("on" in this.wsClient) {
       this.wsClient.on("ping", () => {
-        this.logger?.info("Ping received");
+        this.logger.info("Ping received");
         this.resetHeartbeat();
       });
     }
-
-    return this.connectionPromise;
   }
 
   private resetHeartbeat(): void {
@@ -141,91 +154,66 @@ export class ResilientWebSocket {
     }
 
     this.heartbeatTimeout = setTimeout(() => {
-      this.logger?.warn("Connection timed out. Reconnecting...");
+      this.logger.warn("Connection timed out. Reconnecting...");
       this.wsClient?.terminate();
-      void this.restartUnexpectedClosedWebsocket();
-    }, HEARTBEAT_TIMEOUT_DURATION);
+      this.handleReconnect();
+    }, this.heartbeatTimeoutDurationMs);
   }
 
-  private async waitForMaybeReadyWebSocket(): Promise<void> {
-    let waitedTime = 0;
-    while (
-      this.wsClient !== undefined &&
-      this.wsClient.readyState !== this.wsClient.OPEN
-    ) {
-      if (waitedTime > 5000) {
-        this.wsClient.close();
-        return;
-      } else {
-        waitedTime += 10;
-        await sleep(10);
-      }
+  private handleReconnect() {
+    if (this.wsUserClosed) {
+      this.logger.info(
+        "WebSocket connection closed by user, not reconnecting.",
+      );
+      return;
     }
-  }
 
-  private async handleClose(): Promise<void> {
     if (this.heartbeatTimeout !== undefined) {
       clearTimeout(this.heartbeatTimeout);
     }
 
-    if (this.wsUserClosed) {
-      this.logger?.info("The connection has been closed successfully.");
-    } else {
-      this.wsFailedAttempts += 1;
-      this.wsClient = undefined;
-      this.connectionPromise = undefined;
-      this.resolveConnection = undefined;
-      this.rejectConnection = undefined;
-
-      const waitTime = expoBackoff(this.wsFailedAttempts);
-
-      this._isReconnecting = true;
-      this.logger?.error(
-        "Connection closed unexpectedly or because of timeout. Reconnecting after " +
-          String(waitTime) +
-          "ms.",
-      );
-
-      await sleep(waitTime);
-      await this.restartUnexpectedClosedWebsocket();
+    if (this.retryTimeout !== undefined) {
+      clearTimeout(this.retryTimeout);
     }
-  }
 
-  private async restartUnexpectedClosedWebsocket(): Promise<void> {
-    if (this.wsUserClosed) {
-      return;
-    }
+    this.wsFailedAttempts += 1;
+    this.wsClient = undefined;
 
-    await this.startWebSocket();
-    await this.waitForMaybeReadyWebSocket();
+    this._isReconnecting = true;
 
-    if (this.wsClient === undefined) {
-      this.logger?.error(
-        "Couldn't reconnect to websocket. Error callback is called.",
+    if (this.shouldLogRetry()) {
+      this.logger.error(
+        "Connection closed unexpectedly or because of timeout. Reconnecting after " +
+          String(this.retryDelayMs()) +
+          "ms.",
       );
-      return;
     }
 
-    this.onReconnect();
+    this.retryTimeout = setTimeout(() => {
+      this.startWebSocket();
+    }, this.retryDelayMs());
   }
 
   closeWebSocket(): void {
     if (this.wsClient !== undefined) {
-      const client = this.wsClient;
+      this.wsClient.close();
       this.wsClient = undefined;
-      this.connectionPromise = undefined;
-      this.resolveConnection = undefined;
-      this.rejectConnection = undefined;
-      client.close();
     }
     this.wsUserClosed = true;
   }
-}
 
-async function sleep(ms: number): Promise<void> {
-  return new Promise((resolve) => setTimeout(resolve, ms));
-}
-
-function expoBackoff(attempts: number): number {
-  return 2 ** attempts * 100;
+  /**
+   * Calculates the delay in milliseconds for exponential backoff based on the number of failed attempts.
+   *
+   * The delay increases exponentially with each attempt, starting at 20ms for the first attempt,
+   * and is capped at maxRetryDelayMs for attempts greater than or equal to 10.
+   *
+   * @returns The calculated delay in milliseconds before the next retry.
+   */
+  private retryDelayMs(): number {
+    if (this.wsFailedAttempts >= 10) {
+      return this.maxRetryDelayMs;
+    }
+    return Math.min(2 ** this.wsFailedAttempts * 10, this.maxRetryDelayMs);
+  }
 }

+ 51 - 42
lazer/sdk/js/src/socket/websocket-pool.ts

@@ -1,12 +1,23 @@
 import TTLCache from "@isaacs/ttlcache";
+import type { ErrorEvent } from "isomorphic-ws";
 import WebSocket from "isomorphic-ws";
 import type { Logger } from "ts-log";
 import { dummyLogger } from "ts-log";
 
 import type { Request, Response } from "../protocol.js";
+import type { ResilientWebSocketConfig } from "./resilient-websocket.js";
 import { ResilientWebSocket } from "./resilient-websocket.js";
 
-const DEFAULT_NUM_CONNECTIONS = 3;
+const DEFAULT_NUM_CONNECTIONS = 4;
+
+export type WebSocketPoolConfig = {
+  urls: string[];
+  token: string;
+  numConnections?: number;
+  logger?: Logger;
+  rwsConfig?: Omit<ResilientWebSocketConfig, "logger" | "endpoint">;
+  onError?: (error: ErrorEvent) => void;
+};
 
 export class WebSocketPool {
   rwsPool: ResilientWebSocket[];
@@ -17,7 +28,7 @@ export class WebSocketPool {
   private wasAllDown = true;
   private checkConnectionStatesInterval: NodeJS.Timeout;
 
-  private constructor(private readonly logger: Logger = dummyLogger) {
+  private constructor(private readonly logger: Logger) {
     this.rwsPool = [];
     this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds
     this.subscriptions = new Map();
@@ -38,32 +49,32 @@ export class WebSocketPool {
    * @param numConnections - Number of parallel WebSocket connections to maintain (default: 3)
    * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
    */
-  static async create(
-    urls: string[],
-    token: string,
-    numConnections: number = DEFAULT_NUM_CONNECTIONS,
-    logger: Logger = dummyLogger,
-  ): Promise<WebSocketPool> {
-    if (urls.length === 0) {
+  static async create(config: WebSocketPoolConfig): Promise<WebSocketPool> {
+    if (config.urls.length === 0) {
       throw new Error("No URLs provided");
     }
 
+    const logger = config.logger ?? dummyLogger;
     const pool = new WebSocketPool(logger);
-
-    // Create all websocket instances
-    const connectionPromises: Promise<void>[] = [];
+    const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS;
 
     for (let i = 0; i < numConnections; i++) {
-      const url = urls[i % urls.length];
+      const url = config.urls[i % config.urls.length];
       if (!url) {
         throw new Error(`URLs must not be null or empty`);
       }
       const wsOptions = {
+        ...config.rwsConfig?.wsOptions,
         headers: {
-          Authorization: `Bearer ${token}`,
+          Authorization: `Bearer ${config.token}`,
         },
       };
-      const rws = new ResilientWebSocket(url, wsOptions, logger);
+      const rws = new ResilientWebSocket({
+        ...config.rwsConfig,
+        endpoint: url,
+        wsOptions,
+        logger,
+      });
 
       // If a websocket client unexpectedly disconnects, ResilientWebSocket will reestablish
       // the connection and call the onReconnect callback.
@@ -73,7 +84,7 @@ export class WebSocketPool {
         }
         for (const [, request] of pool.subscriptions) {
           try {
-            void rws.send(JSON.stringify(request));
+            rws.send(JSON.stringify(request));
           } catch (error) {
             pool.logger.error(
               "Failed to resend subscription on reconnect:",
@@ -83,25 +94,25 @@ export class WebSocketPool {
         }
       };
 
+      if (config.onError) {
+        rws.onError = config.onError;
+      }
       // Handle all client messages ourselves. Dedupe before sending to registered message handlers.
       rws.onMessage = pool.dedupeHandler;
       pool.rwsPool.push(rws);
-
-      // Start the websocket and collect the promise
-      connectionPromises.push(rws.startWebSocket());
+      rws.startWebSocket();
     }
 
-    // Wait for all connections to be established
-    try {
-      await Promise.all(connectionPromises);
-    } catch (error) {
-      // If any connection fails, clean up and throw
-      pool.shutdown();
-      throw error;
+    pool.logger.info(
+      `Started WebSocketPool with ${numConnections.toString()} connections. Waiting for at least one to connect...`,
+    );
+
+    while (!pool.isAnyConnectionEstablished()) {
+      await new Promise((resolve) => setTimeout(resolve, 100));
     }
 
     pool.logger.info(
-      `Successfully established ${numConnections.toString()} redundant WebSocket connections`,
+      `At least one WebSocket connection is established. WebSocketPool is ready.`,
     );
 
     return pool;
@@ -149,33 +160,27 @@ export class WebSocketPool {
     }
   };
 
-  async sendRequest(request: Request): Promise<void> {
-    const sendPromises = this.rwsPool.map(async (rws) => {
-      try {
-        await rws.send(JSON.stringify(request));
-      } catch (error) {
-        this.logger.error("Failed to send request:", error);
-        throw error;
-      }
-    });
-    await Promise.all(sendPromises);
+  sendRequest(request: Request) {
+    for (const rws of this.rwsPool) {
+      rws.send(JSON.stringify(request));
+    }
   }
 
-  async addSubscription(request: Request): Promise<void> {
+  addSubscription(request: Request) {
     if (request.type !== "subscribe") {
       throw new Error("Request must be a subscribe request");
     }
     this.subscriptions.set(request.subscriptionId, request);
-    await this.sendRequest(request);
+    this.sendRequest(request);
   }
 
-  async removeSubscription(subscriptionId: number): Promise<void> {
+  removeSubscription(subscriptionId: number) {
     this.subscriptions.delete(subscriptionId);
     const request: Request = {
       type: "unsubscribe",
       subscriptionId,
     };
-    await this.sendRequest(request);
+    this.sendRequest(request);
   }
 
   addMessageListener(handler: (data: WebSocket.Data) => void): void {
@@ -191,7 +196,11 @@ export class WebSocketPool {
   }
 
   private areAllConnectionsDown(): boolean {
-    return this.rwsPool.every((ws) => !ws.isConnected || ws.isReconnecting);
+    return this.rwsPool.every((ws) => !ws.isConnected() || ws.isReconnecting());
+  }
+
+  private isAnyConnectionEstablished(): boolean {
+    return this.rwsPool.some((ws) => ws.isConnected());
   }
 
   private checkConnectionStates(): void {