Browse Source

Sui pusher updates (#914)

* split into ptbs

* update sui logic

* fix stuff

* revert

* use Map
Jayant Krishnamurthy 2 năm trước cách đây
mục cha
commit
742c37ed88
3 tập tin đã thay đổi với 104 bổ sung26 xóa
  1. 1 1
      price_pusher/package.json
  2. 12 0
      price_pusher/src/sui/command.ts
  3. 91 25
      price_pusher/src/sui/sui.ts

+ 1 - 1
price_pusher/package.json

@@ -1,6 +1,6 @@
 {
   "name": "@pythnetwork/price-pusher",
-  "version": "5.3.2",
+  "version": "5.4.0",
   "description": "Pyth Price Pusher",
   "homepage": "https://pyth.network",
   "main": "lib/index.js",

+ 12 - 0
price_pusher/src/sui/command.ts

@@ -58,6 +58,13 @@ export default {
       type: "string",
       required: true,
     } as Options,
+    "max-vaas-per-ptb": {
+      description:
+        "Maximum number of VAAs that can be included in a single PTB.",
+      type: "number",
+      required: true,
+      default: 1,
+    } as Options,
     ...options.priceConfigFile,
     ...options.priceServiceEndpoint,
     ...options.mnemonicFile,
@@ -77,6 +84,7 @@ export default {
       wormholePackageId,
       wormholeStateId,
       priceFeedToPriceInfoObjectTableId,
+      maxVaasPerPtb,
     } = argv;
 
     const priceConfigs = readPriceConfigFile(priceConfigFile);
@@ -91,6 +99,9 @@ export default {
           debug: () => undefined,
           trace: () => undefined,
         },
+        priceFeedRequestConfig: {
+          binary: true,
+        },
       }
     );
     const mnemonic = fs.readFileSync(mnemonicFile, "utf-8").trim();
@@ -116,6 +127,7 @@ export default {
       wormholePackageId,
       wormholeStateId,
       priceFeedToPriceInfoObjectTableId,
+      maxVaasPerPtb,
       endpoint,
       mnemonic
     );

+ 91 - 25
price_pusher/src/sui/sui.ts

