Browse Source

[price-pusher] refactor price listener (#634)

* update pyth-evm-js dependency to pyth-common-js

* refactor price listener
Dev Kalra 2 years ago
parent
commit
40cf49ec05

+ 3 - 3
price_pusher/src/controller.ts

@@ -1,14 +1,14 @@
 import { UnixTimestamp } from "@pythnetwork/pyth-evm-js";
 import { DurationInSeconds, sleep } from "./utils";
-import { ChainPricePusher, PriceListener } from "./interface";
+import { ChainPricePusher, IPriceListener } from "./interface";
 import { PriceConfig, shouldUpdate } from "./price-config";
 
 export class Controller {
   private cooldownDuration: DurationInSeconds;
   constructor(
     private priceConfigs: PriceConfig[],
-    private sourcePriceListener: PriceListener,
-    private targetPriceListener: PriceListener,
+    private sourcePriceListener: IPriceListener,
+    private targetPriceListener: IPriceListener,
     private targetChainPricePusher: ChainPricePusher,
     config: {
       cooldownDuration: DurationInSeconds;

+ 9 - 44
price_pusher/src/evm.ts

@@ -5,7 +5,7 @@ import {
 } from "@pythnetwork/pyth-evm-js";
 import { Contract, EventData } from "web3-eth-contract";
 import { PriceConfig } from "./price-config";
-import { ChainPricePusher, PriceInfo, PriceListener } from "./interface";
+import { ChainPricePusher, PriceInfo, ChainPriceListener } from "./interface";
 import { TransactionReceipt } from "ethereum-protocol";
 import { addLeading0x, DurationInSeconds, removeLeading0x } from "./utils";
 import AbstractPythAbi from "@pythnetwork/pyth-sdk-solidity/abis/AbstractPyth.json";
@@ -14,15 +14,11 @@ import { Provider } from "web3/providers";
 import Web3 from "web3";
 import { isWsEndpoint } from "./utils";
 
-export class EvmPriceListener implements PriceListener {
+export class EvmPriceListener extends ChainPriceListener {
   private pythContractFactory: PythContractFactory;
   private pythContract: Contract;
-  private latestPriceInfo: Map<HexString, PriceInfo>;
-  private priceIds: HexString[];
   private priceIdToAlias: Map<HexString, string>;
 
-  private pollingFrequency: DurationInSeconds;
-
   constructor(
     pythContractFactory: PythContractFactory,
     priceConfigs: PriceConfig[],
@@ -30,14 +26,15 @@ export class EvmPriceListener implements PriceListener {
       pollingFrequency: DurationInSeconds;
     }
   ) {
-    this.latestPriceInfo = new Map();
-    this.priceIds = priceConfigs.map((priceConfig) => priceConfig.id);
+    super(
+      "Evm",
+      config.pollingFrequency,
+      priceConfigs.map((priceConfig) => priceConfig.id)
+    );
     this.priceIdToAlias = new Map(
       priceConfigs.map((priceConfig) => [priceConfig.id, priceConfig.alias])
     );
 
-    this.pollingFrequency = config.pollingFrequency;
-
     this.pythContractFactory = pythContractFactory;
     this.pythContract = this.pythContractFactory.createPythContract();
   }
@@ -55,10 +52,8 @@ export class EvmPriceListener implements PriceListener {
       );
     }
 
-    console.log(`Polling the prices every ${this.pollingFrequency} seconds...`);
-    setInterval(this.pollPrices.bind(this), this.pollingFrequency * 1000);
-
-    await this.pollPrices();
+    // base class for polling
+    await super.start();
   }
 
   private async startSubscription() {
@@ -97,20 +92,6 @@ export class EvmPriceListener implements PriceListener {
     this.updateLatestPriceInfo(priceId, priceInfo);
   }
 
-  private async pollPrices() {
-    console.log("Polling evm prices...");
-    for (const priceId of this.priceIds) {
-      const currentPriceInfo = await this.getOnChainPriceInfo(priceId);
-      if (currentPriceInfo !== undefined) {
-        this.updateLatestPriceInfo(priceId, currentPriceInfo);
-      }
-    }
-  }
-
-  getLatestPriceInfo(priceId: string): PriceInfo | undefined {
-    return this.latestPriceInfo.get(priceId);
-  }
-
   async getOnChainPriceInfo(
     priceId: HexString
   ): Promise<PriceInfo | undefined> {
@@ -131,22 +112,6 @@ export class EvmPriceListener implements PriceListener {
       publishTime: Number(priceRaw.publishTime),
     };
   }
-
-  private updateLatestPriceInfo(priceId: HexString, observedPrice: PriceInfo) {
-    const cachedLatestPriceInfo = this.getLatestPriceInfo(priceId);
-
-    // Ignore the observed price if the cache already has newer
-    // price. This could happen because we are using polling and
-    // subscription at the same time.
-    if (
-      cachedLatestPriceInfo !== undefined &&
-      cachedLatestPriceInfo.publishTime > observedPrice.publishTime
-    ) {
-      return;
-    }
-
-    this.latestPriceInfo.set(priceId, observedPrice);
-  }
 }
 
 export class EvmPricePusher implements ChainPricePusher {

+ 7 - 49
price_pusher/src/injective.ts

@@ -1,5 +1,5 @@
 import { HexString, PriceServiceConnection } from "@pythnetwork/pyth-common-js";
-import { ChainPricePusher, PriceInfo, PriceListener } from "./interface";
+import { ChainPricePusher, PriceInfo, ChainPriceListener } from "./interface";
 import { DurationInSeconds } from "./utils";
 import { PriceConfig } from "./price-config";
 import {
@@ -32,13 +32,7 @@ type UpdateFeeResponse = {
 };
 
 // this use price without leading 0x
-// FIXME: implement common methods in the parent class
-export class InjectivePriceListener implements PriceListener {
-  private latestPriceInfo: Map<HexString, PriceInfo>;
-  private priceIds: HexString[];
-
-  private pollingFrequency: DurationInSeconds;
-
+export class InjectivePriceListener extends ChainPriceListener {
   constructor(
     private contractAddress: string,
     private grpcEndpoint: string,
@@ -47,27 +41,11 @@ export class InjectivePriceListener implements PriceListener {
       pollingFrequency: DurationInSeconds;
     }
   ) {
-    this.latestPriceInfo = new Map();
-    this.priceIds = priceConfigs.map((priceConfig) => priceConfig.id);
-
-    this.pollingFrequency = config.pollingFrequency;
-  }
-
-  async start() {
-    console.log(`Polling the prices every ${this.pollingFrequency} seconds...`);
-    setInterval(this.pollPrices.bind(this), this.pollingFrequency * 1000);
-
-    await this.pollPrices();
-  }
-
-  private async pollPrices() {
-    console.log("Polling injective prices...");
-    for (const priceId of this.priceIds) {
-      const currentPriceInfo = await this.getOnChainPriceInfo(priceId);
-      if (currentPriceInfo !== undefined) {
-        this.updateLatestPriceInfo(priceId, currentPriceInfo);
-      }
-    }
+    super(
+      "Injective",
+      config.pollingFrequency,
+      priceConfigs.map((priceConfig) => priceConfig.id)
+    );
   }
 
   async getOnChainPriceInfo(
@@ -95,26 +73,6 @@ export class InjectivePriceListener implements PriceListener {
       publishTime: priceQueryResponse.price_feed.price.publish_time,
     };
   }
-
-  private updateLatestPriceInfo(priceId: HexString, observedPrice: PriceInfo) {
-    const cachedLatestPriceInfo = this.getLatestPriceInfo(priceId);
-
-    // Ignore the observed price if the cache already has newer
-    // price. This could happen because we are using polling and
-    // subscription at the same time.
-    if (
-      cachedLatestPriceInfo !== undefined &&
-      cachedLatestPriceInfo.publishTime > observedPrice.publishTime
-    ) {
-      return;
-    }
-
-    this.latestPriceInfo.set(priceId, observedPrice);
-  }
-
-  getLatestPriceInfo(priceId: string): PriceInfo | undefined {
-    return this.latestPriceInfo.get(priceId);
-  }
 }
 
 export class InjectivePricePusher implements ChainPricePusher {

+ 60 - 3
price_pusher/src/interface.ts

@@ -1,4 +1,5 @@
-import { HexString, UnixTimestamp } from "@pythnetwork/pyth-evm-js";
+import { HexString, UnixTimestamp } from "@pythnetwork/pyth-common-js";
+import { DurationInSeconds } from "./utils";
 
 export type PriceInfo = {
   price: string;
@@ -6,9 +7,65 @@ export type PriceInfo = {
   publishTime: UnixTimestamp;
 };
 
-export interface PriceListener {
+export interface IPriceListener {
+  getLatestPriceInfo(priceId: string): PriceInfo | undefined;
+}
+
+export abstract class ChainPriceListener implements IPriceListener {
+  private latestPriceInfo: Map<HexString, PriceInfo>;
+
+  constructor(
+    private chain: string,
+    private pollingFrequency: DurationInSeconds,
+    protected priceIds: HexString[]
+  ) {
+    this.latestPriceInfo = new Map();
+  }
+
+  async start() {
+    console.log(`Polling the prices every ${this.pollingFrequency} seconds...`);
+    setInterval(this.pollPrices.bind(this), this.pollingFrequency * 1000);
+
+    await this.pollPrices();
+  }
+
+  private async pollPrices() {
+    console.log(`Polling ${this.chain} prices...`);
+    for (const priceId of this.priceIds) {
+      const currentPriceInfo = await this.getOnChainPriceInfo(priceId);
+      if (currentPriceInfo !== undefined) {
+        this.updateLatestPriceInfo(priceId, currentPriceInfo);
+      }
+    }
+  }
+
+  protected updateLatestPriceInfo(
+    priceId: HexString,
+    observedPrice: PriceInfo
+  ) {
+    const cachedLatestPriceInfo = this.getLatestPriceInfo(priceId);
+
+    // Ignore the observed price if the cache already has newer
+    // price. This could happen because we are using polling and
+    // subscription at the same time.
+    if (
+      cachedLatestPriceInfo !== undefined &&
+      cachedLatestPriceInfo.publishTime > observedPrice.publishTime
+    ) {
+      return;
+    }
+
+    this.latestPriceInfo.set(priceId, observedPrice);
+  }
+
   // Should return undefined only when the price does not exist.
-  getLatestPriceInfo(priceId: HexString): undefined | PriceInfo;
+  getLatestPriceInfo(priceId: string): PriceInfo | undefined {
+    return this.latestPriceInfo.get(priceId);
+  }
+
+  abstract getOnChainPriceInfo(
+    priceId: HexString
+  ): Promise<PriceInfo | undefined>;
 }
 
 export interface ChainPricePusher {

+ 1 - 1
price_pusher/src/price-config.ts

@@ -1,4 +1,4 @@
-import { HexString } from "@pythnetwork/pyth-evm-js";
+import { HexString } from "@pythnetwork/pyth-common-js";
 import Joi from "joi";
 import YAML from "yaml";
 import fs from "fs";

+ 7 - 4
price_pusher/src/pyth-price-listener.ts

@@ -1,9 +1,12 @@
-import { HexString, PriceFeed } from "@pythnetwork/pyth-evm-js";
-import { PriceServiceConnection } from "@pythnetwork/pyth-common-js";
+import {
+  PriceServiceConnection,
+  HexString,
+  PriceFeed,
+} from "@pythnetwork/pyth-common-js";
 import { PriceConfig } from "./price-config";
-import { PriceInfo, PriceListener } from "./interface";
+import { PriceInfo, IPriceListener } from "./interface";
 
-export class PythPriceListener implements PriceListener {
+export class PythPriceListener implements IPriceListener {
   private connection: PriceServiceConnection;
   private priceIds: HexString[];
   private priceIdToAlias: Map<HexString, string>;