|
|
@@ -12,8 +12,6 @@ import sys
|
|
|
import threading
|
|
|
import time
|
|
|
|
|
|
-PYTH_TEST_SYMBOL_COUNT = int(os.environ.get("PYTH_TEST_SYMBOL_COUNT", "9"))
|
|
|
-
|
|
|
class PythAccEndpoint(BaseHTTPRequestHandler):
|
|
|
"""
|
|
|
A dumb endpoint to respond with a JSON containing Pyth symbol and mapping addresses
|
|
|
@@ -60,6 +58,9 @@ def accounts_endpoint():
|
|
|
|
|
|
|
|
|
def add_symbol(num: int):
|
|
|
+ """
|
|
|
+ NOTE: Updates HTTP_ENDPOINT_DATA
|
|
|
+ """
|
|
|
symbol_name = f"Test symbol {num}"
|
|
|
# Add a product
|
|
|
prod_pubkey = pyth_admin_run_or_die(
|
|
|
@@ -96,6 +97,8 @@ def add_symbol(num: int):
|
|
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
|
+ print(f"New symbol: {num}")
|
|
|
+
|
|
|
return num
|
|
|
|
|
|
# Fund the publisher
|
|
|
@@ -122,14 +125,14 @@ publisher_pubkey = sol_run_or_die("address", args=[
|
|
|
"--keypair", PYTH_PUBLISHER_KEYPAIR
|
|
|
], capture_output=True).stdout.strip()
|
|
|
|
|
|
-with ThreadPoolExecutor(max_workers=10) as executor:
|
|
|
+with ThreadPoolExecutor(max_workers=PYTH_TEST_SYMBOL_COUNT) as executor:
|
|
|
add_symbol_futures = {executor.submit(add_symbol, sym_id) for sym_id in range(PYTH_TEST_SYMBOL_COUNT)}
|
|
|
|
|
|
for future in as_completed(add_symbol_futures):
|
|
|
print(f"Completed {future.result()}")
|
|
|
|
|
|
print(
|
|
|
- f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL)} seconds")
|
|
|
+ f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL_SECS)} seconds")
|
|
|
|
|
|
# Spin off the readiness probe endpoint into a separate thread
|
|
|
readiness_thread = threading.Thread(target=readiness, daemon=True)
|
|
|
@@ -140,12 +143,26 @@ http_service = threading.Thread(target=accounts_endpoint, daemon=True)
|
|
|
readiness_thread.start()
|
|
|
http_service.start()
|
|
|
|
|
|
-while True:
|
|
|
- for sym in HTTP_ENDPOINT_DATA["symbols"]:
|
|
|
- publisher_random_update(sym["price"])
|
|
|
+next_new_symbol_id = PYTH_TEST_SYMBOL_COUNT
|
|
|
+last_new_sym_added_at = time.monotonic()
|
|
|
+
|
|
|
+with ThreadPoolExecutor() as executor: # Used for async adding of products and prices
|
|
|
+ while True:
|
|
|
+ for sym in HTTP_ENDPOINT_DATA["symbols"]:
|
|
|
+ publisher_random_update(sym["price"])
|
|
|
+
|
|
|
+ # Add a symbol if new symbol interval configured
|
|
|
+ if PYTH_NEW_SYMBOL_INTERVAL_SECS > 0:
|
|
|
+ # Do it if enough time passed
|
|
|
+ now = time.monotonic()
|
|
|
+ if (now - last_new_sym_added_at) >= PYTH_NEW_SYMBOL_INTERVAL_SECS:
|
|
|
+ executor.submit(add_symbol, next_new_symbol_id) # Returns immediately, runs in background
|
|
|
+ last_sym_added_at = now
|
|
|
+ next_new_symbol_id += 1
|
|
|
+
|
|
|
+ time.sleep(PYTH_PUBLISHER_INTERVAL_SECS)
|
|
|
+ sys.stdout.flush()
|
|
|
|
|
|
- time.sleep(PYTH_PUBLISHER_INTERVAL)
|
|
|
- sys.stdout.flush()
|
|
|
|
|
|
readiness_thread.join()
|
|
|
http_service.join()
|