streaming.ts 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. /* eslint-disable no-console */
  2. /* eslint-disable @typescript-eslint/no-empty-function */
  3. /* eslint-disable unicorn/prefer-top-level-await */
  4. import { renderFeeds, refreshFeedDisplay } from "./util.js";
  5. import type { JsonUpdate } from "../src/index.js";
  6. import { PythLazerClient } from "../src/index.js";
  7. // Ignore debug messages
  8. console.debug = () => {};
  9. // Store feed data for in-place updates
  10. const feedData = new Map<
  11. string,
  12. {
  13. priceFeedId: number;
  14. price: number;
  15. confidence: number | undefined;
  16. exponent: number;
  17. lastUpdate: Date;
  18. }
  19. >();
  20. const symbolsMap = new Map<number, string>();
  21. const client = await PythLazerClient.create({
  22. token: "your-token-here", // Replace with your actual access token
  23. logger: console, // Optionally log operations (to the console in this case.)
  24. webSocketPoolConfig: {
  25. numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4.
  26. onError: (error) => {
  27. console.error("WebSocket error:", error);
  28. },
  29. // Optional configuration for resilient WebSocket connections
  30. rwsConfig: {
  31. heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds
  32. maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds
  33. logAfterRetryCount: 10, // Optional log after how many retries
  34. },
  35. },
  36. });
  37. // Fetch current map of price feeds
  38. void client.get_symbols().then((symbols) => {
  39. for (const symbol of symbols) {
  40. symbolsMap.set(symbol.pyth_lazer_id, symbol.symbol);
  41. }
  42. });
  43. // Add a listener to read and display messages from the Lazer stream
  44. client.addMessageListener((message) => {
  45. switch (message.type) {
  46. case "json": {
  47. if (message.value.type == "streamUpdated") {
  48. refreshFeedDisplay(message.value as JsonUpdate, feedData, symbolsMap);
  49. }
  50. break;
  51. }
  52. case "binary": {
  53. // Print out the binary hex messages if you want:
  54. // if ("solana" in message.value) {
  55. // console.info("solana message:", message.value.solana?.toString("hex"));
  56. // }
  57. // if ("evm" in message.value) {
  58. // console.info("evm message:", message.value.evm?.toString("hex"));
  59. // }
  60. break;
  61. }
  62. }
  63. });
  64. // Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down)
  65. // The connections may still try to reconnect in the background. To shut down the client completely, call shutdown().
  66. client.addAllConnectionsDownListener(() => {
  67. console.error("All connections are down!");
  68. });
  69. renderFeeds(feedData, symbolsMap);
  70. // Create and remove one or more subscriptions on the fly
  71. client.subscribe({
  72. type: "subscribe",
  73. subscriptionId: 1,
  74. priceFeedIds: [1, 2],
  75. properties: ["price"],
  76. formats: ["solana"],
  77. deliveryFormat: "binary",
  78. channel: "fixed_rate@200ms",
  79. parsed: false,
  80. jsonBinaryEncoding: "base64",
  81. });
  82. client.subscribe({
  83. type: "subscribe",
  84. subscriptionId: 2,
  85. priceFeedIds: [1, 2, 3, 4, 5],
  86. properties: ["price", "exponent", "publisherCount", "confidence"],
  87. formats: ["evm"],
  88. deliveryFormat: "json",
  89. channel: "fixed_rate@50ms",
  90. parsed: true,
  91. jsonBinaryEncoding: "hex",
  92. });
  93. client.subscribe({
  94. type: "subscribe",
  95. subscriptionId: 3,
  96. priceFeedIds: [1],
  97. properties: ["price", "confidence"],
  98. formats: ["solana"],
  99. deliveryFormat: "json",
  100. channel: "real_time",
  101. parsed: true,
  102. jsonBinaryEncoding: "hex",
  103. });
  104. await new Promise((resolve) => setTimeout(resolve, 30_000));
  105. client.unsubscribe(1);
  106. client.unsubscribe(2);
  107. client.unsubscribe(3);
  108. // Clear screen and move cursor to top
  109. process.stdout.write("\u001B[2J\u001B[H");
  110. console.log("🛑 Shutting down Pyth Lazer demo after 30 seconds...");
  111. console.log("👋 Goodbye!");
  112. client.shutdown();