websocket-pool.ts 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import TTLCache from "@isaacs/ttlcache";
  2. import type { ErrorEvent } from "isomorphic-ws";
  3. import WebSocket from "isomorphic-ws";
  4. import type { Logger } from "ts-log";
  5. import { dummyLogger } from "ts-log";
  6. import type { Request, Response } from "../protocol.js";
  7. import type { ResilientWebSocketConfig } from "./resilient-websocket.js";
  8. import { ResilientWebSocket } from "./resilient-websocket.js";
  9. import {
  10. DEFAULT_STREAM_SERVICE_0_URL,
  11. DEFAULT_STREAM_SERVICE_1_URL,
  12. } from "../constants.js";
  13. import { addAuthTokenToWebSocketUrl, envIsBrowserOrWorker, IsomorphicBuffer } from "../util/index.js";
  14. const DEFAULT_NUM_CONNECTIONS = 4;
  15. type WebSocketOnMessageCallback = (
  16. data: WebSocket.Data,
  17. ) => void | Promise<void>;
  18. export type WebSocketPoolConfig = {
  19. urls?: string[];
  20. numConnections?: number;
  21. rwsConfig?: Omit<ResilientWebSocketConfig, "logger" | "endpoint">;
  22. onError?: (error: ErrorEvent) => void;
  23. };
  24. export class WebSocketPool {
  25. rwsPool: ResilientWebSocket[];
  26. private cache: TTLCache<string, boolean>;
  27. private subscriptions: Map<number, Request>; // id -> subscription Request
  28. private messageListeners: WebSocketOnMessageCallback[];
  29. private allConnectionsDownListeners: (() => void)[];
  30. private wasAllDown = true;
  31. private checkConnectionStatesInterval: NodeJS.Timeout;
  32. private constructor(private readonly logger: Logger) {
  33. this.rwsPool = [];
  34. this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds
  35. this.subscriptions = new Map();
  36. this.messageListeners = [];
  37. this.allConnectionsDownListeners = [];
  38. // Start monitoring connection states
  39. this.checkConnectionStatesInterval = setInterval(() => {
  40. this.checkConnectionStates();
  41. }, 100);
  42. }
  43. /**
  44. * Creates a new WebSocketPool instance that uses multiple redundant WebSocket connections for reliability.
  45. * Usage semantics are similar to using a regular WebSocket client.
  46. * @param urls - List of WebSocket URLs to connect to
  47. * @param token - Authentication token to use for the connections
  48. * @param numConnections - Number of parallel WebSocket connections to maintain (default: 3)
  49. * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
  50. */
  51. static async create(
  52. config: WebSocketPoolConfig,
  53. token: string,
  54. logger?: Logger,
  55. ): Promise<WebSocketPool> {
  56. const urls = config.urls ?? [
  57. DEFAULT_STREAM_SERVICE_0_URL,
  58. DEFAULT_STREAM_SERVICE_1_URL,
  59. ];
  60. const log = logger ?? dummyLogger;
  61. const pool = new WebSocketPool(log);
  62. const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS;
  63. for (let i = 0; i < numConnections; i++) {
  64. const baseUrl = urls[i % urls.length];
  65. const isBrowser = envIsBrowserOrWorker();
  66. const url = isBrowser ? addAuthTokenToWebSocketUrl(baseUrl, token) : baseUrl;
  67. if (!url) {
  68. throw new Error(`URLs must not be null or empty`);
  69. }
  70. const wsOptions: ResilientWebSocketConfig["wsOptions"] = {
  71. ...config.rwsConfig?.wsOptions,
  72. headers: isBrowser ? undefined : { Authorization: `Bearer ${token}` },
  73. };
  74. const rws = new ResilientWebSocket({
  75. ...config.rwsConfig,
  76. endpoint: url,
  77. wsOptions,
  78. logger: log,
  79. });
  80. // If a websocket client unexpectedly disconnects, ResilientWebSocket will reestablish
  81. // the connection and call the onReconnect callback.
  82. rws.onReconnect = () => {
  83. if (rws.wsUserClosed) {
  84. return;
  85. }
  86. for (const [, request] of pool.subscriptions) {
  87. try {
  88. rws.send(JSON.stringify(request));
  89. } catch (error) {
  90. pool.logger.error(
  91. "Failed to resend subscription on reconnect:",
  92. error,
  93. );
  94. }
  95. }
  96. };
  97. if (config.onError) {
  98. rws.onError = config.onError;
  99. }
  100. // Handle all client messages ourselves. Dedupe before sending to registered message handlers.
  101. rws.onMessage = (data) => {
  102. pool.dedupeHandler(data).catch((error: unknown) => {
  103. const errMsg = `An error occurred in the WebSocket pool's dedupeHandler: ${error instanceof Error ? error.message : String(error)}`;
  104. throw new Error(errMsg);
  105. });
  106. };
  107. pool.rwsPool.push(rws);
  108. rws.startWebSocket();
  109. }
  110. pool.logger.info(
  111. `Started WebSocketPool with ${numConnections.toString()} connections. Waiting for at least one to connect...`,
  112. );
  113. while (!pool.isAnyConnectionEstablished()) {
  114. await new Promise((resolve) => setTimeout(resolve, 100));
  115. }
  116. pool.logger.info(
  117. `At least one WebSocket connection is established. WebSocketPool is ready.`,
  118. );
  119. return pool;
  120. }
  121. /**
  122. * Checks for error responses in JSON messages and throws appropriate errors
  123. */
  124. private handleErrorMessages(data: string): void {
  125. const message = JSON.parse(data) as Response;
  126. if (message.type === "subscriptionError") {
  127. throw new Error(
  128. `Error occurred for subscription ID ${String(
  129. message.subscriptionId,
  130. )}: ${message.error}`,
  131. );
  132. } else if (message.type === "error") {
  133. throw new Error(`Error: ${message.error}`);
  134. }
  135. }
  136. /**
  137. * Handles incoming websocket messages by deduplicating identical messages received across
  138. * multiple connections before forwarding to registered handlers
  139. */
  140. dedupeHandler = async (data: WebSocket.Data): Promise<void> => {
  141. let cacheKey = "";
  142. if (typeof data === "string") {
  143. cacheKey = data;
  144. } else {
  145. const buff = await IsomorphicBuffer.fromWebsocketData(data);
  146. cacheKey = buff.toString("hex");
  147. }
  148. if (this.cache.has(cacheKey)) {
  149. this.logger.debug("Dropping duplicate message");
  150. return;
  151. }
  152. this.cache.set(cacheKey, true);
  153. if (typeof data === "string") {
  154. this.handleErrorMessages(data);
  155. }
  156. await Promise.all(this.messageListeners.map((handler) => handler(data)));
  157. };
  158. sendRequest(request: Request) {
  159. for (const rws of this.rwsPool) {
  160. rws.send(JSON.stringify(request));
  161. }
  162. }
  163. addSubscription(request: Request) {
  164. if (request.type !== "subscribe") {
  165. throw new Error("Request must be a subscribe request");
  166. }
  167. this.subscriptions.set(request.subscriptionId, request);
  168. this.sendRequest(request);
  169. }
  170. removeSubscription(subscriptionId: number) {
  171. this.subscriptions.delete(subscriptionId);
  172. const request: Request = {
  173. type: "unsubscribe",
  174. subscriptionId,
  175. };
  176. this.sendRequest(request);
  177. }
  178. addMessageListener(handler: WebSocketOnMessageCallback): void {
  179. this.messageListeners.push(handler);
  180. }
  181. /**
  182. * Calls the handler if all websocket connections are currently down or in reconnecting state.
  183. * The connections may still try to reconnect in the background.
  184. */
  185. addAllConnectionsDownListener(handler: () => void): void {
  186. this.allConnectionsDownListeners.push(handler);
  187. }
  188. private areAllConnectionsDown(): boolean {
  189. return this.rwsPool.every((ws) => !ws.isConnected() || ws.isReconnecting());
  190. }
  191. private isAnyConnectionEstablished(): boolean {
  192. return this.rwsPool.some((ws) => ws.isConnected());
  193. }
  194. private checkConnectionStates(): void {
  195. const allDown = this.areAllConnectionsDown();
  196. // If all connections just went down
  197. if (allDown && !this.wasAllDown) {
  198. this.wasAllDown = true;
  199. this.logger.error("All WebSocket connections are down or reconnecting");
  200. // Notify all listeners
  201. for (const listener of this.allConnectionsDownListeners) {
  202. listener();
  203. }
  204. }
  205. // If at least one connection was restored
  206. if (!allDown && this.wasAllDown) {
  207. this.wasAllDown = false;
  208. }
  209. }
  210. shutdown(): void {
  211. for (const rws of this.rwsPool) {
  212. rws.closeWebSocket();
  213. }
  214. this.rwsPool = [];
  215. this.subscriptions.clear();
  216. this.messageListeners = [];
  217. this.allConnectionsDownListeners = [];
  218. clearInterval(this.checkConnectionStatesInterval);
  219. }
  220. }