Prechádzať zdrojové kódy

refactor(apps/price_pusher): crash on RPC failures (#1730)

* refactor(appts/price_pusher): fix warnings

* refactor(apps/price_pusher): crash on rpc issues

* refactor(apps/price_pusher): crash on stale hermes data

* fix: run linter

* chore: bump version

* fix: do not crash on sendtx failure in solana

* fix: address issues raised in review
Ali Behjati 1 rok pred
rodič
commit
87aea6f63c

+ 1 - 1
apps/price_pusher/package.json

@@ -1,6 +1,6 @@
 {
   "name": "@pythnetwork/price-pusher",
-  "version": "7.0.0-alpha",
+  "version": "7.0.0",
   "description": "Pyth Price Pusher",
   "homepage": "https://pyth.network",
   "main": "lib/index.js",

+ 25 - 28
apps/price_pusher/src/aptos/aptos.ts

@@ -23,14 +23,14 @@ export class AptosPriceListener extends ChainPriceListener {
   }
 
   async getOnChainPriceInfo(priceId: string): Promise<PriceInfo | undefined> {
-    try {
-      const client = new AptosClient(this.endpoint);
+    const client = new AptosClient(this.endpoint);
 
-      const res = await client.getAccountResource(
-        this.pythModule,
-        `${this.pythModule}::state::LatestPriceInfo`
-      );
+    const res = await client.getAccountResource(
+      this.pythModule,
+      `${this.pythModule}::state::LatestPriceInfo`
+    );
 
+    try {
       // This depends upon the pyth contract storage on Aptos and should not be undefined.
       // If undefined, there has been some change and we would need to update accordingly.
       const handle = (res.data as any).info.handle;
@@ -134,29 +134,26 @@ export class AptosPricePusher implements IPricePusher {
       return;
     }
 
-    try {
-      const account = AptosAccount.fromDerivePath(
-        APTOS_ACCOUNT_HD_PATH,
-        this.mnemonic
-      );
-      const client = new AptosClient(this.endpoint);
-
-      const sequenceNumber = await this.tryGetNextSequenceNumber(
-        client,
-        account
-      );
-      const rawTx = await client.generateTransaction(
-        account.address(),
-        {
-          function: `${this.pythContractAddress}::pyth::update_price_feeds_with_funder`,
-          type_arguments: [],
-          arguments: [priceFeedUpdateData],
-        },
-        {
-          sequence_number: sequenceNumber.toFixed(),
-        }
-      );
+    const account = AptosAccount.fromDerivePath(
+      APTOS_ACCOUNT_HD_PATH,
+      this.mnemonic
+    );
+    const client = new AptosClient(this.endpoint);
+
+    const sequenceNumber = await this.tryGetNextSequenceNumber(client, account);
+    const rawTx = await client.generateTransaction(
+      account.address(),
+      {
+        function: `${this.pythContractAddress}::pyth::update_price_feeds_with_funder`,
+        type_arguments: [],
+        arguments: [priceFeedUpdateData],
+      },
+      {
+        sequence_number: sequenceNumber.toFixed(),
+      }
+    );
 
+    try {
       const signedTx = await client.signTransaction(account, rawTx);
       const pendingTx = await client.submitTransaction(signedTx);
 

+ 2 - 1
apps/price_pusher/src/injective/injective.ts

@@ -232,7 +232,8 @@ export class InjectivePricePusher implements IPricePusher {
       updateFeeQueryResponse = JSON.parse(json);
     } catch (err) {
       this.logger.error(err, "Error fetching update fee");
-      return;
+      // Throwing an error because it is likely an RPC issue
+      throw err;
     }
 
     try {

+ 15 - 0
apps/price_pusher/src/pyth-price-listener.ts

@@ -6,12 +6,15 @@ import {
 import { PriceInfo, IPriceListener, PriceItem } from "./interface";
 import { Logger } from "pino";
 
+type TimestampInMs = number & { readonly _: unique symbol };
+
 export class PythPriceListener implements IPriceListener {
   private connection: PriceServiceConnection;
   private priceIds: HexString[];
   private priceIdToAlias: Map<HexString, string>;
   private latestPriceInfo: Map<HexString, PriceInfo>;
   private logger: Logger;
+  private lastUpdated: TimestampInMs | undefined;
 
   constructor(
     connection: PriceServiceConnection,
@@ -46,6 +49,17 @@ export class PythPriceListener implements IPriceListener {
         publishTime: latestAvailablePrice.publishTime,
       });
     });
+
+    // Check health of the price feeds 5 second. If the price feeds are not updating
+    // for more than 30s, throw an error.
+    setInterval(() => {
+      if (
+        this.lastUpdated === undefined ||
+        this.lastUpdated < Date.now() - 30 * 1000
+      ) {
+        throw new Error("Hermes Price feeds are not updating.");
+      }
+    }, 5000);
   }
 
   private onNewPriceFeed(priceFeed: PriceFeed) {
@@ -68,6 +82,7 @@ export class PythPriceListener implements IPriceListener {
     };
 
     this.latestPriceInfo.set(priceFeed.id, priceInfo);
+    this.lastUpdated = Date.now() as TimestampInMs;
   }
 
   getLatestPriceInfo(priceId: string): PriceInfo | undefined {

+ 19 - 0
apps/price_pusher/src/solana/solana.ts

@@ -28,6 +28,25 @@ export class SolanaPriceListener extends ChainPriceListener {
     super(config.pollingFrequency, priceItems);
   }
 
+  // Checking the health of the Solana connection by checking the last block time
+  // and ensuring it is not older than 30 seconds.
+  private async checkHealth() {
+    const slot = await this.pythSolanaReceiver.connection.getSlot();
+    const blockTime = await this.pythSolanaReceiver.connection.getBlockTime(
+      slot
+    );
+    if (blockTime === null || blockTime < Date.now() / 1000 - 30) {
+      throw new Error("Solana connection is unhealthy");
+    }
+  }
+
+  async start() {
+    // Frequently check the RPC connection to ensure it is healthy
+    setInterval(this.checkHealth.bind(this), 5000);
+
+    await super.start();
+  }
+
   async getOnChainPriceInfo(priceId: string): Promise<PriceInfo | undefined> {
     try {
       const priceFeedAccount =

+ 11 - 36
apps/price_pusher/src/sui/sui.ts

@@ -112,12 +112,6 @@ export class SuiPricePusher implements IPricePusher {
     private readonly provider: SuiClient,
     private logger: Logger,
     private priceServiceConnection: PriceServiceConnection,
-    private pythPackageId: string,
-    private pythStateId: string,
-    private wormholePackageId: string,
-    private wormholeStateId: string,
-    endpoint: string,
-    keypair: Ed25519Keypair,
     private gasBudget: number,
     private gasPool: SuiObjectRef[],
     private pythClient: SuiPythClient
@@ -180,14 +174,6 @@ export class SuiPricePusher implements IPricePusher {
     }
 
     const provider = new SuiClient({ url: endpoint });
-    const pythPackageId = await SuiPricePusher.getPackageId(
-      provider,
-      pythStateId
-    );
-    const wormholePackageId = await SuiPricePusher.getPackageId(
-      provider,
-      wormholeStateId
-    );
 
     const gasPool = await SuiPricePusher.initializeGasPool(
       keypair,
@@ -208,12 +194,6 @@ export class SuiPricePusher implements IPricePusher {
       provider,
       logger,
       priceServiceConnection,
-      pythPackageId,
-      pythStateId,
-      wormholePackageId,
-      wormholeStateId,
-      endpoint,
-      keypair,
       gasBudget,
       gasPool,
       pythClient
@@ -337,7 +317,7 @@ export class SuiPricePusher implements IPricePusher {
     ignoreGasObjects: string[],
     logger: Logger
   ): Promise<SuiObjectRef[]> {
-    const signerAddress = await signer.toSuiAddress();
+    const signerAddress = signer.toSuiAddress();
 
     if (ignoreGasObjects.length > 0) {
       logger.info(
@@ -383,25 +363,20 @@ export class SuiPricePusher implements IPricePusher {
   }
 
   // Attempt to refresh the version of the provided object reference to point to the current version
-  // of the object. Return the provided object reference if an error occurs or the object could not
-  // be retrieved.
+  // of the object. Throws an error if the object cannot be refreshed.
   private static async tryRefreshObjectReference(
     provider: SuiClient,
     ref: SuiObjectRef
   ): Promise<SuiObjectRef> {
-    try {
-      const objectResponse = await provider.getObject({ id: ref.objectId });
-      if (objectResponse.data !== undefined) {
-        return {
-          digest: objectResponse.data!.digest,
-          objectId: objectResponse.data!.objectId,
-          version: objectResponse.data!.version,
-        };
-      } else {
-        return ref;
-      }
-    } catch (error) {
-      return ref;
+    const objectResponse = await provider.getObject({ id: ref.objectId });
+    if (objectResponse.data !== undefined) {
+      return {
+        digest: objectResponse.data!.digest,
+        objectId: objectResponse.data!.objectId,
+        version: objectResponse.data!.version,
+      };
+    } else {
+      throw new Error("Failed to refresh object reference");
     }
   }