Jelajahi Sumber

fix(lazer-sdk-js): updated the websocket to be truly isomorphic and also add Buffer support when running in the browser (plus better runtime env detection)

benduran 1 bulan lalu
induk
melakukan
61893978d1

+ 4 - 1
lazer/sdk/js/package.json

@@ -2,6 +2,9 @@
   "name": "@pythnetwork/pyth-lazer-sdk",
   "version": "4.0.0",
   "description": "Pyth Lazer SDK",
+  "engines": {
+    "node": ">=20"
+  },
   "publishConfig": {
     "access": "public"
   },
@@ -61,7 +64,7 @@
   "license": "Apache-2.0",
   "dependencies": {
     "@isaacs/ttlcache": "^1.4.1",
-    "cross-fetch": "^4.0.0",
+    "buffer": "^6.0.3",
     "isomorphic-ws": "^5.0.0",
     "ts-log": "^2.2.7",
     "ws": "^8.18.0"

+ 43 - 44
lazer/sdk/js/src/client.ts

@@ -1,4 +1,3 @@
-import fetch from "cross-fetch";
 import WebSocket from "isomorphic-ws";
 import type { Logger } from "ts-log";
 import { dummyLogger } from "ts-log";
@@ -20,6 +19,7 @@ import type {
 import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js";
 import type { WebSocketPoolConfig } from "./socket/websocket-pool.js";
 import { WebSocketPool } from "./socket/websocket-pool.js";
+import { IsomorphicBuffer } from "./util/index.js";
 
 export type BinaryResponse = {
   subscriptionId: number;
@@ -31,9 +31,9 @@ export type BinaryResponse = {
 };
 export type JsonOrBinaryResponse =
   | {
-      type: "json";
-      value: Response;
-    }
+    type: "json";
+    value: Response;
+  }
   | { type: "binary"; value: BinaryResponse };
 
 const UINT16_NUM_BYTES = 2;
@@ -55,7 +55,7 @@ export class PythLazerClient {
     private readonly priceServiceUrl: string,
     private readonly logger: Logger,
     private readonly wsp?: WebSocketPool,
-  ) {}
+  ) { }
 
   /**
    * Gets the WebSocket pool. If the WebSocket pool is not configured, an error is thrown.
@@ -113,53 +113,52 @@ export class PythLazerClient {
    */
   addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
     const wsp = this.getWebSocketPool();
-    wsp.addMessageListener((data: WebSocket.Data) => {
+    wsp.addMessageListener(async (data: WebSocket.Data) => {
       if (typeof data == "string") {
         handler({
           type: "json",
           value: JSON.parse(data) as Response,
         });
-      } else if (Buffer.isBuffer(data)) {
-        let pos = 0;
-        const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE();
-        pos += UINT32_NUM_BYTES;
-        if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
-          throw new Error("binary update format magic mismatch");
-        }
-        // TODO: some uint64 values may not be representable as Number.
-        const subscriptionId = Number(
-          data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
-        );
-        pos += UINT64_NUM_BYTES;
+        return;
+      }
+      const buffData = await IsomorphicBuffer.fromWebsocketData(data);
+      let pos = 0;
+      const magic = buffData.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE();
+      pos += UINT32_NUM_BYTES;
+      if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
+        throw new Error("binary update format magic mismatch");
+      }
+      // TODO: some uint64 values may not be representable as Number.
+      const subscriptionId = Number(
+        buffData.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
+      );
+      pos += UINT64_NUM_BYTES;
 
-        const value: BinaryResponse = { subscriptionId };
-        while (pos < data.length) {
-          const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
-          pos += UINT16_NUM_BYTES;
-          const magic = data
-            .subarray(pos, pos + UINT32_NUM_BYTES)
-            .readUint32LE();
-          if (magic == FORMAT_MAGICS_LE.EVM) {
-            value.evm = data.subarray(pos, pos + len);
-          } else if (magic == FORMAT_MAGICS_LE.SOLANA) {
-            value.solana = data.subarray(pos, pos + len);
-          } else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
-            value.leEcdsa = data.subarray(pos, pos + len);
-          } else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
-            value.leUnsigned = data.subarray(pos, pos + len);
-          } else if (magic == FORMAT_MAGICS_LE.JSON) {
-            value.parsed = JSON.parse(
-              data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
-            ) as ParsedPayload;
-          } else {
-            throw new Error("unknown magic: " + magic.toString());
-          }
-          pos += len;
+      const value: BinaryResponse = { subscriptionId };
+      while (pos < buffData.length) {
+        const len = buffData.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
+        pos += UINT16_NUM_BYTES;
+        const magic = buffData
+          .subarray(pos, pos + UINT32_NUM_BYTES)
+          .readUint32LE();
+        if (magic == FORMAT_MAGICS_LE.EVM) {
+          value.evm = buffData.subarray(pos, pos + len);
+        } else if (magic == FORMAT_MAGICS_LE.SOLANA) {
+          value.solana = buffData.subarray(pos, pos + len);
+        } else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
+          value.leEcdsa = buffData.subarray(pos, pos + len);
+        } else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
+          value.leUnsigned = buffData.subarray(pos, pos + len);
+        } else if (magic == FORMAT_MAGICS_LE.JSON) {
+          value.parsed = JSON.parse(
+            buffData.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
+          ) as ParsedPayload;
+        } else {
+          throw new Error(`unknown magic:  ${magic.toString()}`);
         }
-        handler({ type: "binary", value });
-      } else {
-        throw new TypeError("unexpected event data type");
+        pos += len;
       }
