| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- #!/usr/bin/env python3
- from pyth_utils import *
- from http.server import HTTPServer, BaseHTTPRequestHandler
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import json
- import os
- import random
- import sys
- import threading
- import time
- PYTH_TEST_SYMBOL_COUNT = int(os.environ.get("PYTH_TEST_SYMBOL_COUNT", "9"))
- class PythAccEndpoint(BaseHTTPRequestHandler):
- """
- A dumb endpoint to respond with a JSON containing Pyth symbol and mapping addresses
- """
- def do_GET(self):
- print(f"Got path {self.path}")
- sys.stdout.flush()
- data = json.dumps(HTTP_ENDPOINT_DATA).encode("utf-8")
- print(f"Sending:\n{data}")
- self.send_response(200)
- self.send_header("Content-Type", "application/json")
- self.send_header("Content-Length", str(len(data)))
- self.end_headers()
- self.wfile.write(data)
- self.wfile.flush()
- # Test publisher state that gets served via the HTTP endpoint. Note: the schema of this dict is extended here and there
- HTTP_ENDPOINT_DATA = {"symbols": [], "mapping_address": None}
- def publisher_random_update(price_pubkey):
- """
- Update the specified price with random values
- """
- value = random.randrange(1000, 2000)
- confidence = random.randrange(1, 10)
- pyth_run_or_die("upd_price_val", args=[
- price_pubkey, str(value), str(confidence), "trading"
- ])
- print(f"Price {price_pubkey} value updated to {str(value)}!")
- def accounts_endpoint():
- """
- Run a barebones HTTP server to share the dynamic Pyth
- mapping/product/price account addresses
- """
- server_address = ('', 4242)
- httpd = HTTPServer(server_address, PythAccEndpoint)
- httpd.serve_forever()
- def add_symbol(num: int):
- symbol_name = f"Test symbol {num}"
- # Add a product
- prod_pubkey = pyth_admin_run_or_die(
- "add_product", capture_output=True).stdout.strip()
- print(f"{symbol_name}: Added product {prod_pubkey}")
- # Add a price
- price_pubkey = pyth_admin_run_or_die(
- "add_price",
- args=[prod_pubkey, "price"],
- capture_output=True
- ).stdout.strip()
- print(f"{symbol_name}: Added price {price_pubkey}")
- # Become a publisher for the new price
- pyth_admin_run_or_die(
- "add_publisher", args=[publisher_pubkey, price_pubkey],
- debug=True,
- capture_output=True)
- print(f"{symbol_name}: Added publisher {publisher_pubkey}")
- # Update the prices as the newly added publisher
- publisher_random_update(price_pubkey)
- sym = {
- "name": symbol_name,
- "product": prod_pubkey,
- "price": price_pubkey
- }
- HTTP_ENDPOINT_DATA["symbols"].append(sym)
- sys.stdout.flush()
- return num
- # Fund the publisher
- sol_run_or_die("airdrop", [
- str(SOL_AIRDROP_AMT),
- "--keypair", PYTH_PUBLISHER_KEYPAIR,
- "--commitment", "finalized",
- ])
- # Create a mapping
- pyth_admin_run_or_die("init_mapping", capture_output=True)
- mapping_addr = sol_run_or_die("address", args=[
- "--keypair", PYTH_MAPPING_KEYPAIR
- ], capture_output=True).stdout.strip()
- HTTP_ENDPOINT_DATA["mapping_addr"] = mapping_addr
- print(f"New mapping at {mapping_addr}")
- print(f"Creating {PYTH_TEST_SYMBOL_COUNT} test Pyth symbols")
- publisher_pubkey = sol_run_or_die("address", args=[
- "--keypair", PYTH_PUBLISHER_KEYPAIR
- ], capture_output=True).stdout.strip()
- with ThreadPoolExecutor(max_workers=10) as executor:
- add_symbol_futures = {executor.submit(add_symbol, sym_id) for sym_id in range(PYTH_TEST_SYMBOL_COUNT)}
-
- for future in as_completed(add_symbol_futures):
- print(f"Completed {future.result()}")
- print(
- f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL)} seconds")
- # Spin off the readiness probe endpoint into a separate thread
- readiness_thread = threading.Thread(target=readiness, daemon=True)
- # Start an HTTP endpoint for looking up test product/price addresses
- http_service = threading.Thread(target=accounts_endpoint, daemon=True)
- readiness_thread.start()
- http_service.start()
- while True:
- for sym in HTTP_ENDPOINT_DATA["symbols"]:
- publisher_random_update(sym["price"])
- time.sleep(PYTH_PUBLISHER_INTERVAL)
- sys.stdout.flush()
- readiness_thread.join()
- http_service.join()
|