PriceServiceConnection.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. /* eslint-disable @typescript-eslint/no-unsafe-member-access */
  2. /* eslint-disable @typescript-eslint/no-explicit-any */
  3. /* eslint-disable unicorn/no-process-exit */
  4. /* eslint-disable n/no-process-exit */
  5. /* eslint-disable @typescript-eslint/no-non-null-assertion */
  6. /* eslint-disable @typescript-eslint/restrict-template-expressions */
  7. /* eslint-disable @typescript-eslint/no-unnecessary-condition */
  8. /* eslint-disable @typescript-eslint/no-base-to-string */
  9. /* eslint-disable @typescript-eslint/no-floating-promises */
  10. /* eslint-disable @typescript-eslint/no-empty-function */
  11. /* eslint-disable no-console */
  12. /* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
  13. import type { HexString } from "@pythnetwork/price-service-sdk";
  14. import { PriceFeed } from "@pythnetwork/price-service-sdk";
  15. import type { AxiosInstance } from "axios";
  16. import axios from "axios";
  17. import axiosRetry from "axios-retry";
  18. import * as WebSocket from "isomorphic-ws";
  19. import type { Logger } from "ts-log";
  20. import { ResilientWebSocket } from "./ResillientWebSocket";
  21. import { makeWebsocketUrl, removeLeading0xIfExists } from "./utils";
  22. export type DurationInMs = number;
  23. export type PriceFeedRequestConfig = {
  24. /* Optional verbose to request for verbose information from the service */
  25. verbose?: boolean | undefined;
  26. /* Optional binary to include the price feeds binary update data */
  27. binary?: boolean | undefined;
  28. /* Optional config for the websocket subscription to receive out of order updates */
  29. allowOutOfOrder?: boolean | undefined;
  30. };
  31. export type PriceServiceConnectionConfig = {
  32. /* Timeout of each request (for all of retries). Default: 5000ms */
  33. timeout?: DurationInMs | undefined;
  34. /**
  35. * Number of times a HTTP request will be retried before the API returns a failure. Default: 3.
  36. *
  37. * The connection uses exponential back-off for the delay between retries. However,
  38. * it will timeout regardless of the retries at the configured `timeout` time.
  39. */
  40. httpRetries?: number | undefined;
  41. /* Optional logger (e.g: console or any logging library) to log internal events */
  42. logger?: Logger | undefined;
  43. /* Deprecated: please use priceFeedRequestConfig.verbose instead */
  44. verbose?: boolean | undefined;
  45. /* Configuration for the price feed requests */
  46. priceFeedRequestConfig?: PriceFeedRequestConfig | undefined;
  47. };
  48. type ClientMessage = {
  49. type: "subscribe" | "unsubscribe";
  50. ids: HexString[];
  51. verbose?: boolean | undefined;
  52. binary?: boolean | undefined;
  53. allow_out_of_order?: boolean | undefined;
  54. };
  55. type ServerResponse = {
  56. type: "response";
  57. status: "success" | "error";
  58. error?: string;
  59. };
  60. type ServerPriceUpdate = {
  61. type: "price_update";
  62. price_feed: any;
  63. };
  64. type ServerMessage = ServerResponse | ServerPriceUpdate;
  65. export type PriceFeedUpdateCallback = (priceFeed: PriceFeed) => void;
  66. export class PriceServiceConnection {
  67. private httpClient: AxiosInstance;
  68. private priceFeedCallbacks: Map<HexString, Set<PriceFeedUpdateCallback>>;
  69. private wsClient: undefined | ResilientWebSocket;
  70. private wsEndpoint: undefined | string;
  71. private logger: Logger;
  72. private priceFeedRequestConfig: PriceFeedRequestConfig;
  73. /**
  74. * Custom handler for web socket errors (connection and message parsing).
  75. *
  76. * Default handler only logs the errors.
  77. */
  78. onWsError: (error: Error) => void;
  79. /**
  80. * Constructs a new Connection.
  81. *
  82. * @param endpoint - endpoint URL to the price service. Example: https://website/example/
  83. * @param config - Optional PriceServiceConnectionConfig for custom configurations.
  84. */
  85. constructor(endpoint: string, config?: PriceServiceConnectionConfig) {
  86. this.httpClient = axios.create({
  87. baseURL: endpoint,
  88. timeout: config?.timeout || 5000,
  89. });
  90. axiosRetry(this.httpClient, {
  91. retries: config?.httpRetries || 3,
  92. // eslint-disable-next-line import/no-named-as-default-member
  93. retryDelay: axiosRetry.exponentialDelay.bind(axiosRetry),
  94. });
  95. this.priceFeedRequestConfig = {
  96. binary: config?.priceFeedRequestConfig?.binary,
  97. verbose: config?.priceFeedRequestConfig?.verbose ?? config?.verbose,
  98. allowOutOfOrder: config?.priceFeedRequestConfig?.allowOutOfOrder,
  99. };
  100. this.priceFeedCallbacks = new Map();
  101. // Default logger is console for only warnings and errors.
  102. this.logger = config?.logger || {
  103. trace: () => {},
  104. debug: () => {},
  105. info: () => {},
  106. warn: console.warn,
  107. error: console.error,
  108. };
  109. this.onWsError = (error: Error) => {
  110. this.logger.error(error);
  111. // Exit the process if it is running in node.
  112. if (
  113. typeof process !== "undefined" &&
  114. typeof process.exit === "function"
  115. ) {
  116. this.logger.error("Halting the process due to the websocket error");
  117. process.exit(1);
  118. } else {
  119. this.logger.error(
  120. "Cannot halt process. Please handle the websocket error.",
  121. );
  122. }
  123. };
  124. this.wsEndpoint = makeWebsocketUrl(endpoint);
  125. }
  126. /**
  127. * Fetch Latest PriceFeeds of given price ids.
  128. * This will throw an axios error if there is a network problem or the price service returns a non-ok response (e.g: Invalid price ids)
  129. *
  130. * @param priceIds - Array of hex-encoded price ids.
  131. * @returns Array of PriceFeeds
  132. */
  133. async getLatestPriceFeeds(
  134. priceIds: HexString[],
  135. ): Promise<PriceFeed[] | undefined> {
  136. if (priceIds.length === 0) {
  137. return [];
  138. }
  139. const response = await this.httpClient.get("/api/latest_price_feeds", {
  140. params: {
  141. ids: priceIds,
  142. verbose: this.priceFeedRequestConfig.verbose,
  143. binary: this.priceFeedRequestConfig.binary,
  144. },
  145. });
  146. const priceFeedsJson = response.data as any[];
  147. return priceFeedsJson.map((priceFeedJson) =>
  148. PriceFeed.fromJson(priceFeedJson),
  149. );
  150. }
  151. /**
  152. * Fetch latest VAA of given price ids.
  153. * This will throw an axios error if there is a network problem or the price service returns a non-ok response (e.g: Invalid price ids)
  154. *
  155. * This function is coupled to wormhole implemntation.
  156. *
  157. * @param priceIds - Array of hex-encoded price ids.
  158. * @returns Array of base64 encoded VAAs.
  159. */
  160. async getLatestVaas(priceIds: HexString[]): Promise<string[]> {
  161. const response = await this.httpClient.get("/api/latest_vaas", {
  162. params: {
  163. ids: priceIds,
  164. },
  165. });
  166. return response.data as string[];
  167. }
  168. /**
  169. * Fetch the earliest VAA of the given price id that is published since the given publish time.
  170. * This will throw an error if the given publish time is in the future, or if the publish time
  171. * is old and the price service endpoint does not have a db backend for historical requests.
  172. * This will throw an axios error if there is a network problem or the price service returns a non-ok response (e.g: Invalid price id)
  173. *
  174. * This function is coupled to wormhole implemntation.
  175. *
  176. * @param priceId - Hex-encoded price id.
  177. * @param publishTime - Epoch timestamp in seconds.
  178. * @returns Tuple of VAA and publishTime.
  179. */
  180. async getVaa(
  181. priceId: HexString,
  182. publishTime: EpochTimeStamp,
  183. ): Promise<[string, EpochTimeStamp]> {
  184. const response = await this.httpClient.get("/api/get_vaa", {
  185. params: {
  186. id: priceId,
  187. publish_time: publishTime,
  188. },
  189. });
  190. return [response.data.vaa, response.data.publishTime];
  191. }
  192. /**
  193. * Fetch the PriceFeed of the given price id that is published since the given publish time.
  194. * This will throw an error if the given publish time is in the future, or if the publish time
  195. * is old and the price service endpoint does not have a db backend for historical requests.
  196. * This will throw an axios error if there is a network problem or the price service returns a non-ok response (e.g: Invalid price id)
  197. *
  198. * @param priceId - Hex-encoded price id.
  199. * @param publishTime - Epoch timestamp in seconds.
  200. * @returns PriceFeed
  201. */
  202. async getPriceFeed(
  203. priceId: HexString,
  204. publishTime: EpochTimeStamp,
  205. ): Promise<PriceFeed> {
  206. const response = await this.httpClient.get("/api/get_price_feed", {
  207. params: {
  208. id: priceId,
  209. publish_time: publishTime,
  210. verbose: this.priceFeedRequestConfig.verbose,
  211. binary: this.priceFeedRequestConfig.binary,
  212. },
  213. });
  214. return PriceFeed.fromJson(response.data);
  215. }
  216. /**
  217. * Fetch the list of available price feed ids.
  218. * This will throw an axios error if there is a network problem or the price service returns a non-ok response.
  219. *
  220. * @returns Array of hex-encoded price ids.
  221. */
  222. async getPriceFeedIds(): Promise<HexString[]> {
  223. const response = await this.httpClient.get("/api/price_feed_ids");
  224. return response.data as HexString[];
  225. }
  226. /**
  227. * Subscribe to updates for given price ids.
  228. *
  229. * It will start a websocket connection if it's not started yet.
  230. * Also, it won't throw any exception if given price ids are invalid or connection errors. Instead,
  231. * it calls `connection.onWsError`. If you want to handle the errors you should set the
  232. * `onWsError` function to your custom error handler.
  233. *
  234. * @param priceIds - Array of hex-encoded price ids.
  235. * @param cb - Callback function that is called with a PriceFeed upon updates to given price ids.
  236. */
  237. async subscribePriceFeedUpdates(
  238. priceIds: HexString[],
  239. cb: PriceFeedUpdateCallback,
  240. ) {
  241. if (this.wsClient === undefined) {
  242. await this.startWebSocket();
  243. }
  244. priceIds = priceIds.map((priceId) => removeLeading0xIfExists(priceId));
  245. const newPriceIds: HexString[] = [];
  246. for (const id of priceIds) {
  247. if (!this.priceFeedCallbacks.has(id)) {
  248. this.priceFeedCallbacks.set(id, new Set());
  249. newPriceIds.push(id);
  250. }
  251. this.priceFeedCallbacks.get(id)!.add(cb);
  252. }
  253. const message: ClientMessage = {
  254. ids: newPriceIds,
  255. type: "subscribe",
  256. verbose: this.priceFeedRequestConfig.verbose,
  257. binary: this.priceFeedRequestConfig.binary,
  258. allow_out_of_order: this.priceFeedRequestConfig.allowOutOfOrder,
  259. };
  260. await this.wsClient?.send(JSON.stringify(message));
  261. }
  262. /**
  263. * Unsubscribe from updates for given price ids.
  264. *
  265. * It will close the websocket connection if it's not subscribed to any price feed updates anymore.
  266. * Also, it won't throw any exception if given price ids are invalid or connection errors. Instead,
  267. * it calls `connection.onWsError`. If you want to handle the errors you should set the
  268. * `onWsError` function to your custom error handler.
  269. *
  270. * @param priceIds - Array of hex-encoded price ids.
  271. * @param cb - Optional callback, if set it will only unsubscribe this callback from updates for given price ids.
  272. */
  273. async unsubscribePriceFeedUpdates(
  274. priceIds: HexString[],
  275. cb?: PriceFeedUpdateCallback,
  276. ) {
  277. if (this.wsClient === undefined) {
  278. await this.startWebSocket();
  279. }
  280. priceIds = priceIds.map((priceId) => removeLeading0xIfExists(priceId));
  281. const removedPriceIds: HexString[] = [];
  282. for (const id of priceIds) {
  283. if (this.priceFeedCallbacks.has(id)) {
  284. let idRemoved = false;
  285. if (cb === undefined) {
  286. this.priceFeedCallbacks.delete(id);
  287. idRemoved = true;
  288. } else {
  289. this.priceFeedCallbacks.get(id)!.delete(cb);
  290. if (this.priceFeedCallbacks.get(id)!.size === 0) {
  291. this.priceFeedCallbacks.delete(id);
  292. idRemoved = true;
  293. }
  294. }
  295. if (idRemoved) {
  296. removedPriceIds.push(id);
  297. }
  298. }
  299. }
  300. const message: ClientMessage = {
  301. ids: removedPriceIds,
  302. type: "unsubscribe",
  303. };
  304. await this.wsClient?.send(JSON.stringify(message));
  305. if (this.priceFeedCallbacks.size === 0) {
  306. this.closeWebSocket();
  307. }
  308. }
  309. /**
  310. * Starts connection websocket.
  311. *
  312. * This function is called automatically upon subscribing to price feed updates.
  313. */
  314. async startWebSocket() {
  315. if (this.wsEndpoint === undefined) {
  316. throw new Error("Websocket endpoint is undefined.");
  317. }
  318. this.wsClient = new ResilientWebSocket(this.wsEndpoint, this.logger);
  319. this.wsClient.onError = this.onWsError;
  320. this.wsClient.onReconnect = () => {
  321. if (this.priceFeedCallbacks.size > 0) {
  322. const message: ClientMessage = {
  323. ids: [...this.priceFeedCallbacks.keys()],
  324. type: "subscribe",
  325. verbose: this.priceFeedRequestConfig.verbose,
  326. binary: this.priceFeedRequestConfig.binary,
  327. allow_out_of_order: this.priceFeedRequestConfig.allowOutOfOrder,
  328. };
  329. this.logger.info("Resubscribing to existing price feeds.");
  330. this.wsClient?.send(JSON.stringify(message));
  331. }
  332. };
  333. this.wsClient.onMessage = (data: WebSocket.Data) => {
  334. this.logger.info(`Received message ${data.toString()}`);
  335. let message: ServerMessage;
  336. try {
  337. message = JSON.parse(data.toString()) as ServerMessage;
  338. } catch (error: unknown) {
  339. this.logger.error(`Error parsing message ${data.toString()} as JSON.`);
  340. this.logger.error(error);
  341. this.onWsError(error as Error);
  342. return;
  343. }
  344. if (message.type === "response") {
  345. if (message.status === "error") {
  346. this.logger.error(
  347. `Error response from the websocket server ${message.error}.`,
  348. );
  349. this.onWsError(new Error(message.error));
  350. }
  351. } else if (message.type === "price_update") {
  352. let priceFeed;
  353. try {
  354. priceFeed = PriceFeed.fromJson(message.price_feed);
  355. } catch (error: unknown) {
  356. this.logger.error(
  357. `Error parsing price feeds from message ${data.toString()}.`,
  358. );
  359. this.logger.error(error);
  360. this.onWsError(error as Error);
  361. return;
  362. }
  363. if (this.priceFeedCallbacks.has(priceFeed.id)) {
  364. for (const cb of this.priceFeedCallbacks.get(priceFeed.id)!) {
  365. cb(priceFeed);
  366. }
  367. }
  368. } else {
  369. this.logger.warn(
  370. `Ignoring unsupported server response ${data.toString()}.`,
  371. );
  372. }
  373. };
  374. await this.wsClient.startWebSocket();
  375. }
  376. /**
  377. * Closes connection websocket.
  378. *
  379. * At termination, the websocket should be closed to finish the
  380. * process elegantly. It will automatically close when the connection
  381. * is subscribed to no price feeds.
  382. */
  383. closeWebSocket() {
  384. this.wsClient?.closeWebSocket();
  385. this.wsClient = undefined;
  386. this.priceFeedCallbacks.clear();
  387. }
  388. }