+      handler({ type: "binary", value });
     });
   }
 

+ 8 - 3
lazer/sdk/js/src/socket/resilient-websocket.ts

@@ -5,6 +5,8 @@ import WebSocket from "isomorphic-ws";
 import type { Logger } from "ts-log";
 import { dummyLogger } from "ts-log";
 
+import { envIsServiceOrWebWorker } from "../util/env-util.js";
+
 const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds
 const DEFAULT_MAX_RETRY_DELAY_MS = 1000; // 1 second'
 const DEFAULT_LOG_AFTER_RETRY_COUNT = 10;
@@ -106,7 +108,10 @@ export class ResilientWebSocket {
       this.retryTimeout = undefined;
     }
 
-    this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
+    // browser constructor supports a different 2nd argument for the constructor,
+    // so we need to ensure it's not included if we're running in that environment:
+    // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols
+    this.wsClient = new WebSocket(this.endpoint, envIsServiceOrWebWorker() ? this.wsOptions : undefined);
 
     this.wsClient.addEventListener("open", () => {
       this.logger.info("WebSocket connection established");
@@ -184,8 +189,8 @@ export class ResilientWebSocket {
     if (this.shouldLogRetry()) {
       this.logger.error(
         "Connection closed unexpectedly or because of timeout. Reconnecting after " +
-          String(this.retryDelayMs()) +
-          "ms.",
+        String(this.retryDelayMs()) +
+        "ms.",
       );
     }
 

+ 35 - 16
lazer/sdk/js/src/socket/websocket-pool.ts

@@ -11,9 +11,12 @@ import {
   DEFAULT_STREAM_SERVICE_0_URL,
   DEFAULT_STREAM_SERVICE_1_URL,
 } from "../constants.js";
+import { envIsBrowserOrWorker, IsomorphicBuffer } from "../util/index.js";
 
 const DEFAULT_NUM_CONNECTIONS = 4;
 
+type WebSocketOnMessageCallback = (data: WebSocket.Data) => void | Promise<void>;
+
 export type WebSocketPoolConfig = {
   urls?: string[];
   numConnections?: number;
@@ -25,7 +28,7 @@ export class WebSocketPool {
   rwsPool: ResilientWebSocket[];
   private cache: TTLCache<string, boolean>;
   private subscriptions: Map<number, Request>; // id -> subscription Request
-  private messageListeners: ((event: WebSocket.Data) => void)[];
+  private messageListeners: WebSocketOnMessageCallback[];
   private allConnectionsDownListeners: (() => void)[];
   private wasAllDown = true;
   private checkConnectionStatesInterval: NodeJS.Timeout;
@@ -65,16 +68,28 @@ export class WebSocketPool {
     const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS;
 
     for (let i = 0; i < numConnections; i++) {
-      const url = urls[i % urls.length];
+      let url = urls[i % urls.length];
       if (!url) {
         throw new Error(`URLs must not be null or empty`);
       }
-      const wsOptions = {
+      const wsOptions: ResilientWebSocketConfig['wsOptions'] = {
         ...config.rwsConfig?.wsOptions,
-        headers: {
-          Authorization: `Bearer ${token}`,
-        },
       };
+
+      if (envIsBrowserOrWorker()) {
+        // we are in a browser environment where the websocket protocol
+        // doesn't support sending headers in the initial upgrade request,
+        // so we add the token as a query param, which the server already supports
+        const parsedUrl = new URL(url);
+        parsedUrl.searchParams.set('ACCESS_TOKEN', token);
+        url = parsedUrl.toString();
+      } else {
+        // we are in a server-side javascript runtime context
+        wsOptions.headers = {
+          Authorization: `Bearer ${token}`,
+        };
+      }
+
       const rws = new ResilientWebSocket({
         ...config.rwsConfig,
         endpoint: url,
@@ -104,7 +119,9 @@ export class WebSocketPool {
         rws.onError = config.onError;
       }
       // Handle all client messages ourselves. Dedupe before sending to registered message handlers.
-      rws.onMessage = pool.dedupeHandler;
+      rws.onMessage = data => {
+        void pool.dedupeHandler(data);
+      };
       pool.rwsPool.push(rws);
       rws.startWebSocket();
     }
@@ -144,11 +161,14 @@ export class WebSocketPool {
    * Handles incoming websocket messages by deduplicating identical messages received across
    * multiple connections before forwarding to registered handlers
    */
-  dedupeHandler = (data: WebSocket.Data): void => {
-    const cacheKey =
-      typeof data === "string"
-        ? data
-        : Buffer.from(data as Buffer).toString("hex");
+  dedupeHandler = async (data: WebSocket.Data): Promise<void> => {
+    let cacheKey = '';
+    if (typeof data === 'string') {
+      cacheKey = data;
+    } else {
+      const buff = await IsomorphicBuffer.fromWebsocketData(data);
+      cacheKey = buff.toString('hex');
+    }
 
     if (this.cache.has(cacheKey)) {
       this.logger.debug("Dropping duplicate message");
@@ -161,9 +181,8 @@ export class WebSocketPool {
       this.handleErrorMessages(data);
     }
 
-    for (const handler of this.messageListeners) {
-      handler(data);
-    }
+    await Promise.all(this.messageListeners.map(handler => handler(data)));
+
   };
 
   sendRequest(request: Request) {
@@ -189,7 +208,7 @@ export class WebSocketPool {
     this.sendRequest(request);
   }
 
-  addMessageListener(handler: (data: WebSocket.Data) => void): void {
+  addMessageListener(handler: WebSocketOnMessageCallback): void {
     this.messageListeners.push(handler);
   }
 

+ 38 - 0
lazer/sdk/js/src/util/buffer-util.ts

@@ -0,0 +1,38 @@
+// the linting rules don't allow importing anything that might clash with
+// a global, top-level import. we disable this rule because we need this
+// imported from our installed dependency
+// eslint-disable-next-line unicorn/prefer-node-protocol
+import { Buffer as BrowserBuffer } from 'buffer';
+
+import type { Data } from 'isomorphic-ws';
+
+const { Buffer: PossibleBuiltInBuffer } = globalThis as Partial<{ Buffer: typeof Buffer }>;
+
+const BufferClassToUse = PossibleBuiltInBuffer ?? BrowserBuffer;
+
+export class IsomorphicBuffer extends BufferClassToUse {
+  /**
+ * given a relatively unknown websocket frame data object,
+ * returns a valid Buffer instance that is safe to use
+ * isomorphically in any JS runtime environment
+ */
+  static async fromWebsocketData(data: Data) {
+    if (typeof data === 'string') {
+      return BufferClassToUse.from(new TextEncoder().encode(data).buffer);
+    }
+    if (data instanceof Blob) {
+      // let the uncaught promise exception bubble up if there's an issue
+      return BufferClassToUse.from(await data.arrayBuffer());
+    }
+    if (data instanceof ArrayBuffer) return BufferClassToUse.from(data);
+    if (Buffer.isBuffer(data)) {
+      const arrBuffer = new ArrayBuffer(data.length);
+      const v = new Uint8Array(arrBuffer);
+      for (const [i, item] of data.entries()) {
+        v[i] = item;
+      }
+      return BufferClassToUse.from(arrBuffer);
+    }
+    throw new TypeError("unexpected event data type found when IsomorphicBuffer.fromWebsocketData() called");
+  }
+}

+ 30 - 0
lazer/sdk/js/src/util/env-util.ts

@@ -0,0 +1,30 @@
+// we create this local-only type, which has assertions made to indicate
+// that we do not know and cannot guarantee which JS environment we are in
+const g = globalThis as Partial<{ self: typeof globalThis.self; window: typeof globalThis.window }>
+
+/**
+ * Detects if this code is running within any Service or WebWorker context.
+ * @returns true if in a worker of some kind, false if otherwise
+ */
+export function envIsServiceOrWebWorker() {
+  const possiblyInAWorker = typeof WorkerGlobalScope !== 'undefined' && g.self !== undefined;
+  return possiblyInAWorker && g.self instanceof WorkerGlobalScope;
+}
+
+/**
+ * Detects if the code is running in a regular DOM or Web Worker context.
+ * @returns true if running in a DOM or Web Worker context, false if running in Node.js
+ */
+export function envIsBrowser() {
+  return g.window !== undefined;
+}
+
+/**
+ * a convenience method that returns whether or not
+ * this code is executing in some type of browser-centric environment
+ * 
+ * @returns true if in the browser's main UI thread or in a worker, false if otherwise
+ */
+export function envIsBrowserOrWorker() {
+  return envIsServiceOrWebWorker() || envIsBrowser();
+}

+ 2 - 0
lazer/sdk/js/src/util/index.ts

@@ -0,0 +1,2 @@
+export * from "./buffer-util.js";
+export * from "./env-util.js";

+ 4 - 1
lazer/sdk/js/tsconfig.json

@@ -1,4 +1,7 @@
 {
   "extends": "@cprussin/tsconfig/base.json",
-  "exclude": ["node_modules", "dist"]
+  "exclude": ["node_modules", "dist"],
+  "compilerOptions": {
+    "lib": ["DOM", "DOM.Iterable", "WebWorker"]
+  }
 }

+ 3 - 3
pnpm-lock.yaml

@@ -2077,9 +2077,9 @@ importers:
       '@isaacs/ttlcache':
         specifier: ^1.4.1
         version: 1.4.1
-      cross-fetch:
-        specifier: ^4.0.0
-        version: 4.1.0(encoding@0.1.13)
+      buffer:
+        specifier: ^6.0.3
+        version: 6.0.3
       isomorphic-ws:
         specifier: ^5.0.0
         version: 5.0.0(ws@8.18.1(bufferutil@4.0.9)(utf-8-validate@6.0.3))