pyth_publisher.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #!/usr/bin/env python3
  2. from pyth_utils import *
  3. from http.server import HTTPServer, BaseHTTPRequestHandler
  4. from concurrent.futures import ThreadPoolExecutor, as_completed
  5. import json
  6. import os
  7. import random
  8. import sys
  9. import threading
  10. import time
  11. PYTH_TEST_SYMBOL_COUNT = int(os.environ.get("PYTH_TEST_SYMBOL_COUNT", "9"))
  12. class PythAccEndpoint(BaseHTTPRequestHandler):
  13. """
  14. A dumb endpoint to respond with a JSON containing Pyth account addresses
  15. """
  16. def do_GET(self):
  17. print(f"Got path {self.path}")
  18. sys.stdout.flush()
  19. data = json.dumps(TEST_SYMBOLS).encode("utf-8")
  20. print(f"Sending:\n{data}")
  21. self.send_response(200)
  22. self.send_header("Content-Type", "application/json")
  23. self.send_header("Content-Length", str(len(data)))
  24. self.end_headers()
  25. self.wfile.write(data)
  26. self.wfile.flush()
  27. TEST_SYMBOLS = []
  28. def publisher_random_update(price_pubkey):
  29. """
  30. Update the specified price with random values
  31. """
  32. value = random.randrange(1024)
  33. confidence = 5
  34. pyth_run_or_die("upd_price_val", args=[
  35. price_pubkey, str(value), str(confidence), "trading"
  36. ])
  37. print(f"Price {price_pubkey} value updated to {str(value)}!")
  38. def accounts_endpoint():
  39. """
  40. Run a barebones HTTP server to share the dynamic Pyth
  41. mapping/product/price account addresses
  42. """
  43. server_address = ('', 4242)
  44. httpd = HTTPServer(server_address, PythAccEndpoint)
  45. httpd.serve_forever()
  46. def add_symbol(num: int):
  47. symbol_name = f"Test symbol {num}"
  48. # Add a product
  49. prod_pubkey = pyth_run_or_die(
  50. "add_product", capture_output=True).stdout.strip()
  51. print(f"{symbol_name}: Added product {prod_pubkey}")
  52. # Add a price
  53. price_pubkey = pyth_run_or_die(
  54. "add_price",
  55. args=[prod_pubkey, "price"],
  56. confirm=False,
  57. capture_output=True
  58. ).stdout.strip()
  59. print(f"{symbol_name}: Added price {price_pubkey}")
  60. # Become a publisher for the new price
  61. pyth_run_or_die(
  62. "add_publisher", args=[publisher_pubkey, price_pubkey],
  63. confirm=False,
  64. debug=True,
  65. capture_output=True)
  66. print(f"{symbol_name}: Added publisher {publisher_pubkey}")
  67. # Update the prices as the newly added publisher
  68. publisher_random_update(price_pubkey)
  69. sym = {
  70. "name": symbol_name,
  71. "product": prod_pubkey,
  72. "price": price_pubkey
  73. }
  74. TEST_SYMBOLS.append(sym)
  75. sys.stdout.flush()
  76. return num
  77. # Fund the publisher
  78. sol_run_or_die("airdrop", [
  79. str(SOL_AIRDROP_AMT),
  80. "--keypair", PYTH_PUBLISHER_KEYPAIR,
  81. "--commitment", "finalized",
  82. ])
  83. # Create a mapping
  84. pyth_run_or_die("init_mapping")
  85. print(f"Creating {PYTH_TEST_SYMBOL_COUNT} test Pyth symbols")
  86. publisher_pubkey = sol_run_or_die("address", args=[
  87. "--keypair", PYTH_PUBLISHER_KEYPAIR
  88. ], capture_output=True).stdout.strip()
  89. with ThreadPoolExecutor(max_workers=10) as executor:
  90. add_symbol_futures = {executor.submit(add_symbol, sym_id) for sym_id in range(PYTH_TEST_SYMBOL_COUNT)}
  91. for future in as_completed(add_symbol_futures):
  92. print(f"Completed {future.result()}")
  93. print(
  94. f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL)} seconds")
  95. # Spin off the readiness probe endpoint into a separate thread
  96. readiness_thread = threading.Thread(target=readiness, daemon=True)
  97. # Start an HTTP endpoint for looking up test product/price addresses
  98. http_service = threading.Thread(target=accounts_endpoint, daemon=True)
  99. readiness_thread.start()
  100. http_service.start()
  101. while True:
  102. for sym in TEST_SYMBOLS:
  103. publisher_random_update(sym["price"])
  104. time.sleep(PYTH_PUBLISHER_INTERVAL)
  105. sys.stdout.flush()
  106. readiness_thread.join()
  107. http_service.join()