浏览代码

Spy relayer fixes (#1095)

* try catch pullAllEVMTokens

* remove some logs from walletMonitor

* redundant check fix, init s/f metrics

* fix unwrap for realz

* confirmed and rollback metrics

* fix terra balance fetching

* relayer: split out wallet monitor

* relayer: update tilt for wallet-monitor

* relayer: evm print tx hash before wait

* relayer: split out redis queue by source/target

* Update spy relayer example mainnet config files

* Includes Aurora bits in the emitter addresses, supported tokens,
  and supported chains.

Co-authored-by: Jeff Schroeder <jeffschroeder@computer.org>
Evan Gray 3 年之前
父节点
当前提交
50bb184522

+ 12 - 1
Tiltfile

@@ -349,13 +349,24 @@ if spy_relayer:
         "spy-relayer",
         resource_deps = ["proto-gen", "guardian", "redis"],
         port_forwards = [
-            port_forward(6063, container_port = 6060, name = "Debug/Status Server [:6063]", host = webHost),
             port_forward(8083, name = "Prometheus [:8083]", host = webHost),
         ],
         labels = ["spy-relayer"],
         trigger_mode = trigger_mode,
     )
 
+    k8s_yaml_with_ns("devnet/spy-wallet-monitor.yaml")
+
+    k8s_resource(
+        "spy-wallet-monitor",
+        resource_deps = ["proto-gen", "guardian", "redis"],
+        port_forwards = [
+            port_forward(8084, name = "Prometheus [:8084]", host = webHost),
+        ],
+        labels = ["spy-relayer"],
+        trigger_mode = trigger_mode,
+    )
+
 k8s_yaml_with_ns("devnet/eth-devnet.yaml")
 
 k8s_resource(

+ 4 - 0
devnet/spy-listener.yaml

@@ -9,6 +9,10 @@ spec:
   clusterIP: None
   selector:
     app: spy-listener
+  ports:
+    - port: 8082
+      name: prometheus
+      protocol: TCP
 ---
 apiVersion: apps/v1
 kind: StatefulSet

+ 8 - 0
devnet/spy-relayer.yaml

@@ -9,6 +9,10 @@ spec:
   clusterIP: None
   selector:
     app: spy-relayer
+  ports:
+    - port: 8083
+      name: prometheus
+      protocol: TCP
 ---
 apiVersion: apps/v1
 kind: StatefulSet
@@ -36,6 +40,10 @@ spec:
             - --prefix
             - /app/relayer/spy_relayer/
             - tilt_relayer
+          ports:
+            - containerPort: 8083
+              name: prometheus
+              protocol: TCP
           tty: true
           readinessProbe:
             tcpSocket:

+ 52 - 0
devnet/spy-wallet-monitor.yaml

@@ -0,0 +1,52 @@
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: spy-wallet-monitor
+  labels:
+    app: spy-wallet-monitor
+spec:
+  clusterIP: None
+  selector:
+    app: spy-wallet-monitor
+  ports:
+    - port: 8084
+      name: prometheus
+      protocol: TCP
+---
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+  name: spy-wallet-monitor
+spec:
+  selector:
+    matchLabels:
+      app: spy-wallet-monitor
+  serviceName: spy-wallet-monitor
+  replicas: 1
+  template:
+    metadata:
+      labels:
+        app: spy-wallet-monitor
+    spec:
+      restartPolicy: Always
+      terminationGracePeriodSeconds: 0
+      containers:
+        - name: spy-wallet-monitor
+          image: spy-relay-image
+          command:
+            - npm
+            - run
+            - --prefix
+            - /app/relayer/spy_relayer/
+            - tilt_wallet_monitor
+          ports:
+            - containerPort: 8084
+              name: prometheus
+              protocol: TCP
+          tty: true
+          readinessProbe:
+            tcpSocket:
+              port: 2000
+            periodSeconds: 1
+            failureThreshold: 300

+ 0 - 1
relayer/spy_relayer/.env.sample

@@ -6,7 +6,6 @@ READINESS_PORT=2000
 CLEAR_REDIS_ON_INIT=false
 DEMOTE_WORKING_ON_INIT=true
 LOG_LEVEL=debug
-SIMULATED_TERRA_WALLET_ADDRESS=terra1x46rqay4d3cssq8gxxvqz8xt6nwlz4td20k38v
 SUPPORTED_TOKENS=[{"chainId":1,"address":"So11111111111111111111111111111111111111112"}, {"chainId":2,"address":"0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E"}, {"chainId":3,"address":"uluna"}, {"chainId":4,"address":"0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E"}]
 PRIVATE_KEYS=[  {    "chainId": 1,    "privateKeys": [      [        14, 173, 153, 4, 176, 224, 201, 111, 32, 237, 183, 185, 159, 247, 22,        161, 89, 84, 215, 209, 212, 137, 10, 92, 157, 49, 29, 192, 101, 164,        152, 70, 87, 65, 8, 174, 214, 157, 175, 126, 98, 90, 54, 24, 100, 177,        247, 77, 19, 112, 47, 44, 165, 109, 233, 102, 14, 86, 109, 29, 134, 145,        132, 141      ]    ]  },  {    "chainId": 2,    "privateKeys": [      "0x4f3edf983ac636a65a842ce7c78d9aa706d3b113bce9c46f30d7d21715b23b1d"    ]  },  {    "chainId": 3,    "privateKeys": [      "notice oak worry limit wrap speak medal online prefer cluster roof addict wrist behave treat actual wasp year salad speed social layer crew genius"    ]  },  {    "chainId": 4,    "privateKeys": [      "0x4f3edf983ac636a65a842ce7c78d9aa706d3b113bce9c46f30d7d21715b23b1d"    ]  }]
 

+ 0 - 1
relayer/spy_relayer/.env.tilt.relayer

@@ -6,6 +6,5 @@ READINESS_PORT=2000
 CLEAR_REDIS_ON_INIT=false
 DEMOTE_WORKING_ON_INIT=true
 LOG_LEVEL=debug
-SIMULATED_TERRA_WALLET_ADDRESS=terra1x46rqay4d3cssq8gxxvqz8xt6nwlz4td20k38v
 SUPPORTED_TOKENS=[{"chainId":1,"address":"So11111111111111111111111111111111111111112"}, {"chainId":2,"address":"0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E"}, {"chainId":3,"address":"uluna"}, {"chainId":4,"address":"0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E"}]
 PRIVATE_KEYS=[  {    "chainId": 1,    "privateKeys": [      [        14, 173, 153, 4, 176, 224, 201, 111, 32, 237, 183, 185, 159, 247, 22,        161, 89, 84, 215, 209, 212, 137, 10, 92, 157, 49, 29, 192, 101, 164,        152, 70, 87, 65, 8, 174, 214, 157, 175, 126, 98, 90, 54, 24, 100, 177,        247, 77, 19, 112, 47, 44, 165, 109, 233, 102, 14, 86, 109, 29, 134, 145,        132, 141      ]    ]  },  {    "chainId": 2,    "privateKeys": [      "0x4f3edf983ac636a65a842ce7c78d9aa706d3b113bce9c46f30d7d21715b23b1d"    ]  },  {    "chainId": 3,    "privateKeys": [      "notice oak worry limit wrap speak medal online prefer cluster roof addict wrist behave treat actual wasp year salad speed social layer crew genius"    ]  },  {    "chainId": 4,    "privateKeys": [      "0x4f3edf983ac636a65a842ce7c78d9aa706d3b113bce9c46f30d7d21715b23b1d"    ]  }]

+ 10 - 0
relayer/spy_relayer/.env.tilt.wallet-monitor

@@ -0,0 +1,10 @@
+SUPPORTED_CHAINS=[  {    "chainId": 1,    "chainName": "Solana",  "nativeCurrencySymbol": "SOL",  "nodeUrl": "http://solana-devnet:8899",    "tokenBridgeAddress": "0x0290FB167208Af455bB137780163b7B7a9a10C16",    "bridgeAddress": "Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o",    "walletPrivateKey": [      [        14, 173, 153, 4, 176, 224, 201, 111, 32, 237, 183, 185, 159, 247, 22,        161, 89, 84, 215, 209, 212, 137, 10, 92, 157, 49, 29, 192, 101, 164,        152, 70, 87, 65, 8, 174, 214, 157, 175, 126, 98, 90, 54, 24, 100, 177,        247, 77, 19, 112, 47, 44, 165, 109, 233, 102, 14, 86, 109, 29, 134, 145,        132, 141      ]    ],    "wrappedAsset": "So11111111111111111111111111111111111111112"  },  {    "chainId": 2,    "chainName": "Ethereum",  "nativeCurrencySymbol": "ETH",  "nodeUrl": "http://eth-devnet:8545",    "tokenBridgeAddress": "0x0290FB167208Af455bB137780163b7B7a9a10C16",    "walletPrivateKey": [      "0x4f3edf983ac636a65a842ce7c78d9aa706d3b113bce9c46f30d7d21715b23b1d"    ],    "wrappedAsset": "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E"  },  {    "chainId": 3,    "chainName": "Terra",  "nativeCurrencySymbol": "LUNA",  "nodeUrl": "http://terra-terrad:1317",    "tokenBridgeAddress": "terra10pyejy66429refv3g35g2t7am0was7ya7kz2a4",    "walletPrivateKey": [      "notice oak worry limit wrap speak medal online prefer cluster roof addict wrist behave treat actual wasp year salad speed social layer crew genius"    ],    "terraName": "localterra",    "terraChainId": "columbus-5",    "terraCoin": "uluna",    "terraGasPriceUrl": "http://terra-fcd:3060/v1/txs/gas_prices"  },  {    "chainId": 4,    "chainName": "Binance Smart Chain",  "nativeCurrencySymbol": "BNB",  "nodeUrl": "http://eth-devnet2:8545",    "tokenBridgeAddress": "0x0290FB167208Af455bB137780163b7B7a9a10C16",    "walletPrivateKey": [      "0x4f3edf983ac636a65a842ce7c78d9aa706d3b113bce9c46f30d7d21715b23b1d"    ],    "wrappedAsset": "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E"  }]
+REDIS_HOST= redis
+REDIS_PORT=6379
+PROM_PORT=8084
+READINESS_PORT=2000
+CLEAR_REDIS_ON_INIT=false
+DEMOTE_WORKING_ON_INIT=true
+LOG_LEVEL=debug
+SUPPORTED_TOKENS=[{"chainId":1,"address":"So11111111111111111111111111111111111111112"}, {"chainId":2,"address":"0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E"}, {"chainId":3,"address":"uluna"}, {"chainId":4,"address":"0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E"}]
+PRIVATE_KEYS=[  {    "chainId": 1,    "privateKeys": [      [        14, 173, 153, 4, 176, 224, 201, 111, 32, 237, 183, 185, 159, 247, 22,        161, 89, 84, 215, 209, 212, 137, 10, 92, 157, 49, 29, 192, 101, 164,        152, 70, 87, 65, 8, 174, 214, 157, 175, 126, 98, 90, 54, 24, 100, 177,        247, 77, 19, 112, 47, 44, 165, 109, 233, 102, 14, 86, 109, 29, 134, 145,        132, 141      ]    ]  },  {    "chainId": 2,    "privateKeys": [      "0x4f3edf983ac636a65a842ce7c78d9aa706d3b113bce9c46f30d7d21715b23b1d"    ]  },  {    "chainId": 3,    "privateKeys": [      "notice oak worry limit wrap speak medal online prefer cluster roof addict wrist behave treat actual wasp year salad speed social layer crew genius"    ]  },  {    "chainId": 4,    "privateKeys": [      "0x4f3edf983ac636a65a842ce7c78d9aa706d3b113bce9c46f30d7d21715b23b1d"    ]  }]

+ 0 - 1
relayer/spy_relayer/config/mainnet/.env.relayer.sample

@@ -6,6 +6,5 @@ READINESS_PORT=2000
 CLEAR_REDIS_ON_INIT=false
 DEMOTE_WORKING_ON_INIT=true
 LOG_LEVEL=debug
-SIMULATED_TERRA_WALLET_ADDRESS= Requires a terra public address which will always have enough funds to pay for transactions. one of the hot wallets will do.
 SUPPORTED_TOKENS= paste from supportedTokens.json. This must be the same as in the .env.listener file.
 PRIVATE_KEYS= paste from privateKeys.json

+ 10 - 0
relayer/spy_relayer/config/mainnet/.env.wallet-monitor.sample

@@ -0,0 +1,10 @@
+SUPPORTED_CHAINS= paste from supportedChains.json
+REDIS_HOST= change me
+REDIS_PORT= change me
+PROM_PORT=8084
+READINESS_PORT=2000
+CLEAR_REDIS_ON_INIT=false
+DEMOTE_WORKING_ON_INIT=true
+LOG_LEVEL=debug
+SUPPORTED_TOKENS= paste from supportedTokens.json. This must be the same as in the .env.listener file.
+PRIVATE_KEYS= paste from privateKeys.json

+ 5 - 4
relayer/spy_relayer/config/mainnet/emitterAddresses.json

@@ -1,10 +1,11 @@
 [
-    {"chainId":1,"emitterAddress":"wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb"}, 
-    {"chainId":2,"emitterAddress":"0x3ee18B2214AFF97000D974cf647E7C347E8fa585"}, 
-    {"chainId":3,"emitterAddress":"terra10nmmwe8r3g99a9newtqa7a75xfgs2e8z87r2sf"}, 
+    {"chainId":1,"emitterAddress":"wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb"},
+    {"chainId":2,"emitterAddress":"0x3ee18B2214AFF97000D974cf647E7C347E8fa585"},
+    {"chainId":3,"emitterAddress":"terra10nmmwe8r3g99a9newtqa7a75xfgs2e8z87r2sf"},
     {"chainId":4,"emitterAddress":"0xB6F6D86a8f9879A9c87f643768d9efc38c1Da6E7"},
     {"chainId":5,"emitterAddress":"0x5a58505a96D1dbf8dF91cB21B54419FC36e93fdE"},
     {"chainId":6,"emitterAddress":"0x0e082F06FF657D94310cB8cE8B0D9a04541d8052"},
     {"chainId":7,"emitterAddress":"0x5848C791e09901b40A9Ef749f2a6735b418d7564"},
+    {"chainId":9,"emitterAddress":"0x51b5123a7b0f9b2ba265f9c4c8de7d78d52f510f"},
     {"chainId":10,"emitterAddress":"0x7C9Fc5741288cDFdD83CeB07f3ea7e22618D79D2"}
-]
+]

