瀏覽代碼

node: Integrate Transfer Verifier into Sui Watcher (#4324)

* Clean up unused transfer verifier files

* Add Sui transfer verifier integration tests

* node: integrate Sui transfer verifier with Sui watcher

* fund token bridge in EVM tests with 100eth

* improve ci test robustness

* properly handle cases where no valid events were identified

* apply initial PR review changes

* fix tests and add telemetry

* add review suggestions

* sui txverifier improvements

* add verifyAndPublish watcher tests

* fix linting errors

* changes to address pr comments

* fix linting errors

* changes to address pr comments

* Fix lints from rebase

* fix yoda linting error

* fix event unmarshalling and apply multicall fix

* fix up tests for refactored code and udpate watcher

* fix linting errors

* Fix up comments

* remove constants from sdk

* add sui_getObject api call and state object constants

* use state objects to retrieve the token bridge package ID and emitter address

* fix transfer verifier devnet arguments

* fix transfer verifier tilt tests

* revert to using static token bridge package ids

* fix linting errors

---------

Co-authored-by: Dirk Brink <hello@dirk.tech>
Jason Matthyser 3 周之前
父節點
當前提交
1c2193432e

+ 18 - 1
Tiltfile

@@ -650,6 +650,14 @@ if ci_tests:
     )
     k8s_yaml_with_ns("devnet/tx-verifier-evm.yaml")
 