@@ -83,6 +83,10 @@ export class SuiPriceListener extends ChainPriceListener {
 
 export class SuiPricePusher implements IPricePusher {
   private readonly signer: RawSigner;
+  // Sui transactions can error if they're sent concurrently. This flag tracks whether an update is in-flight,
+  // so we can skip sending another update at the same time.
+  private isAwaitingTx: boolean;
+
   constructor(
     private priceServiceConnection: PriceServiceConnection,
     private pythPackageId: string,
@@ -90,6 +94,7 @@ export class SuiPricePusher implements IPricePusher {
     private wormholePackageId: string,
     private wormholeStateId: string,
     private priceFeedToPriceInfoObjectTableId: string,
+    private maxVaasPerPtb: number,
     endpoint: string,
     mnemonic: string
   ) {
@@ -97,6 +102,7 @@ export class SuiPricePusher implements IPricePusher {
       Ed25519Keypair.deriveKeypair(mnemonic),
       new JsonRpcProvider(new Connection({ fullnode: endpoint }))
     );
+    this.isAwaitingTx = false;
   }
 
   async updatePriceFeed(
@@ -110,10 +116,64 @@ export class SuiPricePusher implements IPricePusher {
     if (priceIds.length !== pubTimesToPush.length)
       throw new Error("Invalid arguments");
 
-    const tx = new TransactionBlock();
+    if (this.isAwaitingTx) {
+      console.log(
+        "Skipping update: previous price update transaction(s) have not completed."
+      );
+      return;
+    }
+
+    const priceFeeds = await this.priceServiceConnection.getLatestPriceFeeds(
+      priceIds
+    );
+    if (priceFeeds === undefined) {
+      console.log("Failed to fetch price updates. Skipping push.");
+      return;
+    }
 
-    const vaas = await this.priceServiceConnection.getLatestVaas(priceIds);
+    const vaaToPriceFeedIds: Map<string, string[]> = new Map();
+    for (const priceFeed of priceFeeds) {
+      // The ! will succeed as long as the priceServiceConnection is configured to return binary vaa data (which it is).
+      const vaa = priceFeed.getVAA()!;
+      if (!vaaToPriceFeedIds.has(vaa)) {
+        vaaToPriceFeedIds.set(vaa, []);
+      }
+      vaaToPriceFeedIds.get(vaa)!.push(priceFeed.id);
+    }
+
+    const txs = [];
+    let currentBatchVaas = [];
+    let currentBatchPriceFeedIds = [];
+    for (const [vaa, priceFeedIds] of Object.entries(vaaToPriceFeedIds)) {
+      currentBatchVaas.push(vaa);
+      currentBatchPriceFeedIds.push(...priceFeedIds);
+      if (currentBatchVaas.length >= this.maxVaasPerPtb) {
+        const tx = await this.createPriceUpdateTransaction(
+          currentBatchVaas,
+          currentBatchPriceFeedIds
+        );
+        if (tx !== undefined) {
+          txs.push(tx);
+        }
 
+        currentBatchVaas = [];
+        currentBatchPriceFeedIds = [];
+      }
+    }
+
+    try {
+      this.isAwaitingTx = true;
+      await this.sendTransactionBlocks(txs);
+    } finally {
+      this.isAwaitingTx = false;
+    }
+  }
+
+  private async createPriceUpdateTransaction(
+    vaas: string[],
+    priceIds: string[]
+  ): Promise<TransactionBlock | undefined> {
+    const tx = new TransactionBlock();
     // Parse our batch price attestation VAA bytes using Wormhole.
     // Check out the Wormhole cross-chain bridge and generic messaging protocol here:
     //     https://github.com/wormhole-foundation/wormhole
@@ -158,7 +218,7 @@ export class SuiPricePusher implements IPricePusher {
       } catch (e) {
         console.log("Error fetching price info object id for ", priceId);
         console.error(e);
-        return;
+        return undefined;
       }
       const coin = tx.splitCoins(tx.gas, [tx.pure(1)]);
       [price_updates_hot_potato] = tx.moveCall({
@@ -181,30 +241,36 @@ export class SuiPricePusher implements IPricePusher {
       typeArguments: [`${this.pythPackageId}::price_info::PriceInfo`],
     });
 
-    try {
-      const result = await this.signer.signAndExecuteTransactionBlock({
-        transactionBlock: tx,
-        options: {
-          showInput: true,
-          showEffects: true,
-          showEvents: true,
-          showObjectChanges: true,
-          showBalanceChanges: true,
-        },
-      });
+    return tx;
+  }
 
-      console.log(
-        "Successfully updated price with transaction digest ",
-        result.digest
-      );
-    } catch (e) {
-      console.log("Error when signAndExecuteTransactionBlock");
-      if (String(e).includes("GasBalanceTooLow")) {
-        console.log("Insufficient Gas Amount. Please top up your account");
-        process.exit();
+  /** Send every transaction in txs sequentially, returning when all transactions have completed. */
+  private async sendTransactionBlocks(txs: TransactionBlock[]): Promise<void> {
+    for (const tx of txs) {
+      try {
+        const result = await this.signer.signAndExecuteTransactionBlock({
+          transactionBlock: tx,
+          options: {
+            showInput: true,
+            showEffects: true,
+            showEvents: true,
+            showObjectChanges: true,
+            showBalanceChanges: true,
+          },
+        });
+
+        console.log(
+          "Successfully updated price with transaction digest ",
+          result.digest
+        );
+      } catch (e) {
+        console.log("Error when signAndExecuteTransactionBlock");
+        if (String(e).includes("GasBalanceTooLow")) {
+          console.log("Insufficient Gas Amount. Please top up your account");
+          process.exit();
+        }
+        console.error(e);
       }
-      console.error(e);
-      return;
     }
   }
 }