+ 8 - 0
relayer/spy_relayer/config/mainnet/supportedChains.json

@@ -59,6 +59,14 @@
     "tokenBridgeAddress": "0x5848C791e09901b40A9Ef749f2a6735b418d7564",
     "wrappedAsset": "0x21C718C22D52d0F3a789b752D4c2fD5908a8A733"
   },
+  {
+    "chainId": 9,
+    "chainName": "Aurora",
+    "nativeCurrencySymbol": "ETH",
+    "nodeUrl": "https://mainnet.aurora.dev",
+    "tokenBridgeAddress": "0x51b5123a7b0f9b2ba265f9c4c8de7d78d52f510f",
+    "wrappedAsset": "0xC9BdeEd33CD01541e1eeD10f90519d2C06Fe3feB"
+  },
   {
     "chainId": 10,
     "chainName": "Fantom",

+ 8 - 7
relayer/spy_relayer/config/mainnet/supportedTokens.json

@@ -1,15 +1,16 @@
 [
-    {"chainId":1,"address":"So11111111111111111111111111111111111111112"}, 
-    {"chainId":2,"address":"0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"}, 
-    {"chainId":3,"address":"uluna"}, 
-    {"chainId":3,"address":"uusd"}, 
+    {"chainId":1,"address":"So11111111111111111111111111111111111111112"},
+    {"chainId":2,"address":"0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"},
+    {"chainId":3,"address":"uluna"},
+    {"chainId":3,"address":"uusd"},
     {"chainId":4,"address":"0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c"},
     {"chainId":5,"address":"0x0d500b1d8e8ef31e21c99d1db9a6444d3adf1270"},
     {"chainId":6,"address":"0xb31f66aa3c1e785363f0875a1b74e27b85fd66c7"},
     {"chainId":7,"address":"0x21C718C22D52d0F3a789b752D4c2fD5908a8A733"},
+    {"chainId":9,"address":"0xC9BdeEd33CD01541e1eeD10f90519d2C06Fe3feB"},
     {"chainId":10,"address":"0x21be370D5312f44cB42ce377BC9b8a0cEF1A4C83"},
 
 
-    {"chainId":2,"address":"0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"}, 
-    {"chainId":2,"address":"0xdac17f958d2ee523a2206206994597c13d831ec7"} 
-]
+    {"chainId":2,"address":"0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"},
+    {"chainId":2,"address":"0xdac17f958d2ee523a2206206994597c13d831ec7"}
+]

+ 2 - 0
relayer/spy_relayer/package.json

@@ -8,8 +8,10 @@
     "spy_relay": "node lib/main.js",
     "tilt_listener": "SPY_RELAY_CONFIG=.env.tilt.listener node lib/main.js --listen_only",
     "tilt_relayer": "SPY_RELAY_CONFIG=.env.tilt.relayer node lib/main.js --relay_only",
+    "tilt_wallet_monitor": "SPY_RELAY_CONFIG=.env.tilt.wallet-monitor node lib/main.js --wallet_monitor_only",
     "listen_only": "node lib/main.js --listen_only",
     "relay_only": "node lib/main.js --relay_only",
+    "wallet_monitor_only": "node lib/main.js --wallet_monitor_only",
     "test": "jest --config jestconfig.json --verbose"
   },
   "author": "",

+ 53 - 22
relayer/spy_relayer/src/helpers/promHelpers.ts

@@ -1,7 +1,7 @@
 import { ChainId } from "@certusone/wormhole-sdk";
 import http = require("http");
 import client = require("prom-client");
-import { WalletBalance } from "../relayer/walletMonitor";
+import { WalletBalance } from "../monitor/walletMonitor";
 import { chainIDStrings } from "../utils/wormhole";
 import { getScopedLogger } from "./logHelper";
 import { RedisTables } from "./redisHelper";
