Просмотр исходного кода

chore(pyth-perf): crypto sockets connect

benduran 3 дней назад
Родитель
Сommit
4fd46b39ea

+ 1 - 0
apps/pyth-pro-perf-dash/package.json

@@ -59,6 +59,7 @@
     "react-aria": "catalog:",
     "react-aria-components": "catalog:",
     "react-dom": "catalog:",
+    "sockette": "^2.0.6",
     "swr": "catalog:",
     "zod": "catalog:",
     "zustand": "^5.0.8"

+ 8 - 0
apps/pyth-pro-perf-dash/src/components/SelectSourceStats/selected-source-stats.tsx

@@ -1,3 +1,11 @@
+"use client";
+
+import { useSelectedSourceStats, useSelectedSourceStream } from "../../hooks";
+
 export function SelectSourceStats() {
+  /** hooks */
+  const { crypto, equities, forex, selectedSource } = useSelectedSourceStats();
+  useSelectedSourceStream();
+
   return <div>stats</div>;
 }

+ 2 - 0
apps/pyth-pro-perf-dash/src/hooks/index.ts

@@ -0,0 +1,2 @@
+export * from "./use-selected-source-stats";
+export * from "./use-selected-source-stream";

+ 42 - 0
apps/pyth-pro-perf-dash/src/hooks/use-fetch-usdt-stuff.ts

@@ -0,0 +1,42 @@
+/* eslint-disable @typescript-eslint/no-explicit-any */
+/* eslint-disable @typescript-eslint/no-unsafe-member-access */
+import { useEffect, useMemo, useRef, useState } from "react";
+
+/**
+ * fetches the latest USDT (token) rate to the USD actual dollar rate
+ * from Pyth
+ */
+export function useFetchUsdToUsdRate(
+  url = "https://hermes.pyth.network/v2/updates/price/latest?ids%5B%5D=2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b",
+) {
+  /** state */
+  const [usdtToUsdRate, setUsdtToUsdRate] = useState(1);
+  const [error, setError] = useState<Error | undefined>(undefined);
+
+  /** refs */
+  const abortSignal = useRef<AbortController | undefined>(undefined);
+
+  /** effects */
+  useEffect(() => {
+    if (abortSignal.current) {
+      abortSignal.current.abort();
+    }
+
+    const abt = new AbortController();
+    abortSignal.current = abt;
+
+    fetch(url, { mode: "cors", signal: abt.signal })
+      .then((r) => r.json())
+      .then((data: any) => {
+        const price = Number(data.parsed?.[0].price.price) / Math.pow(10, 8);
+        setUsdtToUsdRate(price);
+      })
+      .catch(setError);
+
+    return () => {
+      abt.abort();
+    };
+  }, [url]);
+
+  return useMemo(() => ({ error, usdtToUsdRate }), [error, usdtToUsdRate]);
+}

+ 22 - 0
apps/pyth-pro-perf-dash/src/hooks/use-selected-source-stats.ts

@@ -0,0 +1,22 @@
+import { useMemo } from "react";
+import { useShallow } from "zustand/shallow";
+
+import { useUIStateStore } from "../state/ui-state";
+
+/**
+ * selects a portion of the UI state
+ * required to display stats, metrics
+ * and plot data
+ */
+export function useSelectedSourceStats() {
+  const out = useUIStateStore(
+    useShallow((state) => ({
+      crypto: state.crypto,
+      equities: state.equities,
+      forex: state.forex,
+      selectedSource: state.selectedSource,
+    })),
+  );
+
+  return useMemo(() => out, [out]);
+}

+ 354 - 0
apps/pyth-pro-perf-dash/src/hooks/use-selected-source-stream.ts