+    if sui:
+        docker_build(
+            ref = "tx-verifier-sui",
+            context = "./devnet/tx-verifier/",
+            dockerfile = "./devnet/tx-verifier/Dockerfile.tx-verifier-sui"
+        )
+        k8s_yaml_with_ns("devnet/tx-verifier-sui.yaml")
+
     k8s_yaml_with_ns(
         encode_yaml_stream(
             set_env_in_jobs(
@@ -690,10 +698,11 @@ if ci_tests:
         trigger_mode = trigger_mode,
         resource_deps = [], # testing/querysdk.sh handles waiting for query-server, not having deps gets the build earlier
     )
+
     # launches Transfer Verifier binary and sets up monitoring script
     k8s_resource(
         "tx-verifier-evm",
-        labels = ["tx-verifier-evm"],
+        labels = ["tx-verifier"],
         trigger_mode = trigger_mode,
         resource_deps = ["eth-devnet"],
     )
@@ -704,6 +713,14 @@ if ci_tests:
         resource_deps = [], # uses devnet-consts.json, buttesting/contract-integrations/custom_consistency_level/test_custom_consistency_level.sh handles waiting for guardian, not having deps gets the build earlier
     )
 
+    if sui:
+        k8s_resource(
+            "tx-verifier-sui",
+            labels = ["tx-verifier"],
+            trigger_mode = trigger_mode,
+            resource_deps = ["sui"]
+        )
+
 if terra_classic:
     docker_build(
         ref = "terra-image",

+ 2 - 3
devnet/node.yaml

@@ -148,9 +148,8 @@ spec:
             - --ccqEnabled=true
             - --ccqAllowedRequesters=beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe,25021A4FCAf61F2EADC8202D3833Df48B2Fa0D54
             - --ccqAllowedPeers=12D3KooWSnju8zhywCYVi2JwTqky1sySPnmtYLsHHzc4WerMnDQH,12D3KooWM6WqedfR6ehtTd1y6rJu3ZUrEkTjcJJnJZYesjd89zj8
-            - --transferVerifierEnabledChainIDs=2
-            - --notaryEnabled=true
-            - --logLevel=warn
+            - --transferVerifierEnabledChainIDs=2,21
+            # - --logLevel=debug
           securityContext:
             capabilities:
               add:

+ 29 - 0
devnet/tx-verifier-sui.yaml

@@ -0,0 +1,29 @@
+apiVersion: batch/v1
+kind: Job
+metadata:
+  name: tx-verifier-sui
+spec:
+  backoffLimit: 0
+  template:
+    spec:
+      restartPolicy: Never
+      # required, as the guardian cannot run as root
+      securityContext:
+        runAsUser: 1000
+        runAsGroup: 1000
+        fsGroup: 1000
+      containers:
+        - name: tx-verifier-sui
+          image: tx-verifier-sui
+          command:
+            - /bin/bash
+            - -c
+            - "bash /tx-verifier-sui-runner.sh"
+          readinessProbe:
+            exec:
+              command:
+                - test
+                - -e
+                - "/tmp/success"
+            initialDelaySeconds: 5
+            periodSeconds: 5

+ 0 - 8
devnet/tx-verifier/Dockerfile

@@ -1,8 +0,0 @@
-# There's nothing special about this version, it is simply the `latest` as of
-# the creation date of this file.
-FROM alpine:3.20.3@sha256:1e42bbe2508154c9126d48c2b8a75420c3544343bf86fd041fb7527e017a4b4a
-
-RUN apk add --no-cache inotify-tools
-
-COPY monitor.sh /monitor.sh
-RUN chmod +x /monitor.sh

+ 0 - 13
devnet/tx-verifier/Dockerfile.cast

@@ -1,13 +0,0 @@
-# These versions are pinned to match the Dockerfile in the `ethereum/`
-# directory. Otherwise, there is nothing special about them and they can be
-# updated alongside the other Dockerfile.
-FROM --platform=linux/amd64 ghcr.io/foundry-rs/foundry:v1.0.0@sha256:d12a373ec950de170d5461014ef9320ba0fb6e0db6f87835999d0fcf3820370e as foundry
-# node is required to install Foundry
-FROM node:22.16-bullseye-slim@sha256:550b434f7edc3a1875860657a3e306752358029c957280809ae6395ab296faeb
-
-COPY --from=foundry /usr/local/bin/cast /bin/cast
-
-COPY transfer-verifier-test.sh /transfer-verifier-test.sh
-RUN chmod +x /transfer-verifier-test.sh
-
-CMD ["/transfer-verifier-test.sh"]

+ 46 - 0
devnet/tx-verifier/Dockerfile.tx-verifier-sui

@@ -0,0 +1,46 @@
+# Apply guardiand-image
+FROM guardiand-image as base
+
+FROM cli-gen AS cli-export
+FROM const-gen AS const-export
+FROM sui-node AS sui-node-base
+
+COPY --from=const-export .env .env
+COPY --from=cli-export clients/js /cli
+
+# prepare guardiand
+COPY --from=base /guardiand /guardiand
+COPY --from=base /usr/lib/libwasmvm.*.so /usr/lib/
+
+WORKDIR /tmp
+
+# Link `worm`
+WORKDIR /cli
+RUN npm link
+
+WORKDIR /
+
+# install jq
+RUN apt update && apt install -y jq
+
+# prepare test scripts
+COPY sui/tx-verifier-sui-runner.sh /tx-verifier-sui-runner.sh
+RUN chmod +x /tx-verifier-sui-runner.sh
+
+# This is necessary to modify/build/deploy Sui contracts from this container
+# The container is running with UID 1000, and all files copied into the sui-node image
+# are owned by root, stored under /tmp.
+RUN chown -R 1000:1000 /tmp
+
+# Necessary to store any external dependency artifacts for Sui
+RUN mkdir /.move
+RUN chown 1000:1000 /.move
+
+# Prepare sui configuration to allow using the sui cli utility
+RUN mkdir -p /.sui
+COPY sui/sui_config /.sui/sui_config
+RUN chown -R 1000:1000 /.sui
+
+
+# Add the unsafe implementations of `prepare_message` and `transfer_tokens`
+COPY sui/transfer_tokens_unsafe.move /tmp/transfer_tokens_unsafe.move

+ 21 - 32
devnet/tx-verifier/README.md

@@ -4,7 +4,7 @@
 
 ### Overview
 
-The Transfer Verifier tests involve interacting with the local Ethereum devnet defined by the Tilt set-up in this repository.
+The Transfer Verifier tests involve interacting with the local Ethereum devnet defined by the Tilt setup in this repository.
 
 The basic idea is as follows:
 * Interact with the local Ethereum testnet. This should already have important pieces such as the Token Bridge and Core Bridge deployed.
@@ -14,42 +14,31 @@ The basic idea is as follows:
 * A "monitor" script is used to detect the expected error message, waiting until the file is written to
 * If the monitor script sees the expected error message in the error log, it terminates
 
-## Components
+### Scripts/Components
 
-### Scripts
+| Script/Component | Description |
+|------------------|-------------|
+| `Dockerfile.tx-verifier-evm` | The dockerfile representing the image used for EVM transfer verifier tests. |
+| `tx-verifier-evm-runner.sh` | Runs the guardiand binary which contains the transfer verifier tool, which monitors the local Ethereum network for events. |
+| `tx-verifier-evm-tests.sh` | Contains the `cast` commands that simulate malicious interactions with the Token Bridge and Core Bridge. It is able to broadcast transactions to the `anvil` instance that powers the Ethereum testnet while being able to impersonate arbitrary senders. <br>This allows performing actions that otherwise should be impossible, like causing a Publish Message event to be emitted from the Core Bridge without a corresponding deposit or transfer into the Token Bridge. <br>The current integration test sends exactly two messages, each one corresponding to a different Token Bridge endpoint (Transfer and Transfer With Payload). |
 
-#### tx-verifier-evm-tests.sh
+## Sui Integration Tests
 
-Contains the `cast` commands that simulate malicious interactions with the Token Bridge and Core Bridge. It is able to broadcast
-transactions to the `anvil` instance that powers the Ethereum testnet while being able to impersonate arbitrary senders.
-
-This lets us perform actions that otherwise should be impossible, like causing a Publish Message event to be emitted from the Core Bridge
-without a corresponding deposit or transfer into the Token Bridge.
-
-The current integration test sends exactly two messages, each one corresponding to a different Token Bridge endpoint
-(Transfer and Transfer With Payload).
-
-#### tx-verifier-evm-runner.sh
-
-Runs the guardiand binary which contains the transfer verifier tool. This tool monitors the local Ethereum network for events.
-
-A bash script that monitors the error log file for a specific error pattern. It runs in an infinite loop so it will
-not exit until the error pattern is detected.
-
-The error pattern is defined in the YAML file and matches an error string in the Transfer Verifier package.
-
-The integration test is considered successful as soon as two instances of the error pattern are detected, one for
-each message type sent by the `transfer-verifier-evm-test.sh`.
+### Overview
 
-## Further Work
+The transfer verifier integration tests for Sui also involve interacting with the Sui node deployed as part of the larger Tilt setup. However, owing to the complexities of Sui, injecting the invariants transfer verifier tests for is a lot more intrusive:
 
-The tests cover the case where the Transfer Verifier should report when a Message Publication receipt from the 
-Token Bridge with a transfer type does not contain any deposits or transfers.
+* The Sui transfer verifier container is set up such that it can build and deploy a fresh core and token bridge to the Sui node.
+* The token bridge's `transfer_tokens` module is modified by the Sui runner script to include two unsafe functions that bypass important security checks.
+* The CI tests rely on the `worm` cli for deployment, and the `sui` cli for interacting with the node.
 
-However, the Transfer Verifier can do more than this. It also reports cases where the incoming funds to the Token
-Bridge within one receipt are less than the amount encoded in the payload that it sends to the Core Bridge. This
-is something like the transfer not being solvent at the resolution of one Ethereum Receipt.
+Similar to EVM, the errors are logged to a file and subsequently read to determine whether or not the malicious actions were detected.
 
-Adding this test would be a good improvement but requires a more complicated test pattern, perhaps combining
-multiple transactions into a single call to `cast`.
+### Scripts/Components
 
+| Script/Component | Description |
+|------------------|-------------|
+| `Dockerfile.tx-verifier-sui` | The dockerfile representing the image used for Sui transfer verifier tests |
+| `sui/tx-verifier-sui-runner.sh` | The runner script that prepares the core and token bridge, launches the transfer verifier for Sui, and initiates and monitors for malicious actions. |
+| `sui/sui_config` | The Sui client configuration, copied from the Sui devnet environment in the monorepo. This was done to avoid expanding the scope/context of the dockerfile to additional directories in the monorepo. |
+| `sui/transfer_tokens_unsafe.move` | Unsafe variations of the `prepare_transfer` and `transfer_tokens` functions in the Sui `token_bridge` package's `transfer_tokens` module. This file is used within the container to patch the token bridge prior to deployment, and allows injecting the invariants transfer verifier monitors.|

+ 0 - 32
devnet/tx-verifier/monitor.sh

@@ -1,32 +0,0 @@
-#!/bin/sh
-log_file="${ERROR_LOG_PATH:-/logs/error.log}"
-error_pattern="${ERROR_PATTERN:-ERROR}"
-poll_interval=5
-TARGET=2
-
-# Wait for log file to exist and be non-empty
-while [ ! -s "${log_file}" ]; do
-    echo "Waiting for ${log_file} to be created and contain data..."
-    sleep 5
-done
-
-echo "Monitoring file '${log_file}' for ${TARGET} total instance(s) of error pattern: '${error_pattern}'"
-
-# Poll until we find the target number of instances
-while true; do
-    current_count=$(grep -c "$error_pattern" "$log_file")
-    
-    echo "Found ${current_count} of ${TARGET} instances so far."
-    
-    if [ $current_count -eq $TARGET ]; then 
-        echo "SUCCESS: Found ${TARGET} instances of error pattern. Exiting."
-        exit 0
-    fi
-    
-    if [ $current_count -gt $TARGET ]; then 
-        echo "Wanted ${TARGET} instances of error pattern but got ${current_count}. This is probably a bug."
-        exit 1
-    fi
-    
-    sleep $poll_interval
-done

+ 8 - 0
devnet/tx-verifier/sui/sui_config/client.yaml

@@ -0,0 +1,8 @@
+keystore:
+  File: /.sui/sui_config/sui.keystore
+envs:
+  - alias: sui-devnet
+    rpc: "http://sui:9000"
+    ws: ~
+active_env: sui-devnet
+active_address: "0xed867315e3f7c83ae82e6d5858b6a6cc57c291fd84f7509646ebc8162169cf96"

+ 3 - 0
devnet/tx-verifier/sui/sui_config/sui.keystore

@@ -0,0 +1,3 @@
+[
+  "AGA20wtGcwbcNAG4nwapbQ5wIuXwkYQEWFUoSVAxctHb"
+]

+ 89 - 0
devnet/tx-verifier/sui/transfer_tokens_unsafe.move

@@ -0,0 +1,89 @@
+    use sui::tx_context::TxContext;
+    // Highly unsafe token transfer method, that accepts an `amount_to_bridge`
+    // that is actually passed to the message publication.
+    public fun prepare_transfer_unsafe<CoinType>(
+        asset_info: VerifiedAsset<CoinType>,
+        funded: Coin<CoinType>,
+        amount_to_bridge: u64,
+        recipient_chain: u16,
+        recipient: vector<u8>,
+        relayer_fee: u64,
+        nonce: u32
+    ): (
+        TransferTicket<CoinType>,
+        Coin<CoinType>
+    ) {
+        let (
+            bridged_in,
+            _
+        ) = take_truncated_amount(&asset_info, &mut funded);
+
+        let decimals = token_registry::coin_decimals(&asset_info);
+        let norm_amount = normalized_amount::from_raw(amount_to_bridge, decimals);
+
+        let ticket =
+            TransferTicket {
+                asset_info,
+                bridged_in,
+                norm_amount,
+                relayer_fee,
+                recipient_chain,
+                recipient,
+                nonce
+            };
+
+        // The remaining amount of funded may have dust depending on the
+        // decimals of this asset.
+        (ticket, funded)
+    }
+
+    public fun transfer_tokens_unsafe<CoinType>(
+        token_bridge_state: &mut State,
+        ticket: TransferTicket<CoinType>,
+        ctx: &mut TxContext
+    ): (
+        MessageTicket, 
+        Coin<CoinType>
+    ) {
+        // This capability ensures that the current build version is used.
+        let latest_only = state::assert_latest_only(token_bridge_state);
+
+        let TransferTicket {
+            asset_info,
+            bridged_in,
+            norm_amount,
+            recipient_chain,
+            recipient,
+            relayer_fee,
+            nonce
+        } = ticket;
+
+        // Ensure that the recipient is a 32-byte address.
+        let recipient = external_address::new(bytes32::from_bytes(recipient));
+
+        let token_chain = token_registry::token_chain(&asset_info);
+        let token_address = token_registry::token_address(&asset_info);
+
+        let encoded_transfer =
+            transfer::serialize(
+                transfer::new(
+                    norm_amount,
+                    token_address,
+                    token_chain,
+                    recipient,
+                    recipient_chain,
+                    normalized_amount::from_raw(
+                        relayer_fee,
+                        token_registry::coin_decimals(&asset_info)
+                    )
+                )
+            );
+
+        // Prepare Wormhole message with encoded `Transfer`.
+        (state::prepare_wormhole_message(
+            &latest_only,
+            token_bridge_state,
+            nonce,
+            encoded_transfer
+        ), coin::from_balance(bridged_in, ctx))
+    }

+ 229 - 0
devnet/tx-verifier/sui/tx-verifier-sui-runner.sh

@@ -0,0 +1,229 @@
+#!/bin/bash
+
+# Add the unsafe functions to the transfer_tokens module. Using sed, the `transfer_tokens_unsafe.move` content
+# can be added right after the module definition. Module imports seem to be hoisted, so there's no need to add
+# the unsafe code after module imports.
+sed -i '/module token_bridge::transfer_tokens/r /tmp/transfer_tokens_unsafe.move' /tmp/token_bridge/sources/transfer_tokens.move
+
+# Configurations, such as the RPC endpoint and output files for various pieces of information collected for the
+# tests to be performed.
+RPC=http://sui:9000
+SETUP_DEVNET_OUTPUT_PATH=/tmp/setup-devnet-output.txt
+EXAMPLE_COIN_TX_OUTPUT_PATH=/tmp/setup-example-coin-tx-output.txt
+
+# Deploy Sui packages to devnet node. Even though the Sui node already has a wormhole deployment, in order to
+# test Sui transfer verification it is necessary to add code that will allow forcefully triggering the invariant
+# being monitored.
+echo "[*] setting up a devnet deployment"
+cd /tmp && worm sui setup-devnet --rpc=$RPC > $SETUP_DEVNET_OUTPUT_PATH && cd ..
+
+# Get the package IDs that are produced by the `worm sui setup-devnet` command, and convert the IDs to an array.
+package_object_ids=`grep -A5 "Summary:" /tmp/setup-devnet-output.txt | grep -oE "0x[a-f0-9]{64}"`
+
+package_ids=()
+for i in $package_object_ids; do
+    package_ids+=($i)
+done
+
+# These are the package IDs and state object IDs that are relevant for interacting with the token bridge.
+core_bridge_package_id=${package_ids[0]}
+core_bridge_state=${package_ids[1]}
+token_bridge_package_id=${package_ids[2]}
+token_bridge_state=${package_ids[3]}
+token_bridge_emitter_cap=${package_ids[4]}
+
+echo " - core_bridge_package_id    = $core_bridge_package_id"
+echo " - core_bridge_state         = $core_bridge_state"
+echo " - token_bridge_package_id   = $token_bridge_package_id"
+echo " - token_bridge_state        = $token_bridge_state"
+echo " - token_bridge_emitter_cap  = $token_bridge_emitter_cap"
+
+# This is a helper function to parse out different pieces of information from the transaction block during which
+# the example coins were deployed.
+get_coin_information() {
+    local filename=$1
+    local coin_name=$2
+    local coin_info_type=$3
+    local treasury_cap_object_id=`
+        jq -r --arg COIN $coin_name --arg INFO_TYPE $coin_info_type '.objectChanges[] |
+            select(
+                .objectType != null and 
+                (.objectType | contains($INFO_TYPE)) and 
+                (.objectType | contains($COIN))
+            ) | 
+            .objectId' $filename`
+
+    echo $treasury_cap_object_id
+}
+
+# Get the transaction digest of the example coin deployment.
+coin_deployment_digest=`cat $SETUP_DEVNET_OUTPUT_PATH | grep "Deploying example coins" -A1 | grep "digest" | cut -d' ' -f3`
+# Retrieve the transaction block.
+sui client tx-block $coin_deployment_digest --json > $EXAMPLE_COIN_TX_OUTPUT_PATH
+# Read the package ID of the coin package that holds the coins.
+coin_package_id=`jq -r '.objectChanges[] | select(.type != null and .type == "published") | .packageId' $EXAMPLE_COIN_TX_OUTPUT_PATH`
+# Get the TreasuryCap and CoinMetadata object IDs
+treasury_cap_10=`get_coin_information $EXAMPLE_COIN_TX_OUTPUT_PATH COIN_10 TreasuryCap`
+coin_metadata_10=`get_coin_information $EXAMPLE_COIN_TX_OUTPUT_PATH COIN_10 CoinMetadata`
+
+# A helper function to attest a native token on the token bridge. This effectively registers a coin on Sui as a
+# native asset on the token bridge.
+attest_token() {
+    local coin_type=$1
+    local coin_metadata=$2
+    local sequence=$3
+    sui client ptb \
+        --move-call $token_bridge_package_id::attest_token::attest_token "<$coin_type>" @$token_bridge_state @$coin_metadata $sequence \
+        --assign message_ticket \
+        --split-coins gas [0] \
+        --assign empty_coin \
+        --move-call $core_bridge_package_id::publish_message::publish_message @$core_bridge_state empty_coin message_ticket @0x06 \
+        --gas-budget 10000000
+}
+
+echo "[*] adding the COIN_10 coin to the token bridge."
+res=`attest_token $coin_package_id::coin_10::COIN_10 $coin_metadata_10 1u32`
+
+# mint_and_transfer_token is a helper function that mints an `amount` of tokens to the caller, and then transfers
+# those tokens out via the token bridge. 
+# This is used for legitimate token bridge transfers.
+mint_and_transfer_token() {
+    local treasury_cap=$1
+    local coin_type=$2
+    local amount=$3
+    sui client ptb \
+        --move-call sui::tx_context::sender \
+        --assign sender \
+        --move-call sui::coin::mint "<$coin_type>" @$treasury_cap $amount \
+        --assign minted_coin \
+        --move-call $token_bridge_package_id::state::verified_asset "<$coin_type>" @$token_bridge_state \
+        --assign verified_asset \
+        --make-move-vec "<u8>" [] \
+        --assign recipient \
+        --split-coins gas [0] \
+        --assign empty_gas_coin \
+        --move-call $token_bridge_package_id::transfer_tokens::prepare_transfer "<$coin_type>" verified_asset minted_coin 1u16 recipient 0u64 1u32 \
+        --assign prepare_transfer_result \
+        --move-call $token_bridge_package_id::transfer_tokens::transfer_tokens "<$coin_type>" @$token_bridge_state prepare_transfer_result.0 \
+        --assign message_ticket \
+        --move-call $core_bridge_package_id::publish_message::publish_message @$core_bridge_state empty_gas_coin message_ticket @0x06 \
+        --transfer-objects sender [prepare_transfer_result.1] \
+        --gas-budget 10000000
+}
+
+echo "[*] mint_and_transfer 100 COIN_10 tokens"
+res=`mint_and_transfer_token $treasury_cap_10 $coin_package_id::coin_10::COIN_10 100_0000000000`
+
+# mint_and_transfer_unsafe_imbalanced is a helper function that uses the unsafe `prepare_transfer_unsafe` function
+# to send X amount of tokens to the token bridge, but request Y amount of tokens out of it.
+# This is used to trigger the invariant where less tokens are deposited into the bridge than requested out.
+mint_and_transfer_unsafe_imbalanced() {
+    local treasury_cap=$1
+    local coin_type=$2
+    local amount=$3
+    local amount_to_bridge=$4
+    sui client ptb \
+        --move-call sui::tx_context::sender \
+        --assign sender \
+        --move-call sui::coin::mint "<$coin_type>" @$treasury_cap $amount \
+        --assign minted_coin \
+        --move-call $token_bridge_package_id::state::verified_asset "<$coin_type>" @$token_bridge_state \
+        --assign verified_asset \
+        --make-move-vec "<u8>" [] \
+        --assign recipient \
+        --split-coins gas [0] \
+        --assign empty_gas_coin \
+        --move-call $token_bridge_package_id::transfer_tokens::prepare_transfer_unsafe "<$coin_type>" verified_asset minted_coin $amount_to_bridge 1u16 recipient 0u64 1u32 \
+        --assign prepare_transfer_result \
+        --move-call $token_bridge_package_id::transfer_tokens::transfer_tokens "<$coin_type>" @$token_bridge_state prepare_transfer_result.0 \
+        --assign message_ticket \
+        --move-call $core_bridge_package_id::publish_message::publish_message @$core_bridge_state empty_gas_coin message_ticket @0x06 \
+        --transfer-objects sender [prepare_transfer_result.1] \
+        --gas-budget 10000000
+}
+
+# transfer_without_deposit_unsafe is a helper function that uses the unsafe `transfer_tokens_unsafe` function to
+# transfer funds out of the bridge without actually making the deposit.
+# This is used to trigger the invariant where no tokens are deposited into the bridge, but an amount is requested out.
+transfer_without_deposit_unsafe() {
+    local treasury_cap=$1
+    local coin_type=$2
+    local amount=0
+    local amount_to_bridge=$3
+    sui client ptb \
+        --move-call sui::tx_context::sender \
+        --assign sender \
+        --move-call sui::coin::mint "<$coin_type>" @$treasury_cap $amount \
+        --assign minted_coin \
+        --move-call $token_bridge_package_id::state::verified_asset "<$coin_type>" @$token_bridge_state \
+        --assign verified_asset \
+        --make-move-vec "<u8>" [] \
+        --assign recipient \
+        --split-coins gas [0] \
+        --assign empty_gas_coin \
+        --move-call $token_bridge_package_id::transfer_tokens::prepare_transfer_unsafe "<$coin_type>" verified_asset minted_coin $amount_to_bridge 1u16 recipient 0u64 1u32 \
+        --assign prepare_transfer_result \
+        --move-call $token_bridge_package_id::transfer_tokens::transfer_tokens_unsafe "<$coin_type>" @$token_bridge_state prepare_transfer_result.0 \
+        --assign message_ticket \
+        --move-call $core_bridge_package_id::publish_message::publish_message @$core_bridge_state empty_gas_coin message_ticket.0 @0x06 \
+        --transfer-objects sender [prepare_transfer_result.1] \
+        --transfer-objects sender [message_ticket.1] \
+        --gas-budget 10000000
+}
+
+echo "[*] running testcases pre-start" # these tests are aimed at the suiProcessInitialEvents flag
+# testcase 1 - do a normal token bridge transfer
+res=`mint_and_transfer_token $treasury_cap_10 $coin_package_id::coin_10::COIN_10 100_0000000000`
+sleep 1
+
+# testcase 2 - do a token bridge transfer where the deposited amount does not cover the full bridge amount
+res=`mint_and_transfer_unsafe_imbalanced $treasury_cap_10 $coin_package_id::coin_10::COIN_10 100_0000000000 200_0000000000`
+sleep 1
+
+# testcase 3 - do a token bridge transfer where the token is not deposited at all
+resp=`transfer_without_deposit_unsafe $treasury_cap_10 $coin_package_id::coin_10::COIN_10 200_0000000000`
+sleep 1
+
+echo "[*] starting the sui transfer verifier"
+/guardiand transfer-verifier \
+    sui \
+    --suiRPC "${RPC}" \
+    --suiEnvironment=devnet \
+    --suiCoreBridgePackageId $core_bridge_package_id \
+    --suiTokenBridgeEmitter $token_bridge_emitter_cap \
+    --suiTokenBridgePackageId $token_bridge_package_id \
+    --logLevel=debug \
+    --suiProcessOnChainEvents \
+    2> /tmp/error.log &
+
+echo "[*] running testcases post-start"
+
+# testcase 1 - do a normal token bridge transfer
+res=`mint_and_transfer_token $treasury_cap_10 $coin_package_id::coin_10::COIN_10 100_0000000000`
+sleep 1
+
+# testcase 2 - do a token bridge transfer where the deposited amount does not cover the full bridge amount
+res=`mint_and_transfer_unsafe_imbalanced $treasury_cap_10 $coin_package_id::coin_10::COIN_10 100_0000000000 200_0000000000`
+sleep 1
+
+# testcase 3 - do a token bridge transfer where the token is not deposited at all
+res=`transfer_without_deposit_unsafe $treasury_cap_10 $coin_package_id::coin_10::COIN_10 200_0000000000`
+sleep 10
+
+# There should be two of each test - one for pre-start and one for post-start.
+echo "[*] verifying that tests succeeded"
+
+cat /tmp/error.log
+
+if [ $(cat /tmp/error.log | grep "bridge transfer requested for more tokens than were deposited" | wc -l) -ne 2 ]; then
+    echo " [-] amount out > amount in test failed"
+    exit 1
+fi
+
+if [ $(cat /tmp/error.log | grep "bridge transfer requested for tokens that were never deposited" | wc -l) -ne 2 ]; then
+    echo " [-] amount in == 0 test failed"
+    exit 1
+fi
+
+echo "[+] tests passed"
+touch /tmp/success

+ 5 - 4
node/cmd/guardiand/node.go

@@ -1685,10 +1685,11 @@ func runNode(cmd *cobra.Command, args []string) {
 
 	if shouldStart(suiRPC) {
 		wc := &sui.WatcherConfig{
-			NetworkID:        "sui",
-			ChainID:          vaa.ChainIDSui,
-			Rpc:              *suiRPC,
-			SuiMoveEventType: *suiMoveEventType,
+			NetworkID:         "sui",
+			ChainID:           vaa.ChainIDSui,
+			Rpc:               *suiRPC,
+			SuiMoveEventType:  *suiMoveEventType,
+			TxVerifierEnabled: slices.Contains(txVerifierChains, vaa.ChainIDSui),
 		}
 		watcherConfigs = append(watcherConfigs, wc)
 	}

+ 156 - 41
node/cmd/txverifier/sui.go

@@ -2,14 +2,20 @@ package txverifier
 
 import (
 	"context"
+	"encoding/hex"
+	"encoding/json"
 	"fmt"
+	"net/http"
 	"os"
 	"strconv"
 	"time"
 
+	"github.com/certusone/wormhole/node/pkg/common"
 	"github.com/certusone/wormhole/node/pkg/telemetry"
 	txverifier "github.com/certusone/wormhole/node/pkg/txverifier"
 	"github.com/certusone/wormhole/node/pkg/version"
+	"github.com/wormhole-foundation/wormhole/sdk"
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 
 	ipfslog "github.com/ipfs/go-log/v2"
 	"github.com/spf13/cobra"
@@ -17,16 +23,22 @@ import (
 )
 
 const (
-	INITIAL_EVENT_FETCH_LIMIT = 25
+	InitialEventFetchLimit = 25
+	EventQueryInterval     = 2 * time.Second
 )
 
 // CLI args
 var (
-	suiRPC                  *string
-	suiCoreContract         *string
+	suiRPC                       *string
+	suiProcessOnChainEvents      *bool
+	suiProcessWormholeScanEvents *bool
+	suiEnvironment               *string
+	suiDigest                    *string
+
+	// Sui package IDs and emitter addresses
+	suiCoreBridgePackageId  *string
 	suiTokenBridgeEmitter   *string
-	suiTokenBridgeContract  *string
-	suiProcessInitialEvents *bool
+	suiTokenBridgePackageId *string
 )
 
 var TransferVerifierCmdSui = &cobra.Command{
@@ -36,27 +48,52 @@ var TransferVerifierCmdSui = &cobra.Command{
 }
 
 // CLI parameters
-//
-//nolint:errcheck // The MarkFlagRequired calls will cause the script to fail on their own. No need to handle the errors manually.
 func init() {
 	suiRPC = TransferVerifierCmdSui.Flags().String("suiRPC", "", "Sui RPC url")
-	suiCoreContract = TransferVerifierCmdSui.Flags().String("suiCoreContract", "", "Sui core contract address")
-	suiTokenBridgeEmitter = TransferVerifierCmdSui.Flags().String("suiTokenBridgeEmitter", "", "Token bridge emitter on Sui")
-	suiTokenBridgeContract = TransferVerifierCmdSui.Flags().String("suiTokenBridgeContract", "", "Token bridge contract on Sui")
-	suiProcessInitialEvents = TransferVerifierCmdSui.Flags().Bool("suiProcessInitialEvents", false, "Indicate whether the Sui transfer verifier should process the initial events it fetches")
-
-	TransferVerifierCmd.MarkFlagRequired("suiRPC")
-	TransferVerifierCmd.MarkFlagRequired("suiCoreContract")
-	TransferVerifierCmd.MarkFlagRequired("suiTokenBridgeEmitter")
-	TransferVerifierCmd.MarkFlagRequired("suiTokenBridgeContract")
+	suiProcessOnChainEvents = TransferVerifierCmdSui.Flags().Bool("suiProcessOnChainEvents", false, "Indicate whether the Sui transfer verifier should process on-chain events")
+	suiProcessWormholeScanEvents = TransferVerifierCmdSui.Flags().Bool("suiProcessWormholeScanEvents", false, "Indicate whether the Sui transfer verifier should process WormholeScan events")
+	suiDigest = TransferVerifierCmdSui.Flags().String("suiDigest", "", "If provided, perform transaction verification on this single digest")
+	suiEnvironment = TransferVerifierCmdSui.Flags().String("suiEnvironment", "mainnet", "The Sui environment to connect to. Supported values: mainnet, testnet and devnet")
+
+	suiCoreBridgePackageId = TransferVerifierCmdSui.Flags().String("suiCoreBridgePackageId", "", "The Sui Core Bridge package ID. If not provided, the default for the selected environment will be used.")
+	suiTokenBridgeEmitter = TransferVerifierCmdSui.Flags().String("suiTokenBridgeEmitter", "", "The Sui Token Bridge emitter address. If not provided, the default for the selected environment will be used.")
+	suiTokenBridgePackageId = TransferVerifierCmdSui.Flags().String("suiTokenBridgePackageId", "", "The Sui Token Bridge package ID. If not provided, the default for the selected environment will be used.")
+}
+
+func setIfEmpty(param *string, value string) {
+	if *param == "" {
+		*param = value
+	}
+}
+
+// Analyse the commandline arguments and prepare the net effect of package and object IDs
+func resolveSuiConfiguration() {
+
+	// Set the package IDs and emitter address based on the environment, if they are not provided
+	// as CLI args.
+	switch *suiEnvironment {
+	case "mainnet":
+		setIfEmpty(suiCoreBridgePackageId, "0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a")
+		setIfEmpty(suiTokenBridgePackageId, txverifier.SuiOriginalTokenBridgePackageIds[common.MainNet])
+		setIfEmpty(suiTokenBridgeEmitter, "0x"+hex.EncodeToString(sdk.KnownTokenbridgeEmitters[vaa.ChainIDSui]))
+	case "testnet":
+		setIfEmpty(suiCoreBridgePackageId, "0xf47329f4344f3bf0f8e436e2f7b485466cff300f12a166563995d3888c296a94")
+		setIfEmpty(suiTokenBridgePackageId, txverifier.SuiOriginalTokenBridgePackageIds[common.TestNet])
+		setIfEmpty(suiTokenBridgeEmitter, "0x"+hex.EncodeToString(sdk.KnownTestnetTokenbridgeEmitters[vaa.ChainIDSui]))
+	case "devnet":
+		setIfEmpty(suiCoreBridgePackageId, "0x320a40bff834b5ffa12d7f5cc2220dd733dd9e8e91c425800203d06fb2b1fee8")
+		setIfEmpty(suiTokenBridgePackageId, txverifier.SuiOriginalTokenBridgePackageIds[common.UnsafeDevNet])
+		setIfEmpty(suiTokenBridgeEmitter, "0x"+hex.EncodeToString(sdk.KnownDevnetTokenbridgeEmitters[vaa.ChainIDSui]))
+	}
 }
 
 func runTransferVerifierSui(cmd *cobra.Command, args []string) {
+	resolveSuiConfiguration()
+
 	ctx := context.Background()
 
 	// Setup logging
-	// lvl, err := ipfslog.LevelFromString(*logLevel)
-	lvl, err := ipfslog.LevelFromString("info")
+	lvl, err := ipfslog.LevelFromString(*logLevel)
 	if err != nil {
 		fmt.Println("Invalid log level")
 		os.Exit(1)
@@ -69,8 +106,6 @@ func runTransferVerifierSui(cmd *cobra.Command, args []string) {
 	// Setup logging to Loki if configured
 	if *telemetryLokiUrl != "" && *telemetryNodeName != "" {
 		labels := map[string]string{
-			// Is this required?
-			// "network":   *p2pNetworkID,
 			"node_name": *telemetryNodeName,
 			"version":   version.Version(),
 		}
@@ -92,32 +127,62 @@ func runTransferVerifierSui(cmd *cobra.Command, args []string) {
 		logger = tm.WrapLogger(logger) // Wrap logger with telemetry logger
 	}
 
-	logger.Info("Starting Sui transfer verifier")
-	logger.Debug("Sui rpc connection", zap.String("url", *suiRPC))
-	logger.Debug("Sui core contract", zap.String("address", *suiCoreContract))
-	logger.Debug("Sui token bridge contract", zap.String("address", *suiTokenBridgeContract))
-	logger.Debug("token bridge event emitter", zap.String("object id", *suiTokenBridgeEmitter))
-	logger.Debug("process initial events", zap.Bool("processInitialEvents", *suiProcessInitialEvents))
-
 	// Verify CLI parameters
-	if *suiRPC == "" || *suiCoreContract == "" || *suiTokenBridgeEmitter == "" || *suiTokenBridgeContract == "" {
+	if *suiRPC == "" || *suiCoreBridgePackageId == "" || *suiTokenBridgeEmitter == "" || *suiTokenBridgePackageId == "" {
 		logger.Fatal("One or more CLI parameters are empty",
 			zap.String("suiRPC", *suiRPC),
-			zap.String("suiCoreContract", *suiCoreContract),
+			zap.String("suiCoreBridgePackageId", *suiCoreBridgePackageId),
 			zap.String("suiTokenBridgeEmitter", *suiTokenBridgeEmitter),
-			zap.String("suiTokenBridgeContract", *suiTokenBridgeContract))
+			zap.String("suiTokenBridgePackageId", *suiTokenBridgePackageId))
 	}
 
+	logger.Info("Starting Sui transfer verifier")
+	logger.Debug("Sui rpc connection", zap.String("url", *suiRPC))
+	logger.Debug("Sui core bridge package ID", zap.String("packageId", *suiCoreBridgePackageId))
+	logger.Debug("Sui token bridge package ID", zap.String("packageId", *suiTokenBridgePackageId))
+	logger.Debug("Sui token bridge emitter", zap.String("address", *suiTokenBridgeEmitter))
+	logger.Debug("process on-chain events", zap.Bool("processOnChainEvents", *suiProcessOnChainEvents))
+	logger.Debug("process WormholeScan events", zap.Bool("processWormholeScanEvents", *suiProcessWormholeScanEvents))
+
+	suiApiConnection := txverifier.NewSuiApiConnection(*suiRPC)
+
 	// Create a new SuiTransferVerifier
-	suiTransferVerifier := txverifier.NewSuiTransferVerifier(*suiCoreContract, *suiTokenBridgeEmitter, *suiTokenBridgeContract)
+	suiTransferVerifier := txverifier.NewSuiTransferVerifier(*suiCoreBridgePackageId, *suiTokenBridgeEmitter, *suiTokenBridgePackageId, suiApiConnection)
+
+	// Process a single digest and exit
+	if *suiDigest != "" {
+		logger.Info("Processing single digest", zap.String("txDigeset", *suiDigest))
+		valid, err := suiTransferVerifier.ProcessDigest(ctx, *suiDigest, "", logger)
+
+		if err != nil {
+			logger.Error("Error validating the digest", zap.Error(err))
+		}
+
+		logger.Info("Validation completed", zap.Bool("valid", valid))
+
+		return
+	}
+
+	if *suiProcessWormholeScanEvents {
+		digests, err := pullDigestsFromWormholeScan(ctx, logger)
+		if err != nil {
+			logger.Fatal("Error pulling digests from WormholeScan", zap.Error(err))
+		}
+		// TODO: check the result of each digest against an expected outcome. Some digests
+		// link to token attestations, which the transfer verifier doesn't handle.
+		for _, digest := range digests {
+			_, err := suiTransferVerifier.ProcessDigest(ctx, digest, "", logger)
+			if err != nil {
+				logger.Error(err.Error())
+			}
+		}
+	}
 
 	// Get the event filter
 	eventFilter := suiTransferVerifier.GetEventFilter()
 
-	suiApiConnection := txverifier.NewSuiApiConnection(*suiRPC)
-
 	// Initial event fetching
-	resp, err := suiApiConnection.QueryEvents(eventFilter, "null", INITIAL_EVENT_FETCH_LIMIT, true)
+	resp, err := suiApiConnection.QueryEvents(ctx, eventFilter, "null", InitialEventFetchLimit, true)
 	if err != nil {
 		logger.Fatal("Error in querying initial events", zap.Error(err))
 	}
@@ -142,20 +207,21 @@ func runTransferVerifierSui(cmd *cobra.Command, args []string) {
 
 	// If specified, process the initial events. This is useful for running a number of digests
 	// through the verifier before starting live processing.
-	if *suiProcessInitialEvents {
-		logger.Info("Processing initial events")
+	if *suiProcessOnChainEvents {
+		logger.Info("Processing on-chain events")
 		for _, event := range initialEvents {
 			if event.ID.TxDigest != nil {
-				_, err = suiTransferVerifier.ProcessDigest(*event.ID.TxDigest, suiApiConnection, logger)
+				_, err = suiTransferVerifier.ProcessDigest(ctx, *event.ID.TxDigest, "", logger)
 				if err != nil {
 					logger.Error(err.Error())
 				}
 			}
 		}
+		logger.Info("Finished processing initial events")
 	}
 
 	// Ticker for live processing
-	ticker := time.NewTicker(5 * time.Second)
+	ticker := time.NewTicker(EventQueryInterval)
 	defer ticker.Stop()
 
 	for {
@@ -164,7 +230,7 @@ func runTransferVerifierSui(cmd *cobra.Command, args []string) {
 			logger.Info("Context cancelled")
 		case <-ticker.C:
 			// Fetch new events
-			resp, err := suiApiConnection.QueryEvents(eventFilter, "null", 25, true)
+			resp, err := suiApiConnection.QueryEvents(ctx, eventFilter, "null", InitialEventFetchLimit, true)
 			if err != nil {
 				logger.Error("Error in querying new events", zap.Error(err))
 				continue
@@ -195,14 +261,63 @@ func runTransferVerifierSui(cmd *cobra.Command, args []string) {
 			}
 
 			for _, txDigest := range txDigests {
-				_, err := suiTransferVerifier.ProcessDigest(txDigest, suiApiConnection, logger)
+				_, err := suiTransferVerifier.ProcessDigest(ctx, txDigest, "", logger)
 				if err != nil {
 					logger.Error(err.Error())
 				}
+				logger.Info("Processed new event", zap.String("txDigest", txDigest))
 			}
 
-			logger.Info("New events processed", zap.Int("latestTimestamp", latestTimestamp), zap.Int("txDigestCount", len(txDigests)))
+			if len(txDigests) > 0 {
+				logger.Info("New events processed", zap.Int("latestTimestamp", latestTimestamp), zap.Int("txDigestCount", len(txDigests)))
+			}
 
 		}
 	}
 }
+
+type WormholeScanResponse struct {
+	Operation []struct {
+		SourceChain struct {
+			Transaction struct {
+				TxHash string `json:"txHash"`
+			} `json:"transaction"`
+		} `json:"sourceChain"`
+	} `json:"operations"`
+}
+
+// Pulls a bunch of transaction digests from Wormholescan to run through the transfer verifier.
+// https://api.wormholescan.io/api/v1/operations?sourceChain=21&appId=PORTAL_TOKEN_BRIDGE
+func pullDigestsFromWormholeScan(ctx context.Context, logger *zap.Logger) ([]string, error) {
+
+	req, err := http.NewRequestWithContext(ctx, "GET", "https://api.wormholescan.io/api/v1/operations?sourceChain=21&appId=PORTAL_TOKEN_BRIDGE", nil)
+
+	if err != nil {
+		return nil, err
+	}
+
+	req.Header.Set("Accept", "application/json")
+	resp, err := http.DefaultClient.Do(req)
+
+	if err != nil {
+		return nil, err
+	}
+
+	defer resp.Body.Close()
+
+	body, _ := common.SafeRead(resp.Body)
+
+	var wsResp WormholeScanResponse
+	err = json.Unmarshal(body, &wsResp)
+	if err != nil {
+		return nil, err
+	}
+
+	digests := make([]string, 0, len(wsResp.Operation))
+	for _, operation := range wsResp.Operation {
+		digests = append(digests, operation.SourceChain.Transaction.TxHash)
+	}
+
+	logger.Info("Pulled digests from WormholeScan", zap.Int("count", len(digests)))
+	return digests, nil
+}

+ 0 - 9
node/pkg/txverifier/evmtypes.go

@@ -721,15 +721,6 @@ func (s *ReceiptSummary) isMsgSafe(msgID msgID) bool {
 	return s.msgPubResult[msgID]
 }
 
-// Custom error type used to signal that a core invariant of the token bridge has been violated.
-type InvariantError struct {
-	Msg string
-}
-
-func (i InvariantError) Error() string {
-	return fmt.Sprintf("invariant violated: %s", i.Msg)
-}
-
 // transferOut is a struct that contains the token ID and amount of a token that was requested to be transferred out of the bridge.
 type transferOut struct {
 	// The token ID of the token that was transferred out.

+ 226 - 100
node/pkg/txverifier/sui.go

@@ -1,9 +1,7 @@
 package txverifier
 
-// TODOs:
-//	* balances on Sui are stored as u64's. Consider using uint64 instead of big.Int
-
 import (
+	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -16,31 +14,56 @@ import (
 	"go.uber.org/zap"
 )
 
+// Errors
+var (
+	// Internal errors that can occur in the verifier.
+	ErrFailedToRetrieveTxBlock = errors.New("failed to retrieve transaction block")
+)
+
 // Global variables
 var (
 	suiModule    = "publish_message"
 	suiEventName = "WormholeMessage"
+
+	// The Sui transfer verifier needs the original token bridge package Id to address the token registry correctly.
+	// The token registry holds the balances for all assets, wrapped and native. If the token registry on chain is
+	// ever moved/upgraded, these values will need to be updated.
+	SuiOriginalTokenBridgePackageIds = map[common.Environment]string{
+		// Obtained from the mainnet state object at 0xc57508ee0d4595e5a8728974a4a93a787d38f339757230d441e895422c07aba9
+		common.MainNet: "0x26efee2b51c911237888e5dc6702868abca3c7ac12c53f76ef8eba0697695e3d",
+		// Obtained from the testnet state object at 0x6fb10cdb7aa299e9a4308752dadecb049ff55a892de92992a1edbd7912b3d6da
+		common.TestNet: "0x562760fc51d90d4ae1835bac3e91e0e6987d3497b06f066941d3e51f6e8d76d0",
+		// Obtained from tilt output when deploying the token bridge to devnet
+		common.UnsafeDevNet:   "0xa6a3da85bbe05da5bfd953708d56f1a3a023e7fb58e5a824a3d4de3791e8f690",
+		common.GoTest:         "0xa6a3da85bbe05da5bfd953708d56f1a3a023e7fb58e5a824a3d4de3791e8f690",
+		common.AccountantMock: "0xa6a3da85bbe05da5bfd953708d56f1a3a023e7fb58e5a824a3d4de3791e8f690",
+	}
 )
 
 type SuiTransferVerifier struct {
-	suiCoreContract        string
-	suiTokenBridgeEmitter  string
-	suiTokenBridgeContract string
-	suiEventType           string
+	// Used to create the event filter.
+	suiCoreBridgePackageId string
+	// Used to check the emitter of the `WormholeMessage` event.
+	suiTokenBridgeEmitter string
+	// Used to match the owning package of native and wrapped asset types.
+	suiTokenBridgePackageId string
+	suiEventType            string
+	suiApiConnection        SuiApiInterface
 }
 
-func NewSuiTransferVerifier(suiCoreContract, suiTokenBridgeEmitter, suiTokenBridgeContract string) *SuiTransferVerifier {
+func NewSuiTransferVerifier(suiCoreBridgePackageId, suiTokenBridgeEmitter, suiTokenBridgePackageId string, suiApiConnection SuiApiInterface) *SuiTransferVerifier {
 	return &SuiTransferVerifier{
-		suiCoreContract:        suiCoreContract,
-		suiTokenBridgeEmitter:  suiTokenBridgeEmitter,
-		suiTokenBridgeContract: suiTokenBridgeContract,
-		suiEventType:           fmt.Sprintf("%s::%s::%s", suiCoreContract, suiModule, suiEventName),
+		suiCoreBridgePackageId:  suiCoreBridgePackageId,
+		suiTokenBridgeEmitter:   suiTokenBridgeEmitter,
+		suiTokenBridgePackageId: suiTokenBridgePackageId,
+		suiEventType:            fmt.Sprintf("%s::%s::%s", suiCoreBridgePackageId, suiModule, suiEventName),
+		suiApiConnection:        suiApiConnection,
 	}
 }
 
-// func (s *SuiTransferVerifier) GetSuiEventType() string {
-// 	return s.suiEventType
-// }
+func (s *SuiTransferVerifier) GetTokenBridgeEmitter() string {
+	return s.suiTokenBridgeEmitter
+}
 
 // Filter to be used for querying events
 // The `MoveEventType` filter doesn't seem to be available in the documentation. However, there is an example
@@ -54,174 +77,277 @@ func (s *SuiTransferVerifier) GetEventFilter() string {
 			"module":"%s",
 			"type":"%s"
 		}
-	}`, s.suiCoreContract, suiModule, s.suiEventType)
+	}`, s.suiCoreBridgePackageId, suiModule, s.suiEventType)
 }
 
-// processEvents takes a list of events and processes them to determine the amount requested out of the bridge. It returns a mapping
-// that maps the token address and chain ID to the amount requested out of the bridge. It does not return an error, because any faulty
-// events can be skipped, since they would likely fail being processed by the guardian as well. Debug level logging can be used to
-// reveal any potential locations where errors are occurring.
-func (s *SuiTransferVerifier) processEvents(events []SuiEvent, logger *zap.Logger) (requestedOutOfBridge map[string]*big.Int, numEventsProcessed uint) {
-	// Initialize the map to store the amount requested out of the bridge
-	requestedOutOfBridge = make(map[string]*big.Int)
+// extractBridgeRequestsFromEvents iterates through all events, and tries to identify `WormholeMessage` events emitted by the token bridge.
+// These events are parsed and collected in a `MsgIdToRequestOutOfBridge` object, mapping message IDs to requests out of the bridge. This
+// function does not return errors, as any issues encountered during processing of individual events result in those events being skipped.
+func (s *SuiTransferVerifier) extractBridgeRequestsFromEvents(events []SuiEvent, logger *zap.Logger) MsgIdToRequestOutOfBridge {
+	requests := make(MsgIdToRequestOutOfBridge)
 
-	// Filter events that have the sui token bridge emitter as the sender in the message. The events indicate
-	// how much is going to leave the network.
 	for _, event := range events {
+		var wormholeMessage WormholeMessage
+
+		// Parse the ParsedJson field into a WormholeMessage. This is done explicitly to avoid any unnecessary
+		// error logging for events that can't be deserialized into `WormholeMessage` instances. If an event's
+		// ParsedJson cannot be unmarshaled into a WormholeMessage, it is simply skipped.
+		if event.ParsedJson != nil {
+			err := json.Unmarshal(*event.ParsedJson, &wormholeMessage)
+			if err != nil {
+				// If an error ocurrs, the ParsedJson is rejected as an event that is not emitted by the bridge
+				continue
+			}
+		}
 
 		// If any of these event parameters are nil, skip the event
-		if event.Message == nil || event.Message.Sender == nil || event.Type == nil {
+		if wormholeMessage.Sender == nil || wormholeMessage.Sequence == nil || event.Type == nil {
 			continue
 		}
 
 		// Only process the event if it is a WormholeMessage event from the token bridge emitter
-		if *event.Type == s.suiEventType && *event.Message.Sender == s.suiTokenBridgeEmitter {
+		if *event.Type == s.suiEventType && *wormholeMessage.Sender == s.suiTokenBridgeEmitter {
 
 			// Parse the wormhole message. vaa.IsTransfer can be omitted, since this is done
 			// inside `DecodeTransferPayloadHdr` already.
-			hdr, err := vaa.DecodeTransferPayloadHdr(event.Message.Payload)
+			hdr, err := vaa.DecodeTransferPayloadHdr(wormholeMessage.Payload)
 
-			// If there is an error decoding the payload, skip the event
+			// If there is an error decoding the payload, skip the event. One reason for a potential
+			// failure in decoding is that an attestation of a token was requested.
 			if err != nil {
 				logger.Debug("Error decoding payload", zap.Error(err))
 				continue
 			}
 
-			// Add the key if it does not exist yet
-			key := fmt.Sprintf(KEY_FORMAT, hdr.OriginAddress.String(), hdr.OriginChain)
-			if _, exists := requestedOutOfBridge[key]; !exists {
-				requestedOutOfBridge[key] = big.NewInt(0)
-			}
+			// The sender address is prefixed with "0x", but the message ID format does not include that prefix.
+			senderWithout0x := strings.TrimPrefix(*wormholeMessage.Sender, "0x")
 
-			// Add the amount requested out of the bridge
-			requestedOutOfBridge[key] = new(big.Int).Add(requestedOutOfBridge[key], hdr.Amount)
+			msgIDStr := fmt.Sprintf("%d/%s/%s", vaa.ChainIDSui, senderWithout0x, *wormholeMessage.Sequence)
+			assetKey := fmt.Sprintf(KEY_FORMAT, hdr.OriginAddress.String(), hdr.OriginChain)
 
-			numEventsProcessed++
+			logger.Debug("Found request out of bridge",
+				zap.String("msgID", msgIDStr),
+				zap.String("assetKey", assetKey),
+				zap.String("amount", hdr.Amount.String()),
+			)
+
+			requests[msgIDStr] = &RequestOutOfBridge{
+				AssetKey:       assetKey,
+				Amount:         hdr.Amount,
+				DepositMade:    false,
+				DepositSolvent: false,
+			}
 		} else {
-			logger.Debug("Event does not match the criteria", zap.String("event type", *event.Type), zap.String("event sender", *event.Message.Sender))
+			logger.Debug("Event does not match the criteria",
+				zap.String("event type", *event.Type),
+				zap.String("event sender", *wormholeMessage.Sender),
+				zap.String("expected event type", s.suiEventType),
+				zap.String("expected event sender", s.suiTokenBridgeEmitter),
+			)
 		}
 	}
 
-	return requestedOutOfBridge, numEventsProcessed
+	return requests
 }
 
-func (s *SuiTransferVerifier) processObjectUpdates(objectChanges []ObjectChange, suiApiConnection SuiApiInterface, logger *zap.Logger) (transferredIntoBridge map[string]*big.Int, numChangesProcessed uint) {
-	transferredIntoBridge = make(map[string]*big.Int)
+// extractTransfersIntoBridgeFromChanges iterates through all object changes, and tries to identify token transfers into the bridge.
+// These transfers are accumulated in an `AssetKeyToTransferIntoBridge` object, which is returned to the caller. The default behaviour
+// of this function is to fail-close, meaning that any errors that occur during processing result in the offending object change being ignored.
+func (s *SuiTransferVerifier) extractTransfersIntoBridgeFromObjectChanges(ctx context.Context, objectChanges []ObjectChange, logger *zap.Logger) AssetKeyToTransferIntoBridge {
+	transfers := make(AssetKeyToTransferIntoBridge)
 
 	for _, objectChange := range objectChanges {
-		// Check that the type information is correct.
-		if !objectChange.ValidateTypeInformation(s.suiTokenBridgeContract) {
+		// Check that the type information is correct. Doing it here means it's not necessary to do it
+		// again, even in the case where `GetObject` is used for a single object instead of getting past
+		// objects.
+		if !objectChange.ValidateTypeInformation(s.suiTokenBridgePackageId) {
 			continue
 		}
 
-		// Get the past objects
-		resp, err := suiApiConnection.TryMultiGetPastObjects(objectChange.ObjectId, objectChange.Version, objectChange.PreviousVersion)
+		if objectChange.PreviousVersion == "" {
+			logger.Warn("No previous version of asset available",
+				zap.String("objectId", objectChange.ObjectId),
+				zap.String("currentVersion", objectChange.Version))
+			continue
+		}
 
+		// Get the previous version of the object. This makes a call to the Sui API.
+		resp, err := s.suiApiConnection.TryMultiGetPastObjects(ctx, objectChange.ObjectId, objectChange.Version, objectChange.PreviousVersion)
 		if err != nil {
-			logger.Error("Error in getting past objects", zap.Error(err))
+			logger.Error("Error getting past objects",
+				zap.String("objectId", objectChange.ObjectId),
+				zap.String("currentVersion", objectChange.Version),
+				zap.String("previousVersion", objectChange.PreviousVersion),
+				zap.Error(err))
 			continue
 		}
 
 		decimals, err := resp.GetDecimals()
 		if err != nil {
-			logger.Error("Error in getting decimals", zap.Error(err))
+			logger.Error("Error getting decimals", zap.Error(err))
 			continue
 		}
 
 		address, err := resp.GetTokenAddress()
 		if err != nil {
-			logger.Error("Error in getting token address", zap.Error(err))
+			logger.Error("Error getting token address", zap.Error(err))
 			continue
 		}
 
 		chain, err := resp.GetTokenChain()
 		if err != nil {
-			logger.Error("Error in getting token chain", zap.Error(err))
+			logger.Error("Error getting token chain", zap.Error(err))
 			continue
 		}
 
-		// Get the balance difference
-		balanceDiff, err := resp.GetBalanceDiff()
+		// Get the change in balance
+		balanceChange, err := resp.GetBalanceChange()
 		if err != nil {
-			logger.Error("Error in getting balance difference", zap.Error(err))
+			logger.Error("Error getting balance difference", zap.Error(err))
 			continue
 		}
 
-		normalized := normalize(balanceDiff, decimals)
+		normalized := normalize(balanceChange, decimals)
 
 		// Add the key if it does not exist yet
-		key := fmt.Sprintf(KEY_FORMAT, address, chain)
+		assetKey := fmt.Sprintf(KEY_FORMAT, address, chain)
 
-		// Add the normalized amount to the transferredIntoBridge map
-		// Intentionally use 'Set' instead of 'Add' because there should only be a single objectChange per token
-		var amount big.Int
-		transferredIntoBridge[key] = amount.Set(normalized)
+		if _, exists := transfers[assetKey]; !exists {
+			// logger.Debug("First time seeing transfer into bridge for asset", zap.String("assetKey", assetKey))
+			transfers[assetKey] = &TransferIntoBridge{
+				Amount:  big.NewInt(0),
+				Solvent: false,
+			}
+		}
 
-		// Increment the number of changes processed
-		numChangesProcessed++
+		// Add the amount transferred into the bridge
+		logger.Debug("Adding transfer into bridge", zap.String("assetKey", assetKey), zap.String("amount", normalized.String()))
+		transfers[assetKey].Amount = new(big.Int).Add(transfers[assetKey].Amount, normalized)
 	}
 
-	return transferredIntoBridge, numChangesProcessed
+	return transfers
 }
 
-func (s *SuiTransferVerifier) ProcessDigest(digest string, suiApiConnection SuiApiInterface, logger *zap.Logger) (uint, error) {
+func (s *SuiTransferVerifier) ProcessDigest(ctx context.Context, digest string, msgIdStr string, logger *zap.Logger) (bool, error) {
+	verified, err := s.processDigestInternal(ctx, digest, msgIdStr, logger)
+
+	// check if the error is an invariant violation
+	var invariantError *InvariantError
+	if errors.As(err, &invariantError) {
+		logger.Error("Sui txverifier invariant violated", zap.String("txdigest", digest), zap.String("invariant", invariantError.Msg))
+		return false, nil
+	} else {
+		return verified, err
+	}
+}
+
+// Return conditions:
+//
+//	true, nil - verification succeeded
+//	false, nil - verification failed
+//	false, err - verification failed due to an internal error or invariant violation
+//
+// NOTE: it is up to the caller to check if the error is an invariant violation, and handle it accordingly.
+func (s *SuiTransferVerifier) processDigestInternal(ctx context.Context, digest string, msgIdStr string, logger *zap.Logger) (bool, error) {
+	logger.Debug("processing digest", zap.String("txDigest", digest), zap.String("msgId", msgIdStr))
+
 	// Get the transaction block
-	txBlock, err := suiApiConnection.GetTransactionBlock(digest)
+	txBlock, err := s.suiApiConnection.GetTransactionBlock(ctx, digest)
 
 	if err != nil {
-		logger.Fatal("Error in getting transaction block", zap.Error(err))
+		logger.Error("failed to retrieve transaction block",
+			zap.String("txDigest", digest),
+			zap.Error(err),
+		)
+		return false, ErrFailedToRetrieveTxBlock
 	}
 
-	// process all events, indicating funds that are leaving the chain
-	requestedOutOfBridge, numEventsProcessed := s.processEvents(txBlock.Result.Events, logger)
+	// Extract bridge requests from events
+	bridgeOutRequests := s.extractBridgeRequestsFromEvents(txBlock.Result.Events, logger)
+
+	if len(bridgeOutRequests) == 0 {
+		logger.Debug("No relevant events found in transaction block", zap.String("txDigest", digest))
+		// No valid events were identified, so the digest does not require further processing.
+		return true, nil
+	}
+
+	// Process all object changes, specifically looking for transfers into the token bridge
+	transfersIntoBridge := s.extractTransfersIntoBridgeFromObjectChanges(ctx, txBlock.Result.ObjectChanges, logger)
+
+	// Validate solvency using the requests out of the bridge vs the transfers into the bridge.
+	resolved, err := validateSolvency(bridgeOutRequests, transfersIntoBridge)
 
-	// process all object changes, indicating funds that are entering the chain
-	transferredIntoBridge, numChangesProcessed := s.processObjectUpdates(txBlock.Result.ObjectChanges, suiApiConnection, logger)
+	if err != nil {
+		logger.Error("Error validating solvency", zap.Error(err))
+		return false, err
+	}
 
-	// TODO: Using `Warn` for testing purposes. Update to Fatal? when ready to go into PR.
-	// TODO: Revisit error handling here.
-	for key, amountOut := range requestedOutOfBridge {
+	// If msgIdStr is found in the resolved map, check only that request. Otherwise, check all requests.
+	if request, exists := resolved[msgIdStr]; exists {
 
-		if _, exists := transferredIntoBridge[key]; !exists {
-			logger.Warn("transfer-out request for tokens that were never deposited",
-				zap.String("tokenAddress", key))
-			// TODO: Is it better to return or continue here?
-			return 0, errors.New("transfer-out request for tokens that were never deposited")
-			// continue
+		// Checking for nil, since the map value is a pointer.
+		if request == nil {
+			logger.Debug("No matching request found for message ID", zap.String("msgId", msgIdStr))
+			// No matching request was found for the given message ID.
+			return false, fmt.Errorf("no matching request found for message ID %s", msgIdStr)
 		}
 
-		amountIn := transferredIntoBridge[key]
+		if !request.DepositMade {
+			logger.Debug("No deposit made for request out of bridge",
+				zap.String("msgId", msgIdStr),
+				zap.String("assetKey", request.AssetKey),
+				zap.String("amount", request.Amount.String()))
+			// A deposit was not made for the given message ID.
+			return false, &InvariantError{Msg: INVARIANT_NO_DEPOSIT}
+		}
 
-		if amountOut.Cmp(amountIn) > 0 {
-			logger.Warn("requested amount out is larger than amount in")
-			return 0, errors.New("requested amount out is larger than amount in")
+		if !request.DepositSolvent {
+			logger.Debug("Deposit for request out of bridge was insolvent",
+				zap.String("msgId", msgIdStr),
+				zap.String("assetKey", request.AssetKey),
+				zap.String("amount", request.Amount.String()))
+			// A deposit was not solvent for the given message ID.
+			return false, &InvariantError{Msg: INVARIANT_INSUFFICIENT_DEPOSIT}
 		}
 
-		keyParts := strings.Split(key, "-")
-		logger.Info("bridge request processed",
-			zap.String("tokenAddress", keyParts[0]),
-			zap.String("chain", keyParts[1]),
-			zap.String("amountOut", amountOut.String()),
-			zap.String("amountIn", amountIn.String()))
-	}
+		logger.Debug("Request for message ID is valid", zap.String("msgId", msgIdStr))
+	} else {
+		// Any request that is not valid causes the entire transaction to be considered invalid.
+		for msgIdStrLoc, request := range resolved {
+			if !request.DepositMade {
+				logger.Debug("No deposit made for request out of bridge",
+					zap.String("assetKey", request.AssetKey),
+					zap.String("amount", request.Amount.String()))
+				// A request was not fulfilled by a deposit into the bridge.
+				return false, &InvariantError{Msg: INVARIANT_NO_DEPOSIT}
+			}
 
-	logger.Info("Digest processed", zap.String("txDigest", digest), zap.Uint("numEventsProcessed", numEventsProcessed), zap.Uint("numChangesProcessed", numChangesProcessed))
+			if !request.DepositSolvent {
+				logger.Debug("Deposit for request out of bridge was insolvent",
+					zap.String("assetKey", request.AssetKey),
+					zap.String("amount", request.Amount.String()))
+				// A request was not solvent.
+				return false, &InvariantError{Msg: INVARIANT_INSUFFICIENT_DEPOSIT}
+			}
+
+			logger.Debug("Request for message ID is valid", zap.String("msgId", msgIdStrLoc))
+		}
+	}
 
-	return numEventsProcessed, nil
+	return true, nil
 }
 
 type SuiApiResponse interface {
 	GetError() error
 }
 
-func suiApiRequest[T SuiApiResponse](rpc string, method string, params string) (T, error) {
+func suiApiRequest[T SuiApiResponse](ctx context.Context, rpc string, method string, params string) (T, error) {
 	var defaultT T
 
 	// Create the request
 	requestBody := fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "%s", "params": %s}`, method, params)
 
-	//nolint:noctx // TODO: this function should use a context
-	req, err := http.NewRequest("POST", rpc, strings.NewReader(requestBody))
+	req, err := http.NewRequestWithContext(ctx, "POST", rpc, strings.NewReader(requestBody))
 	if err != nil {
 		return defaultT, fmt.Errorf("cannot create request: %w", err)
 	}
@@ -250,7 +376,7 @@ func suiApiRequest[T SuiApiResponse](rpc string, method string, params string) (
 		return defaultT, fmt.Errorf("cannot parse response: %w", err)
 	}
 
-	// Check if an error message exists
+	// Check if the API returned an error
 	if res.GetError() != nil {
 		return defaultT, fmt.Errorf("error from Sui RPC: %w", res.GetError())
 	}
@@ -266,7 +392,7 @@ func NewSuiApiConnection(rpc string) SuiApiInterface {
 	return &SuiApiConnection{rpc: rpc}
 }
 
-func (s *SuiApiConnection) GetTransactionBlock(txDigest string) (SuiGetTransactionBlockResponse, error) {
+func (s *SuiApiConnection) GetTransactionBlock(ctx context.Context, txDigest string) (SuiGetTransactionBlockResponse, error) {
 	method := "sui_getTransactionBlock"
 	params := fmt.Sprintf(`[
 				"%s", 
@@ -276,17 +402,17 @@ func (s *SuiApiConnection) GetTransactionBlock(txDigest string) (SuiGetTransacti
 				}
 			]`, txDigest)
 
-	return suiApiRequest[SuiGetTransactionBlockResponse](s.rpc, method, params)
+	return suiApiRequest[SuiGetTransactionBlockResponse](ctx, s.rpc, method, params)
 }
 
-func (s *SuiApiConnection) QueryEvents(filter string, cursor string, limit int, descending bool) (SuiQueryEventsResponse, error) {
+func (s *SuiApiConnection) QueryEvents(ctx context.Context, filter string, cursor string, limit int, descending bool) (SuiQueryEventsResponse, error) {
 	method := "suix_queryEvents"
 	params := fmt.Sprintf(`[%s, %s, %d, %t]`, filter, cursor, limit, descending)
 
-	return suiApiRequest[SuiQueryEventsResponse](s.rpc, method, params)
+	return suiApiRequest[SuiQueryEventsResponse](ctx, s.rpc, method, params)
 }
 
-func (s *SuiApiConnection) TryMultiGetPastObjects(objectId string, version string, previousVersion string) (SuiTryMultiGetPastObjectsResponse, error) {
+func (s *SuiApiConnection) TryMultiGetPastObjects(ctx context.Context, objectId string, version string, previousVersion string) (SuiTryMultiGetPastObjectsResponse, error) {
 	method := "sui_tryMultiGetPastObjects"
 	params := fmt.Sprintf(`[
 			[
@@ -296,5 +422,5 @@ func (s *SuiApiConnection) TryMultiGetPastObjects(objectId string, version strin
 			{"showContent": true}
 		]`, objectId, version, objectId, previousVersion)
 
-	return suiApiRequest[SuiTryMultiGetPastObjectsResponse](s.rpc, method, params)
+	return suiApiRequest[SuiTryMultiGetPastObjectsResponse](ctx, s.rpc, method, params)
 }

+ 197 - 128
node/pkg/txverifier/sui_test.go

@@ -1,6 +1,7 @@
 package txverifier
 
 import (
+	"context"
 	"encoding/hex"
 	"encoding/json"
 	"fmt"
@@ -18,16 +19,12 @@ const (
 	SuiUsdcAddress      = "5d4b302506645c37ff133b98c4b50a5ae14841659738d6d733d59d0d217a93bf"
 )
 
-// func initGlobals() {
-// 	suiEventType = fmt.Sprintf("%s::%s::%s", *suiCoreContract, suiModule, suiEventName)
-// }
-
-func newTestSuiTransferVerifier() *SuiTransferVerifier {
+func newTestSuiTransferVerifier(connection SuiApiInterface) *SuiTransferVerifier {
 	suiCoreContract := "0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a"
 	suiTokenBridgeContract := "0x26efee2b51c911237888e5dc6702868abca3c7ac12c53f76ef8eba0697695e3d"
 	suiTokenBridgeEmitter := "0xccceeb29348f71bdd22ffef43a2a19c1f5b5e17c5cca5411529120182672ade5"
 
-	return NewSuiTransferVerifier(suiCoreContract, suiTokenBridgeEmitter, suiTokenBridgeContract)
+	return NewSuiTransferVerifier(suiCoreContract, suiTokenBridgeEmitter, suiTokenBridgeContract, connection)
 }
 
 type MockSuiApiConnection struct {
@@ -61,11 +58,15 @@ func (mock *MockSuiApiConnection) SetObjectsResponse(ObjectResponse SuiTryMultiG
 	mock.ObjectsResponses = append(mock.ObjectsResponses, ObjectResponse)
 }
 
-func (mock *MockSuiApiConnection) QueryEvents(filter string, cursor string, limit int, descending bool) (SuiQueryEventsResponse, error) {
+func (mock *MockSuiApiConnection) ClearObjectResponses() {
+	mock.ObjectsResponses = []SuiTryMultiGetPastObjectsResponse{}
+}
+
+func (mock *MockSuiApiConnection) QueryEvents(ctx context.Context, filter string, cursor string, limit int, descending bool) (SuiQueryEventsResponse, error) {
 	return SuiQueryEventsResponse{}, nil
 }
 
-func (mock *MockSuiApiConnection) GetTransactionBlock(txDigest string) (SuiGetTransactionBlockResponse, error) {
+func (mock *MockSuiApiConnection) GetTransactionBlock(ctx context.Context, txDigest string) (SuiGetTransactionBlockResponse, error) {
 
 	objectChanges := []ObjectChange{}
 
@@ -87,7 +88,7 @@ func (mock *MockSuiApiConnection) GetTransactionBlock(txDigest string) (SuiGetTr
 
 	return SuiGetTransactionBlockResponse{Result: SuiGetTransactionBlockResult{Events: mock.Events, ObjectChanges: objectChanges}}, nil
 }
-func (mock *MockSuiApiConnection) TryMultiGetPastObjects(objectId string, version string, previousVersion string) (SuiTryMultiGetPastObjectsResponse, error) {
+func (mock *MockSuiApiConnection) TryMultiGetPastObjects(ctx context.Context, objectId string, version string, previousVersion string) (SuiTryMultiGetPastObjectsResponse, error) {
 
 	for _, response := range mock.ObjectsResponses {
 		keyIn := fmt.Sprintf("%s-%s-%s", objectId, version, previousVersion)
@@ -118,12 +119,21 @@ func TestNewSuiApiConnection(t *testing.T) {
 	}
 }
 
+func nextSequenceNumber(seq *uint64) *string {
+	(*seq)++
+	seqStr := fmt.Sprintf("%d", *seq)
+	return &seqStr
+}
+
 func TestProcessEvents(t *testing.T) {
-	suiTxVerifier := newTestSuiTransferVerifier()
+	connection := NewMockSuiApiConnection([]SuiEvent{})
+	suiTxVerifier := newTestSuiTransferVerifier(connection)
 
 	arbitraryEventType := "arbitrary::EventType"
 	arbitraryEmitter := "0x3117"
 
+	sequenceNumber := uint64(0)
+
 	logger := zap.NewNop()
 
 	// Constants used throughout the tests
@@ -135,7 +145,7 @@ func TestProcessEvents(t *testing.T) {
 		name           string
 		events         []SuiEvent
 		expectedResult map[string]*big.Int
-		expectedCount  uint
+		expectedCount  int
 	}{
 		{
 			name:           "TestNoEvents",
@@ -148,10 +158,11 @@ func TestProcessEvents(t *testing.T) {
 			events: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100), EthereumUsdcAddress, 2),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100), EthereumUsdcAddress, 2),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
 			expectedResult: map[string]*big.Int{
@@ -164,17 +175,19 @@ func TestProcessEvents(t *testing.T) {
 			events: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
 			expectedResult: map[string]*big.Int{
@@ -187,17 +200,19 @@ func TestProcessEvents(t *testing.T) {
 			events: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
 			expectedResult: map[string]*big.Int{
@@ -211,10 +226,11 @@ func TestProcessEvents(t *testing.T) {
 			events: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &arbitraryEmitter,
-						Payload: generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &arbitraryEmitter,
+						Payload:  generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
 			expectedResult: map[string]*big.Int{},
@@ -225,17 +241,19 @@ func TestProcessEvents(t *testing.T) {
 			events: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 				{
 					Type: &arbitraryEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
 			expectedResult: map[string]*big.Int{
@@ -248,17 +266,19 @@ func TestProcessEvents(t *testing.T) {
 			events: []SuiEvent{
 				{ // Invalid payload type
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(0, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(0, big.NewInt(100), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 				{ // Empty payload
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: []byte{},
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  []byte{},
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
 			expectedResult: map[string]*big.Int{},
@@ -269,16 +289,18 @@ func TestProcessEvents(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 
-			result, count := suiTxVerifier.processEvents(tt.events, logger)
+			requests := suiTxVerifier.extractBridgeRequestsFromEvents(tt.events, logger)
 
-			assert.Equal(t, tt.expectedResult, result)
-			assert.Equal(t, tt.expectedCount, count)
+			assert.Equal(t, tt.expectedCount, len(requests))
+			// assert.Equal(t, tt.expectedResult, result)
+			// assert.Equal(t, tt.expectedCount, count)
 		})
 	}
 }
 
 func TestProcessObjectUpdates(t *testing.T) {
-	suiTxVerifier := newTestSuiTransferVerifier()
+	suiTxVerifier := newTestSuiTransferVerifier(nil)
+	ctx := context.TODO()
 
 	logger := zap.NewNop() // zap.Must(zap.NewDevelopment())
 
@@ -303,7 +325,7 @@ func TestProcessObjectUpdates(t *testing.T) {
 		name           string
 		objectChanges  []ObjectChange
 		resultList     []ResultTestCase
-		expectedResult map[string]*big.Int
+		expectedResult map[string]TransferIntoBridge
 		expectedCount  uint
 	}{
 		{
@@ -326,8 +348,10 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": big.NewInt(990)},
-			expectedCount:  1,
+			expectedResult: map[string]TransferIntoBridge{
+				"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": {Amount: big.NewInt(990)},
+			},
+			expectedCount: 1,
 		},
 		{
 			name: "TestProcessObjectForeignBase",
@@ -349,8 +373,10 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{"000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48-2": big.NewInt(990)},
-			expectedCount:  1,
+			expectedResult: map[string]TransferIntoBridge{
+				"000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48-2": {Amount: big.NewInt(990)},
+			},
+			expectedCount: 1,
 		},
 		{
 			name: "TestProcessObjectNativeNegative",
@@ -372,8 +398,10 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": big.NewInt(-990)},
-			expectedCount:  1,
+			expectedResult: map[string]TransferIntoBridge{
+				"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": {Amount: big.NewInt(-990)},
+			},
+			expectedCount: 1,
 		},
 		{
 			name: "TestProcessObjectForeignNegative", // Unsure if this test case is possible from Sui API
@@ -395,8 +423,10 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{"000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48-2": big.NewInt(-990)},
-			expectedCount:  1,
+			expectedResult: map[string]TransferIntoBridge{
+				"000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48-2": {Amount: big.NewInt(-990)},
+			},
+			expectedCount: 1,
 		},
 		{
 			name: "TestProcessObjectNativeMultiple",
@@ -432,8 +462,11 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": big.NewInt(990), "5075594c01d46f3bcbc4a7ef1462058273bece7793eebd0464963597c9fd0935-21": big.NewInt(4950)},
-			expectedCount:  2,
+			expectedResult: map[string]TransferIntoBridge{
+				"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": {Amount: big.NewInt(990)},
+				"5075594c01d46f3bcbc4a7ef1462058273bece7793eebd0464963597c9fd0935-21": {Amount: big.NewInt(4950)},
+			},
+			expectedCount: 2,
 		},
 		{
 			name: "TestProcessObjectNativeAndForeign",
@@ -469,8 +502,11 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": big.NewInt(990), "000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48-2": big.NewInt(4950)},
-			expectedCount:  2,
+			expectedResult: map[string]TransferIntoBridge{
+				"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": {Amount: big.NewInt(990)},
+				"000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48-2":  {Amount: big.NewInt(4950)},
+			},
+			expectedCount: 2,
 		},
 		{
 			name: "TestProcessObjectWrongPackageIdType",
@@ -492,7 +528,7 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{},
+			expectedResult: map[string]TransferIntoBridge{},
 			expectedCount:  0,
 		},
 		{
@@ -515,7 +551,7 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{},
+			expectedResult: map[string]TransferIntoBridge{},
 			expectedCount:  0,
 		},
 		{
@@ -538,7 +574,7 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{},
+			expectedResult: map[string]TransferIntoBridge{},
 			expectedCount:  0,
 		},
 		{
@@ -561,7 +597,7 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{},
+			expectedResult: map[string]TransferIntoBridge{},
 			expectedCount:  0,
 		},
 		{
@@ -598,8 +634,10 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{"000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48-2": big.NewInt(990)},
-			expectedCount:  1,
+			expectedResult: map[string]TransferIntoBridge{
+				"000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48-2": {Amount: big.NewInt(990)},
+			},
+			expectedCount: 1,
 		},
 		{
 			name: "TestProcessObjectRealNumbers",
@@ -621,8 +659,10 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     8,
 				},
 			},
-			expectedResult: map[string]*big.Int{"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": big.NewInt(1000000000)},
-			expectedCount:  1,
+			expectedResult: map[string]TransferIntoBridge{
+				"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": {Amount: big.NewInt(1000000000)},
+			},
+			expectedCount: 1,
 		},
 		{
 			name: "TestProcessObjectNormalize",
@@ -644,8 +684,10 @@ func TestProcessObjectUpdates(t *testing.T) {
 					decimals:     18,
 				},
 			},
-			expectedResult: map[string]*big.Int{"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": big.NewInt(100000000)},
-			expectedCount:  1,
+			expectedResult: map[string]TransferIntoBridge{
+				"9258181f5ceac8dbffb7030890243caed69a9599d2886d957a9cb7656af3bdb3-21": {Amount: big.NewInt(100000000)},
+			},
+			expectedCount: 1,
 		},
 		{
 			name: "TestProcessObjectMissingVersion",
@@ -668,7 +710,7 @@ func TestProcessObjectUpdates(t *testing.T) {
 					drop:         true,
 				},
 			},
-			expectedResult: map[string]*big.Int{},
+			expectedResult: map[string]TransferIntoBridge{},
 			expectedCount:  0,
 		},
 	}
@@ -676,6 +718,7 @@ func TestProcessObjectUpdates(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			connection := NewMockSuiApiConnection([]SuiEvent{})
+			suiTxVerifier.suiApiConnection = connection
 
 			assert.Equal(t, len(tt.objectChanges), len(tt.resultList))
 
@@ -691,16 +734,26 @@ func TestProcessObjectUpdates(t *testing.T) {
 			}
 
 			// Run function and check results
-			transferredIntoBridge, numEventsProcessed := suiTxVerifier.processObjectUpdates(tt.objectChanges, connection, logger)
-			assert.Equal(t, tt.expectedResult, transferredIntoBridge)
-			assert.Equal(t, tt.expectedCount, numEventsProcessed)
+			transfers := suiTxVerifier.extractTransfersIntoBridgeFromObjectChanges(ctx, tt.objectChanges, logger)
+
+			// Check that expectedResult and transfers have same number of keys
+			assert.Equal(t, uint(len(tt.expectedResult)), uint(len(transfers)))
+
+			// Check that each key in expectedResult exists in transfers and has the expected amount
+			for key, expectedValue := range tt.expectedResult {
+				actualValue, exists := transfers[key]
+				if !exists {
+					t.Errorf("Expected key %s not found in result", key)
+				} else if actualValue.Amount.Cmp(expectedValue.Amount) != 0 {
+					t.Errorf("For key %s, expected amount %s but got %s", key, expectedValue.Amount.String(), actualValue.Amount.String())
+				}
+			}
 		})
 	}
 }
 
-// TODO
 func TestProcessDigest(t *testing.T) {
-	suiTxVerifier := newTestSuiTransferVerifier()
+	suiTxVerifier := newTestSuiTransferVerifier(nil)
 
 	// Constants used throughout the tests
 	normalObjectNativeType := "0x2::dynamic_field::Field<0x26efee2b51c911237888e5dc6702868abca3c7ac12c53f76ef8eba0697695e3d::token_registry::Key<0x2::sui::SUI>, 0x26efee2b51c911237888e5dc6702868abca3c7ac12c53f76ef8eba0697695e3d::native_asset::NativeAsset<0x2::sui::SUI>>"
@@ -718,6 +771,8 @@ func TestProcessDigest(t *testing.T) {
 	suiEventType := suiTxVerifier.suiEventType
 	suiTokenBridgeEmitter := suiTxVerifier.suiTokenBridgeEmitter
 
+	sequenceNumber := uint64(0)
+
 	logger := zap.Must(zap.NewDevelopment())
 
 	// func processDigest(digest string, suiApiConnection SuiApiInterface, logger *zap.Logger) error {
@@ -727,7 +782,7 @@ func TestProcessDigest(t *testing.T) {
 		objectChanges []ObjectChange
 		resultList    []ResultTestCase
 		suiEvents     []SuiEvent
-		expectedError string
+		expectedError error
 		expectedCount uint
 	}{
 		{
@@ -753,13 +808,13 @@ func TestProcessDigest(t *testing.T) {
 			suiEvents: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
 						Sender:  &suiTokenBridgeEmitter,
 						Payload: generatePayload(1, big.NewInt(990), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
-					},
+					}),
 				},
 			},
-			expectedError: "",
+			expectedError: nil,
 			expectedCount: 1,
 		},
 		{
@@ -785,13 +840,14 @@ func TestProcessDigest(t *testing.T) {
 			suiEvents: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100000), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100000), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
-			expectedError: "requested amount out is larger than amount in",
+			expectedError: &InvariantError{Msg: INVARIANT_INSUFFICIENT_DEPOSIT},
 			expectedCount: 0,
 		},
 		{
@@ -801,13 +857,14 @@ func TestProcessDigest(t *testing.T) {
 			suiEvents: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100000), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100000), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
-			expectedError: "transfer-out request for tokens that were never deposited",
+			expectedError: &InvariantError{Msg: INVARIANT_NO_DEPOSIT},
 			expectedCount: 0,
 		},
 		{
@@ -833,13 +890,14 @@ func TestProcessDigest(t *testing.T) {
 			suiEvents: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(990), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(990), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
-			expectedError: "",
+			expectedError: nil,
 			expectedCount: 1,
 		},
 		{
@@ -849,13 +907,14 @@ func TestProcessDigest(t *testing.T) {
 			suiEvents: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(100000), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(100000), SuiUsdcAddress, uint16(vaa.ChainIDSui)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
-			expectedError: "transfer-out request for tokens that were never deposited",
+			expectedError: &InvariantError{Msg: INVARIANT_NO_DEPOSIT},
 			expectedCount: 0,
 		},
 		{
@@ -881,20 +940,21 @@ func TestProcessDigest(t *testing.T) {
 			suiEvents: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(990), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(990), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
 						Sender:  &suiTokenBridgeEmitter,
 						Payload: generatePayload(1, big.NewInt(1000), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					}),
 				},
 			},
-			expectedError: "",
+			expectedError: nil,
 			expectedCount: 2,
 		},
 		{
@@ -920,29 +980,32 @@ func TestProcessDigest(t *testing.T) {
 			suiEvents: []SuiEvent{
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(990), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(990), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 				{
 					Type: &suiEventType,
-					Message: &WormholeMessage{
-						Sender:  &suiTokenBridgeEmitter,
-						Payload: generatePayload(1, big.NewInt(1001), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
-					},
+					ParsedJson: uncheckedJsonMarshal(&WormholeMessage{
+						Sender:   &suiTokenBridgeEmitter,
+						Payload:  generatePayload(1, big.NewInt(1001), EthereumUsdcAddress, uint16(vaa.ChainIDEthereum)),
+						Sequence: nextSequenceNumber(&sequenceNumber),
+					}),
 				},
 			},
-			expectedError: "requested amount out is larger than amount in",
+			expectedError: &InvariantError{Msg: INVARIANT_INSUFFICIENT_DEPOSIT},
 			expectedCount: 0,
 		},
 	}
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-
+			ctx := context.TODO()
 			assert.Equal(t, len(tt.objectChanges), len(tt.resultList))
 			connection := NewMockSuiApiConnection(tt.suiEvents) // Set events for connection
+			suiTxVerifier.suiApiConnection = connection
 
 			// Add Object Response data for Sui connections
 			for index := 0; index < len(tt.objectChanges); index++ {
@@ -954,14 +1017,20 @@ func TestProcessDigest(t *testing.T) {
 				connection.SetObjectsResponse(responseObject)
 			}
 
-			numProcessed, err := suiTxVerifier.ProcessDigest("HASH", connection, logger)
+			_, err := suiTxVerifier.processDigestInternal(ctx, "HASH", "", logger)
 
-			assert.Equal(t, true, tt.expectedError == "" && err == nil || err != nil && err.Error() == tt.expectedError)
-			assert.Equal(t, tt.expectedCount, numProcessed)
+			assert.Equal(t, true, tt.expectedError == nil && err == nil || err != nil && err.Error() == tt.expectedError.Error())
+			// assert.Equal(t, tt.expectedCount, numProcessed)
 		})
 	}
 }
 
+// Marshal the input to a json.RawMessage, and ignore the error message.
+func uncheckedJsonMarshal(v any) *json.RawMessage {
+	data, _ := json.Marshal(v)
+	return (*json.RawMessage)(&data)
+}
+
 // Generate WormholeMessage payload.
 //
 //	Payload type: payload[0]

+ 71 - 56
node/pkg/txverifier/suitypes.go

@@ -1,21 +1,22 @@
 package txverifier
 
 import (
+	"context"
 	"encoding/hex"
 	"encoding/json"
 	"fmt"
 	"math/big"
 	"regexp"
 	"strings"
-)
 
-const SUI_CHAIN_ID = 21
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
+)
 
 // The SuiApi interface defines the functions that are required to interact with the Sui RPC.
 type SuiApiInterface interface {
-	QueryEvents(filter string, cursor string, limit int, descending bool) (SuiQueryEventsResponse, error)
-	GetTransactionBlock(txDigest string) (SuiGetTransactionBlockResponse, error)
-	TryMultiGetPastObjects(objectId string, version string, previousVersion string) (SuiTryMultiGetPastObjectsResponse, error)
+	QueryEvents(ctx context.Context, filter string, cursor string, limit int, descending bool) (SuiQueryEventsResponse, error)
+	GetTransactionBlock(ctx context.Context, txDigest string) (SuiGetTransactionBlockResponse, error)
+	TryMultiGetPastObjects(ctx context.Context, objectId string, version string, previousVersion string) (SuiTryMultiGetPastObjectsResponse, error)
 }
 
 // This struct defines the standard properties that get returned from the RPC.
@@ -66,8 +67,8 @@ type SuiEvent struct {
 	Sender            *string `json:"sender"`
 	Type              *string `json:"type"`
 	// Bcs               *string          `json:"bcs"`
-	Timestamp *string          `json:"timestampMs"`
-	Message   *WormholeMessage `json:"parsedJson"`
+	Timestamp  *string          `json:"timestampMs"`
+	ParsedJson *json.RawMessage `json:"parsedJson"`
 }
 
 // The response object for sui_GetTransactionBlock
@@ -130,37 +131,41 @@ func (o ObjectChange) ValidateTypeInformation(expectedPackageId string) (success
 // The response object for suix_tryMultiGetPastObjects
 type SuiTryMultiGetPastObjectsResponse struct {
 	SuiApiStandardResponse
-	Result []SuiTryMultiGetPastObjectsResult `json:"result"`
+	Result [2]SuiTryMultiGetPastObjectsResult `json:"result"`
 }
 
 // Gets the balance difference of the two result objects.
-func (r SuiTryMultiGetPastObjectsResponse) GetBalanceDiff() (*big.Int, error) {
+func (r SuiTryMultiGetPastObjectsResponse) GetBalanceChange() (*big.Int, error) {
 
 	if len(r.Result) != 2 {
-		return big.NewInt(0), fmt.Errorf("incorrect number of results received")
+		return nil, fmt.Errorf("incorrect number of objects in Result")
 	}
 
-	// Determine if the asset is wrapped or native
+	// Determine if the asset is wrapped or native. It's only necessary to check if one asset
+	// is wrapped, since `TryMultiGetPastObjects` queries only a single object ID.
 	isWrapped, err := r.Result[0].IsWrapped()
 	if err != nil {
-		return big.NewInt(0), fmt.Errorf("error in checking if object is wrapped: %w", err)
+		return nil, fmt.Errorf("Error checking if object is wrapped: %w", err)
 	}
 
-	// TODO: Should we check that the other asset is also wrapped?
-	newBalance, err := r.Result[0].GetVersionBalance(isWrapped)
+	oldBalance, err := r.Result[1].GetVersionBalance(isWrapped)
 	if err != nil {
-		return big.NewInt(0), fmt.Errorf("error in getting new balance: %w", err)
+		return nil, fmt.Errorf("Error getting old balance: %w", err)
 	}
 
-	oldBalance, err := r.Result[1].GetVersionBalance(isWrapped)
+	newBalance, err := r.Result[0].GetVersionBalance(isWrapped)
 	if err != nil {
-		return big.NewInt(0), fmt.Errorf("error in getting old balance: %w", err)
+		return nil, fmt.Errorf("Error getting new balance: %w", err)
 	}
 
+	// NOTE: newBalance is also modified by this operation, so care should be taken if future changes
+	// relies on the original value of newBalance.
 	difference := newBalance.Sub(newBalance, oldBalance)
-	// If the asset is wrapped, it means that the balance was burned, so the difference should be negative.
+
+	// If the asset is wrapped, it means that the balance should have been burned, implying a reduction
+	// in total supply. Hence, the difference is negative and the sign needs to be inverted.
 	if isWrapped {
-		difference = difference.Mul(difference, big.NewInt(-1))
+		difference.Neg(difference)
 	}
 
 	return difference, nil
@@ -172,9 +177,9 @@ func (r SuiTryMultiGetPastObjectsResponse) GetDecimals() (uint8, error) {
 	decimals1, err1 := r.Result[1].GetDecimals()
 
 	if err0 != nil {
-		return 0, fmt.Errorf("error in getting decimals: %w", err0)
+		return 0, fmt.Errorf("Error getting decimals: %w", err0)
 	} else if err1 != nil {
-		return 0, fmt.Errorf("error in getting decimals: %w", err1)
+		return 0, fmt.Errorf("Error getting decimals: %w", err1)
 	} else if decimals0 != decimals1 {
 		return 0, fmt.Errorf("decimals do not match")
 	}
@@ -187,9 +192,9 @@ func (r SuiTryMultiGetPastObjectsResponse) GetTokenAddress() (string, error) {
 	tokenAddress1, err1 := r.Result[1].GetTokenAddress()
 
 	if err0 != nil {
-		return "", fmt.Errorf("error in getting token address: %w", err0)
+		return "", fmt.Errorf("Error getting token address: %w", err0)
 	} else if err1 != nil {
-		return "", fmt.Errorf("error in getting token address: %w", err1)
+		return "", fmt.Errorf("Error getting token address: %w", err1)
 	} else if tokenAddress0 != tokenAddress1 {
 		return "", fmt.Errorf("token addresses do not match")
 	}
@@ -197,14 +202,14 @@ func (r SuiTryMultiGetPastObjectsResponse) GetTokenAddress() (string, error) {
 	return tokenAddress0, nil
 }
 
-func (r SuiTryMultiGetPastObjectsResponse) GetTokenChain() (uint16, error) {
+func (r SuiTryMultiGetPastObjectsResponse) GetTokenChain() (vaa.ChainID, error) {
 	chain0, err0 := r.Result[0].GetTokenChain()
 	chain1, err1 := r.Result[1].GetTokenChain()
 
 	if err0 != nil {
-		return 0, fmt.Errorf("error in getting token chain: %w", err0)
+		return 0, fmt.Errorf("Error getting token chain: %w", err0)
 	} else if err1 != nil {
-		return 0, fmt.Errorf("error in getting token chain: %w", err1)
+		return 0, fmt.Errorf("Error getting token chain: %w", err1)
 	} else if chain0 != chain1 {
 		return 0, fmt.Errorf("token chain ids do not match")
 	}
@@ -244,9 +249,9 @@ func (r SuiTryMultiGetPastObjectsResponse) GetObjectType() (string, error) {
 	type1, err1 := r.Result[1].GetObjectType()
 
 	if err0 != nil {
-		return "", fmt.Errorf("error in getting token chain: %w", err0)
+		return "", fmt.Errorf("Error getting token chain: %w", err0)
 	} else if err1 != nil {
-		return "", fmt.Errorf("error in getting token chain: %w", err1)
+		return "", fmt.Errorf("Error getting token chain: %w", err1)
 	} else if type0 != type1 {
 		return "", fmt.Errorf("token chain ids do not match")
 	}
@@ -256,17 +261,17 @@ func (r SuiTryMultiGetPastObjectsResponse) GetObjectType() (string, error) {
 
 // The result object for suix_tryMultiGetPastObjects.
 type SuiTryMultiGetPastObjectsResult struct {
-	Status  string           `json:"status"`
-	Details *json.RawMessage `json:"details"`
+	Status  string          `json:"status"`
+	Details json.RawMessage `json:"details"`
 }
 
 // Check if the result object is wrapped.
 func (r SuiTryMultiGetPastObjectsResult) IsWrapped() (bool, error) {
 	path := "content.type"
-	objectType, err := extractFromJsonPath[string](*r.Details, path)
+	objectType, err := extractFromJsonPath[string](r.Details, path)
 
 	if err != nil {
-		return false, fmt.Errorf("error in extracting object type: %w", err)
+		return false, fmt.Errorf("Error extracting object type: %w", err)
 	}
 
 	return strings.Contains(objectType, "wrapped_asset::WrappedAsset"), nil
@@ -290,10 +295,10 @@ func (r SuiTryMultiGetPastObjectsResult) GetVersionBalance(isWrapped bool) (*big
 		path = pathNative
 	}
 
-	supply, err := extractFromJsonPath[string](*r.Details, path)
+	supply, err := extractFromJsonPath[string](r.Details, path)
 
 	if err != nil {
-		return supplyInt, fmt.Errorf("error in extracting wormhole balance: %w", err)
+		return supplyInt, fmt.Errorf("Error extracting wormhole balance: %w", err)
 	}
 
 	supplyInt, success := supplyInt.SetString(supply, 10)
@@ -311,17 +316,17 @@ func (r SuiTryMultiGetPastObjectsResult) GetDecimals() (uint8, error) {
 	// both store the decimals used for truncation in the NativeAsset or WrappedAsset's `decimals()` field
 	path := "content.fields.value.fields.decimals"
 
-	decimals, err := extractFromJsonPath[float64](*r.Details, path)
+	decimals, err := extractFromJsonPath[float64](r.Details, path)
 
 	if err != nil {
-		return 0, fmt.Errorf("error in extracting decimals: %w", err)
+		return 0, fmt.Errorf("Error extracting decimals: %w", err)
 	}
 
 	return uint8(decimals), nil
 }
 
 // Get the result object's token address. This will be the address of the token
-// on it's chain of origin.
+// on its origin chain.
 func (r SuiTryMultiGetPastObjectsResult) GetTokenAddress() (tokenAddress string, err error) {
 	var path string
 
@@ -334,7 +339,7 @@ func (r SuiTryMultiGetPastObjectsResult) GetTokenAddress() (tokenAddress string,
 	wrapped, err := r.IsWrapped()
 
 	if err != nil {
-		return "", fmt.Errorf("error in checking if object is wrapped: %w", err)
+		return "", fmt.Errorf("Error checking if object is wrapped: %w", err)
 	}
 
 	if wrapped {
@@ -343,21 +348,21 @@ func (r SuiTryMultiGetPastObjectsResult) GetTokenAddress() (tokenAddress string,
 		path = pathNative
 	}
 
-	data, err := extractFromJsonPath[[]interface{}](*r.Details, path)
+	data, err := extractFromJsonPath[[]interface{}](r.Details, path)
 
 	if err != nil {
-		return "", fmt.Errorf("error in extracting token address: %w", err)
+		return "", fmt.Errorf("Error extracting token address: %w", err)
 	}
 
 	// data is of type []interface{}, and each element is of type float64.
-	// We need to covnert each element to a byte, and then convert the []byte to a hex string.
+	// We need to convert each element to a byte, and then convert the []byte to a hex string.
 	addrBytes := make([]byte, len(data))
 
 	for i, v := range data {
 		if f, ok := v.(float64); ok {
 			addrBytes[i] = byte(f)
 		} else {
-			return "", fmt.Errorf("error in converting token data to float type")
+			return "", fmt.Errorf("Error converting token data to float type")
 		}
 	}
 
@@ -366,36 +371,46 @@ func (r SuiTryMultiGetPastObjectsResult) GetTokenAddress() (tokenAddress string,
 
 // Get the token's chain ID. This will be the chain ID of the network the token
 // originated from.
-func (r SuiTryMultiGetPastObjectsResult) GetTokenChain() (uint16, error) {
+func (r SuiTryMultiGetPastObjectsResult) GetTokenChain() (vaa.ChainID, error) {
 
 	wrapped, err := r.IsWrapped()
 
 	if err != nil {
-		return 0, fmt.Errorf("error in checking if object is wrapped: %w", err)
+		return 0, fmt.Errorf("Error checking if object is wrapped: %w", err)
 	}
 
 	if !wrapped {
-		return SUI_CHAIN_ID, nil
+		return vaa.ChainIDSui, nil
 	}
 
 	path := "content.fields.value.fields.info.fields.token_chain"
 
-	chain, err := extractFromJsonPath[float64](*r.Details, path)
+	chain, err := extractFromJsonPath[float64](r.Details, path)
 
 	if err != nil {
-		return 0, fmt.Errorf("error in extracting chain: %w", err)
+		return 0, fmt.Errorf("Error extracting chain: %w", err)
 	}
 
-	return uint16(chain), nil
+	if float64(uint16(chain)) != chain {
+		return 0, fmt.Errorf("failed to cast chain ID: %f", chain)
+	}
+
+	chainId, err := vaa.KnownChainIDFromNumber(uint16(chain))
+
+	if err != nil {
+		return 0, fmt.Errorf("Error converting chain to known chain id: %w", err)
+	}
+
+	return chainId, nil
 }
 
 func (r SuiTryMultiGetPastObjectsResult) GetObjectId() (string, error) {
 	path := "objectId"
 
-	objectId, err := extractFromJsonPath[string](*r.Details, path)
+	objectId, err := extractFromJsonPath[string](r.Details, path)
 
 	if err != nil {
-		return "", fmt.Errorf("error in extracting objectId: %w", err)
+		return "", fmt.Errorf("Error extracting objectId: %w", err)
 	}
 
 	return objectId, nil
@@ -404,10 +419,10 @@ func (r SuiTryMultiGetPastObjectsResult) GetObjectId() (string, error) {
 func (r SuiTryMultiGetPastObjectsResult) GetVersion() (string, error) {
 	path := "version"
 
-	version, err := extractFromJsonPath[string](*r.Details, path)
+	version, err := extractFromJsonPath[string](r.Details, path)
 
 	if err != nil {
-		return "", fmt.Errorf("error in extracting version: %w", err)
+		return "", fmt.Errorf("Error extracting version: %w", err)
 	}
 
 	return version, nil
@@ -416,19 +431,19 @@ func (r SuiTryMultiGetPastObjectsResult) GetVersion() (string, error) {
 func (r SuiTryMultiGetPastObjectsResult) GetObjectType() (string, error) {
 	path := "type"
 
-	version, err := extractFromJsonPath[string](*r.Details, path)
+	objectType, err := extractFromJsonPath[string](r.Details, path)
 
 	if err != nil {
-		return "", fmt.Errorf("error in extracting version: %w", err)
+		return "", fmt.Errorf("Error extracting type: %w", err)
 	}
 
-	return version, nil
+	return objectType, nil
 }
 
 // Definition of the WormholeMessage event
 type WormholeMessage struct {
 	ConsistencyLevel *uint8  `json:"consistency_level"`
-	Nonce            *uint64 `json:"nonce"`
+	Nonce            *uint32 `json:"nonce"`
 	Payload          []byte  `json:"payload"`
 	Sender           *string `json:"sender"`
 	Sequence         *string `json:"sequence"`

+ 115 - 1
node/pkg/txverifier/utils.go

@@ -25,6 +25,10 @@ const (
 	// CacheDeleteCount specifies the number of entries to delete from a cache once it reaches CacheMaxSize.
 	// Must be less than CacheMaxSize.
 	CacheDeleteCount = 10
+
+	// Invariant violation messages. There technically only exists two violations.
+	INVARIANT_NO_DEPOSIT           = "bridge transfer requested for tokens that were never deposited"
+	INVARIANT_INSUFFICIENT_DEPOSIT = "bridge transfer requested for more tokens than were deposited"
 )
 
 // msgID is a unique identifier the corresponds to a VAA's "message ID".
@@ -88,6 +92,16 @@ func NewMsgID(in string) (msgID, error) {
 		EmitterAddress: emitterAddress,
 		Sequence:       sequence,
 	}, nil
+
+}
+
+// Custom error type used to signal that a core invariant of the token bridge has been violated.
+type InvariantError struct {
+	Msg string
+}
+
+func (i InvariantError) Error() string {
+	return fmt.Sprintf("invariant violated: %s", i.Msg)
 }
 
 // Extracts the value at the given path from the JSON object, and casts it to
@@ -95,6 +109,10 @@ func NewMsgID(in string) (msgID, error) {
 func extractFromJsonPath[T any](data json.RawMessage, path string) (T, error) {
 	var defaultT T
 
+	if data == nil {
+		return defaultT, fmt.Errorf("supplied JSON data is nil")
+	}
+
 	var obj map[string]interface{}
 	err := json.Unmarshal(data, &obj)
 	if err != nil {
@@ -123,7 +141,7 @@ func extractFromJsonPath[T any](data json.RawMessage, path string) (T, error) {
 		if v, ok := value.(T); ok {
 			return v, nil
 		} else {
-			return defaultT, fmt.Errorf("can't convert to type T")
+			return defaultT, fmt.Errorf("can't convert to type %T", *new(T))
 		}
 	} else {
 		return defaultT, fmt.Errorf("key %s not found", keys[len(keys)-1])
@@ -179,6 +197,7 @@ func SupportedChains() []vaa.ChainID {
 	return []vaa.ChainID{
 		// Mainnets
 		vaa.ChainIDEthereum,
+		vaa.ChainIDSui,
 		// Testnets
 		vaa.ChainIDSepolia,
 		vaa.ChainIDHolesky,
@@ -289,3 +308,98 @@ func upsert(
 	}
 	return nil
 }
+
+type MsgIdToRequestOutOfBridge map[string]*RequestOutOfBridge
+
+// Represents a request to move assets out of the Sui token bridge
+type RequestOutOfBridge struct {
+	AssetKey string
+	Amount   *big.Int
+
+	// Validation parameters
+	DepositMade    bool // True if `assetKey` was deposited into the bridge
+	DepositSolvent bool // True if `assetKey` is considered solvent
+}
+
+type AssetKeyToTransferIntoBridge map[string]*TransferIntoBridge
+
+// Represents a transfer of assets into the Sui token bridge
+type TransferIntoBridge struct {
+	Amount  *big.Int
+	Solvent bool
+}
+
+// validateSolvency checks whether or not the transfers into the brigde are sufficient to cover the
+// requests to transfer assets out of the bridge.
+//   - if an asset is considered insolvent, all requests out of the bridge for that asset are marked
+//     as invalid.
+//   - if an asset is considered solvent, all requests out of the bridge for that asset are marked
+//     as valid.
+//
+// The typical case where this would be important is when multiple bridge transfers are requested in
+// the same transaction. If the transfers are of different assets, it's important that one asset's
+// insolvency does not affect the validity of other bridge transfers.
+func validateSolvency(
+	requests MsgIdToRequestOutOfBridge,
+	transfers AssetKeyToTransferIntoBridge,
+) (MsgIdToRequestOutOfBridge, error) {
+	resolved := make(MsgIdToRequestOutOfBridge)
+
+	insolventAssetKeys := []string{}
+
+	// Check that all requests and transfers have non-nil amounts.
+	// This function assumes that amounts are non-nil, so an error is returned if
+	// any amount is nil. An empty resolution map is also returned.
+	for msgIdStr, request := range requests {
+		if request.Amount == nil {
+			return resolved, fmt.Errorf("nil amount in request out of bridge for msgID %s", msgIdStr)
+		}
+	}
+
+	for assetKey, transfer := range transfers {
+		if transfer.Amount == nil {
+			return resolved, fmt.Errorf("nil amount in transfer into bridge for assetKey %s", assetKey)
+		}
+	}
+
+	// First pass: check if assets were deposited for the requests out of the bridge,
+	// and subtract the requested amounts from the transfers into the bridge.
+	for _, request := range requests {
+		transfer, exists := transfers[request.AssetKey]
+		if !exists {
+			// No transfer into the bridge for this asset. Set depositMade to false.
+			request.DepositMade = false
+			continue
+		}
+
+		// Mark that a deposit was made for this asset.
+		request.DepositMade = true
+
+		// Subtract the requested amount from the transfer amount
+		transfer.Amount = new(big.Int).Sub(transfer.Amount, request.Amount)
+
+		// The moment the transfer amount goes negative, the asset is insolvent.
+		if transfer.Amount.Sign() < 0 {
+			insolventAssetKeys = append(insolventAssetKeys, request.AssetKey)
+		}
+	}
+
+	// Second pass: check each request's assetKey against the list of insolvent assets.
+	// If the assetKey is not in the list, then mark the request as solvent.
+	for msgIdStr, request := range requests {
+		if !request.DepositMade {
+			// No deposit was made for this asset. Mark as insolvent.
+			request.DepositSolvent = false
+		} else if slices.Contains(insolventAssetKeys, request.AssetKey) {
+			// Asset is insolvent.
+			request.DepositSolvent = false
+		} else {
+			// Asset is solvent.
+			request.DepositSolvent = true
+		}
+
+		resolved[msgIdStr] = request
+	}
+
+	return resolved, nil
+}

+ 190 - 0
node/pkg/txverifier/utils_test.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"math/big"
 	"reflect"
+	"slices"
 	"testing"
 
 	"github.com/ethereum/go-ethereum/common"
@@ -76,6 +77,14 @@ func TestExtractFromJsonPath(t *testing.T) {
 			wantErr:  true,
 			typ:      "string",
 		},
+		{
+			name:     "DataIsNil",
+			data:     nil,
+			path:     "test",
+			expected: nil,
+			wantErr:  true,
+			typ:      "string",
+		},
 	}
 
 	for _, tt := range testcases {
@@ -529,3 +538,184 @@ func Test_deleteEntries_AddressKeys(t *testing.T) {
 		})
 	}
 }
+
+func Test_validateSolvency(t *testing.T) {
+	tests := []struct {
+		name             string
+		requests         MsgIdToRequestOutOfBridge
+		transfers        AssetKeyToTransferIntoBridge
+		wantErr          bool
+		invalidTransfers []string
+	}{
+		{
+			name: "SingleRequest_SingleAsset_Solvent",
+			requests: MsgIdToRequestOutOfBridge{
+				"msg1": {
+					AssetKey:       "asset1",
+					Amount:         big.NewInt(100),
+					DepositMade:    false,
+					DepositSolvent: false, // will be updated by validateSolvency
+				},
+			},
+			transfers: AssetKeyToTransferIntoBridge{
+				"asset1": {
+					Amount: big.NewInt(200),
+				},
+			},
+			wantErr:          false,
+			invalidTransfers: []string{},
+		},
+		{
+			name: "SingleRequest_SingleAsset_Insolvent",
+			requests: MsgIdToRequestOutOfBridge{
+				"msg1": {
+					AssetKey:       "asset1",
+					Amount:         big.NewInt(300),
+					DepositMade:    false,
+					DepositSolvent: false, // will be updated by validateSolvency
+				},
+			},
+			transfers: AssetKeyToTransferIntoBridge{
+				"asset1": {
+					Amount: big.NewInt(200),
+				},
+			},
+			wantErr:          false,
+			invalidTransfers: []string{"msg1"},
+		},
+		{
+			name: "MultipleRequests_MultipleAssets_MixedSolvency",
+			requests: MsgIdToRequestOutOfBridge{
+				"msg1": {
+					AssetKey:       "asset1",
+					Amount:         big.NewInt(100),
+					DepositMade:    false,
+					DepositSolvent: false, // will be updated by validateSolvency
+				},
+				"msg2": {
+					AssetKey:       "asset2",
+					Amount:         big.NewInt(150),
+					DepositMade:    false,
+					DepositSolvent: false, // will be updated by validateSolvency
+				},
+				"msg3": {
+					AssetKey:       "asset1",
+					Amount:         big.NewInt(50),
+					DepositMade:    false,
+					DepositSolvent: false, // will be updated by validateSolvency
+				},
+			},
+			transfers: AssetKeyToTransferIntoBridge{
+				"asset1": {
+					Amount: big.NewInt(120),
+				},
+				"asset2": {
+					Amount: big.NewInt(200),
+				},
+			},
+			wantErr: false,
+			// asset1 is insolvent because of msg1 and msg3
+			invalidTransfers: []string{"msg1", "msg3"},
+		},
+		{
+			name: "RequestWithNoMatchingTransfer",
+			requests: MsgIdToRequestOutOfBridge{
+				"msg1": {
+					AssetKey:       "asset1",
+					Amount:         big.NewInt(100),
+					DepositMade:    false,
+					DepositSolvent: false, // will be updated by validateSolvency
+				},
+			},
+			transfers:        AssetKeyToTransferIntoBridge{},
+			wantErr:          false,
+			invalidTransfers: []string{"msg1"}, // no transfer for asset1
+		},
+		{
+			name:             "EmptyRequests",
+			requests:         MsgIdToRequestOutOfBridge{},
+			transfers:        AssetKeyToTransferIntoBridge{},
+			wantErr:          false,
+			invalidTransfers: []string{},
+		},
+		{
+			name: "EmptyTransfers",
+			requests: MsgIdToRequestOutOfBridge{
+				"msg1": {
+					AssetKey:       "asset1",
+					Amount:         big.NewInt(100),
+					DepositMade:    false,
+					DepositSolvent: false, // will be updated by validateSolvency
+				},
+			},
+			transfers:        AssetKeyToTransferIntoBridge{},
+			wantErr:          false,
+			invalidTransfers: []string{"msg1"}, // no transfer for asset1
+		},
+		{
+			name:             "EmptyRequests",
+			requests:         MsgIdToRequestOutOfBridge{},
+			transfers:        AssetKeyToTransferIntoBridge{},
+			wantErr:          false,
+			invalidTransfers: []string{},
+		},
+		{
+			name: "RequestWithNilAmount",
+			requests: MsgIdToRequestOutOfBridge{
+				"msg1": {
+					AssetKey:       "asset1",
+					Amount:         nil,
+					DepositMade:    false,
+					DepositSolvent: false, // will be updated by validateSolvency
+				},
+			},
+			transfers:        AssetKeyToTransferIntoBridge{},
+			wantErr:          true,
+			invalidTransfers: []string{},
+		},
+		{
+			name: "TransferWithNilAmount",
+			requests: MsgIdToRequestOutOfBridge{
+				"msg1": {
+					AssetKey:       "asset1",
+					Amount:         big.NewInt(100),
+					DepositMade:    false,
+					DepositSolvent: false, // will be updated by validateSolvency
+				},
+			},
+			transfers: AssetKeyToTransferIntoBridge{
+				"asset1": {
+					Amount: nil,
+				},
+			},
+			wantErr:          true,
+			invalidTransfers: []string{},
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			resolved, err := validateSolvency(tt.requests, tt.transfers)
+
+			if (err != nil) != tt.wantErr {
+				t.Errorf("validateSolvency() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+
+			for msgIdStr, req := range resolved {
+				reqIsValid := req.DepositSolvent && req.DepositMade
+				shouldBeInvalid := slices.Contains(tt.invalidTransfers, msgIdStr)
+
+				if reqIsValid && !shouldBeInvalid {
+					// Valid and should be valid, all good.
+				} else if reqIsValid && shouldBeInvalid {
+					// Request was marked as valid, but should be invalid
+					t.Errorf("Expected message ID %s to be marked as invalid, but it was marked as valid", msgIdStr)
+				} else if !reqIsValid && !shouldBeInvalid {
+					// Request was marked as invalid, but should be valid
+					t.Errorf("Expected message ID %s to be marked as valid, but it was marked as invalid", msgIdStr)
+				}
+			}
+		})
+	}
+}

+ 16 - 5
node/pkg/watchers/sui/config.go

@@ -11,10 +11,11 @@ import (
 )
 
 type WatcherConfig struct {
-	NetworkID        watchers.NetworkID // human readable name
-	ChainID          vaa.ChainID        // ChainID
-	Rpc              string
-	SuiMoveEventType string
+	NetworkID         watchers.NetworkID // human readable name
+	ChainID           vaa.ChainID
+	Rpc               string
+	SuiMoveEventType  string
+	TxVerifierEnabled bool
 }
 
 func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
@@ -36,5 +37,15 @@ func (wc *WatcherConfig) Create(
 ) (supervisor.Runnable, interfaces.Reobserver, error) {
 	var devMode = (env == common.UnsafeDevNet)
 
-	return NewWatcher(wc.Rpc, wc.SuiMoveEventType, devMode, msgC, obsvReqC).Run, nil, nil
+	watcher, err := NewWatcher(
+		wc.Rpc,
+		wc.SuiMoveEventType,
+		devMode,
+		msgC,
+		obsvReqC,
+		env,
+		wc.TxVerifierEnabled,
+	)
+
+	return watcher.Run, nil, err
 }

+ 64 - 0
node/pkg/watchers/sui/msg_verifier.go

@@ -0,0 +1,64 @@
+package sui
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
+	"go.uber.org/zap"
+)
+
+func (e *Watcher) verify(
+	ctx context.Context,
+	msg *common.MessagePublication,
+	txDigest string,
+	logger *zap.Logger,
+) (common.MessagePublication, error) {
+
+	if msg == nil {
+		return common.MessagePublication{}, fmt.Errorf("MessagePublication is nil")
+	}
+
+	if msg.VerificationState() != common.NotVerified {
+		return common.MessagePublication{}, fmt.Errorf("MessagePublication already has a non-default verification state")
+	}
+
+	if e.suiTxVerifier == nil {
+		return common.MessagePublication{}, fmt.Errorf("transfer verifier is nil")
+	}
+
+	localMsg := *msg
+
+	var verificationState common.VerificationState
+
+	// If the payload does not represent a transfer, or if the emitter address of the message does
+	// not match the token bridge emitter, mark the message's verification state as NotApplicable.
+	if !vaa.IsTransfer(msg.Payload) || "0x"+localMsg.EmitterAddress.String() != e.suiTxVerifier.GetTokenBridgeEmitter() {
+		verificationState = common.NotApplicable
+	} else {
+		// Validate the transfers in the transaction block associated with the
+		// transaction digest.
+		valid, err := e.suiTxVerifier.ProcessDigest(ctx, txDigest, localMsg.MessageIDString(), logger)
+
+		if err != nil {
+			logger.Error("an internal Sui tx verifier error occurred: ", zap.Error(err))
+			verificationState = common.CouldNotVerify
+		} else if valid {
+			verificationState = common.Valid
+		} else {
+			// If no error and validation failed, mark as Anomalous. For Sui transfers, Anomalous is used, since there are
+			// more edge cases that need to be considered and covered before outright rejecting a transfer.
+			verificationState = common.Anomalous
+		}
+	}
+
+	// Update the state of the message.
+	updateErr := localMsg.SetVerificationState(verificationState)
+	if updateErr != nil {
+		errMsg := fmt.Sprintf("could not set verification state for message with txID %s", localMsg.TxIDString())
+		return common.MessagePublication{}, fmt.Errorf("%s %w", errMsg, updateErr)
+	}
+
+	return localMsg, nil
+}

+ 111 - 17
node/pkg/watchers/sui/watcher.go

@@ -10,6 +10,7 @@ import (
 	"strings"
 	"time"
 
+	"encoding/hex"
 	"encoding/json"
 
 	"github.com/certusone/wormhole/node/pkg/common"
@@ -24,8 +25,11 @@ import (
 	"github.com/prometheus/client_golang/prometheus/promauto"
 
 	"github.com/mr-tron/base58"
+	"github.com/wormhole-foundation/wormhole/sdk"
 	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 	"go.uber.org/zap"
+
+	txverifier "github.com/certusone/wormhole/node/pkg/txverifier"
 )
 
 type (
@@ -45,6 +49,10 @@ type (
 		loopDelay                 time.Duration
 		queryEventsCmd            string
 		postTimeout               time.Duration
+
+		// Sui transaction verifier
+		suiTxVerifier     *txverifier.SuiTransferVerifier
+		txVerifierEnabled bool
 	}
 
 	SuiEventResponse struct {
@@ -166,6 +174,11 @@ var (
 			Name: "wormhole_sui_current_height",
 			Help: "Current Sui block height",
 		})
+	suiTransferVerifierFailures = promauto.NewCounter(
+		prometheus.CounterOpts{
+			Name: "wormhole_sui_txverifier_failures",
+			Help: "Total number of messages that failed transfer verification",
+		})
 )
 
 // NewWatcher creates a new Sui appid watcher
@@ -175,9 +188,61 @@ func NewWatcher(
 	unsafeDevMode bool,
 	messageEvents chan<- *common.MessagePublication,
 	obsvReqC <-chan *gossipv1.ObservationRequest,
-) *Watcher {
+	env common.Environment,
+	txVerifierEnabled bool,
+) (*Watcher, error) {
 	maxBatchSize := 10
 	descOrder := true
+
+	var suiTxVerifier *txverifier.SuiTransferVerifier
+
+	if txVerifierEnabled {
+
+		// Extracted from the suiMoveEventType passed to guardiand as CLI arg
+		var suiCoreBridgePackageId string
+
+		//	Read from a hardcoded map in the txverifier package, based on the environment
+		//	NOTE: this is the original package ID, not the current one. The original package ID is used to address
+		//	the token registry that holds the wrapped and native assets of the bridge.
+		var suiTokenBridgePackageId string
+
+		// Read from the sdk, based on the environment
+		var suiTokenBridgeEmitter string
+
+		// Split the suiMoveEventType into its components. If the format is incorrect, return an error.
+		eventTypeComponents := strings.Split(suiMoveEventType, "::")
+		if len(eventTypeComponents) != 3 {
+			return nil, fmt.Errorf("suiMoveEventType is not in the correct format, expected <package_id>::<module_name>::<event_name>, got: %s", suiMoveEventType)
+		}
+
+		suiCoreBridgePackageId = eventTypeComponents[0]
+
+		// Retrieve the token bridge package ID and token bridge emitter address, based on the environment
+		switch env {
+		case common.MainNet:
+			suiTokenBridgePackageId = txverifier.SuiOriginalTokenBridgePackageIds[common.MainNet]
+			suiTokenBridgeEmitter = "0x" + hex.EncodeToString(sdk.KnownTokenbridgeEmitters[vaa.ChainIDSui])
+		case common.TestNet:
+			suiTokenBridgePackageId = txverifier.SuiOriginalTokenBridgePackageIds[common.TestNet]
+			suiTokenBridgeEmitter = "0x" + hex.EncodeToString(sdk.KnownTestnetTokenbridgeEmitters[vaa.ChainIDSui])
+		case common.UnsafeDevNet, common.AccountantMock, common.GoTest:
+			suiTokenBridgePackageId = txverifier.SuiOriginalTokenBridgePackageIds[common.UnsafeDevNet]
+			suiTokenBridgeEmitter = "0x" + hex.EncodeToString(sdk.KnownTokenbridgeEmitters[vaa.ChainIDSui])
+		}
+
+		// Create the Sui Api connection, and query the state object to get the token bridge address and emitter.
+		suiApiConnection := txverifier.NewSuiApiConnection(suiRPC)
+
+		// Create the new suiTxVerifier
+		suiTxVerifier = txverifier.NewSuiTransferVerifier(
+			suiCoreBridgePackageId,
+			suiTokenBridgeEmitter,
+			suiTokenBridgePackageId,
+			suiApiConnection,
+		)
+
+	}
+
 	return &Watcher{
 		suiRPC:                    suiRPC,
 		suiMoveEventType:          suiMoveEventType,
@@ -191,11 +256,13 @@ func NewWatcher(
 		loopDelay:                 time.Second, // SUI produces a checkpoint every ~3 seconds
 		queryEventsCmd: fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "suix_queryEvents", "params": [{ "MoveEventType": "%s" }, null, %d, %t]}`,
 			suiMoveEventType, maxBatchSize, descOrder),
-		postTimeout: time.Second * 5,
-	}
+		postTimeout:       time.Second * 5,
+		suiTxVerifier:     suiTxVerifier,
+		txVerifierEnabled: txVerifierEnabled,
+	}, nil
 }
 
-func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservation bool) error {
+func (e *Watcher) inspectBody(ctx context.Context, logger *zap.Logger, body SuiResult, isReobservation bool) error {
 	if body.ID.TxDigest == nil {
 		return errors.New("missing TxDigest field")
 	}
@@ -274,24 +341,51 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservatio
 		IsReobservation:  isReobservation,
 	}
 
+	// Verifies the observation through the Sui transaction verifier, if enabled, followed
+	// by publishing the observation to the message channel.
+	err = e.verifyAndPublish(ctx, observation, *body.ID.TxDigest, logger)
+
+	if err != nil {
+		suiTransferVerifierFailures.Inc()
+		logger.Error("Message publication error",
+			zap.String("TxDigest", *body.ID.TxDigest),
+			zap.Error(err))
+	}
+
+	return nil
+}
+
+func (e *Watcher) verifyAndPublish(
+	ctx context.Context,
+	msg *common.MessagePublication,
+	txDigest string,
+	logger *zap.Logger,
+) error {
+	if msg == nil {
+		return errors.New("MessagePublication is nil")
+	}
+
+	if e.suiTxVerifier != nil {
+		verifiedMsg, err := e.verify(ctx, msg, txDigest, logger)
+
+		if err != nil {
+			return err
+		}
+
+		msg = &verifiedMsg
+	}
+
+	e.msgChan <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
+
 	suiMessagesConfirmed.Inc()
-	if isReobservation {
+	if msg.IsReobservation {
 		watchers.ReobservationsByChain.WithLabelValues("sui", "std").Inc()
 	}
 
 	logger.Info("message observed",
-		zap.String("txHash", observation.TxIDString()),
-		zap.Time("timestamp", observation.Timestamp),
-		zap.Uint32("nonce", observation.Nonce),
-		zap.Uint64("sequence", observation.Sequence),
-		zap.Stringer("emitter_chain", observation.EmitterChain),
-		zap.Stringer("emitter_address", observation.EmitterAddress),
-		zap.Binary("payload", observation.Payload),
-		zap.Uint8("consistencyLevel", observation.ConsistencyLevel),
+		msg.ZapFields()...,
 	)
 
-	e.msgChan <- observation //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
-
 	return nil
 }
 
@@ -346,7 +440,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				if len(dataWithEvents) > 0 {
 					for idx := len(dataWithEvents) - 1; idx >= 0; idx-- {
 						event := dataWithEvents[idx]
-						err = e.inspectBody(logger, event.result, false)
+						err = e.inspectBody(ctx, logger, event.result, false)
 						if err != nil {
 							logger.Error("inspectBody Error", zap.Error(err))
 							continue
@@ -439,7 +533,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				}
 
 				for i, chunk := range res.Result {
-					err := e.inspectBody(logger, chunk, true)
+					err := e.inspectBody(ctx, logger, chunk, true)
 					if err != nil {
 						logger.Info("sui_fetch_obvs_req skipping event data in result", zap.String("txhash", tx58), zap.Int("index", i), zap.Error(err))
 					}

File diff suppressed because it is too large
+ 75 - 2
node/pkg/watchers/sui/watcher_test.go


+ 0 - 1
sdk/devnet_consts.go

@@ -50,6 +50,5 @@ var KnownDevnetWrappedNativeAddresses = map[vaa.ChainID]string{
 
 // KnownDevnetCoreContracts is a map of known core contract addresses used during development.
 var KnownDevnetCoreContracts = map[vaa.ChainID]string{
-	vaa.ChainIDSui:      "5a5160ca3c2037f4b4051344096ef7a48ebf4400b3f385e57ea90e1628a8bde0",
 	vaa.ChainIDEthereum: "0xC89Ce4735882C9F0f0FE26686c53074E09B0D550",
 }

Some files were not shown because too many files changed in this diff