@@ -15,7 +15,8 @@ const logger = getScopedLogger(["prometheusHelpers"]);
 export enum PromMode {
   Listen,
   Relay,
-  Both,
+  WalletMonitor,
+  All,
 }
 
 export class PromHelper {
@@ -29,11 +30,21 @@ export class PromHelper {
     help: "number of successful relays",
     labelNames: ["chain_name"],
   });
+  private confirmedCounter = new client.Counter({
+    name: "spy_relay_confirmed_successes",
+    help: "number of confirmed successful relays",
+    labelNames: ["chain_name"],
+  });
   private failureCounter = new client.Counter({
     name: "spy_relay_failures",
     help: "number of failed relays",
     labelNames: ["chain_name"],
   });
+  private rollbackCounter = new client.Counter({
+    name: "spy_relay_rollback",
+    help: "number of rolled back relays",
+    labelNames: ["chain_name"],
+  });
   private completeTime = new client.Histogram({
     name: "spy_relay_complete_time",
     help: "Time is took to complete transfer",
@@ -54,7 +65,7 @@ export class PromHelper {
   private redisQueue = new client.Gauge({
     name: "spy_relay_redis_queue_length",
     help: "number of items in the pending queue.",
-    labelNames: ["queue"],
+    labelNames: ["queue", "source_chain_name", "target_chain_name"],
   });
 
   // Wallet metrics
@@ -83,17 +94,12 @@ export class PromHelper {
     } else if (
       req.url === "/metrics" ||
       req.url === "/relayer" ||
-      req.url === "/listener"
+      req.url === "/listener" ||
+      req.url === "/wallet-monitor"
     ) {
       // Return all metrics in the Prometheus exposition format
-      if (this._mode === PromMode.Listen || this._mode == PromMode.Both) {
-        res.setHeader("Content-Type", this._register.contentType);
-        res.end(await this._register.metrics());
-      }
-      if (this._mode === PromMode.Relay || this._mode == PromMode.Both) {
-        res.setHeader("Content-Type", this._register.contentType);
-        res.end(await this._register.metrics());
-      }
+      res.setHeader("Content-Type", this._register.contentType);
+      res.end(await this._register.metrics());
     } else {
       res.writeHead(404, { "Content-Type": "text/plain" });
       res.write("404 Not Found - " + req.url + "\n");
@@ -108,8 +114,10 @@ export class PromHelper {
       mode_name = "listener";
     } else if (mode === PromMode.Relay) {
       mode_name = "relayer";
-    } else if (mode === PromMode.Both) {
-      mode_name = "both";
+    } else if (mode === PromMode.WalletMonitor) {
+      mode_name = "wallet-monitor";
+    } else if (mode === PromMode.All) {
+      mode_name = "all";
     }
 
     this._register.setDefaultLabels({
@@ -121,14 +129,18 @@ export class PromHelper {
 
     this._mode = mode;
     // Register each metric
-    if (this._mode === PromMode.Listen || this._mode == PromMode.Both) {
+    if (this._mode === PromMode.Listen || this._mode === PromMode.All) {
       this._register.registerMetric(this.listenCounter);
     }
-    if (this._mode === PromMode.Relay || this._mode == PromMode.Both) {
+    if (this._mode === PromMode.Relay || this._mode === PromMode.All) {
       this._register.registerMetric(this.successCounter);
+      this._register.registerMetric(this.confirmedCounter);
       this._register.registerMetric(this.failureCounter);
+      this._register.registerMetric(this.rollbackCounter);
       this._register.registerMetric(this.alreadyExecutedCounter);
       this._register.registerMetric(this.redisQueue);
+    }
+    if (this._mode === PromMode.WalletMonitor || this._mode === PromMode.All) {
       this._register.registerMetric(this.walletBalance);
     }
     // End registering metric
@@ -137,15 +149,25 @@ export class PromHelper {
   }
 
   // These are the accessor methods for the metrics
-  incSuccesses(chainId: ChainId) {
+  incSuccesses(chainId: ChainId, value?: number) {
     this.successCounter
       .labels({ chain_name: chainIDStrings[chainId] || "Unknown" })
-      .inc();
+      .inc(value);
+  }
+  incConfirmed(chainId: ChainId, value?: number) {
+    this.confirmedCounter
+      .labels({ chain_name: chainIDStrings[chainId] || "Unknown" })
+      .inc(value);
   }
-  incFailures(chainId: ChainId) {
+  incFailures(chainId: ChainId, value?: number) {
     this.failureCounter
       .labels({ chain_name: chainIDStrings[chainId] || "Unknown" })
-      .inc();
+      .inc(value);
+  }
+  incRollback(chainId: ChainId, value?: number) {
+    this.rollbackCounter
+      .labels({ chain_name: chainIDStrings[chainId] || "Unknown" })
+      .inc(value);
   }
   addCompleteTime(val: number) {
     this.completeTime.observe(val);
@@ -160,9 +182,18 @@ export class PromHelper {
   handleListenerMemqueue(size: number) {
     this.listenerMemqueue.set(size);
   }
-  setRedisQueue(queue: RedisTables, size: number) {
+  setRedisQueue(
+    queue: RedisTables,
+    sourceChainId: ChainId,
+    targetChainId: ChainId,
+    size: number
+  ) {
     this.redisQueue
-      .labels({ queue: RedisTables[queue].toLowerCase() })
+      .labels({
+        queue: RedisTables[queue].toLowerCase(),
+        source_chain_name: chainIDStrings[sourceChainId],
+        target_chain_name: chainIDStrings[targetChainId],
+      })
       .set(size);
   }
 

文件差异内容过多而无法显示
+ 17 - 0
relayer/spy_relayer/src/helpers/redisHelper.test.ts


+ 104 - 7
relayer/spy_relayer/src/helpers/redisHelper.ts

@@ -1,8 +1,15 @@
-import { ChainId, uint8ArrayToHex } from "@certusone/wormhole-sdk";
+import {
+  ChainId,
+  hexToUint8Array,
+  importCoreWasm,
+  parseTransferPayload,
+  uint8ArrayToHex,
+} from "@certusone/wormhole-sdk";
 import { Mutex } from "async-mutex";
-import { createClient } from "redis";
+import { createClient, RedisClientType } from "redis";
 import { getCommonEnvironment } from "../configureEnv";
 import { ParsedTransferPayload, ParsedVaa } from "../listener/validation";
+import { chainIDStrings } from "../utils/wormhole";
 import { getScopedLogger } from "./logHelper";
 import { PromHelper } from "./promHelpers";
 import { sleep } from "./utils";
@@ -224,6 +231,10 @@ export function storePayloadFromJson(json: string): StorePayload {
   return JSON.parse(json);
 }
 
+export function resetPayload(storePayload: StorePayload): StorePayload {
+  return initPayloadWithVAA(storePayload.vaa_bytes);
+}
+
 export async function pushVaaToRedis(
   parsedVAA: ParsedVaa<ParsedTransferPayload>,
   hexVaa: string
@@ -299,18 +310,67 @@ export async function demoteWorkingRedis() {
     await redisClient.select(RedisTables.INCOMING);
     await redisClient.set(
       si_key,
-      storePayloadToJson(
-        initPayloadWithVAA(storePayloadFromJson(si_value).vaa_bytes)
-      )
+      storePayloadToJson(resetPayload(storePayloadFromJson(si_value)))
     );
     await redisClient.select(RedisTables.WORKING);
   }
   redisClient.quit();
 }
 
+type SourceToTargetMap = {
+  [key in ChainId]: {
+    [key in ChainId]: number;
+  };
+};
+
+export function createSourceToTargetMap(
+  knownChainIds: ChainId[]
+): SourceToTargetMap {
+  const sourceToTargetMap: SourceToTargetMap = {} as SourceToTargetMap;
+  for (const sourceKey of knownChainIds) {
+    sourceToTargetMap[sourceKey] = {} as { [key in ChainId]: number };
+    for (const targetKey of knownChainIds) {
+      sourceToTargetMap[sourceKey][targetKey] = 0;
+    }
+  }
+  return sourceToTargetMap;
+}
+
+export async function incrementSourceToTargetMap(
+  key: string,
+  redisClient: RedisClientType<any>,
+  parse_vaa: Function,
+  sourceToTargetMap: SourceToTargetMap
+): Promise<void> {
+  const parsedKey = storeKeyFromJson(key);
+  const si_value = await redisClient.get(key);
+  if (!si_value) {
+    return;
+  }
+  const parsedPayload = parseTransferPayload(
+    Buffer.from(
+      parse_vaa(hexToUint8Array(storePayloadFromJson(si_value).vaa_bytes))
+        .payload
+    )
+  );
+  if (
+    sourceToTargetMap[parsedKey.chain_id as ChainId]?.[
+      parsedPayload.targetChain
+    ] !== undefined
+  ) {
+    sourceToTargetMap[parsedKey.chain_id as ChainId][
+      parsedPayload.targetChain
+    ]++;
+  }
+}
+
 export async function monitorRedis(metrics: PromHelper) {
   const scopedLogger = getScopedLogger(["monitorRedis"], logger);
   const TEN_SECONDS: number = 10000;
+  const { parse_vaa } = await importCoreWasm();
+  const knownChainIds = Object.keys(chainIDStrings).map(
+    (c) => Number(c) as ChainId
+  );
   while (true) {
     const redisClient = await connectToRedis();
     if (!redisClient) {
@@ -318,9 +378,46 @@ export async function monitorRedis(metrics: PromHelper) {
     } else {
       try {
         await redisClient.select(RedisTables.INCOMING);
-        metrics.setRedisQueue(RedisTables.INCOMING, await redisClient.dbSize());
+        const incomingSourceToTargetMap =
+          createSourceToTargetMap(knownChainIds);
+        for await (const si_key of redisClient.scanIterator()) {
+          incrementSourceToTargetMap(
+            si_key,
+            redisClient,
+            parse_vaa,
+            incomingSourceToTargetMap
+          );
+        }
+        for (const sourceKey of knownChainIds) {
+          for (const targetKey of knownChainIds) {
+            metrics.setRedisQueue(
+              RedisTables.INCOMING,
+              sourceKey,
+              targetKey,
+              incomingSourceToTargetMap[sourceKey][targetKey]
+            );
+          }
+        }
         await redisClient.select(RedisTables.WORKING);
-        metrics.setRedisQueue(RedisTables.WORKING, await redisClient.dbSize());
+        const workingSourceToTargetMap = createSourceToTargetMap(knownChainIds);
+        for await (const si_key of redisClient.scanIterator()) {
+          incrementSourceToTargetMap(
+            si_key,
+            redisClient,
+            parse_vaa,
+            workingSourceToTargetMap
+          );
+        }
+        for (const sourceKey of knownChainIds) {
+          for (const targetKey of knownChainIds) {
+            metrics.setRedisQueue(
+              RedisTables.WORKING,
+              sourceKey,
+              targetKey,
+              workingSourceToTargetMap[sourceKey][targetKey]
+            );
+          }
+        }
       } catch (e) {
         scopedLogger.error("Failed to get dbSize and set metrics!");
       }

+ 38 - 18
relayer/spy_relayer/src/main.ts

@@ -9,12 +9,13 @@ import * as redisHelper from "./helpers/redisHelper";
 import * as restListener from "./listener/rest_listen";
 import * as spyListener from "./listener/spy_listen";
 import * as relayWorker from "./relayer/relay_worker";
+import * as walletMonitor from "./monitor";
 
-export enum ProcessType {
-  LISTEN_ONLY = "--listen_only",
-  RELAY_ONLY = "--relay_only",
-  SPY_AND_RELAY = "spy and relay",
-}
+const ARG_LISTEN_ONLY = "--listen_only";
+const ARG_RELAY_ONLY = "--relay_only";
+const ARG_WALLET_MONITOR_ONLY = "--wallet_monitor_only";
+const ONLY_ONE_ARG_ERROR_MSG = `May only specify one of ${ARG_LISTEN_ONLY}, ${ARG_RELAY_ONLY}, or ${ARG_WALLET_MONITOR_ONLY}`;
+const ONLY_ONE_ARG_ERROR_RESULT = `Multiple args found of ${ARG_LISTEN_ONLY}, ${ARG_RELAY_ONLY}, ${ARG_WALLET_MONITOR_ONLY}`;
 
 setDefaultWasm("node");
 const logger = getLogger();
@@ -23,34 +24,49 @@ const logger = getLogger();
 let runListen: boolean = true;
 let runWorker: boolean = true;
 let runRest: boolean = true;
+let runWalletMonitor: boolean = true;
 let foundOne: boolean = false;
 let error: string = "";
 
 for (let idx = 0; idx < process.argv.length; ++idx) {
-  if (process.argv[idx] === "--listen_only") {
+  if (process.argv[idx] === ARG_LISTEN_ONLY) {
     if (foundOne) {
-      logger.error('May only specify one of "--listen_only" or "--relay_only"');
-      error = "Multiple args found of --listen_only and --relay_only";
+      logger.error(ONLY_ONE_ARG_ERROR_MSG);
+      error = ONLY_ONE_ARG_ERROR_RESULT;
       break;
     }
 
     logger.info("spy_relay is running in listen only mode");
     runWorker = false;
+    runWalletMonitor = false;
     foundOne = true;
   }
 
-  if (process.argv[idx] === "--relay_only") {
+  if (process.argv[idx] === ARG_RELAY_ONLY) {
     if (foundOne) {
-      logger.error(
-        'May only specify one of "--listen_only", "--relay_only" or "--rest_only"'
-      );
-      error = "Multiple args found of --listen_only and --relay_only";
+      logger.error(ONLY_ONE_ARG_ERROR_MSG);
+      error = ONLY_ONE_ARG_ERROR_RESULT;
       break;
     }
 
     logger.info("spy_relay is running in relay only mode");
     runListen = false;
     runRest = false;
+    runWalletMonitor = false;
+    foundOne = true;
+  }
+
+  if (process.argv[idx] === ARG_WALLET_MONITOR_ONLY) {
+    if (foundOne) {
+      logger.error(ONLY_ONE_ARG_ERROR_MSG);
+      error = ONLY_ONE_ARG_ERROR_RESULT;
+      break;
+    }
+
+    logger.info("spy_relay is running in wallet monitor only mode");
+    runListen = false;
+    runRest = false;
+    runWorker = false;
     foundOne = true;
   }
 }
@@ -63,22 +79,25 @@ if (
   !error &&
   spyListener.init(runListen) &&
   relayWorker.init(runWorker) &&
-  restListener.init(runRest)
+  restListener.init(runRest) &&
+  walletMonitor.init(runWalletMonitor)
 ) {
   const commonEnv = getCommonEnvironment();
   const { promPort, readinessPort } = commonEnv;
   logger.info("prometheus client listening on port " + promPort);
   let promClient: PromHelper;
-  const runBoth: boolean = runListen && runWorker;
-  if (runBoth) {
-    promClient = new PromHelper("spy_relay", promPort, PromMode.Both);
+  const runAll: boolean = runListen && runWorker && runWalletMonitor;
+  if (runAll) {
+    promClient = new PromHelper("spy_relay", promPort, PromMode.All);
   } else if (runListen) {
     promClient = new PromHelper("spy_relay", promPort, PromMode.Listen);
   } else if (runWorker) {
     promClient = new PromHelper("spy_relay", promPort, PromMode.Relay);
+  } else if (runWalletMonitor) {
+    promClient = new PromHelper("spy_relay", promPort, PromMode.WalletMonitor);
   } else {
     logger.error("Invalid run mode for Prometheus");
-    promClient = new PromHelper("spy_relay", promPort, PromMode.Both);
+    promClient = new PromHelper("spy_relay", promPort, PromMode.All);
   }
 
   redisHelper.init(promClient);
@@ -86,6 +105,7 @@ if (
   if (runListen) spyListener.run(promClient);
   if (runWorker) relayWorker.run(promClient);
   if (runRest) restListener.run();
+  if (runWalletMonitor) walletMonitor.run(promClient);
 
   if (readinessPort) {
     const Net = require("net");

+ 34 - 0
relayer/spy_relayer/src/monitor/index.ts

@@ -0,0 +1,34 @@
+import { getRelayerEnvironment, RelayerEnvironment } from "../configureEnv";
+import { getLogger } from "../helpers/logHelper";
+import { PromHelper } from "../helpers/promHelpers";
+import { collectWallets } from "./walletMonitor";
+
+let metrics: PromHelper;
+
+const logger = getLogger();
+let relayerEnv: RelayerEnvironment;
+
+export function init(runWorker: boolean): boolean {
+  if (!runWorker) return true;
+
+  try {
+    relayerEnv = getRelayerEnvironment();
+  } catch (e) {
+    logger.error(
+      "Encountered error while initiating the monitor environment: " + e
+    );
+    return false;
+  }
+
+  return true;
+}
+
+export async function run(ph: PromHelper) {
+  metrics = ph;
+
+  try {
+    collectWallets(metrics);
+  } catch (e) {
+    logger.error("Failed to kick off collectWallets: " + e);
+  }
+}

+ 90 - 0
relayer/spy_relayer/src/monitor/walletMonitor.test.ts

@@ -0,0 +1,90 @@
+require("../helpers/loadConfig");
+process.env.LOG_DIR = ".";
+
+import { CHAIN_ID_TERRA } from "@certusone/wormhole-sdk";
+import { jest, test } from "@jest/globals";
+import { LCDClient } from "@terra-money/terra.js";
+import { ChainConfigInfo } from "../configureEnv";
+import { calcLocalAddressesTerra, pullTerraBalance } from "./walletMonitor";
+// import { pullEVMBalance } from "./walletMonitor";
+
+jest.setTimeout(300000);
+
+// const bscChainConfig: ChainConfigInfo = {
+//   chainId: CHAIN_ID_BSC,
+//   chainName: "BSC",
+//   nativeCurrencySymbol: "BNB",
+//   nodeUrl: "https://bsc-dataseed.binance.org",
+//   tokenBridgeAddress: "0xB6F6D86a8f9879A9c87f643768d9efc38c1Da6E7",
+//   wrappedAsset: "0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c",
+// };
+// const bscPublicKey = "0xB6F6D86a8f9879A9c87f643768d9efc38c1Da6E7"; // Token Bridge
+// const bscTokens = [
+//   "0xfA54fF1a158B5189Ebba6ae130CEd6bbd3aEA76e", // SOL
+//   "0x4DB5a66E937A9F4473fA95b1cAF1d1E1D62E29EA", // WETH
+//   "0x156ab3346823B651294766e23e6Cf87254d68962", // LUNA
+//   "0x3d4350cD54aeF9f9b2C29435e0fa809957B3F30a", // UST
+//   "0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c", // WBNB
+//   "0xc836d8dC361E44DbE64c4862D55BA041F88Ddd39", // WMATIC
+//   "0x96412902aa9aFf61E13f085e70D3152C6ef2a817", // WAVAX
+//   "0x6c6D604D3f07aBE287C1A3dF0281e999A83495C0", // wROSE
+//   "0xbF8413EE8612E0E4f66Aa63B5ebE27f3C5883d47", // WFTM
+//   "0xB04906e95AB5D797aDA81508115611fee694c2b3", // USDC
+//   "0x524bC91Dc82d6b90EF29F76A3ECAaBAffFD490Bc", // USDT
+// ];
+
+// test("should pull EVM token balances", async () => {
+//   for (let address of bscTokens) {
+//     const balance = await pullEVMBalance(bscChainConfig, bscPublicKey, address);
+//     console.log(balance);
+//     expect(balance).toBeTruthy();
+//   }
+// });
+
+const terraChainConfig: ChainConfigInfo = {
+  chainId: CHAIN_ID_TERRA,
+  chainName: "Terra",
+  nativeCurrencySymbol: "UST",
+  nodeUrl: "https://fcd.terra.dev",
+  tokenBridgeAddress: "terra10nmmwe8r3g99a9newtqa7a75xfgs2e8z87r2sf",
+  terraName: "mainnet",
+  terraChainId: "columbus-5",
+  terraCoin: "uluna",
+  terraGasPriceUrl: "https://fcd.terra.dev/v1/txs/gas_prices",
+};
+
+const supportedTokens = require("../../config/mainnet/supportedTokens.json");
+
+test("should pull Terra token balances", async () => {
+  if (
+    !(
+      terraChainConfig.terraChainId &&
+      terraChainConfig.terraCoin &&
+      terraChainConfig.terraGasPriceUrl &&
+      terraChainConfig.terraName
+    )
+  ) {
+    throw new Error("Terra relay was called without proper instantiation.");
+  }
+  const lcdConfig = {
+    URL: terraChainConfig.nodeUrl,
+    chainID: terraChainConfig.terraChainId,
+    name: terraChainConfig.terraName,
+  };
+  const lcd = new LCDClient(lcdConfig);
+  const localAddresses = await calcLocalAddressesTerra(
+    lcd,
+    supportedTokens,
+    terraChainConfig
+  );
+  expect(localAddresses.length).toBeGreaterThan(0);
+  for (const tokenAddress of localAddresses) {
+    const balance = await pullTerraBalance(
+      lcd,
+      terraChainConfig.tokenBridgeAddress,
+      tokenAddress
+    );
+    console.log(balance);
+    expect(balance).toBeDefined();
+  }
+});

+ 190 - 251
relayer/spy_relayer/src/relayer/walletMonitor.ts → relayer/spy_relayer/src/monitor/walletMonitor.ts

@@ -26,7 +26,7 @@ import { getMetaplexData, sleep } from "../helpers/utils";
 import { getEthereumToken } from "../utils/ethereum";
 import { getMultipleAccountsRPC } from "../utils/solana";
 import { formatNativeDenom } from "../utils/terra";
-import { newProvider } from "./evm";
+import { newProvider } from "../relayer/evm";
 
 let env: RelayerEnvironment;
 const logger = getScopedLogger(["walletMonitor"]);
@@ -55,8 +55,6 @@ function init() {
 
 async function pullBalances(metrics: PromHelper): Promise<WalletBalance[]> {
   //TODO loop through all the chain configs, calc the public keys, pull their balances, and push to a combo of the loggers and prmometheus
-
-  logger.debug("pulling balances...");
   if (!env) {
     logger.error("pullBalances() - no env");
     return [];
@@ -67,63 +65,50 @@ async function pullBalances(metrics: PromHelper): Promise<WalletBalance[]> {
   }
   const balancePromises: Promise<WalletBalance[]>[] = [];
   for (const chainInfo of env.supportedChains) {
-    if (!chainInfo) break;
-    for (const privateKey of chainInfo.walletPrivateKey || []) {
-      try {
-        if (!privateKey) break;
-        logger.debug(
-          "Attempting to pull native balance for chainId: " + chainInfo.chainId
-        );
-        if (isEVMChain(chainInfo.chainId)) {
-          logger.info("Attempting to pull EVM native balance...");
+    if (!chainInfo) continue;
+    try {
+      if (chainInfo.chainId === CHAIN_ID_SOLANA) {
+        for (const solanaPrivateKey of chainInfo.solanaPrivateKey || []) {
+          try {
+            balancePromises.push(
+              pullSolanaNativeBalance(chainInfo, solanaPrivateKey)
+            );
+            balancePromises.push(
+              pullSolanaTokenBalances(chainInfo, solanaPrivateKey)
+            );
+          } catch (e: any) {
+            logger.error(
+              "pulling balances failed failed for chain: " + chainInfo.chainName
+            );
+            if (e && e.stack) {
+              logger.error(e.stack);
+            }
+          }
+        }
+      } else if (isEVMChain(chainInfo.chainId)) {
+        for (const privateKey of chainInfo.walletPrivateKey || []) {
           try {
             balancePromises.push(pullEVMNativeBalance(chainInfo, privateKey));
           } catch (e) {
             logger.error("pullEVMNativeBalance() failed: " + e);
           }
-          logger.info("Attempting to pull EVM non-native balance...");
-          pullAllEVMTokens(env.supportedTokens, chainInfo, metrics);
-        } else if (chainInfo.chainId === CHAIN_ID_TERRA) {
-          logger.info("Attempting to pull TERRA native balance...");
-          balancePromises.push(pullTerraNativeBalance(chainInfo, privateKey));
-          logger.info("Attempting to pull TERRA non-native balance...");
-          balancePromises.push(
-            pullAllTerraTokens(env.supportedTokens, chainInfo)
-          );
-        } else {
-          logger.error(
-            "Invalid chain ID in wallet monitor " + chainInfo.chainId
-          );
-        }
-      } catch (e: any) {
-        logger.error(
-          "pulling balances failed failed for chain: " + chainInfo.chainName
-        );
-        if (e && e.stack) {
-          logger.error(e.stack);
         }
+        // TODO one day this will spin up independent watchers that time themselves
+        // purposefully not awaited
+        pullAllEVMTokens(env.supportedTokens, chainInfo, metrics);
+      } else if (chainInfo.chainId === CHAIN_ID_TERRA) {
+        // TODO one day this will spin up independent watchers that time themselves
+        // purposefully not awaited
+        pullAllTerraBalances(env.supportedTokens, chainInfo, metrics);
+      } else {
+        logger.error("Invalid chain ID in wallet monitor " + chainInfo.chainId);
       }
-    }
-
-    for (const solanaPrivateKey of chainInfo.solanaPrivateKey || []) {
-      try {
-        if (chainInfo.chainId === CHAIN_ID_SOLANA) {
-          logger.info("pullBalances() - calling pullSolanaNativeBalance...");
-          balancePromises.push(
-            pullSolanaNativeBalance(chainInfo, solanaPrivateKey)
-          );
-          logger.info("pullBalances() - calling pullSolanaTokenBalances...");
-          balancePromises.push(
-            pullSolanaTokenBalances(chainInfo, solanaPrivateKey)
-          );
-        }
-      } catch (e: any) {
-        logger.error(
-          "pulling balances failed failed for chain: " + chainInfo.chainName
-        );
-        if (e && e.stack) {
-          logger.error(e.stack);
-        }
+    } catch (e: any) {
+      logger.error(
+        "pulling balances failed failed for chain: " + chainInfo.chainName
+      );
+      if (e && e.stack) {
+        logger.error(e.stack);
       }
     }
   }
@@ -137,83 +122,40 @@ async function pullBalances(metrics: PromHelper): Promise<WalletBalance[]> {
   return balances;
 }
 
-export async function pullEVMBalance(
-  chainInfo: ChainConfigInfo,
-  publicAddress: string,
-  tokenAddress: string
-): Promise<WalletBalance> {
-  let provider = newProvider(chainInfo.nodeUrl);
-
-  const token = await getEthereumToken(tokenAddress, provider);
-  const decimals = await token.decimals();
-  const balance = await token.balanceOf(publicAddress);
-  const symbol = await token.symbol();
-  const balanceFormatted = formatUnits(balance, decimals);
-
-  return {
-    chainId: chainInfo.chainId,
-    balanceAbs: balance.toString(),
-    balanceFormatted: balanceFormatted,
-    currencyName: symbol,
-    currencyAddressNative: tokenAddress,
-    isNative: false,
-    walletAddress: publicAddress,
-  };
-}
-
-async function pullTerraBalance(
-  chainInfo: ChainConfigInfo,
-  walletPrivateKey: string,
+export async function pullTerraBalance(
+  lcd: LCDClient,
+  walletAddress: string,
   tokenAddress: string
 ): Promise<WalletBalance | undefined> {
-  if (
-    !(
-      chainInfo.terraChainId &&
-      chainInfo.terraCoin &&
-      chainInfo.terraGasPriceUrl &&
-      chainInfo.terraName
-    )
-  ) {
-    logger.error("Terra relay was called without proper instantiation.");
-    throw new Error("Terra relay was called without proper instantiation.");
-  }
-  const lcdConfig = {
-    URL: chainInfo.nodeUrl,
-    chainID: chainInfo.terraChainId,
-    name: chainInfo.terraName,
-  };
-  const lcd = new LCDClient(lcdConfig);
-  const mk = new MnemonicKey({
-    mnemonic: walletPrivateKey,
-  });
-  const wallet = lcd.wallet(mk);
-  const walletAddress = wallet.key.accAddress;
+  try {
+    const tokenInfo: any = await lcd.wasm.contractQuery(tokenAddress, {
+      token_info: {},
+    });
+    const balanceInfo: any = await lcd.wasm.contractQuery(tokenAddress, {
+      balance: {
+        address: walletAddress,
+      },
+    });
 
-  const tokenInfo: any = await lcd.wasm.contractQuery(tokenAddress, {
-    token_info: {},
-  });
-  const balanceInfo: any = lcd.wasm.contractQuery(tokenAddress, {
-    balance: {
-      address: walletAddress,
-    },
-  });
+    if (!tokenInfo || !balanceInfo) {
+      return undefined;
+    }
 
-  if (!tokenInfo || !balanceInfo) {
-    return undefined;
+    return {
+      chainId: CHAIN_ID_TERRA,
+      balanceAbs: balanceInfo?.balance?.toString() || "0",
+      balanceFormatted: formatUnits(
+        balanceInfo?.balance?.toString() || "0",
+        tokenInfo.decimals
+      ),
+      currencyName: tokenInfo.symbol,
+      currencyAddressNative: tokenAddress,
+      isNative: false,
+      walletAddress: walletAddress,
+    };
+  } catch (e) {
+    logger.error("Failed to fetch terra balance for %s", tokenAddress);
   }
-
-  return {
-    chainId: CHAIN_ID_TERRA,
-    balanceAbs: balanceInfo?.balance?.toString() || "0",
-    balanceFormatted: formatUnits(
-      balanceInfo?.balance?.toString() || "0",
-      tokenInfo.decimals
-    ),
-    currencyName: tokenInfo.symbol,
-    currencyAddressNative: tokenAddress,
-    isNative: false,
-    walletAddress: walletAddress,
-  };
 }
 
 async function pullSolanaTokenBalances(
@@ -294,56 +236,38 @@ async function pullEVMNativeBalance(
 }
 
 async function pullTerraNativeBalance(
+  lcd: LCDClient,
   chainInfo: ChainConfigInfo,
-  privateKey: string
+  walletAddress: string
 ): Promise<WalletBalance[]> {
-  const output: WalletBalance[] = [];
-  if (
-    !(
-      chainInfo.terraChainId &&
-      chainInfo.terraCoin &&
-      chainInfo.terraGasPriceUrl &&
-      chainInfo.terraName
-    )
-  ) {
+  try {
+    const output: WalletBalance[] = [];
+    const [coins] = await lcd.bank.balance(walletAddress);
+    // coins doesn't support reduce
+    const balancePairs = coins.map(({ amount, denom }) => [denom, amount]);
+    const balance = balancePairs.reduce((obj, current) => {
+      obj[current[0].toString()] = current[1].toString();
+      return obj;
+    }, {} as TerraNativeBalances);
+    Object.keys(balance).forEach((key) => {
+      output.push({
+        chainId: chainInfo.chainId,
+        balanceAbs: balance[key],
+        balanceFormatted: formatUnits(balance[key], 6).toString(),
+        currencyName: formatNativeDenom(key),
+        currencyAddressNative: key,
+        isNative: true,
+        walletAddress: walletAddress,
+      });
+    });
+    return output;
+  } catch (e) {
     logger.error(
-      "Terra wallet balance was called without proper instantiation."
-    );
-    throw new Error(
-      "Terra wallet balance was called without proper instantiation."
+      "Failed to fetch terra native balances for wallet %s",
+      walletAddress
     );
+    return [];
   }
-  const lcdConfig = {
-    URL: chainInfo.nodeUrl,
-    chainID: chainInfo.terraChainId,
-    name: chainInfo.terraName,
-  };
-  const lcd = new LCDClient(lcdConfig);
-  const mk = new MnemonicKey({
-    mnemonic: privateKey,
-  });
-  const wallet = lcd.wallet(mk);
-  const walletAddress = wallet.key.accAddress;
-
-  const [coins] = await lcd.bank.balance(walletAddress);
-  // coins doesn't support reduce
-  const balancePairs = coins.map(({ amount, denom }) => [denom, amount]);
-  const balance = balancePairs.reduce((obj, current) => {
-    obj[current[0].toString()] = current[1].toString();
-    return obj;
-  }, {} as TerraNativeBalances);
-  Object.keys(balance).forEach((key) => {
-    output.push({
-      chainId: chainInfo.chainId,
-      balanceAbs: balance[key],
-      balanceFormatted: formatUnits(balance[key], 6).toString(),
-      currencyName: formatNativeDenom(key),
-      currencyAddressNative: key,
-      isNative: true,
-      walletAddress: walletAddress,
-    });
-  });
-  return output;
 }
 
 async function pullSolanaNativeBalance(
@@ -450,32 +374,11 @@ async function calcLocalAddressesEVM(
   );
 }
 
-async function calcLocalAddressesTerra(
+export async function calcLocalAddressesTerra(
+  lcd: LCDClient,
   supportedTokens: SupportedToken[],
   chainConfigInfo: ChainConfigInfo
 ) {
-  if (
-    !(
-      chainConfigInfo.terraChainId &&
-      chainConfigInfo.terraCoin &&
-      chainConfigInfo.terraGasPriceUrl &&
-      chainConfigInfo.terraName
-    )
-  ) {
-    logger.error(
-      "Terra wallet balance was called without proper instantiation."
-    );
-    throw new Error(
-      "Terra wallet balance was called without proper instantiation."
-    );
-  }
-  const lcdConfig = {
-    URL: chainConfigInfo.nodeUrl,
-    chainID: chainConfigInfo.terraChainId,
-    name: chainConfigInfo.terraName,
-  };
-  const lcd = new LCDClient(lcdConfig);
-
   const output: string[] = [];
   for (const supportedToken of supportedTokens) {
     if (supportedToken.chainId === chainConfigInfo.chainId) {
@@ -519,79 +422,115 @@ async function pullAllEVMTokens(
   chainConfig: ChainConfigInfo,
   metrics: PromHelper
 ) {
-  let provider = newProvider(
-    chainConfig.nodeUrl,
-    true
-  ) as ethers.providers.JsonRpcBatchProvider;
-  const localAddresses = await calcLocalAddressesEVM(
-    provider,
-    supportedTokens,
-    chainConfig
-  );
-  if (!chainConfig.walletPrivateKey) {
-    return;
-  }
-  for (const privateKey of chainConfig.walletPrivateKey) {
-    try {
-      const publicAddress = await new ethers.Wallet(privateKey).getAddress();
-      const tokens = await Promise.all(
-        localAddresses.map((tokenAddress) =>
-          getEthereumToken(tokenAddress, provider)
-        )
-      );
-      const tokenInfos = await Promise.all(
-        tokens.map((token) =>
-          Promise.all([
-            token.decimals(),
-            token.balanceOf(publicAddress),
-            token.symbol(),
-          ])
-        )
-      );
-      const balances = tokenInfos.map(([decimals, balance, symbol], idx) => ({
-        chainId: chainConfig.chainId,
-        balanceAbs: balance.toString(),
-        balanceFormatted: formatUnits(balance, decimals),
-        currencyName: symbol,
-        currencyAddressNative: localAddresses[idx],
-        isNative: false,
-        walletAddress: publicAddress,
-      }));
-      metrics.handleWalletBalances(balances);
-    } catch (e) {
-      logger.error(
-        "pollEVMBalance failed: for tokens " +
-          JSON.stringify(localAddresses) +
-          " on chain " +
-          chainConfig.chainId +
-          ", error: " +
-          e
-      );
+  try {
+    let provider = newProvider(
+      chainConfig.nodeUrl,
+      true
+    ) as ethers.providers.JsonRpcBatchProvider;
+    const localAddresses = await calcLocalAddressesEVM(
+      provider,
+      supportedTokens,
+      chainConfig
+    );
+    if (!chainConfig.walletPrivateKey) {
+      return;
+    }
+    for (const privateKey of chainConfig.walletPrivateKey) {
+      try {
+        const publicAddress = await new ethers.Wallet(privateKey).getAddress();
+        const tokens = await Promise.all(
+          localAddresses.map((tokenAddress) =>
+            getEthereumToken(tokenAddress, provider)
+          )
+        );
+        const tokenInfos = await Promise.all(
+          tokens.map((token) =>
+            Promise.all([
+              token.decimals(),
+              token.balanceOf(publicAddress),
+              token.symbol(),
+            ])
+          )
+        );
+        const balances = tokenInfos.map(([decimals, balance, symbol], idx) => ({
+          chainId: chainConfig.chainId,
+          balanceAbs: balance.toString(),
+          balanceFormatted: formatUnits(balance, decimals),
+          currencyName: symbol,
+          currencyAddressNative: localAddresses[idx],
+          isNative: false,
+          walletAddress: publicAddress,
+        }));
+        metrics.handleWalletBalances(balances);
+      } catch (e) {
+        logger.error(
+          "pullAllEVMTokens failed: for tokens " +
+            JSON.stringify(localAddresses) +
+            " on chain " +
+            chainConfig.chainId +
+            ", error: " +
+            e
+        );
+      }
     }
+  } catch (e) {
+    logger.error(
+      "pullAllEVMTokens failed: for chain " +
+        chainConfig.chainId +
+        ", error: " +
+        e
+    );
   }
 }
 
-async function pullAllTerraTokens(
+async function pullAllTerraBalances(
   supportedTokens: SupportedToken[],
-  chainConfig: ChainConfigInfo
+  chainConfig: ChainConfigInfo,
+  metrics: PromHelper
 ) {
+  let balances: WalletBalance[] = [];
+  if (!chainConfig.walletPrivateKey) {
+    return balances;
+  }
+  if (
+    !(
+      chainConfig.terraChainId &&
+      chainConfig.terraCoin &&
+      chainConfig.terraGasPriceUrl &&
+      chainConfig.terraName
+    )
+  ) {
+    logger.error("Terra relay was called without proper instantiation.");
+    throw new Error("Terra relay was called without proper instantiation.");
+  }
+  const lcdConfig = {
+    URL: chainConfig.nodeUrl,
+    chainID: chainConfig.terraChainId,
+    name: chainConfig.terraName,
+  };
+  const lcd = new LCDClient(lcdConfig);
   const localAddresses = await calcLocalAddressesTerra(
+    lcd,
     supportedTokens,
     chainConfig
   );
-  const output: WalletBalance[] = [];
-  if (!chainConfig.walletPrivateKey) {
-    return output;
-  }
   for (const privateKey of chainConfig.walletPrivateKey) {
+    const mk = new MnemonicKey({
+      mnemonic: privateKey,
+    });
+    const wallet = lcd.wallet(mk);
+    const walletAddress = wallet.key.accAddress;
+    balances = [
+      ...balances,
+      ...(await pullTerraNativeBalance(lcd, chainConfig, walletAddress)),
+    ];
     for (const address of localAddresses) {
-      const balance = await pullTerraBalance(chainConfig, privateKey, address);
+      const balance = await pullTerraBalance(lcd, walletAddress, address);
       if (balance) {
-        output.push(balance);
+        balances.push(balance);
       }
     }
   }
-  // logger.debug("pullAllTerraTokens() - returning %o", output);
 
-  return output;
+  metrics.handleWalletBalances(balances);
 }

+ 28 - 30
relayer/spy_relayer/src/relayer/evm.ts

@@ -1,4 +1,5 @@
 import {
+  Bridge__factory,
   CHAIN_ID_POLYGON,
   getIsTransferCompletedEth,
   hexToUint8Array,
@@ -9,6 +10,7 @@ import { Signer } from "@ethersproject/abstract-signer";
 import { ethers } from "ethers";
 import { ChainConfigInfo } from "../configureEnv";
 import { getScopedLogger, ScopedLogger } from "../helpers/logHelper";
+import { PromHelper } from "../helpers/promHelpers";
 
 export function newProvider(
   url: string,
@@ -31,7 +33,8 @@ export async function relayEVM(
   unwrapNative: boolean,
   checkOnly: boolean,
   walletPrivateKey: string,
-  relayLogger: ScopedLogger
+  relayLogger: ScopedLogger,
+  metrics: PromHelper
 ) {
   const logger = getScopedLogger(
     ["evm", chainConfigInfo.chainName],
@@ -41,15 +44,6 @@ export async function relayEVM(
   let provider = newProvider(chainConfigInfo.nodeUrl);
   const signer: Signer = new ethers.Wallet(walletPrivateKey, provider);
 
-  if (unwrapNative) {
-    logger.info(
-      "Will redeem and unwrap using pubkey: %s",
-      await signer.getAddress()
-    );
-  } else {
-    logger.info("Will redeem using pubkey: %s", await signer.getAddress());
-  }
-
   logger.debug("Checking to see if vaa has already been redeemed.");
   const alreadyRedeemed = await getIsTransferCompletedEth(
     chainConfigInfo.tokenBridgeAddress,
@@ -65,9 +59,18 @@ export async function relayEVM(
     return { redeemed: false, result: "not redeemed" };
   }
 
+  if (unwrapNative) {
+    logger.info(
+      "Will redeem and unwrap using pubkey: %s",
+      await signer.getAddress()
+    );
+  } else {
+    logger.info("Will redeem using pubkey: %s", await signer.getAddress());
+  }
+
   logger.debug("Redeeming.");
   // look, there's something janky with Polygon + ethers + EIP-1559
-  let overrides;
+  let overrides = {};
   if (chainConfigInfo.chainId === CHAIN_ID_POLYGON) {
     let feeData = await provider.getFeeData();
     overrides = {
@@ -75,32 +78,27 @@ export async function relayEVM(
       maxPriorityFeePerGas: feeData.maxPriorityFeePerGas?.mul(50) || undefined,
     };
   }
-  const receipt = unwrapNative
-    ? await redeemOnEthNative(
-        chainConfigInfo.tokenBridgeAddress,
-        signer,
-        signedVaaArray,
-        overrides
-      )
-    : await redeemOnEth(
-        chainConfigInfo.tokenBridgeAddress,
-        signer,
-        signedVaaArray,
-        overrides
-      );
-
-  logger.debug("Checking to see if the transaction is complete.");
-
-  const success = await getIsTransferCompletedEth(
+  const bridge = Bridge__factory.connect(
     chainConfigInfo.tokenBridgeAddress,
-    provider,
-    signedVaaArray
+    signer
   );
+  const contractMethod = unwrapNative
+    ? bridge.completeTransferAndUnwrapETH
+    : bridge.completeTransfer;
+  const tx = await contractMethod(signedVaaArray, overrides);
+  logger.info("waiting for tx hash: %s", tx.hash);
+  const receipt = await tx.wait();
+
+  // Checking getIsTransferCompletedEth can be problematic if we get
+  // load balanced to a node that is behind the block of our accepted tx
+  // The auditor worker should confirm that our tx was successful
+  const success = true;
 
   if (provider instanceof ethers.providers.WebSocketProvider) {
     await provider.destroy();
   }
 
   logger.info("success: %s tx hash: %s", success, receipt.transactionHash);
+  metrics.incSuccesses(chainConfigInfo.chainId);
   return { redeemed: success, result: receipt };
 }

+ 15 - 6
relayer/spy_relayer/src/relayer/relay.ts

@@ -4,6 +4,7 @@ import {
   ChainId,
   CHAIN_ID_SOLANA,
   CHAIN_ID_TERRA,
+  hexToNativeString,
   hexToUint8Array,
   isEVMChain,
   parseTransferPayload,
@@ -15,6 +16,7 @@ import { relayTerra } from "./terra";
 import { getRelayerEnvironment } from "../configureEnv";
 import { RelayResult, Status } from "../helpers/redisHelper";
 import { getLogger, getScopedLogger, ScopedLogger } from "../helpers/logHelper";
+import { PromHelper } from "../helpers/promHelpers";
 
 const logger = getLogger();
 
@@ -27,7 +29,8 @@ export async function relay(
   signedVAA: string,
   checkOnly: boolean,
   walletPrivateKey: any,
-  relayLogger: ScopedLogger
+  relayLogger: ScopedLogger,
+  metrics: PromHelper
 ): Promise<RelayResult> {
   const logger = getScopedLogger(["relay"], relayLogger);
   const { parse_vaa } = await importCoreWasm();
@@ -51,8 +54,11 @@ export async function relay(
 
     if (isEVMChain(transferPayload.targetChain)) {
       const unwrapNative =
-        transferPayload.originAddress.toLowerCase() ===
-        chainConfigInfo.wrappedAsset?.toLowerCase();
+        transferPayload.originChain === transferPayload.targetChain &&
+        hexToNativeString(
+          transferPayload.originAddress,
+          transferPayload.originChain
+        )?.toLowerCase() === chainConfigInfo.wrappedAsset?.toLowerCase();
       logger.debug(
         "isEVMChain: originAddress: [" +
           transferPayload.originAddress +
@@ -67,7 +73,8 @@ export async function relay(
         unwrapNative,
         checkOnly,
         walletPrivateKey,
-        logger
+        logger,
+        metrics
       );
       return {
         status: evmResult.redeemed ? Status.Completed : Status.Error,
@@ -82,7 +89,8 @@ export async function relay(
         signedVAA,
         checkOnly,
         walletPrivateKey,
-        logger
+        logger,
+        metrics
       );
       if (retVal.redeemed) {
         rResult.status = Status.Completed;
@@ -98,7 +106,8 @@ export async function relay(
         signedVAA,
         checkOnly,
         walletPrivateKey,
-        logger
+        logger,
+        metrics
       );
       if (retVal.redeemed) {
         rResult.status = Status.Completed;

+ 26 - 15
relayer/spy_relayer/src/relayer/relay_worker.ts

@@ -10,6 +10,7 @@ import {
   monitorRedis,
   RedisTables,
   RelayResult,
+  resetPayload,
   Status,
   StorePayload,
   storePayloadFromJson,
@@ -18,7 +19,6 @@ import {
 } from "../helpers/redisHelper";
 import { sleep } from "../helpers/utils";
 import { relay } from "./relay";
-import { collectWallets } from "./walletMonitor";
 
 const WORKER_THREAD_RESTART_MS = 10 * 1000;
 const AUDITOR_THREAD_RESTART_MS = 10 * 1000;
@@ -51,10 +51,15 @@ export function init(runWorker: boolean): boolean {
   return true;
 }
 
-function createWorkerInfos() {
+function createWorkerInfos(metrics: PromHelper) {
   let workerArray: WorkerInfo[] = new Array();
   let index = 0;
   relayerEnv.supportedChains.forEach((chain) => {
+    // initialize per chain metrics
+    metrics.incSuccesses(chain.chainId, 0);
+    metrics.incConfirmed(chain.chainId, 0);
+    metrics.incFailures(chain.chainId, 0);
+    metrics.incRollback(chain.chainId, 0);
     chain.walletPrivateKey?.forEach((key) => {
       workerArray.push({
         walletPrivateKey: key,
@@ -188,15 +193,22 @@ async function doAuditorThread(workerInfo: WorkerInfo) {
               storePayload.vaa_bytes,
               true,
               workerInfo.walletPrivateKey,
-              auditLogger
+              auditLogger,
+              metrics
             );
 
             await redisClient.del(si_key);
-            if (rr.status !== Status.Completed) {
+            if (rr.status === Status.Completed) {
+              metrics.incConfirmed(workerInfo.targetChainId);
+            } else {
               auditLogger.info("Detected a rollback on " + si_key);
+              metrics.incRollback(workerInfo.targetChainId);
               // Remove this item from the WORKING table and move it to INCOMING
               await redisClient.select(RedisTables.INCOMING);
-              await redisClient.set(si_key, si_value);
+              await redisClient.set(
+                si_key,
+                storePayloadToJson(resetPayload(storePayloadFromJson(si_value)))
+              );
               await redisClient.select(RedisTables.WORKING);
             }
           } else if (storePayload.status === Status.Error) {
@@ -233,14 +245,9 @@ export async function run(ph: PromHelper) {
     logger.info("NOT clearing REDIS.");
   }
 
-  let workerArray: WorkerInfo[] = createWorkerInfos();
+  let workerArray: WorkerInfo[] = createWorkerInfos(metrics);
 
   spawnWorkerThreads(workerArray);
-  try {
-    collectWallets(metrics);
-  } catch (e) {
-    logger.error("Failed to kick off collectWallets: " + e);
-  }
   try {
     monitorRedis(metrics);
   } catch (e) {
@@ -285,7 +292,13 @@ async function processRequest(
       } else {
         logger.info("Calling with vaa_bytes %s", payload.vaa_bytes);
       }
-      relayResult = await relay(payload.vaa_bytes, false, myPrivateKey, logger);
+      relayResult = await relay(
+        payload.vaa_bytes,
+        false,
+        myPrivateKey,
+        logger,
+        metrics
+      );
       logger.info("Relay returned: %o", Status[relayResult.status]);
     } catch (e: any) {
       if (e.message) {
@@ -314,9 +327,7 @@ async function processRequest(
       targetChain = transferPayload.targetChain;
     } catch (e) {}
     let retry: boolean = false;
-    if (relayResult.status === Status.Completed) {
-      metrics.incSuccesses(targetChain);
-    } else {
+    if (relayResult.status !== Status.Completed) {
       metrics.incFailures(targetChain);
       if (payload.retries >= MAX_RETRIES) {
         relayResult.status = Status.FatalError;

+ 4 - 1
relayer/spy_relayer/src/relayer/solana.ts

@@ -17,6 +17,7 @@ import {
 import { Connection, Keypair, PublicKey, Transaction } from "@solana/web3.js";
 import { ChainConfigInfo } from "../configureEnv";
 import { getScopedLogger, ScopedLogger } from "../helpers/logHelper";
+import { PromHelper } from "../helpers/promHelpers";
 
 const MAX_VAA_UPLOAD_RETRIES_SOLANA = 5;
 
@@ -25,7 +26,8 @@ export async function relaySolana(
   signedVAAString: string,
   checkOnly: boolean,
   walletPrivateKey: Uint8Array,
-  relayLogger: ScopedLogger
+  relayLogger: ScopedLogger,
+  metrics: PromHelper
 ) {
   const logger = getScopedLogger(["solana"], relayLogger);
   //TODO native transfer & create associated token account
@@ -164,5 +166,6 @@ export async function relaySolana(
   );
 
   logger.info("success: %s, tx hash: %s", success, txid);
+  metrics.incSuccesses(chainConfigInfo.chainId);
   return { redeemed: success, result: txid };
 }

+ 5 - 2
relayer/spy_relayer/src/relayer/terra.ts

@@ -7,13 +7,15 @@ import { LCDClient, MnemonicKey } from "@terra-money/terra.js";
 import axios from "axios";
 import { ChainConfigInfo } from "../configureEnv";
 import { getScopedLogger, ScopedLogger } from "../helpers/logHelper";
+import { PromHelper } from "../helpers/promHelpers";
 
 export async function relayTerra(
   chainConfigInfo: ChainConfigInfo,
   signedVAA: string,
   checkOnly: boolean,
   walletPrivateKey: any,
-  relayLogger: ScopedLogger
+  relayLogger: ScopedLogger,
+  metrics: PromHelper
 ) {
   const logger = getScopedLogger(["terra"], relayLogger);
   if (
@@ -40,7 +42,7 @@ export async function relayTerra(
   const wallet = lcd.wallet(mk);
 
   logger.info(
-    "terraChainId: %s, tokenBridgeAddress: %s, accAddress: %s, signedVAA: $s",
+    "terraChainId: %s, tokenBridgeAddress: %s, accAddress: %s, signedVAA: %s",
     chainConfigInfo.terraChainId,
     chainConfigInfo.tokenBridgeAddress,
     wallet.key.accAddress,
@@ -113,5 +115,6 @@ export async function relayTerra(
   );
 
   logger.info("success: %s, tx hash: %s", success, receipt.txhash);
+  metrics.incSuccesses(chainConfigInfo.chainId);
   return { redeemed: success, result: receipt.txhash };
 }

+ 0 - 40
relayer/spy_relayer/src/relayer/walletMonitor.test.ts

@@ -1,40 +0,0 @@
-require("../helpers/loadConfig");
-process.env.LOG_DIR = ".";
-
-import { CHAIN_ID_BSC } from "@certusone/wormhole-sdk";
-import { jest, test } from "@jest/globals";
-import { ChainConfigInfo } from "../configureEnv";
-import { pullEVMBalance } from "./walletMonitor";
-
-jest.setTimeout(300000);
-
-const bscChainConfig: ChainConfigInfo = {
-  chainId: CHAIN_ID_BSC,
-  chainName: "BSC",
-  nativeCurrencySymbol: "BNB",
-  nodeUrl: "https://bsc-dataseed.binance.org",
-  tokenBridgeAddress: "0xB6F6D86a8f9879A9c87f643768d9efc38c1Da6E7",
-  wrappedAsset: "0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c",
-};
-const bscPublicKey = "0xB6F6D86a8f9879A9c87f643768d9efc38c1Da6E7"; // Token Bridge
-const bscTokens = [
-  "0xfA54fF1a158B5189Ebba6ae130CEd6bbd3aEA76e", // SOL
-  "0x4DB5a66E937A9F4473fA95b1cAF1d1E1D62E29EA", // WETH
-  "0x156ab3346823B651294766e23e6Cf87254d68962", // LUNA
-  "0x3d4350cD54aeF9f9b2C29435e0fa809957B3F30a", // UST
-  "0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c", // WBNB
-  "0xc836d8dC361E44DbE64c4862D55BA041F88Ddd39", // WMATIC
-  "0x96412902aa9aFf61E13f085e70D3152C6ef2a817", // WAVAX
-  "0x6c6D604D3f07aBE287C1A3dF0281e999A83495C0", // wROSE
-  "0xbF8413EE8612E0E4f66Aa63B5ebE27f3C5883d47", // WFTM
-  "0xB04906e95AB5D797aDA81508115611fee694c2b3", // USDC
-  "0x524bC91Dc82d6b90EF29F76A3ECAaBAffFD490Bc", // USDT
-];
-
-test("should pull EVM token balances", async () => {
-  for (let address of bscTokens) {
-    const balance = await pullEVMBalance(bscChainConfig, bscPublicKey, address);
-    console.log(balance);
-    expect(balance).toBeTruthy();
-  }
-});

部分文件因为文件数量过多而无法显示