|
@@ -11,7 +11,11 @@ import {
|
|
|
DEFAULT_STREAM_SERVICE_0_URL,
|
|
DEFAULT_STREAM_SERVICE_0_URL,
|
|
|
DEFAULT_STREAM_SERVICE_1_URL,
|
|
DEFAULT_STREAM_SERVICE_1_URL,
|
|
|
} from "../constants.js";
|
|
} from "../constants.js";
|
|
|
-import { addAuthTokenToWebSocketUrl, envIsBrowserOrWorker, IsomorphicBuffer } from "../util/index.js";
|
|
|
|
|
|
|
+import {
|
|
|
|
|
+ addAuthTokenToWebSocketUrl,
|
|
|
|
|
+ bufferFromWebsocketData,
|
|
|
|
|
+ envIsBrowserOrWorker,
|
|
|
|
|
+} from "../util/index.js";
|
|
|
|
|
|
|
|
const DEFAULT_NUM_CONNECTIONS = 4;
|
|
const DEFAULT_NUM_CONNECTIONS = 4;
|
|
|
|
|
|
|
@@ -72,7 +76,9 @@ export class WebSocketPool {
|
|
|
for (let i = 0; i < numConnections; i++) {
|
|
for (let i = 0; i < numConnections; i++) {
|
|
|
const baseUrl = urls[i % urls.length];
|
|
const baseUrl = urls[i % urls.length];
|
|
|
const isBrowser = envIsBrowserOrWorker();
|
|
const isBrowser = envIsBrowserOrWorker();
|
|
|
- const url = isBrowser ? addAuthTokenToWebSocketUrl(baseUrl, token) : baseUrl;
|
|
|
|
|
|
|
+ const url = isBrowser
|
|
|
|
|
+ ? addAuthTokenToWebSocketUrl(baseUrl, token)
|
|
|
|
|
+ : baseUrl;
|
|
|
if (!url) {
|
|
if (!url) {
|
|
|
throw new Error(`URLs must not be null or empty`);
|
|
throw new Error(`URLs must not be null or empty`);
|
|
|
}
|
|
}
|
|
@@ -151,18 +157,18 @@ export class WebSocketPool {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ private async constructCacheKeyFromWebsocketData(data: WebSocket.Data) {
|
|
|
|
|
+ if (typeof data === "string") return data;
|
|
|
|
|
+ const buff = await bufferFromWebsocketData(data);
|
|
|
|
|
+ return buff.toString("hex");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* Handles incoming websocket messages by deduplicating identical messages received across
|
|
* Handles incoming websocket messages by deduplicating identical messages received across
|
|
|
* multiple connections before forwarding to registered handlers
|
|
* multiple connections before forwarding to registered handlers
|
|
|
*/
|
|
*/
|
|
|
dedupeHandler = async (data: WebSocket.Data): Promise<void> => {
|
|
dedupeHandler = async (data: WebSocket.Data): Promise<void> => {
|
|
|
- let cacheKey = "";
|
|
|
|
|
- if (typeof data === "string") {
|
|
|
|
|
- cacheKey = data;
|
|
|
|
|
- } else {
|
|
|
|
|
- const buff = await IsomorphicBuffer.fromWebsocketData(data);
|
|
|
|
|
- cacheKey = buff.toString("hex");
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ const cacheKey = await this.constructCacheKeyFromWebsocketData(data);
|
|
|
|
|
|
|
|
if (this.cache.has(cacheKey)) {
|
|
if (this.cache.has(cacheKey)) {
|
|
|
this.logger.debug("Dropping duplicate message");
|
|
this.logger.debug("Dropping duplicate message");
|