瀏覽代碼

feat(lazer/js-sdk): add promises for open and disconnected (#2254)

* feat: add promise for connection open

* feat: add promise for all connections down

* refactor: rename socket files

* fix: lint

* feat: bump ver

* feat: better interface for allConnectionsDown events

* fix: docs

* fix: naming
Tejas Badadare 10 月之前
父節點
當前提交
1c6530498e

+ 8 - 1
lazer/sdk/js/examples/index.ts

@@ -6,13 +6,14 @@ import { PythLazerClient } from "../src/index.js";
 // Ignore debug messages
 console.debug = () => {};
 
-const client = new PythLazerClient(
+const client = await PythLazerClient.create(
   ["wss://pyth-lazer.dourolabs.app/v1/stream"],
   "access_token",
   3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
   console // Optionally log socket operations (to the console in this case.)
 );
 
+// Read and process messages from the Lazer stream
 client.addMessageListener((message) => {
   console.info("got message:", message);
   switch (message.type) {
@@ -39,6 +40,12 @@ client.addMessageListener((message) => {
   }
 });
 
+// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down)
+// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown().
+client.addAllConnectionsDownListener(() => {
+  console.error("All connections are down!");
+});
+
 // Create and remove one or more subscriptions on the fly
 await client.subscribe({
   type: "subscribe",

+ 1 - 1
lazer/sdk/js/package.json

@@ -1,6 +1,6 @@
 {
   "name": "@pythnetwork/pyth-lazer-sdk",
-  "version": "0.2.1",
+  "version": "0.3.0",
   "description": "Pyth Lazer SDK",
   "publishConfig": {
     "access": "public"

+ 22 - 6
lazer/sdk/js/src/client.ts

@@ -10,7 +10,7 @@ import {
   type Response,
   SOLANA_FORMAT_MAGIC_BE,
 } from "./protocol.js";
-import { WebSocketPool } from "./socket/web-socket-pool.js";
+import { WebSocketPool } from "./socket/websocket-pool.js";
 
 export type BinaryResponse = {
   subscriptionId: number;
@@ -30,24 +30,31 @@ const UINT32_NUM_BYTES = 4;
 const UINT64_NUM_BYTES = 8;
 
 export class PythLazerClient {
-  wsp: WebSocketPool;
+  private constructor(private readonly wsp: WebSocketPool) {}
 
   /**
    * Creates a new PythLazerClient instance.
    * @param urls - List of WebSocket URLs of the Pyth Lazer service
    * @param token - The access token for authentication
-   * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream.
+   * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. The connections will round-robin across the provided URLs.
    * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
    */
-  constructor(
+  static async create(
     urls: string[],
     token: string,
     numConnections = 3,
     logger: Logger = dummyLogger
-  ) {
-    this.wsp = new WebSocketPool(urls, token, numConnections, logger);
+  ): Promise<PythLazerClient> {
+    const wsp = await WebSocketPool.create(urls, token, numConnections, logger);
+    return new PythLazerClient(wsp);
   }
 
+  /**
+   * Adds a message listener that receives either JSON or binary responses from the WebSocket connections.
+   * The listener will be called for each message received, with deduplication across redundant connections.
+   * @param handler - Callback function that receives the parsed message. The message can be either a JSON response
+   * or a binary response containing EVM, Solana, or parsed payload data.
+   */
   addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
     this.wsp.addMessageListener((data: WebSocket.Data) => {
       if (typeof data == "string") {
@@ -110,6 +117,15 @@ export class PythLazerClient {
     await this.wsp.sendRequest(request);
   }
 
+  /**
+   * Registers a handler function that will be called whenever all WebSocket connections are down or attempting to reconnect.
+   * The connections may still try to reconnect in the background. To shut down the pool, call `shutdown()`.
+   * @param handler - Function to be called when all connections are down
+   */
+  addAllConnectionsDownListener(handler: () => void): void {
+    this.wsp.addAllConnectionsDownListener(handler);
+  }
+
   shutdown(): void {
     this.wsp.shutdown();
   }

+ 55 - 20
lazer/sdk/js/src/socket/resilient-web-socket.ts → lazer/sdk/js/src/socket/resilient-websocket.ts

@@ -3,19 +3,9 @@ import type { ClientRequestArgs } from "node:http";
 import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws";
 import type { Logger } from "ts-log";
 
-// Reconnect with expo backoff if we don't get a message or ping for 10 seconds
 const HEARTBEAT_TIMEOUT_DURATION = 10_000;
+const CONNECTION_TIMEOUT = 5000;
 
-/**
- * This class wraps websocket to provide a resilient web socket client.
- *
- * It will reconnect if connection fails with exponential backoff. Also, it will reconnect
- * if it receives no ping request or regular message from server within a while as indication
- * of timeout (assuming the server sends either regularly).
- *
- * This class also logs events if logger is given and by replacing onError method you can handle
- * connection errors yourself (e.g: do not retry and close the connection).
- */
 export class ResilientWebSocket {
   endpoint: string;
   wsClient: undefined | WebSocket;
@@ -24,10 +14,23 @@ export class ResilientWebSocket {
   private wsFailedAttempts: number;
   private heartbeatTimeout: undefined | NodeJS.Timeout;
   private logger: undefined | Logger;
+  private connectionPromise: Promise<void> | undefined;
+  private resolveConnection: (() => void) | undefined;
+  private rejectConnection: ((error: Error) => void) | undefined;
+  private _isReconnecting = false;
+
+  get isReconnecting(): boolean {
+    return this._isReconnecting;
+  }
+
+  get isConnected(): boolean {
+    return this.wsClient?.readyState === WebSocket.OPEN;
+  }
 
   onError: (error: ErrorEvent) => void;
   onMessage: (data: WebSocket.Data) => void;
   onReconnect: () => void;
+
   constructor(
     endpoint: string,
     wsOptions?: ClientOptions | ClientRequestArgs,
@@ -64,23 +67,48 @@ export class ResilientWebSocket {
     }
   }
 
-  startWebSocket(): void {
+  async startWebSocket(): Promise<void> {
     if (this.wsClient !== undefined) {
+      // If there's an existing connection attempt, wait for it
+      if (this.connectionPromise) {
+        return this.connectionPromise;
+      }
       return;
     }
 
     this.logger?.info(`Creating Web Socket client`);
 
+    // Create a new promise for this connection attempt
+    this.connectionPromise = new Promise((resolve, reject) => {
+      this.resolveConnection = resolve;
+      this.rejectConnection = reject;
+    });
+
+    // Set a connection timeout
+    const timeoutId = setTimeout(() => {
+      if (this.rejectConnection) {
+        this.rejectConnection(
+          new Error(`Connection timeout after ${String(CONNECTION_TIMEOUT)}ms`)
+        );
+      }
+    }, CONNECTION_TIMEOUT);
+
     this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
     this.wsUserClosed = false;
 
     this.wsClient.addEventListener("open", () => {
       this.wsFailedAttempts = 0;
       this.resetHeartbeat();
+      clearTimeout(timeoutId);
+      this._isReconnecting = false;
+      this.resolveConnection?.();
     });
 
     this.wsClient.addEventListener("error", (event) => {
       this.onError(event);
+      if (this.rejectConnection) {
+        this.rejectConnection(new Error("WebSocket connection failed"));
+      }
     });
 
     this.wsClient.addEventListener("message", (event) => {
@@ -89,24 +117,23 @@ export class ResilientWebSocket {
     });
 
     this.wsClient.addEventListener("close", () => {
+      clearTimeout(timeoutId);
+      if (this.rejectConnection) {
+        this.rejectConnection(new Error("WebSocket closed before connecting"));
+      }
       void this.handleClose();
     });
 
-    // Handle ping events if supported (Node.js only)
     if ("on" in this.wsClient) {
-      // Ping handler is undefined in browser side
       this.wsClient.on("ping", () => {
         this.logger?.info("Ping received");
         this.resetHeartbeat();
       });
     }
+
+    return this.connectionPromise;
   }
 
-  /**
-   * Reset the heartbeat timeout. This is called when we receive any message (ping or regular)
-   * from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION,
-   * we assume the connection is dead and reconnect.
-   */
   private resetHeartbeat(): void {
     if (this.heartbeatTimeout !== undefined) {
       clearTimeout(this.heartbeatTimeout);
@@ -145,8 +172,13 @@ export class ResilientWebSocket {
     } else {
       this.wsFailedAttempts += 1;
       this.wsClient = undefined;
+      this.connectionPromise = undefined;
+      this.resolveConnection = undefined;
+      this.rejectConnection = undefined;
+
       const waitTime = expoBackoff(this.wsFailedAttempts);
 
+      this._isReconnecting = true;
       this.logger?.error(
         "Connection closed unexpectedly or because of timeout. Reconnecting after " +
           String(waitTime) +
@@ -163,7 +195,7 @@ export class ResilientWebSocket {
       return;
     }
 
-    this.startWebSocket();
+    await this.startWebSocket();
     await this.waitForMaybeReadyWebSocket();
 
     if (this.wsClient === undefined) {
@@ -180,6 +212,9 @@ export class ResilientWebSocket {
     if (this.wsClient !== undefined) {
       const client = this.wsClient;
       this.wsClient = undefined;
+      this.connectionPromise = undefined;
+      this.resolveConnection = undefined;
+      this.rejectConnection = undefined;
       client.close();
     }
     this.wsUserClosed = true;

+ 74 - 49
lazer/sdk/js/src/socket/web-socket-pool.ts → lazer/sdk/js/src/socket/websocket-pool.ts

@@ -2,10 +2,9 @@ import TTLCache from "@isaacs/ttlcache";
 import WebSocket from "isomorphic-ws";
 import { dummyLogger, type Logger } from "ts-log";
 
-import { ResilientWebSocket } from "./resilient-web-socket.js";
+import { ResilientWebSocket } from "./resilient-websocket.js";
 import type { Request, Response } from "../protocol.js";
 
-// Number of redundant parallel WebSocket connections
 const DEFAULT_NUM_CONNECTIONS = 3;
 
 export class WebSocketPool {
@@ -13,6 +12,21 @@ export class WebSocketPool {
   private cache: TTLCache<string, boolean>;
   private subscriptions: Map<number, Request>; // id -> subscription Request
   private messageListeners: ((event: WebSocket.Data) => void)[];
+  private allConnectionsDownListeners: (() => void)[];
+  private wasAllDown = true;
+
+  private constructor(private readonly logger: Logger = dummyLogger) {
+    this.rwsPool = [];
+    this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds
+    this.subscriptions = new Map();
+    this.messageListeners = [];
+    this.allConnectionsDownListeners = [];
+
+    // Start monitoring connection states
+    setInterval(() => {
+      this.checkConnectionStates();
+    }, 100);
+  }
 
   /**
    * Creates a new WebSocketPool instance that uses multiple redundant WebSocket connections for reliability.
@@ -22,22 +36,21 @@ export class WebSocketPool {
    * @param numConnections - Number of parallel WebSocket connections to maintain (default: 3)
    * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
    */
-  constructor(
+  static async create(
     urls: string[],
     token: string,
     numConnections: number = DEFAULT_NUM_CONNECTIONS,
-    private readonly logger: Logger = dummyLogger
-  ) {
+    logger: Logger = dummyLogger
+  ): Promise<WebSocketPool> {
     if (urls.length === 0) {
       throw new Error("No URLs provided");
     }
-    // This cache is used to deduplicate messages received across different websocket clients in the pool.
-    // A TTL cache is used to prevent unbounded memory usage. A very short TTL of 10 seconds is chosen since
-    // deduplication only needs to happen between messages received very close together in time.
-    this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds
-    this.rwsPool = [];
-    this.subscriptions = new Map();
-    this.messageListeners = [];
+
+    const pool = new WebSocketPool(logger);
+
+    // Create all websocket instances
+    const connectionPromises: Promise<void>[] = [];
+
     for (let i = 0; i < numConnections; i++) {
       const url = urls[i % urls.length];
       if (!url) {
@@ -52,36 +65,44 @@ export class WebSocketPool {
 
       // If a websocket client unexpectedly disconnects, ResilientWebSocket will reestablish
       // the connection and call the onReconnect callback.
-      // When we reconnect, replay all subscription messages to resume the data stream.
       rws.onReconnect = () => {
         if (rws.wsUserClosed) {
           return;
         }
-        for (const [, request] of this.subscriptions) {
+        for (const [, request] of pool.subscriptions) {
           try {
             void rws.send(JSON.stringify(request));
           } catch (error) {
-            this.logger.error(
+            pool.logger.error(
               "Failed to resend subscription on reconnect:",
               error
             );
           }
         }
       };
+
       // Handle all client messages ourselves. Dedupe before sending to registered message handlers.
-      rws.onMessage = this.dedupeHandler;
-      this.rwsPool.push(rws);
+      rws.onMessage = pool.dedupeHandler;
+      pool.rwsPool.push(rws);
+
+      // Start the websocket and collect the promise
+      connectionPromises.push(rws.startWebSocket());
     }
 
-    // Let it rip
-    // TODO: wait for sockets to receive `open` msg before subscribing?
-    for (const rws of this.rwsPool) {
-      rws.startWebSocket();
+    // Wait for all connections to be established
+    try {
+      await Promise.all(connectionPromises);
+    } catch (error) {
+      // If any connection fails, clean up and throw
+      pool.shutdown();
+      throw error;
     }
 
-    this.logger.info(
-      `Using ${numConnections.toString()} redundant WebSocket connections`
+    pool.logger.info(
+      `Successfully established ${numConnections.toString()} redundant WebSocket connections`
     );
+
+    return pool;
   }
 
   /**
@@ -105,23 +126,18 @@ export class WebSocketPool {
    * multiple connections before forwarding to registered handlers
    */
   dedupeHandler = (data: WebSocket.Data): void => {
-    // For string data, use the whole string as the cache key. This avoids expensive JSON parsing during deduping.
-    // For binary data, use the hex string representation as the cache key
     const cacheKey =
       typeof data === "string"
         ? data
         : Buffer.from(data as Buffer).toString("hex");
 
-    // If we've seen this exact message recently, drop it
     if (this.cache.has(cacheKey)) {
       this.logger.debug("Dropping duplicate message");
       return;
     }
 
-    // Haven't seen this message, cache it and forward to handlers
     this.cache.set(cacheKey, true);
 
-    // Check for errors in JSON responses
     if (typeof data === "string") {
       this.handleErrorMessages(data);
     }
@@ -131,28 +147,18 @@ export class WebSocketPool {
     }
   };
 
-  /**
-   * Sends a message to all websockets in the pool
-   * @param request - The request to send
-   */
   async sendRequest(request: Request): Promise<void> {
-    // Send to all websockets in the pool
     const sendPromises = this.rwsPool.map(async (rws) => {
       try {
         await rws.send(JSON.stringify(request));
       } catch (error) {
         this.logger.error("Failed to send request:", error);
-        throw error; // Re-throw the error
+        throw error;
       }
     });
     await Promise.all(sendPromises);
   }
 
-  /**
-   * Adds a subscription by sending a subscribe request to all websockets in the pool
-   * and storing it for replay on reconnection
-   * @param request - The subscription request to send
-   */
   async addSubscription(request: Request): Promise<void> {
     if (request.type !== "subscribe") {
       throw new Error("Request must be a subscribe request");
@@ -161,11 +167,6 @@ export class WebSocketPool {
     await this.sendRequest(request);
   }
 
-  /**
-   * Removes a subscription by sending an unsubscribe request to all websockets in the pool
-   * and removing it from stored subscriptions
-   * @param subscriptionId - The ID of the subscription to remove
-   */
   async removeSubscription(subscriptionId: number): Promise<void> {
     this.subscriptions.delete(subscriptionId);
     const request: Request = {
@@ -175,17 +176,40 @@ export class WebSocketPool {
     await this.sendRequest(request);
   }
 
-  /**
-   * Adds a message handler function to receive websocket messages
-   * @param handler - Function that will be called with each received message
-   */
   addMessageListener(handler: (data: WebSocket.Data) => void): void {
     this.messageListeners.push(handler);
   }
 
   /**
-   * Elegantly closes all websocket connections in the pool
+   * Calls the handler if all websocket connections are currently down or in reconnecting state.
+   * The connections may still try to reconnect in the background.
    */
+  addAllConnectionsDownListener(handler: () => void): void {
+    this.allConnectionsDownListeners.push(handler);
+  }
+
+  private areAllConnectionsDown(): boolean {
+    return this.rwsPool.every((ws) => !ws.isConnected || ws.isReconnecting);
+  }
+
+  private checkConnectionStates(): void {
+    const allDown = this.areAllConnectionsDown();
+
+    // If all connections just went down
+    if (allDown && !this.wasAllDown) {
+      this.wasAllDown = true;
+      this.logger.error("All WebSocket connections are down or reconnecting");
+      // Notify all listeners
+      for (const listener of this.allConnectionsDownListeners) {
+        listener();
+      }
+    }
+    // If at least one connection was restored
+    if (!allDown && this.wasAllDown) {
+      this.wasAllDown = false;
+    }
+  }
+
   shutdown(): void {
     for (const rws of this.rwsPool) {
       rws.closeWebSocket();
@@ -193,5 +217,6 @@ export class WebSocketPool {
     this.rwsPool = [];
     this.subscriptions.clear();
     this.messageListeners = [];
+    this.allConnectionsDownListeners = [];
   }
 }