pyth_publisher.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #!/usr/bin/env python3
  2. from pyth_utils import *
  3. from http.server import HTTPServer, BaseHTTPRequestHandler
  4. from concurrent.futures import ThreadPoolExecutor, as_completed
  5. import json
  6. import os
  7. import random
  8. import sys
  9. import threading
  10. import time
  11. PYTH_TEST_SYMBOL_COUNT = int(os.environ.get("PYTH_TEST_SYMBOL_COUNT", "9"))
  12. class PythAccEndpoint(BaseHTTPRequestHandler):
  13. """
  14. A dumb endpoint to respond with a JSON containing Pyth symbol and mapping addresses
  15. """
  16. def do_GET(self):
  17. print(f"Got path {self.path}")
  18. sys.stdout.flush()
  19. data = json.dumps(HTTP_ENDPOINT_DATA).encode("utf-8")
  20. print(f"Sending:\n{data}")
  21. self.send_response(200)
  22. self.send_header("Content-Type", "application/json")
  23. self.send_header("Content-Length", str(len(data)))
  24. self.end_headers()
  25. self.wfile.write(data)
  26. self.wfile.flush()
  27. # Test publisher state that gets served via the HTTP endpoint. Note: the schema of this dict is extended here and there
  28. HTTP_ENDPOINT_DATA = {"symbols": [], "mapping_address": None}
  29. def publisher_random_update(price_pubkey):
  30. """
  31. Update the specified price with random values
  32. """
  33. value = random.randrange(1000, 2000)
  34. confidence = random.randrange(1, 10)
  35. pyth_run_or_die("upd_price_val", args=[
  36. price_pubkey, str(value), str(confidence), "trading"
  37. ])
  38. print(f"Price {price_pubkey} value updated to {str(value)}!")
  39. def accounts_endpoint():
  40. """
  41. Run a barebones HTTP server to share the dynamic Pyth
  42. mapping/product/price account addresses
  43. """
  44. server_address = ('', 4242)
  45. httpd = HTTPServer(server_address, PythAccEndpoint)
  46. httpd.serve_forever()
  47. def add_symbol(num: int):
  48. symbol_name = f"Test symbol {num}"
  49. # Add a product
  50. prod_pubkey = pyth_admin_run_or_die(
  51. "add_product", capture_output=True).stdout.strip()
  52. print(f"{symbol_name}: Added product {prod_pubkey}")
  53. # Add a price
  54. price_pubkey = pyth_admin_run_or_die(
  55. "add_price",
  56. args=[prod_pubkey, "price"],
  57. capture_output=True
  58. ).stdout.strip()
  59. print(f"{symbol_name}: Added price {price_pubkey}")
  60. # Become a publisher for the new price
  61. pyth_admin_run_or_die(
  62. "add_publisher", args=[publisher_pubkey, price_pubkey],
  63. debug=True,
  64. capture_output=True)
  65. print(f"{symbol_name}: Added publisher {publisher_pubkey}")
  66. # Update the prices as the newly added publisher
  67. publisher_random_update(price_pubkey)
  68. sym = {
  69. "name": symbol_name,
  70. "product": prod_pubkey,
  71. "price": price_pubkey
  72. }
  73. HTTP_ENDPOINT_DATA["symbols"].append(sym)
  74. sys.stdout.flush()
  75. return num
  76. # Fund the publisher
  77. sol_run_or_die("airdrop", [
  78. str(SOL_AIRDROP_AMT),
  79. "--keypair", PYTH_PUBLISHER_KEYPAIR,
  80. "--commitment", "finalized",
  81. ])
  82. # Create a mapping
  83. pyth_admin_run_or_die("init_mapping", capture_output=True)
  84. mapping_addr = sol_run_or_die("address", args=[
  85. "--keypair", PYTH_MAPPING_KEYPAIR
  86. ], capture_output=True).stdout.strip()
  87. HTTP_ENDPOINT_DATA["mapping_addr"] = mapping_addr
  88. print(f"New mapping at {mapping_addr}")
  89. print(f"Creating {PYTH_TEST_SYMBOL_COUNT} test Pyth symbols")
  90. publisher_pubkey = sol_run_or_die("address", args=[
  91. "--keypair", PYTH_PUBLISHER_KEYPAIR
  92. ], capture_output=True).stdout.strip()
  93. with ThreadPoolExecutor(max_workers=10) as executor:
  94. add_symbol_futures = {executor.submit(add_symbol, sym_id) for sym_id in range(PYTH_TEST_SYMBOL_COUNT)}
  95. for future in as_completed(add_symbol_futures):
  96. print(f"Completed {future.result()}")
  97. print(
  98. f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL)} seconds")
  99. # Spin off the readiness probe endpoint into a separate thread
  100. readiness_thread = threading.Thread(target=readiness, daemon=True)
  101. # Start an HTTP endpoint for looking up test product/price addresses
  102. http_service = threading.Thread(target=accounts_endpoint, daemon=True)
  103. readiness_thread.start()
  104. http_service.start()
  105. while True:
  106. for sym in HTTP_ENDPOINT_DATA["symbols"]:
  107. publisher_random_update(sym["price"])
  108. time.sleep(PYTH_PUBLISHER_INTERVAL)
  109. sys.stdout.flush()
  110. readiness_thread.join()
  111. http_service.join()