@@ -0,0 +1,354 @@
+/* eslint-disable no-console */
+/* eslint-disable @typescript-eslint/no-unsafe-member-access */
+/* eslint-disable @typescript-eslint/no-unsafe-assignment */
+import { useCallback, useEffect, useMemo, useRef } from "react";
+
+import { useFetchUsdToUsdRate } from "./use-fetch-usdt-stuff";
+import { useSelectedSourceStats } from "./use-selected-source-stats";
+import type { UseWebSocketOpts } from "./use-websocket";
+import { useWebsockets } from "./use-websocket";
+
+/**
+ * given the user's currently selected source,
+ * opens websocket connections to get the data
+ * and starts piping this data into the UIState.
+ * Any previously-opened connections are closed
+ */
+export function useSelectedSourceStream() {
+  /** store */
+  const { selectedSource } = useSelectedSourceStats();
+
+  /** refs */
+  const coinbaseOrderbookRef = useRef<{
+    bids: Map<string, string>; // price -> quantity
+    asks: Map<string, string>; // price -> quantity
+  }>({
+    bids: new Map(),
+    asks: new Map(),
+  });
+
+  /** queries */
+  const { usdtToUsdRate } = useFetchUsdToUsdRate();
+
+  /** callbacks */
+  const calculateCoinbaseMidPrice = useCallback(() => {
+    const bids = coinbaseOrderbookRef.current.bids;
+    const asks = coinbaseOrderbookRef.current.asks;
+
+    if (bids.size === 0 || asks.size === 0) {
+      return;
+    }
+
+    // Get best bid (highest price)
+    const bestBidPrice = Math.max(...[...bids.keys()].map(Number));
+    // Get best ask (lowest price)
+    const bestAskPrice = Math.min(...[...asks.keys()].map(Number));
+
+    if (Number.isFinite(bestBidPrice) && Number.isFinite(bestAskPrice)) {
+      const midPrice = (bestBidPrice + bestAskPrice) / 2;
+      return midPrice;
+    }
+
+    return;
+  }, []);
+
+  /** memos */
+  const websocketConnectionOpts = useMemo<UseWebSocketOpts[]>(() => {
+    if (selectedSource.crypto) {
+      return [
+        {
+          id: "coinbase",
+          onConnected: (_, s) => {
+            const subscribeMessage = {
+              type: "subscribe",
+              product_ids: [`${selectedSource.id}-USD`],
+              channel: "level2",
+            };
+
+            s.json(subscribeMessage);
+          },
+          onMessage: (msg) => {
+            try {
+              const data = JSON.parse(String(msg));
+
+              // Handle Advanced Trade level2 orderbook messages
+              if (data.channel === "l2_data" && data.events) {
+                for (const event of data.events) {
+                  // console.log('Coinbase: Processing event type:', event.type, 'for product:', event.product_id);
+                  if (event.product_id === `${selectedSource.id}-USD`) {
+                    // Handle snapshot (initial orderbook state)
+                    if (event.type === "snapshot") {
+                      const snapshotEvent = event as
+                        | CoinbaseLevel2Snapshot["events"][0]
+                        | null
+                        | undefined;
+
+                      // Clear existing orderbook
+                      coinbaseOrderbookRef.current.bids.clear();
+                      coinbaseOrderbookRef.current.asks.clear();
+
+                      // Load bids
+                      if (snapshotEvent?.bids?.length) {
+                        for (const bid of snapshotEvent.bids) {
+                          if (Number.parseFloat(bid.new_quantity) > 0) {
+                            coinbaseOrderbookRef.current.bids.set(
+                              bid.price_level,
+                              bid.new_quantity,
+                            );
+                          }
+                        }
+                      }
+
+                      // Load asks
+                      if (snapshotEvent?.asks?.length) {
+                        for (const ask of snapshotEvent.asks) {
+                          if (Number.parseFloat(ask.new_quantity) > 0) {
+                            coinbaseOrderbookRef.current.asks.set(
+                              ask.price_level,
+                              ask.new_quantity,
+                            );
+                          }
+                        }
+                      }
+
+                      //console.log('Coinbase: Loaded orderbook snapshot');
+
+                      // Calculate and emit price after snapshot
+                      const midPrice = calculateCoinbaseMidPrice();
+                      if (typeof midPrice === "number") {
+                        // TODO: emit into UI state
+                        // onPriceUpdate({
+                        //   price: midPrice,
+                        //   timestamp: Date.now(),
+                        //   source: "coinbase",
+                        // });
+                      }
+                    }
+                    // Handle updates (incremental changes)
+                    else if (event.type === "update") {
+                      const updateEvent = event as
+                        | CoinbaseAdvancedTradeLevel2Message["events"][0]
+                        | null
+                        | undefined;
+
+                      if (updateEvent?.updates.length) {
+                        for (const update of updateEvent.updates) {
+                          const quantity = Number.parseFloat(
+                            update.new_quantity,
+                          );
+
+                          if (update.side === "bid") {
+                            if (quantity === 0) {
+                              // Remove the price level
+                              coinbaseOrderbookRef.current.bids.delete(
+                                update.price_level,
+                              );
+                            } else {
+                              // Update the price level
+                              coinbaseOrderbookRef.current.bids.set(
+                                update.price_level,
+                                update.new_quantity,
+                              );
+                            }
+                          } else if (update.side === "offer") {
+                            if (quantity === 0) {
+                              // Remove the price level
+                              coinbaseOrderbookRef.current.asks.delete(
+                                update.price_level,
+                              );
+                            } else {
+                              // Update the price level
+                              coinbaseOrderbookRef.current.asks.set(
+                                update.price_level,
+                                update.new_quantity,
+                              );
+                            }
+                          }
+                        }
+
+                        // Calculate and emit price after updates
+                        const midPrice = calculateCoinbaseMidPrice();
+                        if (typeof midPrice === "number") {
+                          // TODO: emit into ui state
+                          // onPriceUpdate({
+                          //   price: midPrice,
+                          //   timestamp: Date.now(),
+                          //   source: "coinbase",
+                          // });
+                        }
+                      }
+                    }
+                  }
+                }
+              }
+              // Handle subscription confirmations
+              else if (data.type === "subscriptions") {
+                console.log("Coinbase: Subscription confirmed:", data);
+              }
+              // Handle errors
+              else if (data.type === "error") {
+                console.error("Coinbase: Subscription error:", data);
+              }
+            } catch (error) {
+              console.error("Error parsing Coinbase message:", error);
+            }
+          },
+          url: "wss://advanced-trade-ws.coinbase.com",
+        },
+        {
+          id: "binance",
+          onMessage: (msg) => {
+            try {
+              const data = JSON.parse(String(msg)) as BinanceOrderBookData;
+              if (data.s === `${selectedSource.id}USDT`) {
+                // Calculate mid price from best bid and best ask
+                const bestBid = Number.parseFloat(data.b);
+                const bestAsk = Number.parseFloat(data.a);
+                const midPriceUSDT = (bestBid + bestAsk) / 2;
+
+                // Convert USDT to USD using the fetched rate
+                const midPriceUSD = midPriceUSDT * usdtToUsdRate;
+
+                // TODO: need to emit this into ui state
+                // onPriceUpdate({
+                //   price: midPriceUSD,
+                //   timestamp: Date.now(),
+                //   source: "binance",
+                // });
+              }
+            } catch (error) {
+              console.error("Error parsing Binance message:", error);
+            }
+          },
+          url: `wss://stream.binance.com:9443/ws/${selectedSource.id.toLowerCase()}usdt@bookTicker`,
+        },
+        {
+          id: "bybit",
+          onConnected: (_, s) => {
+            const subscribeMessage = {
+              op: "subscribe",
+              args: [`orderbook.1.${selectedSource.id}USDT`],
+            };
+            s.json(subscribeMessage);
+          },
+          onMessage: (msg) => {
+            try {
+              const data = JSON.parse(String(msg));
+              // console.log('Bybit: Received message:', data);
+
+              // Handle orderbook updates
+              if (
+                (data.topic === `orderbook.1.${selectedSource.id}USDT` &&
+                  data.type === "snapshot") ||
+                data.type === "delta"
+              ) {
+                const orderBookData = data as BybitOrderBookData;
+                const bookData = orderBookData.data;
+
+                if (bookData.b.length > 0 && bookData.a.length > 0) {
+                  // Get best bid and ask (first elements in the arrays)
+                  const bestBid = Number.parseFloat(bookData.b[0]?.[0] ?? "");
+                  const bestAsk = Number.parseFloat(bookData.a[0]?.[0] ?? "");
+                  const midPriceUSDT = (bestBid + bestAsk) / 2;
+
+                  const midPriceUSD = midPriceUSDT * usdtToUsdRate;
+
+                  // TODO: Need to emit this event into the UI state
+                  // onPriceUpdate({
+                  //   price: midPriceUSD,
+                  //   timestamp: Date.now(),
+                  //   source: "bybit",
+                  // });
+                }
+              }
+              // Handle subscription confirmations
+              else if (data.success === true) {
+                console.info("Bybit: Subscription confirmed:", data);
+              }
+              // Handle errors
+              else if (data.success === false) {
+                console.error("Bybit: Subscription error:", data);
+              }
+            } catch (error) {
+              console.error("Error parsing Bybit message:", error);
+            }
+          },
+          url: "wss://stream.bybit.com/v5/public/spot",
+        },
+      ];
+    }
+    return [];
+  }, [
+    calculateCoinbaseMidPrice,
+    selectedSource.crypto,
+    selectedSource.id,
+    usdtToUsdRate,
+  ]);
+
+  /** hooks */
+  const sockets = useWebsockets(websocketConnectionOpts);
+
+  /** effects */
+  useEffect(() => {
+    return () => {
+      sockets.disconnectAll();
+    };
+  }, [sockets]);
+}
+
+type BybitOrderBookData = {
+  topic: string;
+  type: string;
+  ts: number;
+  data: {
+    s: string; // symbol
+    b: [string, string][]; // bids [price, size]
+    a: [string, string][]; // asks [price, size]
+    u: number; // update id
+    seq: number; // sequence number
+  };
+};
+
+type BinanceOrderBookData = {
+  s: string; // symbol
+  b: string; // best bid price
+  B: string; // best bid quantity
+  a: string; // best ask price
+  A: string; // best ask quantity
+};
+
+type CoinbaseAdvancedTradeLevel2Message = {
+  channel: string;
+  client_id: string;
+  timestamp: string;
+  sequence_num: number;
+  events: {
+    type: string;
+    product_id: string;
+    updates: {
+      side: string; // "bid" or "offer"
+      event_time: string;
+      price_level: string;
+      new_quantity: string;
+    }[];
+  }[];
+};
+
+type CoinbaseLevel2Snapshot = {
+  channel: string;
+  client_id: string;
+  timestamp: string;
+  sequence_num: number;
+  events: {
+    type: string;
+    product_id: string;
+    bids?: {
+      price_level: string;
+      new_quantity: string;
+    }[];
+    asks?: {
+      price_level: string;
+      new_quantity: string;
+    }[];
+  }[];
+};

