| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- import TTLCache from "@isaacs/ttlcache";
- import type { ErrorEvent } from "isomorphic-ws";
- import WebSocket from "isomorphic-ws";
- import type { Logger } from "ts-log";
- import { dummyLogger } from "ts-log";
- import type { Request, Response } from "../protocol.js";
- import type { ResilientWebSocketConfig } from "./resilient-websocket.js";
- import { ResilientWebSocket } from "./resilient-websocket.js";
- import {
- DEFAULT_STREAM_SERVICE_0_URL,
- DEFAULT_STREAM_SERVICE_1_URL,
- } from "../constants.js";
- import { addAuthTokenToWebSocketUrl, envIsBrowserOrWorker, IsomorphicBuffer } from "../util/index.js";
- const DEFAULT_NUM_CONNECTIONS = 4;
- type WebSocketOnMessageCallback = (
- data: WebSocket.Data,
- ) => void | Promise<void>;
- export type WebSocketPoolConfig = {
- urls?: string[];
- numConnections?: number;
- rwsConfig?: Omit<ResilientWebSocketConfig, "logger" | "endpoint">;
- onError?: (error: ErrorEvent) => void;
- };
- export class WebSocketPool {
- rwsPool: ResilientWebSocket[];
- private cache: TTLCache<string, boolean>;
- private subscriptions: Map<number, Request>; // id -> subscription Request
- private messageListeners: WebSocketOnMessageCallback[];
- private allConnectionsDownListeners: (() => void)[];
- private wasAllDown = true;
- private checkConnectionStatesInterval: NodeJS.Timeout;
- private constructor(private readonly logger: Logger) {
- this.rwsPool = [];
- this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds
- this.subscriptions = new Map();
- this.messageListeners = [];
- this.allConnectionsDownListeners = [];
- // Start monitoring connection states
- this.checkConnectionStatesInterval = setInterval(() => {
- this.checkConnectionStates();
- }, 100);
- }
- /**
- * Creates a new WebSocketPool instance that uses multiple redundant WebSocket connections for reliability.
- * Usage semantics are similar to using a regular WebSocket client.
- * @param urls - List of WebSocket URLs to connect to
- * @param token - Authentication token to use for the connections
- * @param numConnections - Number of parallel WebSocket connections to maintain (default: 3)
- * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
- */
- static async create(
- config: WebSocketPoolConfig,
- token: string,
- logger?: Logger,
- ): Promise<WebSocketPool> {
- const urls = config.urls ?? [
- DEFAULT_STREAM_SERVICE_0_URL,
- DEFAULT_STREAM_SERVICE_1_URL,
- ];
- const log = logger ?? dummyLogger;
- const pool = new WebSocketPool(log);
- const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS;
- for (let i = 0; i < numConnections; i++) {
- const baseUrl = urls[i % urls.length];
- const isBrowser = envIsBrowserOrWorker();
- const url = isBrowser ? addAuthTokenToWebSocketUrl(baseUrl, token) : baseUrl;
- if (!url) {
- throw new Error(`URLs must not be null or empty`);
- }
- const wsOptions: ResilientWebSocketConfig["wsOptions"] = {
- ...config.rwsConfig?.wsOptions,
- headers: isBrowser ? undefined : { Authorization: `Bearer ${token}` },
- };
- const rws = new ResilientWebSocket({
- ...config.rwsConfig,
- endpoint: url,
- wsOptions,
- logger: log,
- });
- // If a websocket client unexpectedly disconnects, ResilientWebSocket will reestablish
- // the connection and call the onReconnect callback.
- rws.onReconnect = () => {
- if (rws.wsUserClosed) {
- return;
- }
- for (const [, request] of pool.subscriptions) {
- try {
- rws.send(JSON.stringify(request));
- } catch (error) {
- pool.logger.error(
- "Failed to resend subscription on reconnect:",
- error,
- );
- }
- }
- };
- if (config.onError) {
- rws.onError = config.onError;
- }
- // Handle all client messages ourselves. Dedupe before sending to registered message handlers.
- rws.onMessage = (data) => {
- pool.dedupeHandler(data).catch((error: unknown) => {
- const errMsg = `An error occurred in the WebSocket pool's dedupeHandler: ${error instanceof Error ? error.message : String(error)}`;
- throw new Error(errMsg);
- });
- };
- pool.rwsPool.push(rws);
- rws.startWebSocket();
- }
- pool.logger.info(
- `Started WebSocketPool with ${numConnections.toString()} connections. Waiting for at least one to connect...`,
- );
- while (!pool.isAnyConnectionEstablished()) {
- await new Promise((resolve) => setTimeout(resolve, 100));
- }
- pool.logger.info(
- `At least one WebSocket connection is established. WebSocketPool is ready.`,
- );
- return pool;
- }
- /**
- * Checks for error responses in JSON messages and throws appropriate errors
- */
- private handleErrorMessages(data: string): void {
- const message = JSON.parse(data) as Response;
- if (message.type === "subscriptionError") {
- throw new Error(
- `Error occurred for subscription ID ${String(
- message.subscriptionId,
- )}: ${message.error}`,
- );
- } else if (message.type === "error") {
- throw new Error(`Error: ${message.error}`);
- }
- }
- /**
- * Handles incoming websocket messages by deduplicating identical messages received across
- * multiple connections before forwarding to registered handlers
- */
- dedupeHandler = async (data: WebSocket.Data): Promise<void> => {
- let cacheKey = "";
- if (typeof data === "string") {
- cacheKey = data;
- } else {
- const buff = await IsomorphicBuffer.fromWebsocketData(data);
- cacheKey = buff.toString("hex");
- }
- if (this.cache.has(cacheKey)) {
- this.logger.debug("Dropping duplicate message");
- return;
- }
- this.cache.set(cacheKey, true);
- if (typeof data === "string") {
- this.handleErrorMessages(data);
- }
- await Promise.all(this.messageListeners.map((handler) => handler(data)));
- };
- sendRequest(request: Request) {
- for (const rws of this.rwsPool) {
- rws.send(JSON.stringify(request));
- }
- }
- addSubscription(request: Request) {
- if (request.type !== "subscribe") {
- throw new Error("Request must be a subscribe request");
- }
- this.subscriptions.set(request.subscriptionId, request);
- this.sendRequest(request);
- }
- removeSubscription(subscriptionId: number) {
- this.subscriptions.delete(subscriptionId);
- const request: Request = {
- type: "unsubscribe",
- subscriptionId,
- };
- this.sendRequest(request);
- }
- addMessageListener(handler: WebSocketOnMessageCallback): void {
- this.messageListeners.push(handler);
- }
- /**
- * Calls the handler if all websocket connections are currently down or in reconnecting state.
- * The connections may still try to reconnect in the background.
- */
- addAllConnectionsDownListener(handler: () => void): void {
- this.allConnectionsDownListeners.push(handler);
- }
- private areAllConnectionsDown(): boolean {
- return this.rwsPool.every((ws) => !ws.isConnected() || ws.isReconnecting());
- }
- private isAnyConnectionEstablished(): boolean {
- return this.rwsPool.some((ws) => ws.isConnected());
- }
- private checkConnectionStates(): void {
- const allDown = this.areAllConnectionsDown();
- // If all connections just went down
- if (allDown && !this.wasAllDown) {
- this.wasAllDown = true;
- this.logger.error("All WebSocket connections are down or reconnecting");
- // Notify all listeners
- for (const listener of this.allConnectionsDownListeners) {
- listener();
- }
- }
- // If at least one connection was restored
- if (!allDown && this.wasAllDown) {
- this.wasAllDown = false;
- }
- }
- shutdown(): void {
- for (const rws of this.rwsPool) {
- rws.closeWebSocket();
- }
- this.rwsPool = [];
- this.subscriptions.clear();
- this.messageListeners = [];
- this.allConnectionsDownListeners = [];
- clearInterval(this.checkConnectionStatesInterval);
- }
- }
|