Ver Fonte

Merge pull request #98 from pyth-network/drozdziak1/p2w-relay-iface-integrate-terra

Drozdziak1/p2w relay iface integrate terra
Stanisław Drozd há 3 anos atrás
pai
commit
a3c1a66bf3

+ 13 - 1
third_party/pyth/p2w-terra-relay/src/index.ts

@@ -7,6 +7,9 @@ import * as helpers from "./helpers";
 import { logger } from "./helpers";
 import { PromHelper } from "./promHelpers";
 
+import { Relay } from "./relay/iface";
+import { TerraRelay } from "./relay/terra";
+
 let configFile: string = ".env";
 if (process.env.PYTH_RELAY_CONFIG) {
   configFile = process.env.PYTH_RELAY_CONFIG;
@@ -22,6 +25,7 @@ helpers.initLogger();
 
 let error: boolean = false;
 let listenOnly: boolean = false;
+let relayImpl: Relay;
 for (let idx = 0; idx < process.argv.length; ++idx) {
   if (process.argv[idx] === "--listen_only") {
     logger.info("running in listen only mode, will not relay anything!");
@@ -29,10 +33,18 @@ for (let idx = 0; idx < process.argv.length; ++idx) {
   }
 }
 
+relayImpl = new TerraRelay({
+  nodeUrl: helpers.envOrErr("TERRA_NODE_URL"),
+  terraChainId: helpers.envOrErr("TERRA_CHAIN_ID"),
+  walletPrivateKey: helpers.envOrErr("TERRA_PRIVATE_KEY"),
+  coin: helpers.envOrErr("TERRA_COIN"),
+  contractAddress: helpers.envOrErr("TERRA_PYTH_CONTRACT_ADDRESS"),
+});
+
 if (
   !error &&
   listen.init(listenOnly) &&
-  worker.init(!listenOnly) &&
+  worker.init(!listenOnly, relayImpl) &&
   rest.init(!listenOnly)
 ) {
   // Start the Prometheus client with the app name and http port

+ 27 - 1
third_party/pyth/p2w-terra-relay/src/relay/iface.ts

@@ -3,10 +3,36 @@
 
 export type PriceId = string;
 
+/// Describes the possible outcomes of a relay() call on target chain
+/// NOTE(2022-03-21): order reflects the historically used constants
+export enum RelayRetcode {
+  Success = 0,
+  Fail, // Generic failure
+  AlreadyExecuted, // TODO(2022-03-18): Terra-specific leak, remove ASAP
+  Timeout, // Our desired timeout expired
+  SeqNumMismatch, // TODO(2022-03-18): Terra-specific leak, remove ASAP
+  InsufficientFunds, // Payer's too poor
+}
+
+/// relay() return type
+export class RelayResult {
+  code: RelayRetcode;
+  txHashes: Array<string>; /// One or more tx hashes produced by a successful relay() call
+
+  constructor(code: RelayRetcode, hashes: Array<string>) {
+    this.code = code;
+    this.txHashes = hashes;
+  }
+
+  is_ok(): boolean {
+    return this.code == RelayRetcode.Success;
+  }
+}
+
 /// Represents a target chain relay client generically.
 export interface Relay {
   /// Relay a signed Wormhole payload to this chain
-  relay(signedVAAs: Array<string>): Promise<any>;
+  relay(signedVAAs: Array<string>): Promise<RelayResult>;
 
   /// Query price data on this chain
   query(priceId: PriceId): Promise<any>;

+ 0 - 72
third_party/pyth/p2w-terra-relay/src/relay/main.ts

@@ -1,72 +0,0 @@
-import {
-  connectToTerra,
-  queryBalanceOnTerra,
-  queryTerra,
-  relayTerra,
-  setAccountNumOnTerra,
-  setSeqNumOnTerra,
-  TerraConnectionData,
-} from "./terra";
-
-export type ConnectionData = {
-  terraData: TerraConnectionData;
-};
-
-import { logger } from "../helpers";
-
-export function connectRelayer(): ConnectionData {
-  let td = connectToTerra();
-  return { terraData: td };
-}
-
-export async function setAccountNum(connectionData: ConnectionData) {
-  try {
-    await setAccountNumOnTerra(connectionData.terraData);
-  } catch (e) {
-    logger.error("setAccountNum: query failed: %o", e);
-  }
-}
-
-export async function setSeqNum(connectionData: ConnectionData) {
-  try {
-    await setSeqNumOnTerra(connectionData.terraData);
-  } catch (e) {
-    logger.error("setSeqNum: query failed: %o", e);
-  }
-}
-
-// Exceptions from this method are caught at the higher level.
-export async function relay(
-  signedVAAs: Array<string>,
-  connectionData: ConnectionData
-): Promise<any> {
-  return await relayTerra(connectionData.terraData, signedVAAs);
-}
-
-export async function query(
-  priceIdStr: string
-): Promise<any> {
-  let result: any;
-  try {
-    let terraData = connectToTerra();
-    result = await queryTerra(terraData, priceIdStr);
-  } catch (e) {
-    logger.error("query failed: %o", e);
-    result = "Error: unhandled exception";
-  }
-
-  return result;
-}
-
-export async function queryBalance(
-  connectionData: ConnectionData
-): Promise<number> {
-  let balance: number = NaN;
-  try {
-    balance = await queryBalanceOnTerra(connectionData.terraData);
-  } catch (e) {
-    logger.error("balance query failed: %o", e);
-  }
-
-  return balance;
-}

+ 98 - 113
third_party/pyth/p2w-terra-relay/src/relay/terra.ts

@@ -10,7 +10,7 @@ import { redeemOnTerra } from "@certusone/wormhole-sdk";
 
 import { logger, envOrErr } from "../helpers";
 
-import { Relay, PriceId } from "./iface";
+import { Relay, RelayResult, RelayRetcode, PriceId } from "./iface";
 
 export class TerraRelay implements Relay {
   readonly nodeUrl: string;
@@ -19,8 +19,6 @@ export class TerraRelay implements Relay {
   readonly coin: string;
   readonly contractAddress: string;
   readonly lcdConfig: LCDClientConfig;
-  walletSeqNum: number = 0;
-  walletAccountNum: number = 0;
 
   constructor(cfg: {
     nodeUrl: string;
@@ -53,53 +51,105 @@ export class TerraRelay implements Relay {
   }
 
   async relay(signedVAAs: Array<string>) {
-    logger.debug("relaying " + signedVAAs.length + " messages to terra");
-
-    logger.debug("TIME: connecting to terra");
-    const lcdClient = new LCDClient(this.lcdConfig);
+    let terraRes;
+    try {
+      logger.debug("relaying " + signedVAAs.length + " messages to terra");
+
+      logger.debug("TIME: connecting to terra");
+      const lcdClient = new LCDClient(this.lcdConfig);
+
+      const mk = new MnemonicKey({
+        mnemonic: this.walletPrivateKey,
+      });
+
+      const wallet = lcdClient.wallet(mk);
+
+      logger.debug("TIME: creating messages");
+      let msgs = new Array<MsgExecuteContract>();
+      for (let idx = 0; idx < signedVAAs.length; ++idx) {
+        const msg = new MsgExecuteContract(
+          wallet.key.accAddress,
+          this.contractAddress,
+          {
+            submit_vaa: {
+              data: Buffer.from(signedVAAs[idx], "hex").toString("base64"),
+            },
+          }
+        );
+
+        msgs.push(msg);
+      }
 
-    const mk = new MnemonicKey({
-      mnemonic: this.walletPrivateKey,
-    });
+      const tx = await wallet.createAndSignTx({
+        msgs: msgs,
+        memo: "P2T",
+        feeDenoms: [this.coin],
+      });
 
-    const wallet = lcdClient.wallet(mk);
+      logger.debug("TIME: sending msg");
+      terraRes = await lcdClient.tx.broadcastSync(tx);
+      logger.debug(
+        `TIME:submitted to terra: terraRes: ${JSON.stringify(terraRes)}`
+      );
+      // Act on known Terra errors
 
-    logger.debug("TIME: creating messages");
-    let msgs = new Array<MsgExecuteContract>();
-    for (let idx = 0; idx < signedVAAs.length; ++idx) {
-      const msg = new MsgExecuteContract(
-        wallet.key.accAddress,
-        this.contractAddress,
-        {
-          submit_vaa: {
-            data: Buffer.from(signedVAAs[idx], "hex").toString("base64"),
-          },
+      if (terraRes.raw_log) {
+        if (terraRes.raw_log.search("VaaAlreadyExecuted") >= 0) {
+          logger.error(
+            "Already Executed:",
+            terraRes.txhash
+              ? terraRes.txhash
+              : "<INTERNAL: no txhash for AlreadyExecuted>"
+          );
+          return new RelayResult(RelayRetcode.AlreadyExecuted, []);
+        } else if (terraRes.raw_log.search("insufficient funds") >= 0) {
+          logger.error(
+            "relay failed due to insufficient funds: ",
+            JSON.stringify(terraRes)
+          );
+          return new RelayResult(RelayRetcode.InsufficientFunds, []);
+        } else if (terraRes.raw_log.search("failed") >= 0) {
+          logger.error(
+            "relay seems to have failed: ",
+            JSON.stringify(terraRes)
+          );
+          return new RelayResult(RelayRetcode.Fail, []);
         }
-      );
+      } else {
+        logger.warn("No logs were found, result: ", JSON.stringify(terraRes));
+      }
 
-      msgs.push(msg);
+      // Base case, no errors were detected and no exceptions were thrown
+      if (terraRes.txhash) {
+        return new RelayResult(RelayRetcode.Success, [terraRes.txhash]);
+      }
+    } catch (e: any) {
+      // Act on known Terra exceptions
+      if (
+        e.message &&
+        e.message.search("timeout") >= 0 &&
+        e.message.search("exceeded") >= 0
+      ) {
+        logger.error("relay timed out: %o", e);
+        return new RelayResult(RelayRetcode.Timeout, []);
+      } else if (
+        e.response?.data?.error &&
+        e.response.data.error.search("VaaAlreadyExecuted") >= 0
+      ) {
+        return new RelayResult(RelayRetcode.AlreadyExecuted, []);
+      } else if (
+        e.response?.data?.message &&
+        e.response.data.message.search("account sequence mismatch") >= 0
+      ) {
+        return new RelayResult(RelayRetcode.SeqNumMismatch, []);
+      } else {
+        logger.error("Unknown error:", e.toString());
+        return new RelayResult(RelayRetcode.Fail, []);
+      }
     }
 
-    logger.debug(
-      "TIME: creating transaction using seq number " +
-        this.walletSeqNum +
-        " and account number " +
-        this.walletAccountNum
-    );
-    const tx = await wallet.createAndSignTx({
-      sequence: this.walletSeqNum,
-      accountNumber: this.walletAccountNum,
-      msgs: msgs,
-      memo: "P2T",
-      feeDenoms: [this.coin],
-    });
-
-    this.walletSeqNum = this.walletSeqNum + 1;
-
-    logger.debug("TIME: sending msg");
-    const receipt = await lcdClient.tx.broadcastSync(tx);
-    logger.debug("TIME:submitted to terra: receipt: %o", receipt);
-    return receipt;
+    logger.error("INTERNAL: Terra relay() logic failed to produce a result");
+    return new RelayResult(RelayRetcode.Fail, []);
   }
 
   async query(priceId: PriceId) {
@@ -121,14 +171,11 @@ export class TerraRelay implements Relay {
 
     const wallet = lcdClient.wallet(mk);
 
-    const query_result = await lcdClient.wasm.contractQuery(
-      this.contractAddress,
-      {
-        price_info: {
-          price_id: encodedPriceId,
-        },
-      }
-    );
+    return await lcdClient.wasm.contractQuery(this.contractAddress, {
+      price_info: {
+        price_id: encodedPriceId,
+      },
+    });
   }
 
   async getPayerInfo(): Promise<{ address: string; balance: number }> {
@@ -169,65 +216,3 @@ export class TerraRelay implements Relay {
     return { address: wallet.key.accAddress, balance };
   }
 }
-
-// TODO(2021-03-17): Propagate the use of the interface into worker/listener logic.
-export type TerraConnectionData = TerraRelay;
-
-export function connectToTerra(): TerraConnectionData {
-  return new TerraRelay({
-    nodeUrl: envOrErr("TERRA_NODE_URL"),
-    terraChainId: envOrErr("TERRA_CHAIN_ID"),
-    walletPrivateKey: envOrErr("TERRA_PRIVATE_KEY"),
-    coin: envOrErr("TERRA_COIN"),
-    contractAddress: envOrErr("TERRA_PYTH_CONTRACT_ADDRESS"),
-  });
-}
-
-export async function relayTerra(
-  connectionData: TerraConnectionData,
-  signedVAAs: Array<string>
-) {
-  return connectionData.relay(signedVAAs);
-}
-
-export async function queryTerra(
-  connectionData: TerraConnectionData,
-  priceIdStr: string
-) {
-  let query_result = await connectionData.query(priceIdStr);
-  logger.debug("queryTerra: query returned: %o", query_result);
-  return query_result;
-}
-
-export async function queryBalanceOnTerra(connectionData: TerraConnectionData) {
-  return (await connectionData.getPayerInfo()).balance;
-}
-
-export async function setAccountNumOnTerra(
-  connectionData: TerraConnectionData
-) {
-  const lcdClient = new LCDClient(connectionData.lcdConfig);
-
-  const mk = new MnemonicKey({
-    mnemonic: process.env.TERRA_PRIVATE_KEY,
-  });
-
-  const wallet = lcdClient.wallet(mk);
-  logger.debug("getting wallet account num");
-  connectionData.walletAccountNum = await wallet.accountNumber();
-  logger.debug("wallet account num is " + connectionData.walletAccountNum);
-}
-
-export async function setSeqNumOnTerra(connectionData: TerraConnectionData) {
-  const lcdClient = new LCDClient(connectionData.lcdConfig);
-
-  const mk = new MnemonicKey({
-    mnemonic: process.env.TERRA_PRIVATE_KEY,
-  });
-
-  const wallet = lcdClient.wallet(mk);
-
-  logger.debug("getting wallet seq num");
-  connectionData.walletSeqNum = await wallet.sequence();
-  logger.debug("wallet seq num is " + connectionData.walletSeqNum);
-}

+ 59 - 148
third_party/pyth/p2w-terra-relay/src/worker.ts

@@ -3,10 +3,10 @@ let CondVar = require("condition-variable");
 
 import { setDefaultWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
 import { uint8ArrayToHex } from "@certusone/wormhole-sdk";
+import { Relay, RelayResult, RelayRetcode } from "./relay/iface";
 
 import * as helpers from "./helpers";
 import { logger } from "./helpers";
-import * as main from "./relay/main";
 import { PromHelper } from "./promHelpers";
 
 const mutex = new Mutex();
@@ -37,7 +37,7 @@ type CurrentEntry = {
 
 let productMap = new Map<string, ProductData>(); // The key to this is hash of price_ids in the batch attestation.
 
-let connectionData: main.ConnectionData;
+let relayImpl: Relay;
 let metrics: PromHelper;
 let nextBalanceQueryTimeAsMs: number = 0;
 let balanceQueryInterval = 0;
@@ -46,15 +46,10 @@ let maxPerBatch: number = 1;
 let maxAttempts: number = 2;
 let retryDelayInMs: number = 0;
 
-export function init(runWorker: boolean): boolean {
+export function init(runWorker: boolean, relay: Relay): boolean {
   if (!runWorker) return true;
 
-  try {
-    connectionData = main.connectRelayer();
-  } catch (e) {
-    logger.error("failed to load connection config: %o", e);
-    return false;
-  }
+  relayImpl = relay;
 
   if (process.env.MAX_MSGS_PER_BATCH) {
     maxPerBatch = parseInt(process.env.MAX_MSGS_PER_BATCH);
@@ -121,35 +116,25 @@ export async function run(met: PromHelper) {
       balanceQueryInterval = parseInt(process.env.BAL_QUERY_INTERVAL);
     }
 
-    await main.setAccountNum(connectionData);
-    logger.info(
-      "wallet account number is " + connectionData.terraData.walletAccountNum
-    );
-
-    await main.setSeqNum(connectionData);
-    logger.info(
-      "initial wallet sequence number is " +
-        connectionData.terraData.walletSeqNum
-    );
-
-    let balance = await main.queryBalance(connectionData);
-    if (!isNaN(balance)) {
+    let { address: payerAddress, balance: payerBalance } =
+      await relayImpl.getPayerInfo();
+    if (!isNaN(payerBalance)) {
       walletTimeStamp = new Date();
     }
     if (balanceQueryInterval !== 0) {
       logger.info(
         "initial wallet balance is " +
-          balance +
+          payerBalance +
           ", will query every " +
           balanceQueryInterval +
           " milliseconds."
       );
-      metrics.setWalletBalance(balance);
+      metrics.setWalletBalance(payerBalance);
 
       nextBalanceQueryTimeAsMs = new Date().getTime() + balanceQueryInterval;
     } else {
-      logger.info("initial wallet balance is " + balance);
-      metrics.setWalletBalance(balance);
+      logger.info("initial wallet balance is " + payerBalance);
+      metrics.setWalletBalance(payerBalance);
     }
 
     await condition.wait(computeTimeout(), callBack);
@@ -190,18 +175,11 @@ async function callBack(err: any, result: any) {
     if (currObjs.length !== 0) {
       logger.debug("in callback, relaying " + currObjs.length + " events.");
       let sendTime = new Date();
-      let retVal: number;
-      let relayResult: any;
-      [retVal, relayResult] = await relayEventsNotLocked(messages);
+      let relayResult = await relayEventsNotLocked(messages);
 
       await mutex.runExclusive(async () => {
         logger.debug("in callback, finalizing " + currObjs.length + " events.");
-        await finalizeEventsAlreadyLocked(
-          currObjs,
-          retVal,
-          relayResult,
-          sendTime
-        );
+        await finalizeEventsAlreadyLocked(currObjs, relayResult, sendTime);
 
         await updateBalance();
 
@@ -241,7 +219,9 @@ async function getPendingEventsAlreadyLocked(
     const first = pendingMap.entries().next();
     logger.debug("processing event with key [" + first.value[0] + "]");
     const pendingValue: PendingPayload = first.value[1];
-    let pendingKey = helpers.getBatchAttestationHashKey(pendingValue.batchAttestation);
+    let pendingKey = helpers.getBatchAttestationHashKey(
+      pendingValue.batchAttestation
+    );
     let currObj = productMap.get(pendingKey);
     if (currObj) {
       currObj.lastBatchAttestation = pendingValue.batchAttestation;
@@ -293,113 +273,41 @@ const RELAY_INSUFFICIENT_FUNDS: number = 5;
 
 async function relayEventsNotLocked(
   messages: Array<string>
-): Promise<[number, any]> {
-  let retVal: number = RELAY_SUCCESS;
-  let relayResult: any;
+): Promise<RelayResult> {
+  let relayResult: RelayResult | null = null;
   let retry: boolean = false;
 
+  // CAUTION(2022-03-21): The retry logic is not very efficient at
+  // handling more than one messsage. It may attempt redundant
+  // transactions during retries for messasges that were successful on a
+  // previous attempt.
   for (let attempt = 0; attempt < maxAttempts; ++attempt) {
-    retVal = RELAY_SUCCESS;
     retry = false;
 
-    try {
-      relayResult = await main.relay(messages, connectionData);
-      if (relayResult.txhash) {
-        if (
-          relayResult.raw_log &&
-          relayResult.raw_log.search("VaaAlreadyExecuted") >= 0
-        ) {
-          relayResult = "Already Executed: " + relayResult.txhash;
-          retVal = RELAY_ALREADY_EXECUTED;
-        } else if (
-          relayResult.raw_log &&
-          relayResult.raw_log.search("insufficient funds") >= 0
-        ) {
-          logger.error(
-            "relay failed due to insufficient funds: %o",
-            relayResult
-          );
-          connectionData.terraData.walletSeqNum =
-            connectionData.terraData.walletSeqNum - 1;
-          retVal = RELAY_INSUFFICIENT_FUNDS;
-        } else if (
-          relayResult.raw_log &&
-          relayResult.raw_log.search("failed") >= 0
-        ) {
-          logger.error("relay seems to have failed: %o", relayResult);
-          retVal = RELAY_FAIL;
-          retry = true;
-        } else {
-          relayResult = relayResult.txhash;
-        }
-      } else {
-        retVal = RELAY_FAIL;
+    relayResult = await relayImpl.relay(messages);
+
+    switch (relayResult.code) {
+      case RelayRetcode.Success:
+      case RelayRetcode.AlreadyExecuted:
+      case RelayRetcode.InsufficientFunds:
+        logger.info(`Not retrying for relay retcode ${relayResult.code}`);
+        break;
+
+      case RelayRetcode.Fail:
+      case RelayRetcode.SeqNumMismatch:
+      case RelayRetcode.Timeout:
         retry = true;
-        if (relayResult.message) {
-          relayResult = relayResult.message;
-        } else {
-          logger.error("No txhash: %o", relayResult);
-          relayResult = "No txhash";
-        }
-      }
-    } catch (e: any) {
-      if (
-        e.message &&
-        e.message.search("timeout") >= 0 &&
-        e.message.search("exceeded") >= 0
-      ) {
-        logger.error("relay timed out: %o", e);
-        retVal = RELAY_TIMEOUT;
+        break;
+
+      default:
+        logger.warn(`Retrying for unknown relay retcode ${relayResult.code}`);
         retry = true;
-      } else {
-        logger.error("relay failed: %o", e);
-        if (e.response && e.response.data) {
-          if (
-            e.response.data.error &&
-            e.response.data.error.search("VaaAlreadyExecuted") >= 0
-          ) {
-            relayResult = "Already Executed";
-            retVal = RELAY_ALREADY_EXECUTED;
-          } else if (
-            e.response.data.message &&
-            e.response.data.message.search("account sequence mismatch") >= 0
-          ) {
-            relayResult = e.response.data.message;
-            retVal = RELAY_SEQ_NUM_MISMATCH;
-            retry = true;
-
-            logger.debug(
-              "wallet sequence number is out of sync, querying the current value"
-            );
-            await main.setSeqNum(connectionData);
-            logger.info(
-              "wallet seq number is now " +
-                connectionData.terraData.walletSeqNum
-            );
-          } else {
-            retVal = RELAY_FAIL;
-            retry = true;
-            if (e.message) {
-              relayResult = "Error: " + e.message;
-            } else {
-              relayResult = "Error: unexpected exception";
-            }
-          }
-        } else {
-          retVal = RELAY_FAIL;
-          retry = true;
-          if (e.message) {
-            relayResult = "Error: " + e.message;
-          } else {
-            relayResult = "Error: unexpected exception";
-          }
-        }
-      }
+        break;
     }
 
     logger.debug(
-      "relay attempt complete: retVal: " +
-        retVal +
+      "relay attempt complete: " +
+        JSON.stringify(relayResult) +
         ", retry: " +
         retry +
         ", attempt " +
@@ -420,19 +328,22 @@ async function relayEventsNotLocked(
       }
     }
   }
-
   if (retry) {
     logger.error("failed to relay batch, retry count exceeded!");
     metrics.incRetriesExceeded();
   }
 
-  return [retVal, relayResult];
+  if (!relayResult) {
+    logger.error("INTERNAL: worker failed to produce a relay result.");
+    relayResult = new RelayResult(RelayRetcode.Fail, []);
+  }
+
+  return relayResult;
 }
 
 async function finalizeEventsAlreadyLocked(
   currObjs: Array<CurrentEntry>,
-  retVal: number,
-  relayResult: any,
+  relayResult: RelayResult,
   sendTime: Date
 ) {
   for (let idx = 0; idx < currObjs.length; ++idx) {
@@ -440,17 +351,17 @@ async function finalizeEventsAlreadyLocked(
     let currEntry = currObjs[idx].pendingEntry;
     currObj.lastResult = relayResult;
     currObj.numTimesPublished = currObj.numTimesPublished + 1;
-    if (retVal == RELAY_SUCCESS) {
+    if (relayResult.code == RelayRetcode.Success) {
       metrics.incSuccesses();
-    } else if (retVal == RELAY_ALREADY_EXECUTED) {
+    } else if (relayResult.code == RelayRetcode.AlreadyExecuted) {
       metrics.incAlreadyExec();
-    } else if (retVal == RELAY_TIMEOUT) {
+    } else if (relayResult.code == RelayRetcode.Timeout) {
       metrics.incTransferTimeout();
       metrics.incFailures();
-    } else if (retVal == RELAY_SEQ_NUM_MISMATCH) {
+    } else if (relayResult.code == RelayRetcode.SeqNumMismatch) {
       metrics.incSeqNumMismatch();
       metrics.incFailures();
-    } else if (retVal == RELAY_INSUFFICIENT_FUNDS) {
+    } else if (relayResult.code == RelayRetcode.InsufficientFunds) {
       metrics.incInsufficentFunds();
       metrics.incFailures();
     } else {
@@ -485,7 +396,7 @@ async function finalizeEventsAlreadyLocked(
 async function updateBalance() {
   let now = new Date();
   if (balanceQueryInterval > 0 && now.getTime() >= nextBalanceQueryTimeAsMs) {
-    let balance = await main.queryBalance(connectionData);
+    let { address, balance } = await relayImpl.getPayerInfo();
     if (isNaN(balance)) {
       logger.error("failed to query wallet balance!");
     } else {
@@ -493,7 +404,9 @@ async function updateBalance() {
         walletTimeStamp = new Date();
       }
       logger.info(
-        "wallet balance: " +
+        "wallet " +
+          address +
+          " balance: " +
           balance +
           ", update time: " +
           walletTimeStamp.toISOString()
@@ -554,12 +467,10 @@ export async function getStatus() {
 }
 
 // Note that querying the contract does not update the sequence number, so we don't need to be locked.
-export async function getPriceData(
-  priceId: string
-): Promise<any> {
+export async function getPriceData(priceId: string): Promise<any> {
   let result: any;
   // await mutex.runExclusive(async () => {
-  result = await main.query(priceId);
+  result = await relayImpl.query(priceId);
   // });
 
   return result;