+ 86 - 0
apps/pyth-pro-perf-dash/src/hooks/use-websocket.ts

@@ -0,0 +1,86 @@
+/* eslint-disable no-console */
+import { useEffect, useMemo, useRef, useState } from "react";
+import Sockette from "sockette";
+
+export type UseWebSocketOpts<T = unknown> = {
+  /**
+   * unique ID for this websocket connection for logging purposes
+   */
+  id: string;
+
+  /**
+   * fired when the connection first opens
+   */
+  onConnected?: (url: string, socket: Sockette) => void;
+
+  /**
+   * fired whenever a message is received
+   * over the socket. it is up to you
+   * to determine how to parse it
+   */
+  onMessage?: (msg: T, url: string, socket: Sockette) => void;
+
+  /**
+   * url to the WSS endpoint used for the connection
+   */
+  url: string;
+};
+
+/**
+ * abstracts away the whole websocket connection dance,
+ * and allows you to connect to one or more websockets
+ * through a single, convenient hook
+ */
+export function useWebsockets(opts: UseWebSocketOpts[]) {
+  /** state */
+  const [connectedMap, setConnectedMap] = useState<Record<string, boolean>>({});
+
+  /** refs */
+  const socketMap = useRef<Record<string, Sockette | undefined>>({});
+
+  /** effects */
+  useEffect(() => {
+    for (const opt of opts) {
+      const handleCloseOrError = () => {
+        console.warn(`${opt.id}: connection was closed.`);
+        setConnectedMap((prev) => ({ ...prev, [opt.url]: false }));
+      };
+
+      const s = new Sockette(opt.url, {
+        onclose: handleCloseOrError,
+        onerror: handleCloseOrError,
+        onmessage: (e) => {
+          opt.onMessage?.(e.data, opt.url, s);
+        },
+        onopen: () => {
+          console.info(`${opt.id}: connected!`);
+          opt.onConnected?.(opt.url, s);
+          setConnectedMap((prev) => ({ ...prev, [opt.url]: true }));
+        },
+      });
+
+      socketMap.current[opt.url] = s;
+    }
+  }, [opts]);
+
+  /** memos */
+  return useMemo(() => {
+    const disconnect = (url: string) => {
+      const s = socketMap.current[url];
+      if (!s) return;
+
+      s.close();
+      socketMap.current[url] = undefined;
+    };
+
+    return {
+      disconnect,
+      disconnectAll: () => {
+        for (const url of Object.keys(socketMap.current)) {
+          disconnect(url);
+        }
+      },
+      connectedMap,
+    };
+  }, [connectedMap]);
+}

