浏览代码

Add readiness to Pyth Price Service (#183)

Also add it to Tilt
Ali Behjati 3 年之前
父节点
当前提交
2dd5357b46

+ 17 - 0
Tiltfile

@@ -266,6 +266,23 @@ if pyth:
         labels = ["pyth"]
     )
 
+    # Pyth Price service
+    docker_build(
+        ref = "pyth-price-service",
+        context = ".",
+        dockerfile = "third_party/pyth/price-service/Dockerfile.price_service",
+    )
+    k8s_yaml_with_ns("devnet/pyth-price-service.yaml")
+    k8s_resource(
+        "pyth-price-service",
+        resource_deps = ["pyth", "p2w-attest", "spy", "eth-devnet"],
+        port_forwards = [
+            port_forward(4202, container_port = 4200, name = "Rest API (Status + Query) [:4202]", host = webHost),
+            port_forward(8083, container_port = 8081, name = "Prometheus [:8083]", host = webHost)],
+        labels = ["pyth"]
+    )
+
+
 k8s_yaml_with_ns("devnet/eth-devnet.yaml")
 
 k8s_resource(

+ 78 - 0
devnet/pyth-price-service.yaml

@@ -0,0 +1,78 @@
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: pyth-price-service
+  labels:
+    app: pyth-price-service
+spec:
+  ports:
+    - port: 8081
+      name: prometheus
+      protocol: TCP
+    - port: 4200
+      name: rest-api
+      protocol: TCP
+  clusterIP: None
+  selector:
+    app: pyth-price-service
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: pyth-price-service
+spec:
+  selector:
+    matchLabels:
+      app: pyth-price-service
+  serviceName: pyth-price-service
+  replicas: 2
+  template:
+    metadata:
+      labels:
+        app: pyth-price-service
+    spec:
+      terminationGracePeriodSeconds: 0
+      containers:
+        - name: pyth-price-service
+          image: pyth-price-service
+          ports:
+            - containerPort: 8081
+              name: prometheus
+              protocol: TCP
+            - containerPort: 4200
+              name: rest-api
+              protocol: TCP
+          readinessProbe:
+            httpGet:
+              path: '/ready'
+              port: 4200
+            initialDelaySeconds: 10
+            periodSeconds: 1
+            failureThreshold: 1
+          livenessProbe:
+            httpGet:
+              path: '/live'
+              port: 4200
+            initialDelaySeconds: 20
+            periodSeconds: 30
+            timeoutSeconds: 30
+          resources:
+            limits:
+              memory: "128Mi"
+              cpu: "500m"
+          env:
+            - name: SPY_SERVICE_HOST
+              value: spy:7072
+            - name: SPY_SERVICE_FILTERS
+              value: '[{"chain_id":1,"emitter_address":"71f8dcb863d176e2c420ad6610cf687359612b6fb392e0642b0ca6b1f186aa3b"}]'
+            - name: REST_PORT
+              value: '4200'
+            - name: PROM_PORT
+              value: '8081'
+            - name: READINESS_SPY_SYNC_TIME_SECONDS
+              value: '5'
+            - name: READINESS_NUM_LOADED_SYMBOLS
+              value: '6'
+            - name: LOG_LEVEL
+              value: debug

+ 4 - 0
third_party/pyth/price-service/.env.sample

@@ -12,6 +12,10 @@ SPY_SERVICE_FILTERS=[{"chain_id":1,"emitter_address":"71f8dcb863d176e2c420ad6610
 #SPY_SERVICE_HOST=0.0.0.0:7074
 #SPY_SERVICE_FILTERS=[{"chain_id":1,"emitter_address":"6bb14509a612f01fbbc4cffeebd4bbfb492a86df717ebe92eb6df432a3f00a25"}]
 
+# Number of seconds to sync with spy to be sure to have latest messages
+READINESS_SPY_SYNC_TIME_SECONDS=60
+READINESS_NUM_LOADED_SYMBOLS=5
+
 REST_PORT=4200
 PROM_PORT=8081
 

+ 0 - 21
third_party/pyth/price-service/Dockerfile.p2w_api

@@ -1,21 +0,0 @@
-FROM node:16-alpine@sha256:72a490e7ed8aed68e16b8dc8f37b5bcc35c5b5c56ee3256effcdee63e2546f93
-
-ARG P2W_BASE_PATH=/usr/src/pyth2wormhole
-
-WORKDIR ${P2W_BASE_PATH}
-RUN addgroup -S pyth -g 10001 && adduser -S pyth -G pyth -u 10001
-USER pyth
-
-ARG P2W_API_REL_PATH=third_party/pyth/price-service
-
-WORKDIR ${P2W_BASE_PATH}/${P2W_API_REL_PATH} 
-ADD --chown=pyth:pyth ${P2W_API_REL_PATH} .
-
-RUN npm ci && npm run build && npm cache clean --force
-
-# If you are building for production
-# RUN npm ci --only=production
-
-RUN mkdir -p ${P2W_BASE_PATH}/${P2W_API_REL_PATH}/logs
-
-CMD [ "npm", "run", "start" ]

+ 30 - 0
third_party/pyth/price-service/Dockerfile.price_service

@@ -0,0 +1,30 @@
+FROM node:16-alpine@sha256:72a490e7ed8aed68e16b8dc8f37b5bcc35c5b5c56ee3256effcdee63e2546f93
+
+ARG BASE_PATH=/usr/src/pyth2wormhole
+
+WORKDIR ${BASE_PATH}
+RUN addgroup -S pyth -g 10001 && adduser -S pyth -G pyth -u 10001
+USER pyth
+
+# Adds p2w-sdk/js dependency
+ARG P2W_SDK_REL_PATH=third_party/pyth/p2w-sdk/js
+
+WORKDIR ${BASE_PATH}/${P2W_SDK_REL_PATH}
+ADD --chown=pyth:pyth ${P2W_SDK_REL_PATH} .
+
+RUN npm ci && npm run build && npm cache clean --force
+
+
+ARG PRICE_SERVICE_REL_PATH=third_party/pyth/price-service
+
+WORKDIR ${BASE_PATH}/${PRICE_SERVICE_REL_PATH} 
+ADD --chown=pyth:pyth ${PRICE_SERVICE_REL_PATH} .
+
+RUN npm ci && npm run build && npm cache clean --force
+
+# If you are building for production
+# RUN npm ci --only=production
+
+RUN mkdir -p ${BASE_PATH}/${PRICE_SERVICE_REL_PATH}/logs
+
+CMD [ "npm", "run", "start" ]

+ 9 - 2
third_party/pyth/price-service/src/index.ts

@@ -27,12 +27,19 @@ const promClient = new PromClient({
 
 const listener = new Listener({
   spyServiceHost: envOrErr("SPY_SERVICE_HOST"),
-  filtersRaw: process.env.SPY_SERVICE_FILTERS
+  filtersRaw: process.env.SPY_SERVICE_FILTERS,
+  readiness: {
+    spySyncTimeSeconds: parseInt(envOrErr("READINESS_SPY_SYNC_TIME_SECONDS")),
+    numLoadedSymbols: parseInt(envOrErr("READINESS_NUM_LOADED_SYMBOLS"))
+  }
 }, promClient);
 
+// In future if we have more components we will modify it to include them all
+const isReady = () => listener.isReady();
+
 const restAPI = new RestAPI({
   port: parseInt(envOrErr("REST_PORT"))
-}, listener);
+}, listener, isReady);
 
 listener.run();
 restAPI.run();

+ 33 - 1
third_party/pyth/price-service/src/listen.ts

@@ -18,6 +18,9 @@ import { ClientReadableStream } from "@grpc/grpc-js";
 import { FilterEntry, SubscribeSignedVAAResponse } from "@certusone/wormhole-spydk/lib/cjs/proto/spy/v1/spy";
 import { logger } from "./logging";
 
+// Timestamp (in seconds)
+type Timestamp = number;
+
 export type VaaInfo = {
   vaaBytes: string,
   seqNum: number;
@@ -27,17 +30,31 @@ export interface PriceFeedVaaInfo {
   getLatestVaaForPriceFeed(priceFeedId: string): VaaInfo | undefined;
 }
 
+type ListenerReadinessConfig = {
+  spySyncTimeSeconds: number,
+  numLoadedSymbols: number,
+};
+
+type ListenerConfig = {
+  spyServiceHost: string,
+  filtersRaw?: string,
+  readiness: ListenerReadinessConfig,
+};
+
 export class Listener implements PriceFeedVaaInfo {
   // Mapping of Price Feed Id to Vaa
   private priceFeedVaaMap = new Map<string, VaaInfo>();
   private promClient: PromClient;
   private spyServiceHost: string;
   private filters: FilterEntry[] = [];
+  private spyConnectionTime: Timestamp | undefined;
+  private readinessConfig: ListenerReadinessConfig;
 
-  constructor(config: { spyServiceHost: string, filtersRaw?: string; }, promClient: PromClient) {
+  constructor(config: ListenerConfig, promClient: PromClient) {
     this.promClient = promClient;
     this.spyServiceHost = config.spyServiceHost;
     this.loadFilters(config.filtersRaw);
+    this.readinessConfig = config.readiness;
   }
 
   private loadFilters(filtersRaw?: string) {
@@ -88,6 +105,8 @@ export class Listener implements PriceFeedVaaInfo {
           this.processVaa(vaaBytes);
         });
 
+        this.spyConnectionTime = (new Date()).getTime() / 1000;
+
         let connected = true;
         stream!.on("error", (err: any) => {
           logger.error("spy service returned an error: %o", err);
@@ -111,6 +130,7 @@ export class Listener implements PriceFeedVaaInfo {
       if (stream) {
         stream.destroy();
       }
+      this.spyConnectionTime = undefined;
 
       await sleep(1000);
       logger.info("attempting to reconnect to the spy service");
@@ -180,4 +200,16 @@ export class Listener implements PriceFeedVaaInfo {
   getLatestVaaForPriceFeed(priceFeedId: string): VaaInfo | undefined {
     return this.priceFeedVaaMap.get(priceFeedId);
   }
+
+  isReady(): boolean {
+    let currentTime: Timestamp = (new Date()).getTime() / 1000;
+    if (this.spyConnectionTime === undefined ||
+      currentTime < this.spyConnectionTime + this.readinessConfig.spySyncTimeSeconds) {
+      return false;
+    }
+    if (this.priceFeedVaaMap.size < this.readinessConfig.numLoadedSymbols) {
+      return false;
+    }
+    return true;
+  }
 }

+ 37 - 2
third_party/pyth/price-service/src/rest.ts

@@ -8,10 +8,14 @@ import { logger } from "./logging";
 export class RestAPI {
   private port: number;
   private priceFeedVaaInfo: PriceFeedVaaInfo;
+  private isReady: () => boolean;
 
-  constructor(config: { port: number; }, priceFeedVaaInfo: PriceFeedVaaInfo) {
+  constructor(config: { port: number; }, 
+    priceFeedVaaInfo: PriceFeedVaaInfo,
+    isReady: () => boolean) {
     this.port = config.port;
     this.priceFeedVaaInfo = priceFeedVaaInfo;
+    this.isReady = isReady;
   }
 
   // Run this function without blocking (`await`) if you want to run it async.
@@ -36,8 +40,39 @@ export class RestAPI {
       res.end();
     });
 
+    let endpoints: string[] = [];
+    
+    app.get("/latest_vaa_bytes/:price_feed_id", (req: Request, res: Response) => {
+      let latestVaa = this.priceFeedVaaInfo.getLatestVaaForPriceFeed(req.params.price_feed_id);
+
+      if (latestVaa === undefined) {
+        res.sendStatus(404);
+        return;
+      }
+
+      res.status(200);
+      res.write(latestVaa.vaaBytes);
+      res.end();
+    });
+    endpoints.push("latest_vaa_bytes/<price_feed_id>");
+
+    app.get("/ready", (_, res: Response) => {
+      if (this.isReady!()) {
+        res.sendStatus(200);
+      } else {
+        res.sendStatus(503);
+      }
+    });
+    endpoints.push('ready');
+
+    app.get("/live", (_, res: Response) => {
+      res.sendStatus(200);
+    });
+    endpoints.push("live");
+
+
     app.get("/", (_, res: Response) =>
-      res.json(["latest_vaa_bytes/<price_feed_id>"])
+      res.json(endpoints)
     );
   }
 }