Преглед на файлове

Merge pull request #3139 from pyth-network/bduran/lazer-js-sdk/browser-support

fix(lazer-sdk): implemented function to convert all non-string WS frames to an array buffer, then leverage an isomorphic buffer implementation
Ben Duran преди 1 месец
родител
ревизия
487ae1e898

+ 57 - 0
lazer/sdk/js/README.md

@@ -3,3 +3,60 @@
 ## Contributing & Development
 
 See [contributing.md](docs/contributing/contributing.md) for information on how to develop or contribute to this project!
+
+## How to use
+
+```javascript
+import { PythLazerClient } from "@pythnetwork/pyth-lazer-sdk";
+
+const c = await PythLazerClient.create({
+  token: "YOUR-AUTH-TOKEN-HERE",
+  logger: console, // Optionally log operations (to the console in this case.)
+  webSocketPoolConfig: {
+    numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4.
+    onError: (error) => {
+      console.error("⛔️ WebSocket error:", error.message);
+    },
+    // Optional configuration for resilient WebSocket connections
+    rwsConfig: {
+      heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds
+      maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds
+      logAfterRetryCount: 10, // Optional log after how many retries
+    },
+  },
+});
+
+c.addMessageListener((message) => {
+  console.info("received the following from the Lazer stream:", message);
+});
+
+// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down)
+// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown().
+c.addAllConnectionsDownListener(() => {
+  console.error("All connections are down!");
+});
+
+// Create and remove one or more subscriptions on the fly
+c.subscribe({
+  type: "subscribe",
+  subscriptionId: 1,
+  priceFeedIds: [1, 2],
+  properties: ["price"],
+  formats: ["solana"],
+  deliveryFormat: "binary",
+  channel: "fixed_rate@200ms",
+  parsed: false,
+  jsonBinaryEncoding: "base64",
+});
+c.subscribe({
+  type: "subscribe",
+  subscriptionId: 2,
+  priceFeedIds: [1, 2, 3, 4, 5],
+  properties: ["price", "exponent", "publisherCount", "confidence"],
+  formats: ["evm"],
+  deliveryFormat: "json",
+  channel: "fixed_rate@200ms",
+  parsed: true,
+  jsonBinaryEncoding: "hex",
+});
+```

+ 5 - 2
lazer/sdk/js/package.json

