|
|
@@ -3,10 +3,10 @@ let CondVar = require("condition-variable");
|
|
|
|
|
|
import { setDefaultWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
|
|
|
import { uint8ArrayToHex } from "@certusone/wormhole-sdk";
|
|
|
+import { Relay, RelayResult, RelayRetcode } from "./relay/iface";
|
|
|
|
|
|
import * as helpers from "./helpers";
|
|
|
import { logger } from "./helpers";
|
|
|
-import * as main from "./relay/main";
|
|
|
import { PromHelper } from "./promHelpers";
|
|
|
|
|
|
const mutex = new Mutex();
|
|
|
@@ -37,7 +37,7 @@ type CurrentEntry = {
|
|
|
|
|
|
let productMap = new Map<string, ProductData>(); // The key to this is hash of price_ids in the batch attestation.
|
|
|
|
|
|
-let connectionData: main.ConnectionData;
|
|
|
+let relayImpl: Relay;
|
|
|
let metrics: PromHelper;
|
|
|
let nextBalanceQueryTimeAsMs: number = 0;
|
|
|
let balanceQueryInterval = 0;
|
|
|
@@ -46,15 +46,10 @@ let maxPerBatch: number = 1;
|
|
|
let maxAttempts: number = 2;
|
|
|
let retryDelayInMs: number = 0;
|
|
|
|
|
|
-export function init(runWorker: boolean): boolean {
|
|
|
+export function init(runWorker: boolean, relay: Relay): boolean {
|
|
|
if (!runWorker) return true;
|
|
|
|
|
|
- try {
|
|
|
- connectionData = main.connectRelayer();
|
|
|
- } catch (e) {
|
|
|
- logger.error("failed to load connection config: %o", e);
|
|
|
- return false;
|
|
|
- }
|
|
|
+ relayImpl = relay;
|
|
|
|
|
|
if (process.env.MAX_MSGS_PER_BATCH) {
|
|
|
maxPerBatch = parseInt(process.env.MAX_MSGS_PER_BATCH);
|
|
|
@@ -121,35 +116,25 @@ export async function run(met: PromHelper) {
|
|
|
balanceQueryInterval = parseInt(process.env.BAL_QUERY_INTERVAL);
|
|
|
}
|
|
|
|
|
|
- await main.setAccountNum(connectionData);
|
|
|
- logger.info(
|
|
|
- "wallet account number is " + connectionData.terraData.walletAccountNum
|
|
|
- );
|
|
|
-
|
|
|
- await main.setSeqNum(connectionData);
|
|
|
- logger.info(
|
|
|
- "initial wallet sequence number is " +
|
|
|
- connectionData.terraData.walletSeqNum
|
|
|
- );
|
|
|
-
|
|
|
- let balance = await main.queryBalance(connectionData);
|
|
|
- if (!isNaN(balance)) {
|
|
|
+ let { address: payerAddress, balance: payerBalance } =
|
|
|
+ await relayImpl.getPayerInfo();
|
|
|
+ if (!isNaN(payerBalance)) {
|
|
|
walletTimeStamp = new Date();
|
|
|
}
|
|
|
if (balanceQueryInterval !== 0) {
|
|
|
logger.info(
|
|
|
"initial wallet balance is " +
|
|
|
- balance +
|
|
|
+ payerBalance +
|
|
|
", will query every " +
|
|
|
balanceQueryInterval +
|
|
|
" milliseconds."
|
|
|
);
|
|
|
- metrics.setWalletBalance(balance);
|
|
|
+ metrics.setWalletBalance(payerBalance);
|
|
|
|
|
|
nextBalanceQueryTimeAsMs = new Date().getTime() + balanceQueryInterval;
|
|
|
} else {
|
|
|
- logger.info("initial wallet balance is " + balance);
|
|
|
- metrics.setWalletBalance(balance);
|
|
|
+ logger.info("initial wallet balance is " + payerBalance);
|
|
|
+ metrics.setWalletBalance(payerBalance);
|
|
|
}
|
|
|
|
|
|
await condition.wait(computeTimeout(), callBack);
|
|
|
@@ -187,18 +172,11 @@ async function callBack(err: any, result: any) {
|
|
|
if (currObjs.length !== 0) {
|
|
|
logger.debug("in callback, relaying " + currObjs.length + " events.");
|
|
|
let sendTime = new Date();
|
|
|
- let retVal: number;
|
|
|
- let relayResult: any;
|
|
|
- [retVal, relayResult] = await relayEventsNotLocked(messages);
|
|
|
+ let relayResult = await relayEventsNotLocked(messages);
|
|
|
|
|
|
await mutex.runExclusive(async () => {
|
|
|
logger.debug("in callback, finalizing " + currObjs.length + " events.");
|
|
|
- await finalizeEventsAlreadyLocked(
|
|
|
- currObjs,
|
|
|
- retVal,
|
|
|
- relayResult,
|
|
|
- sendTime
|
|
|
- );
|
|
|
+ await finalizeEventsAlreadyLocked(currObjs, relayResult, sendTime);
|
|
|
|
|
|
if (pendingMap.size === 0) {
|
|
|
logger.debug("in callback, rearming the condition.");
|
|
|
@@ -234,7 +212,9 @@ async function getPendingEventsAlreadyLocked(
|
|
|
const first = pendingMap.entries().next();
|
|
|
logger.debug("processing event with key [" + first.value[0] + "]");
|
|
|
const pendingValue: PendingPayload = first.value[1];
|
|
|
- let pendingKey = helpers.getBatchAttestationHashKey(pendingValue.batchAttestation);
|
|
|
+ let pendingKey = helpers.getBatchAttestationHashKey(
|
|
|
+ pendingValue.batchAttestation
|
|
|
+ );
|
|
|
let currObj = productMap.get(pendingKey);
|
|
|
if (currObj) {
|
|
|
currObj.lastBatchAttestation = pendingValue.batchAttestation;
|
|
|
@@ -286,113 +266,41 @@ const RELAY_INSUFFICIENT_FUNDS: number = 5;
|
|
|
|
|
|
async function relayEventsNotLocked(
|
|
|
messages: Array<string>
|
|
|
-): Promise<[number, any]> {
|
|
|
- let retVal: number = RELAY_SUCCESS;
|
|
|
- let relayResult: any;
|
|
|
+): Promise<RelayResult<any>> {
|
|
|
+ let relayResult: RelayResult<any> | null = null;
|
|
|
let retry: boolean = false;
|
|
|
|
|
|
+ // CAUTION(2022-03-21): The retry logic is not very efficient at
|
|
|
+ // handling more than one messsage. It may attempt redundant
|
|
|
+ // transactions during retries for messasges that were successful on a
|
|
|
+ // previous attempt.
|
|
|
for (let attempt = 0; attempt < maxAttempts; ++attempt) {
|
|
|
- retVal = RELAY_SUCCESS;
|
|
|
retry = false;
|
|
|
|
|
|
- try {
|
|
|
- relayResult = await main.relay(messages, connectionData);
|
|
|
- if (relayResult.txhash) {
|
|
|
- if (
|
|
|
- relayResult.raw_log &&
|
|
|
- relayResult.raw_log.search("VaaAlreadyExecuted") >= 0
|
|
|
- ) {
|
|
|
- relayResult = "Already Executed: " + relayResult.txhash;
|
|
|
- retVal = RELAY_ALREADY_EXECUTED;
|
|
|
- } else if (
|
|
|
- relayResult.raw_log &&
|
|
|
- relayResult.raw_log.search("insufficient funds") >= 0
|
|
|
- ) {
|
|
|
- logger.error(
|
|
|
- "relay failed due to insufficient funds: %o",
|
|
|
- relayResult
|
|
|
- );
|
|
|
- connectionData.terraData.walletSeqNum =
|
|
|
- connectionData.terraData.walletSeqNum - 1;
|
|
|
- retVal = RELAY_INSUFFICIENT_FUNDS;
|
|
|
- } else if (
|
|
|
- relayResult.raw_log &&
|
|
|
- relayResult.raw_log.search("failed") >= 0
|
|
|
- ) {
|
|
|
- logger.error("relay seems to have failed: %o", relayResult);
|
|
|
- retVal = RELAY_FAIL;
|
|
|
- retry = true;
|
|
|
- } else {
|
|
|
- relayResult = relayResult.txhash;
|
|
|
- }
|
|
|
- } else {
|
|
|
- retVal = RELAY_FAIL;
|
|
|
+ relayResult = await relayImpl.relay(messages);
|
|
|
+
|
|
|
+ switch (relayResult.code) {
|
|
|
+ case RelayRetcode.Success:
|
|
|
+ case RelayRetcode.AlreadyExecuted:
|
|
|
+ case RelayRetcode.InsufficientFunds:
|
|
|
+ logger.info(`Not retrying for relay retcode ${relayResult.code}`);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case RelayRetcode.Fail:
|
|
|
+ case RelayRetcode.SeqNumMismatch:
|
|
|
+ case RelayRetcode.Timeout:
|
|
|
retry = true;
|
|
|
- if (relayResult.message) {
|
|
|
- relayResult = relayResult.message;
|
|
|
- } else {
|
|
|
- logger.error("No txhash: %o", relayResult);
|
|
|
- relayResult = "No txhash";
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (e: any) {
|
|
|
- if (
|
|
|
- e.message &&
|
|
|
- e.message.search("timeout") >= 0 &&
|
|
|
- e.message.search("exceeded") >= 0
|
|
|
- ) {
|
|
|
- logger.error("relay timed out: %o", e);
|
|
|
- retVal = RELAY_TIMEOUT;
|
|
|
+ break;
|
|
|
+
|
|
|
+ default:
|
|
|
+ logger.warn(`Retrying for unknown relay retcode ${relayResult.code}`);
|
|
|
retry = true;
|
|
|
- } else {
|
|
|
- logger.error("relay failed: %o", e);
|
|
|
- if (e.response && e.response.data) {
|
|
|
- if (
|
|
|
- e.response.data.error &&
|
|
|
- e.response.data.error.search("VaaAlreadyExecuted") >= 0
|
|
|
- ) {
|
|
|
- relayResult = "Already Executed";
|
|
|
- retVal = RELAY_ALREADY_EXECUTED;
|
|
|
- } else if (
|
|
|
- e.response.data.message &&
|
|
|
- e.response.data.message.search("account sequence mismatch") >= 0
|
|
|
- ) {
|
|
|
- relayResult = e.response.data.message;
|
|
|
- retVal = RELAY_SEQ_NUM_MISMATCH;
|
|
|
- retry = true;
|
|
|
-
|
|
|
- logger.debug(
|
|
|
- "wallet sequence number is out of sync, querying the current value"
|
|
|
- );
|
|
|
- await main.setSeqNum(connectionData);
|
|
|
- logger.info(
|
|
|
- "wallet seq number is now " +
|
|
|
- connectionData.terraData.walletSeqNum
|
|
|
- );
|
|
|
- } else {
|
|
|
- retVal = RELAY_FAIL;
|
|
|
- retry = true;
|
|
|
- if (e.message) {
|
|
|
- relayResult = "Error: " + e.message;
|
|
|
- } else {
|
|
|
- relayResult = "Error: unexpected exception";
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- retVal = RELAY_FAIL;
|
|
|
- retry = true;
|
|
|
- if (e.message) {
|
|
|
- relayResult = "Error: " + e.message;
|
|
|
- } else {
|
|
|
- relayResult = "Error: unexpected exception";
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ break;
|
|
|
}
|
|
|
|
|
|
logger.debug(
|
|
|
- "relay attempt complete: retVal: " +
|
|
|
- retVal +
|
|
|
+ "relay attempt complete: " +
|
|
|
+ JSON.stringify(relayResult) +
|
|
|
", retry: " +
|
|
|
retry +
|
|
|
", attempt " +
|
|
|
@@ -413,19 +321,22 @@ async function relayEventsNotLocked(
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
if (retry) {
|
|
|
logger.error("failed to relay batch, retry count exceeded!");
|
|
|
metrics.incRetriesExceeded();
|
|
|
}
|
|
|
|
|
|
- return [retVal, relayResult];
|
|
|
+ if (!relayResult) {
|
|
|
+ logger.error("INTERNAL: worker failed to produce a relay result.");
|
|
|
+ relayResult = new RelayResult(RelayRetcode.Fail, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ return relayResult;
|
|
|
}
|
|
|
|
|
|
async function finalizeEventsAlreadyLocked(
|
|
|
currObjs: Array<CurrentEntry>,
|
|
|
- retVal: number,
|
|
|
- relayResult: any,
|
|
|
+ relayResult: RelayResult<any>,
|
|
|
sendTime: Date
|
|
|
) {
|
|
|
for (let idx = 0; idx < currObjs.length; ++idx) {
|
|
|
@@ -433,17 +344,17 @@ async function finalizeEventsAlreadyLocked(
|
|
|
let currEntry = currObjs[idx].pendingEntry;
|
|
|
currObj.lastResult = relayResult;
|
|
|
currObj.numTimesPublished = currObj.numTimesPublished + 1;
|
|
|
- if (retVal == RELAY_SUCCESS) {
|
|
|
+ if (relayResult.code == RelayRetcode.Success) {
|
|
|
metrics.incSuccesses();
|
|
|
- } else if (retVal == RELAY_ALREADY_EXECUTED) {
|
|
|
+ } else if (relayResult.code == RelayRetcode.AlreadyExecuted) {
|
|
|
metrics.incAlreadyExec();
|
|
|
- } else if (retVal == RELAY_TIMEOUT) {
|
|
|
+ } else if (relayResult.code == RelayRetcode.Timeout) {
|
|
|
metrics.incTransferTimeout();
|
|
|
metrics.incFailures();
|
|
|
- } else if (retVal == RELAY_SEQ_NUM_MISMATCH) {
|
|
|
+ } else if (relayResult.code == RelayRetcode.SeqNumMismatch) {
|
|
|
metrics.incSeqNumMismatch();
|
|
|
metrics.incFailures();
|
|
|
- } else if (retVal == RELAY_INSUFFICIENT_FUNDS) {
|
|
|
+ } else if (relayResult.code == RelayRetcode.InsufficientFunds) {
|
|
|
metrics.incInsufficentFunds();
|
|
|
metrics.incFailures();
|
|
|
} else {
|
|
|
@@ -476,7 +387,7 @@ async function finalizeEventsAlreadyLocked(
|
|
|
|
|
|
let now = new Date();
|
|
|
if (balanceQueryInterval > 0 && now.getTime() >= nextBalanceQueryTimeAsMs) {
|
|
|
- let balance = await main.queryBalance(connectionData);
|
|
|
+ let { address, balance } = await relayImpl.getPayerInfo();
|
|
|
if (isNaN(balance)) {
|
|
|
logger.error("failed to query wallet balance!");
|
|
|
} else {
|
|
|
@@ -484,7 +395,9 @@ async function finalizeEventsAlreadyLocked(
|
|
|
walletTimeStamp = new Date();
|
|
|
}
|
|
|
logger.info(
|
|
|
- "wallet balance: " +
|
|
|
+ "wallet " +
|
|
|
+ address +
|
|
|
+ " balance: " +
|
|
|
balance +
|
|
|
", update time: " +
|
|
|
walletTimeStamp.toISOString()
|
|
|
@@ -545,12 +458,10 @@ export async function getStatus() {
|
|
|
}
|
|
|
|
|
|
// Note that querying the contract does not update the sequence number, so we don't need to be locked.
|
|
|
-export async function getPriceData(
|
|
|
- priceId: string
|
|
|
-): Promise<any> {
|
|
|
+export async function getPriceData(priceId: string): Promise<any> {
|
|
|
let result: any;
|
|
|
// await mutex.runExclusive(async () => {
|
|
|
- result = await main.query(priceId);
|
|
|
+ result = await relayImpl.query(priceId);
|
|
|
// });
|
|
|
|
|
|
return result;
|