pyth_publisher.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #!/usr/bin/env python3
  2. from pyth_utils import *
  3. from http.server import HTTPServer, BaseHTTPRequestHandler
  4. from concurrent.futures import ThreadPoolExecutor, as_completed
  5. import json
  6. import os
  7. import random
  8. import sys
  9. import threading
  10. import time
  11. # The mock publisher needs to fund the publisher identity account,
  12. # unable to use a separate payer
  13. SOL_AIRDROP_AMT = int(os.environ.get("SOL_AIRDROP_AMT", 0))
  14. class PythAccEndpoint(BaseHTTPRequestHandler):
  15. """
  16. A dumb endpoint to respond with a JSON containing Pyth symbol and mapping addresses
  17. """
  18. def do_GET(self):
  19. print(f"Got path {self.path}")
  20. sys.stdout.flush()
  21. data = json.dumps(HTTP_ENDPOINT_DATA).encode("utf-8")
  22. print(f"Sending:\n{data}")
  23. self.send_response(200)
  24. self.send_header("Content-Type", "application/json")
  25. self.send_header("Content-Length", str(len(data)))
  26. self.end_headers()
  27. self.wfile.write(data)
  28. self.wfile.flush()
  29. # Test publisher state that gets served via the HTTP endpoint. Note: the schema of this dict is extended here and there
  30. # all_symbols_added is set to True once all dynamically-created symbols are added to the on-chain program. This
  31. # flag allows the integration test in check_attestations.py to determine that every on-chain symbol is being attested.
  32. HTTP_ENDPOINT_DATA = {"symbols": [], "mapping_address": None, "all_symbols_added": False}
  33. def publisher_random_update(price_pubkey):
  34. """
  35. Update the specified price with random values
  36. """
  37. value = random.randrange(1000, 2000)
  38. confidence = random.randrange(1, 10)
  39. pyth_run_or_die("upd_price_val", args=[
  40. price_pubkey, str(value), str(confidence), "trading"
  41. ])
  42. print(f"Price {price_pubkey} value updated to {str(value)}!")
  43. def accounts_endpoint():
  44. """
  45. Run a barebones HTTP server to share the dynamic Pyth
  46. mapping/product/price account addresses
  47. """
  48. server_address = ('', 4242)
  49. httpd = HTTPServer(server_address, PythAccEndpoint)
  50. httpd.serve_forever()
  51. def add_symbol(num: int):
  52. """
  53. NOTE: Updates HTTP_ENDPOINT_DATA
  54. """
  55. symbol_name = f"Test symbol {num}"
  56. # Add a product
  57. prod_pubkey = pyth_admin_run_or_die(
  58. "add_product", capture_output=True).stdout.strip()
  59. print(f"{symbol_name}: Added product {prod_pubkey}")
  60. # Add a price
  61. price_pubkey = pyth_admin_run_or_die(
  62. "add_price",
  63. args=[prod_pubkey, "price"],
  64. capture_output=True
  65. ).stdout.strip()
  66. print(f"{symbol_name}: Added price {price_pubkey}")
  67. # Become a publisher for the new price
  68. pyth_admin_run_or_die(
  69. "add_publisher", args=[publisher_pubkey, price_pubkey],
  70. debug=True,
  71. capture_output=True)
  72. print(f"{symbol_name}: Added publisher {publisher_pubkey}")
  73. # Update the prices as the newly added publisher
  74. publisher_random_update(price_pubkey)
  75. sym = {
  76. "name": symbol_name,
  77. "product": prod_pubkey,
  78. "price": price_pubkey
  79. }
  80. HTTP_ENDPOINT_DATA["symbols"].append(sym)
  81. sys.stdout.flush()
  82. print(f"New symbol: {num}")
  83. return num
  84. # Fund the publisher
  85. sol_run_or_die("airdrop", [
  86. str(SOL_AIRDROP_AMT),
  87. "--keypair", PYTH_PUBLISHER_KEYPAIR,
  88. "--commitment", "finalized",
  89. ])
  90. # Create a mapping
  91. pyth_admin_run_or_die("init_mapping", capture_output=True)
  92. mapping_addr = sol_run_or_die("address", args=[
  93. "--keypair", PYTH_MAPPING_KEYPAIR
  94. ], capture_output=True).stdout.strip()
  95. HTTP_ENDPOINT_DATA["mapping_addr"] = mapping_addr
  96. print(f"New mapping at {mapping_addr}")
  97. print(f"Creating {PYTH_TEST_SYMBOL_COUNT} test Pyth symbols")
  98. publisher_pubkey = sol_run_or_die("address", args=[
  99. "--keypair", PYTH_PUBLISHER_KEYPAIR
  100. ], capture_output=True).stdout.strip()
  101. with ThreadPoolExecutor(max_workers=PYTH_TEST_SYMBOL_COUNT) as executor:
  102. add_symbol_futures = {executor.submit(add_symbol, sym_id) for sym_id in range(PYTH_TEST_SYMBOL_COUNT)}
  103. for future in as_completed(add_symbol_futures):
  104. print(f"Completed {future.result()}")
  105. print(
  106. f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL_SECS)} seconds")
  107. # Spin off the readiness probe endpoint into a separate thread
  108. readiness_thread = threading.Thread(target=readiness, daemon=True)
  109. # Start an HTTP endpoint for looking up test product/price addresses
  110. http_service = threading.Thread(target=accounts_endpoint, daemon=True)
  111. readiness_thread.start()
  112. http_service.start()
  113. next_new_symbol_id = PYTH_TEST_SYMBOL_COUNT
  114. last_new_sym_added_at = time.monotonic()
  115. with ThreadPoolExecutor() as executor: # Used for async adding of products and prices
  116. dynamically_added_symbols = 0
  117. while True:
  118. for sym in HTTP_ENDPOINT_DATA["symbols"]:
  119. publisher_random_update(sym["price"])
  120. # Add a symbol if new symbol interval configured. This will add a new symbol if PYTH_NEW_SYMBOL_INTERVAL_SECS
  121. # is passed since adding the previous symbol. The second constraint ensures that
  122. # at most PYTH_DYNAMIC_SYMBOL_COUNT new price symbols are created.
  123. if PYTH_NEW_SYMBOL_INTERVAL_SECS > 0 and dynamically_added_symbols < PYTH_DYNAMIC_SYMBOL_COUNT:
  124. # Do it if enough time passed
  125. now = time.monotonic()
  126. if (now - last_new_sym_added_at) >= PYTH_NEW_SYMBOL_INTERVAL_SECS:
  127. executor.submit(add_symbol, next_new_symbol_id) # Returns immediately, runs in background
  128. last_sym_added_at = now
  129. next_new_symbol_id += 1
  130. dynamically_added_symbols += 1
  131. if dynamically_added_symbols >= PYTH_DYNAMIC_SYMBOL_COUNT:
  132. HTTP_ENDPOINT_DATA["all_symbols_added"] = True
  133. time.sleep(PYTH_PUBLISHER_INTERVAL_SECS)
  134. sys.stdout.flush()
  135. readiness_thread.join()
  136. http_service.join()