+ 8 - 0
pnpm-lock.yaml

@@ -1226,6 +1226,9 @@ importers:
       react-dom:
         specifier: 'catalog:'
         version: 19.1.0(react@19.1.0)
+      sockette:
+        specifier: ^2.0.6
+        version: 2.0.6
       swr:
         specifier: 'catalog:'
         version: 2.3.3(react@19.1.0)
@@ -19879,6 +19882,9 @@ packages:
     resolution: {integrity: sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==}
     engines: {node: '>=10.0.0'}
 
+  sockette@2.0.6:
+    resolution: {integrity: sha512-W6iG8RGV6Zife3Cj+FhuyHV447E6fqFM2hKmnaQrTvg3OydINV3Msj3WPFbX76blUlUxvQSMMMdrJxce8NqI5Q==}
+
   socks-proxy-agent@8.0.5:
     resolution: {integrity: sha512-HehCEsotFqbPW9sJ8WVYB6UbmIMv7kUUORIF2Nncq4VQvBfNBLibW9YZR5dlYCSUhwcD628pRllm7n+E+YTzJw==}
     engines: {node: '>= 14'}
@@ -48876,6 +48882,8 @@ snapshots:
     transitivePeerDependencies:
       - supports-color
 
+  sockette@2.0.6: {}
+
   socks-proxy-agent@8.0.5:
     dependencies:
       agent-base: 7.1.3