@@ -1,7 +1,10 @@
 {
   "name": "@pythnetwork/pyth-lazer-sdk",
-  "version": "4.0.0",
+  "version": "5.0.0",
   "description": "Pyth Lazer SDK",
+  "engines": {
+    "node": ">=22"
+  },
   "publishConfig": {
     "access": "public"
   },
@@ -61,7 +64,7 @@
   "license": "Apache-2.0",
   "dependencies": {
     "@isaacs/ttlcache": "^1.4.1",
-    "cross-fetch": "^4.0.0",
+    "buffer": "^6.0.3",
     "isomorphic-ws": "^5.0.0",
     "ts-log": "^2.2.7",
     "ws": "^8.18.0"

+ 43 - 40
lazer/sdk/js/src/client.ts

@@ -1,4 +1,3 @@
-import fetch from "cross-fetch";
 import WebSocket from "isomorphic-ws";
 import type { Logger } from "ts-log";
 import { dummyLogger } from "ts-log";
@@ -20,6 +19,7 @@ import type {
 import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js";
 import type { WebSocketPoolConfig } from "./socket/websocket-pool.js";
 import { WebSocketPool } from "./socket/websocket-pool.js";
+import { bufferFromWebsocketData } from "./util/buffer-util.js";
 
 export type BinaryResponse = {
   subscriptionId: number;
@@ -113,53 +113,56 @@ export class PythLazerClient {
    */
   addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
     const wsp = this.getWebSocketPool();
-    wsp.addMessageListener((data: WebSocket.Data) => {
+    wsp.addMessageListener(async (data: WebSocket.Data) => {
       if (typeof data == "string") {
         handler({
           type: "json",
           value: JSON.parse(data) as Response,
         });
-      } else if (Buffer.isBuffer(data)) {
-        let pos = 0;
-        const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE();
-        pos += UINT32_NUM_BYTES;
-        if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
-          throw new Error("binary update format magic mismatch");
-        }
-        // TODO: some uint64 values may not be representable as Number.
-        const subscriptionId = Number(
-          data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
-        );
-        pos += UINT64_NUM_BYTES;
+        return;
+      }
+      const buffData = await bufferFromWebsocketData(data);
+      let pos = 0;
+      const magic = buffData
+        .subarray(pos, pos + UINT32_NUM_BYTES)
+        .readUint32LE();
+      pos += UINT32_NUM_BYTES;
+      if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
+        throw new Error("binary update format magic mismatch");
+      }
+      // TODO: some uint64 values may not be representable as Number.
+      const subscriptionId = Number(
+        buffData.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
+      );
+      pos += UINT64_NUM_BYTES;
 
-        const value: BinaryResponse = { subscriptionId };
-        while (pos < data.length) {
-          const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
-          pos += UINT16_NUM_BYTES;
-          const magic = data
-            .subarray(pos, pos + UINT32_NUM_BYTES)
-            .readUint32LE();
-          if (magic == FORMAT_MAGICS_LE.EVM) {
-            value.evm = data.subarray(pos, pos + len);
-          } else if (magic == FORMAT_MAGICS_LE.SOLANA) {
-            value.solana = data.subarray(pos, pos + len);
-          } else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
-            value.leEcdsa = data.subarray(pos, pos + len);
-          } else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
-            value.leUnsigned = data.subarray(pos, pos + len);
-          } else if (magic == FORMAT_MAGICS_LE.JSON) {
-            value.parsed = JSON.parse(
-              data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
-            ) as ParsedPayload;
-          } else {
-            throw new Error("unknown magic: " + magic.toString());
-          }
-          pos += len;
+      const value: BinaryResponse = { subscriptionId };
+      while (pos < buffData.length) {
+        const len = buffData
+          .subarray(pos, pos + UINT16_NUM_BYTES)
+          .readUint16BE();
+        pos += UINT16_NUM_BYTES;
+        const magic = buffData
+          .subarray(pos, pos + UINT32_NUM_BYTES)
+          .readUint32LE();
+        if (magic == FORMAT_MAGICS_LE.EVM) {
+          value.evm = buffData.subarray(pos, pos + len);
+        } else if (magic == FORMAT_MAGICS_LE.SOLANA) {
+          value.solana = buffData.subarray(pos, pos + len);
+        } else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
+          value.leEcdsa = buffData.subarray(pos, pos + len);
+        } else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
+          value.leUnsigned = buffData.subarray(pos, pos + len);
+        } else if (magic == FORMAT_MAGICS_LE.JSON) {
+          value.parsed = JSON.parse(
+            buffData.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
+          ) as ParsedPayload;
+        } else {
+          throw new Error(`unknown magic:  ${magic.toString()}`);
         }
-        handler({ type: "binary", value });
-      } else {
-        throw new TypeError("unexpected event data type");
+        pos += len;
       }
+      handler({ type: "binary", value });
     });
   }
 

+ 4 - 0
lazer/sdk/js/src/protocol.ts

@@ -158,3 +158,7 @@ export type JsonUpdate = {
   leEcdsa?: JsonBinaryData;
   leUnsigned?: JsonBinaryData;
 };
+
+export enum CustomSocketClosureCodes {
+  CLIENT_TIMEOUT_BUT_RECONNECTING = 4000,
+}

+ 41 - 4
lazer/sdk/js/src/socket/resilient-websocket.ts

@@ -5,6 +5,9 @@ import WebSocket from "isomorphic-ws";
 import type { Logger } from "ts-log";
 import { dummyLogger } from "ts-log";
 
+import { CustomSocketClosureCodes } from "../protocol.js";
+import { envIsBrowserOrWorker } from "../util/env-util.js";
+
 const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds
 const DEFAULT_MAX_RETRY_DELAY_MS = 1000; // 1 second'
 const DEFAULT_LOG_AFTER_RETRY_COUNT = 10;
@@ -18,6 +21,21 @@ export type ResilientWebSocketConfig = {
   logAfterRetryCount?: number;
 };
 
+/**
+ * the isomorphic-ws package ships with some slightly-erroneous typings.
+ * namely, it returns a WebSocket with typings that indicate the "terminate()" function
+ * is available on all platforms.
+ * Given that, under the hood, it is using the globalThis.WebSocket class, if it's available,
+ * and falling back to using the https://www.npmjs.com/package/ws package, this
+ * means there are API differences between the native WebSocket (the one in a web browser)
+ * and the server-side version from the "ws" package.
+ *
+ * This type creates a WebSocket type reference we use to indicate the unknown
+ * nature of the env in which is code is run.
+ */
+type UnsafeWebSocket = Omit<WebSocket, "terminate"> &
+  Partial<Pick<WebSocket, "terminate">>;
+
 export class ResilientWebSocket {
   private endpoint: string;
   private wsOptions?: ClientOptions | ClientRequestArgs | undefined;
@@ -26,7 +44,7 @@ export class ResilientWebSocket {
   private maxRetryDelayMs: number;
   private logAfterRetryCount: number;
 
-  wsClient: undefined | WebSocket;
+  wsClient: UnsafeWebSocket | undefined;
   wsUserClosed = false;
   private wsFailedAttempts: number;
   private heartbeatTimeout?: NodeJS.Timeout | undefined;
@@ -106,7 +124,13 @@ export class ResilientWebSocket {
       this.retryTimeout = undefined;
     }
 
-    this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
+    // browser constructor supports a different 2nd argument for the constructor,
+    // so we need to ensure it's not included if we're running in that environment:
+    // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols
+    this.wsClient = new WebSocket(
+      this.endpoint,
+      envIsBrowserOrWorker() ? undefined : this.wsOptions,
+    );
 
     this.wsClient.addEventListener("open", () => {
       this.logger.info("WebSocket connection established");
@@ -154,8 +178,21 @@ export class ResilientWebSocket {
     }
 
     this.heartbeatTimeout = setTimeout(() => {
-      this.logger.warn("Connection timed out. Reconnecting...");
-      this.wsClient?.terminate();
+      const warnMsg = "Connection timed out. Reconnecting...";
+      this.logger.warn(warnMsg);
+      if (this.wsClient) {
+        if (typeof this.wsClient.terminate === "function") {
+          this.wsClient.terminate();
+        } else {
+          // terminate is an implementation detail of the node-friendly
+          // https://www.npmjs.com/package/ws package, but is not a native WebSocket API,
+          // so we have to use the close method
+          this.wsClient.close(
+            CustomSocketClosureCodes.CLIENT_TIMEOUT_BUT_RECONNECTING,
+            warnMsg,
+          );
+        }
+      }
       this.handleReconnect();
     }, this.heartbeatTimeoutDurationMs);
   }

+ 34 - 16
lazer/sdk/js/src/socket/websocket-pool.ts

@@ -11,9 +11,18 @@ import {
   DEFAULT_STREAM_SERVICE_0_URL,
   DEFAULT_STREAM_SERVICE_1_URL,
 } from "../constants.js";
+import {
+  addAuthTokenToWebSocketUrl,
+  bufferFromWebsocketData,
+  envIsBrowserOrWorker,
+} from "../util/index.js";
 
 const DEFAULT_NUM_CONNECTIONS = 4;
 
+type WebSocketOnMessageCallback = (
+  data: WebSocket.Data,
+) => void | Promise<void>;
+
 export type WebSocketPoolConfig = {
   urls?: string[];
   numConnections?: number;
@@ -25,7 +34,7 @@ export class WebSocketPool {
   rwsPool: ResilientWebSocket[];
   private cache: TTLCache<string, boolean>;
   private subscriptions: Map<number, Request>; // id -> subscription Request
-  private messageListeners: ((event: WebSocket.Data) => void)[];
+  private messageListeners: WebSocketOnMessageCallback[];
   private allConnectionsDownListeners: (() => void)[];
   private wasAllDown = true;
   private checkConnectionStatesInterval: NodeJS.Timeout;
@@ -65,16 +74,19 @@ export class WebSocketPool {
     const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS;
 
     for (let i = 0; i < numConnections; i++) {
-      const url = urls[i % urls.length];
+      const baseUrl = urls[i % urls.length];
+      const isBrowser = envIsBrowserOrWorker();
+      const url = isBrowser
+        ? addAuthTokenToWebSocketUrl(baseUrl, token)
+        : baseUrl;
       if (!url) {
         throw new Error(`URLs must not be null or empty`);
       }
-      const wsOptions = {
+      const wsOptions: ResilientWebSocketConfig["wsOptions"] = {
         ...config.rwsConfig?.wsOptions,
-        headers: {
-          Authorization: `Bearer ${token}`,
-        },
+        headers: isBrowser ? undefined : { Authorization: `Bearer ${token}` },
       };
+
       const rws = new ResilientWebSocket({
         ...config.rwsConfig,
         endpoint: url,
@@ -104,7 +116,12 @@ export class WebSocketPool {
         rws.onError = config.onError;
       }
       // Handle all client messages ourselves. Dedupe before sending to registered message handlers.
-      rws.onMessage = pool.dedupeHandler;
+      rws.onMessage = (data) => {
+        pool.dedupeHandler(data).catch((error: unknown) => {
+          const errMsg = `An error occurred in the WebSocket pool's dedupeHandler: ${error instanceof Error ? error.message : String(error)}`;
+          throw new Error(errMsg);
+        });
+      };
       pool.rwsPool.push(rws);
       rws.startWebSocket();
     }
@@ -140,15 +157,18 @@ export class WebSocketPool {
     }
   }
 
+  private async constructCacheKeyFromWebsocketData(data: WebSocket.Data) {
+    if (typeof data === "string") return data;
+    const buff = await bufferFromWebsocketData(data);
+    return buff.toString("hex");
+  }
+
   /**
    * Handles incoming websocket messages by deduplicating identical messages received across
    * multiple connections before forwarding to registered handlers
    */
-  dedupeHandler = (data: WebSocket.Data): void => {
-    const cacheKey =
-      typeof data === "string"
-        ? data
-        : Buffer.from(data as Buffer).toString("hex");
+  dedupeHandler = async (data: WebSocket.Data): Promise<void> => {
+    const cacheKey = await this.constructCacheKeyFromWebsocketData(data);
 
     if (this.cache.has(cacheKey)) {
       this.logger.debug("Dropping duplicate message");
@@ -161,9 +181,7 @@ export class WebSocketPool {
       this.handleErrorMessages(data);
     }
 
-    for (const handler of this.messageListeners) {
-      handler(data);
-    }
+    await Promise.all(this.messageListeners.map((handler) => handler(data)));
   };
 
   sendRequest(request: Request) {
@@ -189,7 +207,7 @@ export class WebSocketPool {
     this.sendRequest(request);
   }
 
-  addMessageListener(handler: (data: WebSocket.Data) => void): void {
+  addMessageListener(handler: WebSocketOnMessageCallback): void {
     this.messageListeners.push(handler);
   }
 

+ 38 - 0
lazer/sdk/js/src/util/buffer-util.ts

@@ -0,0 +1,38 @@
+// the linting rules don't allow importing anything that might clash with
+// a global, top-level import. we disable this rule because we need this
+// imported from our installed dependency
+// eslint-disable-next-line unicorn/prefer-node-protocol
+import { Buffer as BrowserBuffer } from "buffer";
+
+import type { Data } from "isomorphic-ws";
+
+const BufferClassToUse =
+  "Buffer" in globalThis ? globalThis.Buffer : BrowserBuffer;
+
+/**
+ * given a relatively unknown websocket frame data object,
+ * returns a valid Buffer instance that is safe to use
+ * isomorphically in any JS runtime environment
+ */
+export async function bufferFromWebsocketData(data: Data): Promise<Buffer> {
+  if (typeof data === "string") {
+    return BufferClassToUse.from(new TextEncoder().encode(data).buffer);
+  }
+
+  if (data instanceof BufferClassToUse) return data;
+
+  if (data instanceof Blob) {
+    // let the uncaught promise exception bubble up if there's an issue
+    return BufferClassToUse.from(await data.arrayBuffer());
+  }
+
+  if (data instanceof ArrayBuffer) return BufferClassToUse.from(data);
+
+  if (Array.isArray(data)) {
+    // an array of buffers is highly unlikely, but it is a possibility
+    // indicated by the WebSocket Data interface
+    return BufferClassToUse.concat(data);
+  }
+
+  return data;
+}

+ 35 - 0
lazer/sdk/js/src/util/env-util.ts

@@ -0,0 +1,35 @@
+// we create this local-only type, which has assertions made to indicate
+// that we do not know and cannot guarantee which JS environment we are in
+const g = globalThis as Partial<{
+  self: typeof globalThis.self;
+  window: typeof globalThis.window;
+}>;
+
+/**
+ * Detects if this code is running within any Service or WebWorker context.
+ * @returns true if in a worker of some kind, false if otherwise
+ */
+export function envIsServiceOrWebWorker() {
+  return (
+    typeof WorkerGlobalScope !== "undefined" &&
+    g.self instanceof WorkerGlobalScope
+  );
+}
+
+/**
+ * Detects if the code is running in a regular DOM or Web Worker context.
+ * @returns true if running in a DOM or Web Worker context, false if running in Node.js
+ */
+export function envIsBrowser() {
+  return g.window !== undefined;
+}
+
+/**
+ * a convenience method that returns whether or not
+ * this code is executing in some type of browser-centric environment
+ *
+ * @returns true if in the browser's main UI thread or in a worker, false if otherwise
+ */
+export function envIsBrowserOrWorker() {
+  return envIsServiceOrWebWorker() || envIsBrowser();
+}

+ 3 - 0
lazer/sdk/js/src/util/index.ts

@@ -0,0 +1,3 @@
+export * from "./buffer-util.js";
+export * from "./env-util.js";
+export * from "./url-util.js";

+ 19 - 0
lazer/sdk/js/src/util/url-util.ts

@@ -0,0 +1,19 @@
+const ACCESS_TOKEN_QUERY_PARAM_KEY = "ACCESS_TOKEN";
+
+/**
+ * Given a URL to a hosted lazer stream service and a possible auth token,
+ * appends the auth token as a query parameter and returns the URL with the token
+ * contained within.
+ * If the URL provided is nullish, it is returned as-is (in the same nullish format).
+ * If the token is nullish, the baseUrl given is returned, instead.
+ */
+export function addAuthTokenToWebSocketUrl(
+  baseUrl: string | null | undefined,
+  authToken: string | null | undefined,
+) {
+  if (!baseUrl || !authToken) return baseUrl;
+  const parsedUrl = new URL(baseUrl);
+  parsedUrl.searchParams.set(ACCESS_TOKEN_QUERY_PARAM_KEY, authToken);
+
+  return parsedUrl.toString();
+}

+ 4 - 1
lazer/sdk/js/tsconfig.json

@@ -1,4 +1,7 @@
 {
   "extends": "@cprussin/tsconfig/base.json",
-  "exclude": ["node_modules", "dist"]
+  "exclude": ["node_modules", "dist"],
+  "compilerOptions": {
+    "lib": ["DOM", "DOM.Iterable", "WebWorker"]
+  }
 }

+ 3 - 3
pnpm-lock.yaml

@@ -2077,9 +2077,9 @@ importers:
       '@isaacs/ttlcache':
         specifier: ^1.4.1
         version: 1.4.1
-      cross-fetch:
-        specifier: ^4.0.0
-        version: 4.1.0(encoding@0.1.13)
+      buffer:
+        specifier: ^6.0.3
+        version: 6.0.3
       isomorphic-ws:
         specifier: ^5.0.0
         version: 5.0.0(ws@8.18.1(bufferutil@4.0.9)(utf-8-validate@6.0.3))