فهرست منبع

Node: Change TxHash to TxID in MessagePublication (#4219)

* Node: Change TxHash to TxID in MessagePublication

* Code review rework

* More code review rework

* Limit TxID to 255 bytes and add test for it

* Code review rework
bruce-riley 10 ماه پیش
والد
کامیت
1ed88d113f
34فایلهای تغییر یافته به همراه745 افزوده شده و 292 حذف شده
  1. 82 7
      node/hack/accountant/send_obs.go
  2. 1 1
      node/hack/repair_terra/repair.go
  3. 9 9
      node/pkg/accountant/accountant_test.go
  4. 1 1
      node/pkg/accountant/audit.go
  5. 5 5
      node/pkg/accountant/ntt_test.go
  6. 2 2
      node/pkg/accountant/submit_obs.go
  7. 1 3
      node/pkg/accountant/watcher.go
  8. 1 1
      node/pkg/adminrpc/adminserver.go
  9. 31 16
      node/pkg/common/chainlock.go
  10. 181 3
      node/pkg/common/chainlock_test.go
  11. 121 50
      node/pkg/db/accountant.go
  12. 115 35
      node/pkg/db/accountant_test.go
  13. 5 5
      node/pkg/db/governor.go
  14. 65 30
      node/pkg/db/governor_test.go
  15. 10 10
      node/pkg/governor/governor.go
  16. 6 6
      node/pkg/governor/governor_db.go
  17. 2 2
      node/pkg/governor/governor_monitoring.go
  18. 44 44
      node/pkg/governor/governor_test.go
  19. 6 4
      node/pkg/node/node_test.go
  20. 3 3
      node/pkg/node/options.go
  21. 3 3
      node/pkg/processor/batch_obs_test.go
  22. 2 2
      node/pkg/processor/benchmark_test.go
  23. 5 5
      node/pkg/processor/message.go
  24. 1 1
      node/pkg/watchers/algorand/watcher.go
  25. 2 2
      node/pkg/watchers/aptos/watcher.go
  26. 1 1
      node/pkg/watchers/cosmwasm/watcher.go
  27. 1 1
      node/pkg/watchers/evm/by_transaction.go
  28. 18 18
      node/pkg/watchers/evm/watcher.go
  29. 6 6
      node/pkg/watchers/ibc/watcher.go
  30. 1 1
      node/pkg/watchers/ibc/watcher_test.go
  31. 1 1
      node/pkg/watchers/near/tx_processing.go
  32. 10 11
      node/pkg/watchers/near/watcher_test.go
  33. 1 1
      node/pkg/watchers/solana/client.go
  34. 2 2
      node/pkg/watchers/sui/watcher.go

+ 82 - 7
node/hack/accountant/send_obs.go

@@ -58,6 +58,7 @@ func main() {
 		return
 	}
 
+	// Don't increment the sequence number here.
 	if !testSubmit(ctx, logger, guardianSigner, wormchainConn, contract, "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16", timestamp, sequence, true, "Already commited should succeed") {
 		return
 	}
@@ -82,6 +83,11 @@ func main() {
 		return
 	}
 
+	sequence += 10
+	if !testBigBatch(ctx, logger, guardianSigner, wormchainConn, contract, "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16", timestamp, sequence, true, "Submit of big batch should succeed") {
+		return
+	}
+
 	logger.Info("Success! All tests passed!")
 }
 
@@ -102,7 +108,7 @@ func testSubmit(
 	Payload, _ := hex.DecodeString("010000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a0002000000000000000000000000c10820983f33456ce7beb3a046f5a83fa34f027d0c200000000000000000000000000000000000000000000000000000000000000000")
 
 	msg := common.MessagePublication{
-		TxHash:           TxHash,
+		TxID:             TxHash.Bytes(),
 		Timestamp:        timestamp,
 		Nonce:            uint32(0),
 		Sequence:         sequence,
@@ -167,7 +173,7 @@ func testBatch(
 	msgs := []*common.MessagePublication{}
 
 	msg1 := common.MessagePublication{
-		TxHash:           TxHash,
+		TxID:             TxHash.Bytes(),
 		Timestamp:        timestamp,
 		Nonce:            nonce,
 		Sequence:         sequence,
@@ -181,7 +187,7 @@ func testBatch(
 	nonce = nonce + 1
 	sequence = sequence + 1
 	msg2 := common.MessagePublication{
-		TxHash:           TxHash,
+		TxID:             TxHash.Bytes(),
 		Timestamp:        time.Now(),
 		Nonce:            nonce,
 		Sequence:         sequence,
@@ -246,7 +252,7 @@ func testBatchWithcommitted(
 
 	logger.Info("submitting a single transfer that should work")
 	msg1 := common.MessagePublication{
-		TxHash:           TxHash,
+		TxID:             TxHash.Bytes(),
 		Timestamp:        timestamp,
 		Nonce:            nonce,
 		Sequence:         sequence,
@@ -269,7 +275,7 @@ func testBatchWithcommitted(
 	nonce = nonce + 1
 	sequence = sequence + 1
 	msg2 := common.MessagePublication{
-		TxHash:           TxHash,
+		TxID:             TxHash.Bytes(),
 		Timestamp:        time.Now(),
 		Nonce:            nonce,
 		Sequence:         sequence,
@@ -338,7 +344,7 @@ func testBatchWithDigestError(
 
 	logger.Info("submitting a single transfer that should work")
 	msg1 := common.MessagePublication{
-		TxHash:           TxHash,
+		TxID:             TxHash.Bytes(),
 		Timestamp:        timestamp,
 		Nonce:            nonce,
 		Sequence:         sequence,
@@ -361,7 +367,7 @@ func testBatchWithDigestError(
 	nonce = nonce + 1
 	sequence = sequence + 1
 	msg2 := common.MessagePublication{
-		TxHash:           TxHash,
+		TxID:             TxHash.Bytes(),
 		Timestamp:        time.Now(),
 		Nonce:            nonce,
 		Sequence:         sequence,
@@ -440,3 +446,72 @@ func submit(
 
 	return accountant.SubmitObservationsToContract(ctx, logger, guardianSigner, gsIndex, guardianIndex, wormchainConn, contract, accountant.SubmitObservationPrefix, msgs)
 }
+
+func testBigBatch(
+	ctx context.Context,
+	logger *zap.Logger,
+	guardianSigner guardiansigner.GuardianSigner,
+	wormchainConn *wormconn.ClientConn,
+	contract string,
+	emitterAddressStr string,
+	timestamp time.Time,
+	sequence uint64,
+	expectedResult bool,
+	tag string,
+) bool {
+	EmitterAddress, _ := vaa.StringToAddress(emitterAddressStr)
+	TxHash := []byte("0123456789012345678901234567890123456789012345678901234567890123") // 64 bytes, the size of a Solana signature.
+	Payload, _ := hex.DecodeString("010000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a0002000000000000000000000000c10820983f33456ce7beb3a046f5a83fa34f027d0c200000000000000000000000000000000000000000000000000000000000000000")
+
+	msgs := []*common.MessagePublication{}
+	for idx := 0; idx < 10; idx++ {
+		msg := common.MessagePublication{
+			TxID:             TxHash,
+			Timestamp:        timestamp,
+			Nonce:            uint32(0),
+			Sequence:         sequence,
+			EmitterChain:     vaa.ChainIDEthereum,
+			EmitterAddress:   EmitterAddress,
+			ConsistencyLevel: uint8(15),
+			Payload:          Payload,
+		}
+
+		msgs = append(msgs, &msg)
+		sequence += 1
+	}
+
+	txResp, err := submit(ctx, logger, guardianSigner, wormchainConn, contract, msgs)
+	if err != nil {
+		logger.Error("failed to broadcast Observation request", zap.String("test", tag), zap.Error(err))
+		return false
+	}
+
+	responses, err := accountant.GetObservationResponses(txResp)
+	if err != nil {
+		logger.Error("failed to get responses", zap.Error(err))
+		return false
+	}
+
+	if len(responses) != len(msgs) {
+		logger.Error("number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err))
+		return false
+	}
+
+	msgId := msgs[0].MessageIDString()
+	status, exists := responses[msgId]
+	if !exists {
+		logger.Info("test failed: did not receive an observation response for message", zap.String("test", tag), zap.String("msgId", msgId))
+		return false
+	}
+
+	committed := status.Type == "committed"
+
+	if committed != expectedResult {
+		logger.Info("test failed", zap.String("test", tag), zap.Uint64("seqNo", sequence), zap.Bool("committed", committed),
+			zap.String("response", wormchainConn.BroadcastTxResponseToString(txResp)))
+		return false
+	}
+
+	logger.Info("test of big batch succeeded", zap.String("test", tag))
+	return true
+}

+ 1 - 1
node/hack/repair_terra/repair.go

@@ -237,7 +237,7 @@ func EventsToMessagePublications(contract string, txHash string, events []gjson.
 			continue
 		}
 		messagePublication := &common.MessagePublication{
-			TxHash:           txHashValue,
+			TxID:             txHashValue.Bytes(),
 			Timestamp:        time.Unix(blockTimeInt, 0),
 			Nonce:            uint32(nonceInt),
 			Sequence:         sequenceInt,

+ 9 - 9
node/pkg/accountant/accountant_test.go

@@ -137,13 +137,13 @@ func newAccountantForTest(
 	return acct
 }
 
-// Converts a string into a go-ethereum Hash object used as test input.
-func hashFromString(str string) ethCommon.Hash { //nolint:unparam
+// Converts a TxHash string into a byte array to be used as a TxID.
+func hashToTxID(str string) []byte {
 	if (len(str) > 2) && (str[0] == '0') && (str[1] == 'x') {
 		str = str[2:]
 	}
 
-	return ethCommon.HexToHash(str)
+	return ethCommon.HexToHash(str).Bytes()
 }
 
 // Note this method assumes 18 decimals for the amount.
@@ -188,7 +188,7 @@ func TestVaaFromUninterestingEmitter(t *testing.T) {
 	var payload = []byte{1, 97, 97, 97, 97, 97}
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -216,7 +216,7 @@ func TestVaaForUninterestingPayloadType(t *testing.T) {
 	var payload = []byte{2, 97, 97, 97, 97, 97}
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -251,7 +251,7 @@ func TestInterestingTransferShouldNotBeBlockedWhenNotEnforcingAccountant(t *test
 	)
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -295,7 +295,7 @@ func TestInterestingTransferShouldBeBlockedWhenEnforcingAccountant(t *testing.T)
 	)
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -347,7 +347,7 @@ func TestForDeadlock(t *testing.T) {
 	)
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1683136244),
@@ -374,7 +374,7 @@ func TestForDeadlock(t *testing.T) {
 	assert.Equal(t, 1, len(acct.msgChan))
 
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1683136244),

+ 1 - 1
node/pkg/accountant/audit.go

@@ -108,7 +108,7 @@ func (mo *MissingObservation) makeAuditKey() string {
 
 // makeAuditKey creates an audit map key from a pending observation entry.
 func (pe *pendingEntry) makeAuditKey() string {
-	return fmt.Sprintf("%d-%s", pe.msg.EmitterChain, strings.TrimPrefix(pe.msg.TxHash.String(), "0x"))
+	return fmt.Sprintf("%d-%s", pe.msg.EmitterChain, strings.TrimPrefix(pe.msg.TxIDString(), "0x"))
 }
 
 // audit is the runnable that executes the audit each interval.

+ 5 - 5
node/pkg/accountant/ntt_test.go

@@ -50,7 +50,7 @@ func TestNttParseMsgSuccess(t *testing.T) {
 	}
 
 	msg := &common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(42),
 		Sequence:         uint64(123456),
@@ -77,7 +77,7 @@ func TestNttParseMsgWrongEmitterChain(t *testing.T) {
 	}
 
 	msg := &common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(42),
 		Sequence:         uint64(123456),
@@ -106,7 +106,7 @@ func TestNttParseMsgWrongEmitterAddress(t *testing.T) {
 	}
 
 	msg := &common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(42),
 		Sequence:         uint64(123456),
@@ -221,7 +221,7 @@ func TestNttParseArMsgSuccess(t *testing.T) {
 	require.NoError(t, err)
 
 	msg := &common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1708575745), 0),
 		Nonce:            uint32(0),
 		Sequence:         uint64(259),
@@ -258,7 +258,7 @@ func TestNttParseArMsgUnknownArEmitter(t *testing.T) {
 	require.NoError(t, err)
 
 	msg := &common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1708575745), 0),
 		Nonce:            uint32(0),
 		Sequence:         uint64(259),

+ 2 - 2
node/pkg/accountant/submit_obs.go

@@ -308,7 +308,7 @@ func SubmitObservationsToContract(
 	obs := make([]Observation, len(msgs))
 	for idx, msg := range msgs {
 		obs[idx] = Observation{
-			TxHash:           msg.TxHash.Bytes(),
+			TxHash:           msg.TxID,
 			Timestamp:        uint32(msg.Timestamp.Unix()),
 			Nonce:            msg.Nonce,
 			EmitterChain:     uint16(msg.EmitterChain),
@@ -321,7 +321,7 @@ func SubmitObservationsToContract(
 		logger.Debug("in SubmitObservationsToContract, encoding observation",
 			zap.String("contract", contract),
 			zap.Int("idx", idx),
-			zap.String("txHash", msg.TxHash.String()), zap.String("encTxHash", hex.EncodeToString(obs[idx].TxHash[:])),
+			zap.String("txHash", msg.TxIDString()), zap.String("encTxHash", hex.EncodeToString(obs[idx].TxHash[:])),
 			zap.Stringer("timeStamp", msg.Timestamp), zap.Uint32("encTimestamp", obs[idx].Timestamp),
 			zap.Uint32("nonce", msg.Nonce), zap.Uint32("encNonce", obs[idx].Nonce),
 			zap.Stringer("emitterChain", msg.EmitterChain), zap.Uint16("encEmitterChain", obs[idx].EmitterChain),

+ 1 - 3
node/pkg/accountant/watcher.go

@@ -11,8 +11,6 @@ import (
 	"github.com/certusone/wormhole/node/pkg/common"
 	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 
-	ethCommon "github.com/ethereum/go-ethereum/common"
-
 	tmAbci "github.com/tendermint/tendermint/abci/types"
 	tmHttp "github.com/tendermint/tendermint/rpc/client/http"
 	tmCoreTypes "github.com/tendermint/tendermint/rpc/core/types"
@@ -181,7 +179,7 @@ func (acct *Accountant) processPendingTransfer(xfer *WasmObservation, tag string
 	)
 
 	msg := &common.MessagePublication{
-		TxHash:           ethCommon.BytesToHash(xfer.TxHash),
+		TxID:             xfer.TxHash,
 		Timestamp:        time.Unix(int64(xfer.Timestamp), 0),
 		Nonce:            xfer.Nonce,
 		Sequence:         xfer.Sequence,

+ 1 - 1
node/pkg/adminrpc/adminserver.go

@@ -798,7 +798,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
 		vaaInjectionsTotal.Inc()
 
 		s.injectC <- &common.MessagePublication{
-			TxHash:           ethcommon.Hash{},
+			TxID:             ethcommon.Hash{}.Bytes(),
 			Timestamp:        v.Timestamp,
 			Nonce:            v.Nonce,
 			Sequence:         v.Sequence,

+ 31 - 16
node/pkg/common/chainlock.go

@@ -7,19 +7,19 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"math"
 	"time"
 
+	"github.com/ethereum/go-ethereum/common"
 	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 	"go.uber.org/zap"
-
-	"github.com/ethereum/go-ethereum/common"
 )
 
 const HashLength = 32
 const AddressLength = 32
 
 type MessagePublication struct {
-	TxHash    common.Hash // TODO: rename to identifier? on Solana, this isn't actually the tx hash
+	TxID      []byte
 	Timestamp time.Time
 
 	Nonce            uint32
@@ -35,6 +35,10 @@ type MessagePublication struct {
 	Unreliable bool
 }
 
+func (msg *MessagePublication) TxIDString() string {
+	return "0x" + hex.EncodeToString(msg.TxID)
+}
+
 func (msg *MessagePublication) MessageID() []byte {
 	return []byte(msg.MessageIDString())
 }
@@ -48,7 +52,12 @@ const minMsgLength = 88 // Marshalled length with empty payload
 func (msg *MessagePublication) Marshal() ([]byte, error) {
 	buf := new(bytes.Buffer)
 
-	buf.Write(msg.TxHash[:])
+	if len(msg.TxID) > math.MaxUint8 {
+		return nil, errors.New("TxID too long")
+	}
+	vaa.MustWrite(buf, binary.BigEndian, uint8(len(msg.TxID)))
+	buf.Write(msg.TxID)
+
 	vaa.MustWrite(buf, binary.BigEndian, uint32(msg.Timestamp.Unix()))
 	vaa.MustWrite(buf, binary.BigEndian, msg.Nonce)
 	vaa.MustWrite(buf, binary.BigEndian, msg.Sequence)
@@ -61,12 +70,10 @@ func (msg *MessagePublication) Marshal() ([]byte, error) {
 	return buf.Bytes(), nil
 }
 
-const oldMinMsgLength = 83 // Old marshalled length with empty payload
-
-// UnmarshalOldMessagePublicationBeforeIsReobservation deserializes a MessagePublication from prior to the addition of IsReobservation.
+// UnmarshalOldMessagePublicationWithTxHash deserializes a MessagePublication from when the TxHash was a fixed size ethCommon.Hash.
 // This function can be deleted once all guardians have been upgraded. That's why the code is just duplicated.
-func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessagePublication, error) {
-	if len(data) < oldMinMsgLength {
+func UnmarshalOldMessagePublicationWithTxHash(data []byte) (*MessagePublication, error) {
+	if len(data) < minMsgLength {
 		return nil, errors.New("message is too short")
 	}
 
@@ -78,7 +85,7 @@ func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessageP
 	if n, err := reader.Read(txHash[:]); err != nil || n != HashLength {
 		return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err)
 	}
-	msg.TxHash = txHash
+	msg.TxID = txHash.Bytes()
 
 	unixSeconds := uint32(0)
 	if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil {
@@ -108,6 +115,10 @@ func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessageP
 	}
 	msg.EmitterAddress = emitterAddress
 
+	if err := binary.Read(reader, binary.BigEndian, &msg.IsReobservation); err != nil {
+		return nil, fmt.Errorf("failed to read isReobservation: %w", err)
+	}
+
 	payload := make([]byte, reader.Len())
 	n, err := reader.Read(payload)
 	if err != nil || n == 0 {
@@ -121,18 +132,22 @@ func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessageP
 // UnmarshalMessagePublication deserializes a MessagePublication
 func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) {
 	if len(data) < minMsgLength {
-		return nil, fmt.Errorf("message is too short")
+		return nil, errors.New("message is too short")
 	}
 
 	msg := &MessagePublication{}
 
 	reader := bytes.NewReader(data[:])
 
-	txHash := common.Hash{}
-	if n, err := reader.Read(txHash[:]); err != nil || n != HashLength {
-		return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err)
+	txIdLen := uint8(0)
+	if err := binary.Read(reader, binary.BigEndian, &txIdLen); err != nil {
+		return nil, fmt.Errorf("failed to read TxID len: %w", err)
+	}
+
+	msg.TxID = make([]byte, txIdLen)
+	if n, err := reader.Read(msg.TxID[:]); err != nil || n != int(txIdLen) {
+		return nil, fmt.Errorf("failed to read TxID [%d]: %w", n, err)
 	}
-	msg.TxHash = txHash
 
 	unixSeconds := uint32(0)
 	if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil {
@@ -229,7 +244,7 @@ func (msg *MessagePublication) CreateDigest() string {
 // TODO refactor the codebase to use this function instead of manually logging the message with inconsistent fields
 func (msg *MessagePublication) ZapFields(fields ...zap.Field) []zap.Field {
 	return append(fields,
-		zap.Stringer("tx", msg.TxHash),
+		zap.String("tx", msg.TxIDString()),
 		zap.Time("timestamp", msg.Timestamp),
 		zap.Uint32("nonce", msg.Nonce),
 		zap.Uint8("consistency", msg.ConsistencyLevel),

+ 181 - 3
node/pkg/common/chainlock_test.go

@@ -51,7 +51,7 @@ func TestSerializeAndDeserializeOfMessagePublication(t *testing.T) {
 	payloadBytes1 := encodePayloadBytes(payload1)
 
 	msg1 := &MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -74,6 +74,118 @@ func TestSerializeAndDeserializeOfMessagePublication(t *testing.T) {
 	assert.Equal(t, payload1, payload2)
 }
 
+func TestSerializeAndDeserializeOfMessagePublicationWithEmptyTxID(t *testing.T) {
+	originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") //nolint:gosec
+	require.NoError(t, err)
+
+	targetAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	payload1 := &vaa.TransferPayloadHdr{
+		Type:          0x01,
+		Amount:        big.NewInt(27000000000),
+		OriginAddress: originAddress,
+		OriginChain:   vaa.ChainIDEthereum,
+		TargetAddress: targetAddress,
+		TargetChain:   vaa.ChainIDPolygon,
+	}
+
+	payloadBytes1 := encodePayloadBytes(payload1)
+
+	msg1 := &MessagePublication{
+		TxID:             []byte{},
+		Timestamp:        time.Unix(int64(1654516425), 0),
+		Nonce:            123456,
+		Sequence:         789101112131415,
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddress,
+		Payload:          payloadBytes1,
+		ConsistencyLevel: 32,
+	}
+
+	bytes, err := msg1.Marshal()
+	require.NoError(t, err)
+
+	msg2, err := UnmarshalMessagePublication(bytes)
+	require.NoError(t, err)
+	assert.Equal(t, msg1, msg2)
+
+	payload2, err := vaa.DecodeTransferPayloadHdr(msg2.Payload)
+	require.NoError(t, err)
+
+	assert.Equal(t, payload1, payload2)
+}
+
+func TestSerializeAndDeserializeOfMessagePublicationWithArbitraryTxID(t *testing.T) {
+	originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") //nolint:gosec
+	require.NoError(t, err)
+
+	targetAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	payload1 := &vaa.TransferPayloadHdr{
+		Type:          0x01,
+		Amount:        big.NewInt(27000000000),
+		OriginAddress: originAddress,
+		OriginChain:   vaa.ChainIDEthereum,
+		TargetAddress: targetAddress,
+		TargetChain:   vaa.ChainIDPolygon,
+	}
+
+	payloadBytes1 := encodePayloadBytes(payload1)
+
+	msg1 := &MessagePublication{
+		TxID:             []byte("This is some arbitrary string with just some random junk in it. This is to prove that the TxID does not have to be a ethCommon.Hash"),
+		Timestamp:        time.Unix(int64(1654516425), 0),
+		Nonce:            123456,
+		Sequence:         789101112131415,
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddress,
+		Payload:          payloadBytes1,
+		ConsistencyLevel: 32,
+	}
+
+	bytes, err := msg1.Marshal()
+	require.NoError(t, err)
+
+	msg2, err := UnmarshalMessagePublication(bytes)
+	require.NoError(t, err)
+	assert.Equal(t, msg1, msg2)
+
+	payload2, err := vaa.DecodeTransferPayloadHdr(msg2.Payload)
+	require.NoError(t, err)
+
+	assert.Equal(t, payload1, payload2)
+}
+
+func TestTxIDStringTooLongShouldFail(t *testing.T) {
+	tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	// This is limited to 255. Make it 256 and the marshal should fail.
+	txID := []byte("0123456789012345678901234567890123456789012345678901234567890123012345678901234567890123456789012345678901234567890123456789012301234567890123456789012345678901234567890123456789012345678901230123456789012345678901234567890123456789012345678901234567890123")
+
+	msg := &MessagePublication{
+		TxID:             txID,
+		Timestamp:        time.Unix(int64(1654516425), 0),
+		Nonce:            123456,
+		Sequence:         789101112131415,
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddress,
+		Payload:          []byte("Hello, World!"),
+		ConsistencyLevel: 32,
+	}
+
+	_, err = msg.Marshal()
+	assert.ErrorContains(t, err, "TxID too long")
+}
+
 func TestSerializeAndDeserializeOfMessagePublicationWithBigPayload(t *testing.T) {
 	tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
 	require.NoError(t, err)
@@ -86,7 +198,7 @@ func TestSerializeAndDeserializeOfMessagePublicationWithBigPayload(t *testing.T)
 	}
 
 	msg1 := &MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -127,7 +239,53 @@ func TestMarshalUnmarshalJSONOfMessagePublication(t *testing.T) {
 	payloadBytes1 := encodePayloadBytes(payload1)
 
 	msg1 := &MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
+		Timestamp:        time.Unix(int64(1654516425), 0),
+		Nonce:            123456,
+		Sequence:         789101112131415,
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddress,
+		Payload:          payloadBytes1,
+		ConsistencyLevel: 32,
+	}
+
+	bytes, err := msg1.MarshalJSON()
+	require.NoError(t, err)
+
+	var msg2 MessagePublication
+	err = msg2.UnmarshalJSON(bytes)
+	require.NoError(t, err)
+	assert.Equal(t, *msg1, msg2)
+
+	payload2, err := vaa.DecodeTransferPayloadHdr(msg2.Payload)
+	require.NoError(t, err)
+
+	assert.Equal(t, *payload1, *payload2)
+}
+
+func TestMarshalUnmarshalJSONOfMessagePublicationWithArbitraryTxID(t *testing.T) {
+	originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") //nolint:gosec
+	require.NoError(t, err)
+
+	targetAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	payload1 := &vaa.TransferPayloadHdr{
+		Type:          0x01,
+		Amount:        big.NewInt(27000000000),
+		OriginAddress: originAddress,
+		OriginChain:   vaa.ChainIDEthereum,
+		TargetAddress: targetAddress,
+		TargetChain:   vaa.ChainIDPolygon,
+	}
+
+	payloadBytes1 := encodePayloadBytes(payload1)
+
+	msg1 := &MessagePublication{
+		TxID:             []byte("This is some arbitrary string with just some random junk in it. This is to prove that the TxID does not have to be a ethCommon.Hash"),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -220,3 +378,23 @@ func TestMessageID(t *testing.T) {
 		})
 	}
 }
+
+func TestTxIDStringMatchesHashToString(t *testing.T) {
+	tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	expectedHashID := "0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"
+
+	msg := &MessagePublication{
+		TxID:             eth_common.HexToHash(expectedHashID).Bytes(),
+		Timestamp:        time.Unix(int64(1654516425), 0),
+		Nonce:            123456,
+		Sequence:         789101112131415,
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddress,
+		Payload:          []byte("Hello, World!"),
+		ConsistencyLevel: 32,
+	}
+
+	assert.Equal(t, expectedHashID, msg.TxIDString())
+}

+ 121 - 50
node/pkg/db/accountant.go

@@ -3,10 +3,13 @@ package db
 import (
 	"encoding/json"
 	"fmt"
+	"time"
 
 	"github.com/certusone/wormhole/node/pkg/common"
 	"github.com/dgraph-io/badger/v3"
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 
+	ethCommon "github.com/ethereum/go-ethereum/common"
 	"go.uber.org/zap"
 )
 
@@ -31,10 +34,10 @@ func (d *MockAccountantDB) AcctGetData(logger *zap.Logger) ([]*common.MessagePub
 	return nil, nil
 }
 
-const acctOldPendingTransfer = "ACCT:PXFER:"
+const acctOldPendingTransfer = "ACCT:PXFER2:"
 const acctOldPendingTransferLen = len(acctOldPendingTransfer)
 
-const acctPendingTransfer = "ACCT:PXFER2:"
+const acctPendingTransfer = "ACCT:PXFER3:"
 const acctPendingTransferLen = len(acctPendingTransfer)
 
 const acctMinMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")
@@ -59,6 +62,11 @@ func acctIsPendingTransfer(keyBytes []byte) bool {
 func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication, error) {
 	pendingTransfers := []*common.MessagePublication{}
 	var err error
+
+	if err = d.convertOldTransfersToNewFormat(logger); err != nil {
+		return pendingTransfers, fmt.Errorf("failed to convert old pending transfers to the new format: %w", err)
+	}
+
 	{
 		prefixBytes := []byte(acctPendingTransfer)
 		err = d.db.View(func(txn *badger.Txn) error {
@@ -84,7 +92,7 @@ func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication
 
 					pendingTransfers = append(pendingTransfers, &pt)
 				} else {
-					return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key))
+					return fmt.Errorf("failed to load accountant pending transfer, unexpected key '%s'", string(key))
 				}
 			}
 
@@ -92,53 +100,6 @@ func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication
 		})
 	}
 
-	// Any pending transfers in the old format are long since obsolete. Just delete them.
-	if err == nil {
-		oldPendingTransfers := []string{}
-		prefixBytes := []byte(acctOldPendingTransfer)
-		err = d.db.View(func(txn *badger.Txn) error {
-			opts := badger.DefaultIteratorOptions
-			opts.PrefetchSize = 10
-			it := txn.NewIterator(opts)
-			defer it.Close()
-			for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() {
-				item := it.Item()
-				key := item.Key()
-				val, err := item.ValueCopy(nil)
-				if err != nil {
-					return err
-				}
-
-				if acctIsOldPendingTransfer(key) {
-					pt, err := common.UnmarshalOldMessagePublicationBeforeIsReobservation(val)
-					if err != nil {
-						logger.Error("failed to unmarshal old pending transfer for key", zap.String("key", string(key[:])), zap.Error(err))
-						continue
-					}
-
-					oldPendingTransfers = append(oldPendingTransfers, pt.MessageIDString())
-				} else {
-					return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key))
-				}
-			}
-
-			return nil
-		})
-
-		if err == nil && len(oldPendingTransfers) != 0 {
-			for _, pt := range oldPendingTransfers {
-				key := acctOldPendingTransferMsgID(pt)
-				logger.Info("deleting obsolete pending transfer", zap.String("msgId", pt), zap.String("key", string(key)))
-				if err := d.db.Update(func(txn *badger.Txn) error {
-					err := txn.Delete(key)
-					return err
-				}); err != nil {
-					return pendingTransfers, fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pt, err)
-				}
-			}
-		}
-	}
-
 	return pendingTransfers, err
 }
 
@@ -170,3 +131,113 @@ func (d *Database) AcctDeletePendingTransfer(msgId string) error {
 
 	return nil
 }
+
+//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// The code below here is used to read and convert old Pending transfers. Once the db has been migrated away from those, this can be deleted.
+//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+// OldMessagePublication is used to unmarshal old JSON which has the TxHash rather than the TxID.
+type OldMessagePublication struct {
+	TxHash    ethCommon.Hash
+	Timestamp time.Time
+
+	Nonce            uint32
+	Sequence         uint64
+	ConsistencyLevel uint8
+	EmitterChain     vaa.ChainID
+	EmitterAddress   vaa.Address
+	Payload          []byte
+	IsReobservation  bool
+	Unreliable       bool
+}
+
+func (msg *OldMessagePublication) UnmarshalJSON(data []byte) error {
+	type Alias OldMessagePublication
+	aux := &struct {
+		Timestamp int64
+		*Alias
+	}{
+		Alias: (*Alias)(msg),
+	}
+	if err := json.Unmarshal(data, &aux); err != nil {
+		return err
+	}
+	msg.Timestamp = time.Unix(aux.Timestamp, 0)
+	return nil
+}
+
+// convertOldToNew converts an OldMessagePublication to a MessagePublication.
+func convertOldToNew(old *OldMessagePublication) *common.MessagePublication {
+	return &common.MessagePublication{
+		TxID:             old.TxHash.Bytes(),
+		Timestamp:        old.Timestamp,
+		Nonce:            old.Nonce,
+		Sequence:         old.Sequence,
+		EmitterChain:     old.EmitterChain,
+		EmitterAddress:   old.EmitterAddress,
+		Payload:          old.Payload,
+		ConsistencyLevel: old.ConsistencyLevel,
+		IsReobservation:  old.IsReobservation,
+		Unreliable:       old.Unreliable,
+	}
+}
+
+// convertOldTransfersToNewFormat loads any pending transfers in the old format, writes them in the new format and deletes the old ones.
+func (d *Database) convertOldTransfersToNewFormat(logger *zap.Logger) error {
+	pendingTransfers := []*common.MessagePublication{}
+	prefixBytes := []byte(acctOldPendingTransfer)
+	err := d.db.View(func(txn *badger.Txn) error {
+		opts := badger.DefaultIteratorOptions
+		opts.PrefetchSize = 10
+		it := txn.NewIterator(opts)
+		defer it.Close()
+		for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() {
+			item := it.Item()
+			key := item.Key()
+			val, err := item.ValueCopy(nil)
+			if err != nil {
+				return err
+			}
+
+			if acctIsOldPendingTransfer(key) {
+				var pt OldMessagePublication
+				err := json.Unmarshal(val, &pt)
+				if err != nil {
+					return fmt.Errorf("failed to unmarshal old pending transfer for key '%s': %w", string(key), err)
+				}
+
+				pendingTransfers = append(pendingTransfers, convertOldToNew(&pt))
+			} else {
+				return fmt.Errorf("failed to convert old accountant pending transfer, unexpected key '%s'", string(key))
+			}
+		}
+
+		return nil
+	})
+
+	if err != nil {
+		return err
+	}
+
+	if len(pendingTransfers) != 0 {
+		for _, pt := range pendingTransfers {
+			logger.Info("converting old pending transfer to new format", zap.String("msgId", pt.MessageIDString()))
+			if err := d.AcctStorePendingTransfer(pt); err != nil {
+				return fmt.Errorf("failed to convert old pending transfer for key [%v]: %w", pt, err)
+			}
+		}
+
+		for _, pt := range pendingTransfers {
+			key := acctOldPendingTransferMsgID(pt.MessageIDString())
+			logger.Info("deleting old pending transfer", zap.String("msgId", pt.MessageIDString()), zap.String("key", string(key)))
+			if err := d.db.Update(func(txn *badger.Txn) error {
+				err := txn.Delete(key)
+				return err
+			}); err != nil {
+				return fmt.Errorf("failed to delete old pending transfer for key [%v]: %w", pt, err)
+			}
+		}
+	}
+
+	return nil
+}

+ 115 - 35
node/pkg/db/accountant_test.go

@@ -1,9 +1,10 @@
 package db
 
 import (
-	"bytes"
-	"encoding/binary"
+	"encoding/json"
+	"fmt"
 	"os"
+	"sort"
 	"testing"
 	"time"
 
@@ -16,6 +17,9 @@ import (
 	"github.com/stretchr/testify/require"
 
 	"go.uber.org/zap"
+	"go.uber.org/zap/zapcore"
+	"go.uber.org/zap/zaptest"
+	"go.uber.org/zap/zaptest/observer"
 )
 
 func TestAcctPendingTransferMsgID(t *testing.T) {
@@ -23,7 +27,7 @@ func TestAcctPendingTransferMsgID(t *testing.T) {
 	require.NoError(t, err)
 
 	msg1 := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -33,22 +37,22 @@ func TestAcctPendingTransferMsgID(t *testing.T) {
 		ConsistencyLevel: 16,
 	}
 
-	assert.Equal(t, []byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctOldPendingTransferMsgID(msg1.MessageIDString()))
-	assert.Equal(t, []byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctPendingTransferMsgID(msg1.MessageIDString()))
+	assert.Equal(t, []byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctOldPendingTransferMsgID(msg1.MessageIDString()))
+	assert.Equal(t, []byte("ACCT:PXFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctPendingTransferMsgID(msg1.MessageIDString()))
 }
 
 func TestAcctIsPendingTransfer(t *testing.T) {
-	assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:")))
-	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:1")))
-	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:1/1/1")))
-	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
-	assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
+	assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER3:")))
+	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER3:1")))
+	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER3:1/1/1")))
+	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
+	assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
 	assert.Equal(t, false, acctIsPendingTransfer([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 	assert.Equal(t, false, acctIsPendingTransfer([]byte{0x01, 0x02, 0x03, 0x04}))
 	assert.Equal(t, false, acctIsPendingTransfer([]byte{}))
-	assert.Equal(t, true, acctIsOldPendingTransfer([]byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, acctIsOldPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, true, acctIsOldPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, acctIsOldPendingTransfer([]byte("ACCT:PXFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 }
 
 func TestAcctStoreAndDeletePendingTransfers(t *testing.T) {
@@ -60,7 +64,7 @@ func TestAcctStoreAndDeletePendingTransfers(t *testing.T) {
 	require.NoError(t, err)
 
 	msg1 := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -71,7 +75,7 @@ func TestAcctStoreAndDeletePendingTransfers(t *testing.T) {
 	}
 
 	msg2 := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123457,
 		Sequence:         789101112131416,
@@ -99,7 +103,7 @@ func TestAcctStoreAndDeletePendingTransfers(t *testing.T) {
 
 	// Delete something that doesn't exist.
 	msg3 := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123457,
 		Sequence:         789101112131417,
@@ -146,7 +150,7 @@ func TestAcctGetData(t *testing.T) {
 	require.NoError(t, err)
 
 	msg1 := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -157,7 +161,7 @@ func TestAcctGetData(t *testing.T) {
 	}
 
 	msg2 := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123457,
 		Sequence:         789101112131416,
@@ -189,13 +193,13 @@ func TestAcctGetData(t *testing.T) {
 	assert.Equal(t, *msg2, *pendings[1])
 }
 
-func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) {
+func TestAcctLoadingWhereOldPendingGetsUpdated(t *testing.T) {
 	dbPath := t.TempDir()
 	db := OpenDb(zap.NewNop(), &dbPath)
 	defer db.Close()
 	defer os.Remove(dbPath)
 
-	logger := zap.NewNop()
+	logger, zapObserver := setupLogsCapture(t)
 
 	tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
 	require.NoError(t, err)
@@ -203,7 +207,7 @@ func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) {
 	now := time.Unix(time.Now().Unix(), 0)
 
 	// Write the first pending event in the old format.
-	pending1 := &common.MessagePublication{
+	pending1 := &OldMessagePublication{
 		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        now,
 		Nonce:            123456,
@@ -215,14 +219,14 @@ func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) {
 		// IsReobservation will not be serialized. It should be set to false on reload.
 	}
 
-	db.acctStoreOldPendingTransfer(t, pending1)
+	err = db.acctStoreOldPendingTransfer(pending1)
 	require.Nil(t, err)
 
 	now2 := now.Add(time.Second * 5)
 
 	// Write the second one in the new format.
 	pending2 := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        now2,
 		Nonce:            123456,
 		Sequence:         789101112131418,
@@ -236,35 +240,111 @@ func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) {
 	err = db.AcctStorePendingTransfer(pending2)
 	require.Nil(t, err)
 
-	// When we reload the data, the first one should get dropped, so we should get back only one.
+	// When we reload the data, the first one should get converted and returned here.
 	pendings, err := db.AcctGetData(logger)
 	require.NoError(t, err)
-	require.Equal(t, 1, len(pendings))
+	require.Equal(t, 2, len(pendings))
+
+	// Verify that we converted and deleted the old one.
+	loggedEntries := zapObserver.FilterMessage("converting old pending transfer to new format").All()
+	require.Equal(t, 1, len(loggedEntries))
+	loggedEntries = zapObserver.FilterMessage("deleting old pending transfer").All()
+	require.Equal(t, 1, len(loggedEntries))
 
-	assert.Equal(t, *pending2, *pendings[0])
+	sort.SliceStable(pendings, func(i, j int) bool {
+		return pendings[i].Timestamp.Before(pendings[j].Timestamp)
+	})
+
+	assert.Equal(t, *convertOldToNew(pending1), *pendings[0])
+	assert.Equal(t, *pending2, *pendings[1])
 
-	// Make sure we can still reload things after deleting the old one.
+	// Make sure we can still reload things after updating the old one.
+	logger, zapObserver = setupLogsCapture(t)
 	pendings2, err := db.AcctGetData(logger)
 
 	require.Nil(t, err)
-	require.Equal(t, 1, len(pendings2))
+	require.Equal(t, 2, len(pendings2))
 
-	assert.Equal(t, pending2, pendings2[0])
-}
+	// Verify that we didn't do any conversions the second time.
+	loggedEntries = zapObserver.FilterMessage("converting old pending transfer to new format").All()
+	require.Equal(t, 0, len(loggedEntries))
+	loggedEntries = zapObserver.FilterMessage("deleting old pending transfer").All()
+	require.Equal(t, 0, len(loggedEntries))
 
-func (d *Database) acctStoreOldPendingTransfer(t *testing.T, msg *common.MessagePublication) {
-	buf := new(bytes.Buffer)
+	assert.Equal(t, *convertOldToNew(pending1), *pendings[0])
+	assert.Equal(t, *pending2, *pendings[1])
 
-	b := marshalOldMessagePublication(msg)
+	sort.SliceStable(pendings, func(i, j int) bool {
+		return pendings[i].Timestamp.Before(pendings[j].Timestamp)
+	})
 
-	vaa.MustWrite(buf, binary.BigEndian, b)
+	assert.Equal(t, *convertOldToNew(pending1), *pendings[0])
+	assert.Equal(t, *pending2, *pendings[1])
+}
+
+// setupLogsCapture is a helper function for making a zap logger/observer combination for testing that certain logs have been made
+func setupLogsCapture(t testing.TB) (*zap.Logger, *observer.ObservedLogs) {
+	t.Helper()
+	observedCore, observedLogs := observer.New(zap.InfoLevel)
+	consoleLogger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))
+	parentLogger := zap.New(zapcore.NewTee(observedCore, consoleLogger.Core()))
+	return parentLogger, observedLogs
+}
+
+func (d *Database) acctStoreOldPendingTransfer(msg *OldMessagePublication) error {
+	b, _ := json.Marshal(msg)
 
 	err := d.db.Update(func(txn *badger.Txn) error {
-		if err := txn.Set(acctOldPendingTransferMsgID(msg.MessageIDString()), buf.Bytes()); err != nil {
+		if err := txn.Set(acctOldPendingTransferMsgID(msg.MessageIDString()), b); err != nil {
 			return err
 		}
 		return nil
 	})
 
+	if err != nil {
+		return fmt.Errorf("failed to commit old accountant pending transfer for tx %s: %w", msg.MessageIDString(), err)
+	}
+
+	return nil
+}
+
+// The standard json Marshal / Unmarshal of time.Time gets confused between local and UTC time.
+func (msg *OldMessagePublication) MarshalJSON() ([]byte, error) {
+	type Alias OldMessagePublication
+	return json.Marshal(&struct {
+		Timestamp int64
+		*Alias
+	}{
+		Timestamp: msg.Timestamp.Unix(),
+		Alias:     (*Alias)(msg),
+	})
+}
+
+func (msg *OldMessagePublication) MessageIDString() string {
+	return fmt.Sprintf("%v/%v/%v", uint16(msg.EmitterChain), msg.EmitterAddress, msg.Sequence)
+}
+
+func TestUnmarshalOldJSON(t *testing.T) {
+	jsn := `
+	{
+	  "TxID": "SGVsbG8=",
+		"Timestamp": 1654516425,
+		"TxHash": "0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063",
+		"Nonce": 123456,
+		"Sequence": 789101112131415,
+		"ConsistencyLevel": 32,
+		"EmitterChain": 2,
+		"EmitterAddress": "000000000000000000000000707f9118e33a9b8998bea41dd0d46f38bb963fc8",
+		"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZJU04AAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAHB/kRjjOpuJmL6kHdDUbzi7lj/IAAU=",
+		"IsReobservation": false,
+		"Unreliable": false
+	}
+	`
+
+	var oldMsg OldMessagePublication
+	err := json.Unmarshal([]byte(jsn), &oldMsg)
 	require.NoError(t, err)
+
+	newMsg := convertOldToNew(&oldMsg)
+	assert.Equal(t, oldMsg.TxHash.String(), newMsg.TxIDString())
 }

+ 5 - 5
node/pkg/db/governor.go

@@ -268,7 +268,7 @@ func UnmarshalPendingTransfer(data []byte, isOld bool) (*PendingTransfer, error)
 
 	var msg *common.MessagePublication
 	if isOld {
-		msg, err = common.UnmarshalOldMessagePublicationBeforeIsReobservation(buf)
+		msg, err = common.UnmarshalOldMessagePublicationWithTxHash(buf)
 	} else {
 		msg, err = common.UnmarshalMessagePublication(buf)
 	}
@@ -287,13 +287,13 @@ const transfer = "GOV:XFER3:"
 const transferLen = len(transfer)
 
 // Since we are changing the DB format of pending entries, we will use a new tag in the pending key field.
-// The first time we run this new release, any existing entries with the "GOV:PENDING2" tag will get converted
-// to the new format and given the "GOV:PENDING3" format. In a future release, the "GOV:PENDING2" code can be deleted.
+// The first time we run this new release, any existing entries with the old tag will get converted
+// to the new format and the new tag. In a future release, code for the old format can be deleted.
 
-const oldPending = "GOV:PENDING2:"
+const oldPending = "GOV:PENDING3:"
 const oldPendingLen = len(oldPending)
 
-const pending = "GOV:PENDING3:"
+const pending = "GOV:PENDING4:"
 const pendingLen = len(pending)
 
 const minMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")

+ 65 - 30
node/pkg/db/governor_test.go

@@ -65,7 +65,7 @@ func TestPendingMsgID(t *testing.T) {
 	require.NoError(t, err)
 
 	msg1 := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -75,7 +75,7 @@ func TestPendingMsgID(t *testing.T) {
 		ConsistencyLevel: 16,
 	}
 
-	assert.Equal(t, []byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1))
+	assert.Equal(t, []byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1))
 }
 
 func TestTransferMsgID(t *testing.T) {
@@ -120,18 +120,18 @@ func TestIsTransfer(t *testing.T) {
 }
 
 func TestIsPendingMsg(t *testing.T) {
-	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 	assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/1/1")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
-	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1/1/1")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
+	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING4:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 	assert.Equal(t, false, IsPendingMsg([]byte{0x01, 0x02, 0x03, 0x04}))
 	assert.Equal(t, false, IsPendingMsg([]byte{}))
-	assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 }
 
 func TestGetChainGovernorData(t *testing.T) {
@@ -228,7 +228,7 @@ func TestStorePendingMsg(t *testing.T) {
 	assert.NoError(t, err2)
 
 	msg := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -253,7 +253,7 @@ func TestDeletePendingMsg(t *testing.T) {
 	assert.NoError(t, err2)
 
 	msg := &common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -283,7 +283,7 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {
 	require.NoError(t, err)
 
 	msg := common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -307,7 +307,7 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {
 
 	assert.Equal(t, pending1, pending2)
 
-	expectedPendingKey := "GOV:PENDING3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
+	expectedPendingKey := "GOV:PENDING4:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
 	assert.Equal(t, expectedPendingKey, string(PendingMsgID(&pending2.Msg)))
 }
 
@@ -361,7 +361,7 @@ func TestStoreAndReloadTransfers(t *testing.T) {
 	pending1 := &PendingTransfer{
 		ReleaseTime: time.Unix(int64(1654516435+72*60*60), 0),
 		Msg: common.MessagePublication{
-			TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+			TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 			Timestamp:        time.Unix(int64(1654516435), 0),
 			Nonce:            123456,
 			Sequence:         789101112131417,
@@ -378,7 +378,7 @@ func TestStoreAndReloadTransfers(t *testing.T) {
 	pending2 := &PendingTransfer{
 		ReleaseTime: time.Unix(int64(1654516440+72*60*60), 0),
 		Msg: common.MessagePublication{
-			TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+			TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 			Timestamp:        time.Unix(int64(1654516440), 0),
 			Nonce:            123456,
 			Sequence:         789101112131418,
@@ -524,7 +524,7 @@ func TestUnmarshalPendingTransferFailures(t *testing.T) {
 	require.NoError(t, err)
 
 	msg := common.MessagePublication{
-		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 		Timestamp:        time.Unix(int64(1654516425), 0),
 		Nonce:            123456,
 		Sequence:         789101112131415,
@@ -585,13 +585,14 @@ func (d *Database) storeOldPendingMsg(t *testing.T, p *PendingTransfer) {
 func marshalOldMessagePublication(msg *common.MessagePublication) []byte {
 	buf := new(bytes.Buffer)
 
-	buf.Write(msg.TxHash[:])
+	buf.Write(msg.TxID[:])
 	vaa.MustWrite(buf, binary.BigEndian, uint32(msg.Timestamp.Unix()))
 	vaa.MustWrite(buf, binary.BigEndian, msg.Nonce)
 	vaa.MustWrite(buf, binary.BigEndian, msg.Sequence)
 	vaa.MustWrite(buf, binary.BigEndian, msg.ConsistencyLevel)
 	vaa.MustWrite(buf, binary.BigEndian, msg.EmitterChain)
 	buf.Write(msg.EmitterAddress[:])
+	vaa.MustWrite(buf, binary.BigEndian, msg.IsReobservation)
 	buf.Write(msg.Payload)
 
 	return buf.Bytes()
@@ -674,13 +675,12 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 	err = db.StoreTransfer(newXfer2)
 	require.NoError(t, err)
 
-	now := time.Unix(time.Now().Unix(), 0)
-
 	// Write the first pending event in the old format.
+	now := time.Unix(time.Now().Unix(), 0)
 	pending1 := &PendingTransfer{
 		ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.,
 		Msg: common.MessagePublication{
-			TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+			TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
 			Timestamp:        now,
 			Nonce:            123456,
 			Sequence:         789101112131417,
@@ -695,14 +695,13 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 	db.storeOldPendingMsg(t, pending1)
 	require.NoError(t, err)
 
-	now2 := now.Add(time.Second * 5)
-
 	// Write the second one in the new format.
+	now = now.Add(time.Second * 5)
 	pending2 := &PendingTransfer{
-		ReleaseTime: now2.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.
+		ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.
 		Msg: common.MessagePublication{
-			TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-			Timestamp:        now2,
+			TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
+			Timestamp:        now,
 			Nonce:            123456,
 			Sequence:         789101112131418,
 			EmitterChain:     vaa.ChainIDEthereum,
@@ -716,12 +715,39 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 	err = db.StorePendingMsg(pending2)
 	require.NoError(t, err)
 
-	logger := zap.NewNop()
+	// Write the third pending event in the old format.
+	now = now.Add(time.Second * 5)
+	pending3 := &PendingTransfer{
+		ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.,
+		Msg: common.MessagePublication{
+			TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(),
+			Timestamp:        now,
+			Nonce:            123456,
+			Sequence:         789101112131419,
+			EmitterChain:     vaa.ChainIDEthereum,
+			EmitterAddress:   ethereumTokenBridgeAddr,
+			Payload:          []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+			ConsistencyLevel: 16,
+			// IsReobservation will not be serialized. It should be set to false on reload.
+		},
+	}
+
+	db.storeOldPendingMsg(t, pending3)
+	require.NoError(t, err)
+
+	logger, zapObserver := setupLogsCapture(t)
+
 	xfers, pendings, err := db.GetChainGovernorDataForTime(logger, now)
 
 	require.NoError(t, err)
 	require.Equal(t, 4, len(xfers))
-	require.Equal(t, 2, len(pendings))
+	require.Equal(t, 3, len(pendings))
+
+	// Verify that we converted the two old pending transfers and the two old completed transfers.
+	loggedEntries := zapObserver.FilterMessage("updating format of database entry for pending vaa").All()
+	require.Equal(t, 2, len(loggedEntries))
+	loggedEntries = zapObserver.FilterMessage("updating format of database entry for completed transfer").All()
+	require.Equal(t, 2, len(loggedEntries))
 
 	sort.SliceStable(xfers, func(i, j int) bool {
 		return xfers[i].Timestamp.Before(xfers[j].Timestamp)
@@ -739,14 +765,23 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 
 	assert.Equal(t, pending1.Msg, pendings[0].Msg)
 	assert.Equal(t, pending2.Msg, pendings[1].Msg)
+	assert.Equal(t, pending3.Msg, pendings[2].Msg)
 
 	// Make sure we can reload the updated pendings.
 
+	logger, zapObserver = setupLogsCapture(t)
+
 	xfers2, pendings2, err := db.GetChainGovernorDataForTime(logger, now)
 
 	require.NoError(t, err)
 	require.Equal(t, 4, len(xfers2))
-	require.Equal(t, 2, len(pendings2))
+	require.Equal(t, 3, len(pendings2))
+
+	// This time we shouldn't have updated anything.
+	loggedEntries = zapObserver.FilterMessage("updating format of database entry for pending vaa").All()
+	require.Equal(t, 0, len(loggedEntries))
+	loggedEntries = zapObserver.FilterMessage("updating format of database entry for completed transfer").All()
+	require.Equal(t, 0, len(loggedEntries))
 
 	sort.SliceStable(xfers2, func(i, j int) bool {
 		return xfers2[i].Timestamp.Before(xfers2[j].Timestamp)

+ 10 - 10
node/pkg/governor/governor.go

@@ -464,7 +464,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 			gov.logger.Info("ignoring duplicate vaa because it is enqueued",
 				zap.String("msgID", msg.MessageIDString()),
 				zap.String("hash", hash),
-				zap.Stringer("txHash", msg.TxHash),
+				zap.String("txID", msg.TxIDString()),
 			)
 			return false, nil
 		}
@@ -472,7 +472,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		gov.logger.Info("allowing duplicate vaa to be published again, but not adding it to the notional value",
 			zap.String("msgID", msg.MessageIDString()),
 			zap.String("hash", hash),
-			zap.Stringer("txHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 		)
 		return true, nil
 	}
@@ -484,7 +484,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		gov.logger.Error("Error when attempting to trim and sum transfers",
 			zap.String("msgID", msg.MessageIDString()),
 			zap.String("hash", hash),
-			zap.Stringer("txHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 			zap.Error(err),
 		)
 		return false, err
@@ -496,7 +496,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		gov.logger.Error("failed to compute value of transfer",
 			zap.String("msgID", msg.MessageIDString()),
 			zap.String("hash", hash),
-			zap.Stringer("txHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 			zap.Error(err),
 		)
 		return false, err
@@ -507,7 +507,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		gov.logger.Error("total value has overflowed",
 			zap.String("msgID", msg.MessageIDString()),
 			zap.String("hash", hash),
-			zap.Stringer("txHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 			zap.Uint64("prevTotalValue", prevTotalValue),
 			zap.Uint64("newTotalValue", newTotalValue),
 		)
@@ -527,7 +527,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 			zap.Stringer("releaseTime", releaseTime),
 			zap.Uint64("bigTransactionSize", emitterChainEntry.bigTransactionSize),
 			zap.String("hash", hash),
-			zap.Stringer("txHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 		)
 	} else if newTotalValue > emitterChainEntry.dailyLimit {
 		enqueueIt = true
@@ -539,7 +539,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 			zap.Stringer("releaseTime", releaseTime),
 			zap.String("msgID", msg.MessageIDString()),
 			zap.String("hash", hash),
-			zap.Stringer("txHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 		)
 	}
 
@@ -550,7 +550,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 			gov.logger.Error("failed to store pending vaa",
 				zap.String("msgID", msg.MessageIDString()),
 				zap.String("hash", hash),
-				zap.Stringer("txHash", msg.TxHash),
+				zap.String("txID", msg.TxIDString()),
 				zap.Error(err),
 			)
 			return false, err
@@ -570,7 +570,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		zap.Uint64("newTotalValue", newTotalValue),
 		zap.String("msgID", msg.MessageIDString()),
 		zap.String("hash", hash),
-		zap.Stringer("txHash", msg.TxHash),
+		zap.String("txID", msg.TxIDString()),
 	)
 
 	dbTransfer := db.Transfer{
@@ -591,7 +591,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		gov.logger.Error("failed to store transfer",
 			zap.String("msgID", msg.MessageIDString()),
 			zap.String("hash", hash), zap.Error(err),
-			zap.Stringer("txHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 		)
 		return false, err
 	}

+ 6 - 6
node/pkg/governor/governor_db.go

@@ -67,7 +67,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
 	if !exists {
 		gov.logger.Error("reloaded pending transfer for unsupported chain, dropping it",
 			zap.String("MsgID", msg.MessageIDString()),
-			zap.Stringer("TxHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 			zap.Stringer("Timestamp", msg.Timestamp),
 			zap.Uint32("Nonce", msg.Nonce),
 			zap.Uint64("Sequence", msg.Sequence),
@@ -81,7 +81,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
 	if msg.EmitterAddress != ce.emitterAddr {
 		gov.logger.Error("reloaded pending transfer for unsupported emitter address, dropping it",
 			zap.String("MsgID", msg.MessageIDString()),
-			zap.Stringer("TxHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 			zap.Stringer("Timestamp", msg.Timestamp),
 			zap.Uint32("Nonce", msg.Nonce),
 			zap.Uint64("Sequence", msg.Sequence),
@@ -96,7 +96,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
 	if err != nil {
 		gov.logger.Error("failed to parse payload for reloaded pending transfer, dropping it",
 			zap.String("MsgID", msg.MessageIDString()),
-			zap.Stringer("TxHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 			zap.Stringer("Timestamp", msg.Timestamp),
 			zap.Uint32("Nonce", msg.Nonce),
 			zap.Uint64("Sequence", msg.Sequence),
@@ -113,7 +113,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
 	if !exists {
 		gov.logger.Error("reloaded pending transfer for unsupported token, dropping it",
 			zap.String("MsgID", msg.MessageIDString()),
-			zap.Stringer("TxHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 			zap.Stringer("Timestamp", msg.Timestamp),
 			zap.Uint32("Nonce", msg.Nonce),
 			zap.Uint64("Sequence", msg.Sequence),
@@ -131,7 +131,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
 	if _, alreadyExists := gov.msgsSeen[hash]; alreadyExists {
 		gov.logger.Error("not reloading pending transfer because it is a duplicate",
 			zap.String("MsgID", msg.MessageIDString()),
-			zap.Stringer("TxHash", msg.TxHash),
+			zap.String("txID", msg.TxIDString()),
 			zap.Stringer("Timestamp", msg.Timestamp),
 			zap.Uint32("Nonce", msg.Nonce),
 			zap.Uint64("Sequence", msg.Sequence),
@@ -146,7 +146,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
 
 	gov.logger.Info("reloaded pending transfer",
 		zap.String("MsgID", msg.MessageIDString()),
-		zap.Stringer("TxHash", msg.TxHash),
+		zap.String("txID", msg.TxIDString()),
 		zap.Stringer("Timestamp", msg.Timestamp),
 		zap.Uint32("Nonce", msg.Nonce),
 		zap.Uint64("Sequence", msg.Sequence),

+ 2 - 2
node/pkg/governor/governor_monitoring.go

@@ -381,7 +381,7 @@ func (gov *ChainGovernor) GetEnqueuedVAAs() []*publicrpcv1.GovernorGetEnqueuedVA
 				Sequence:       pe.dbData.Msg.Sequence,
 				ReleaseTime:    uint32(pe.dbData.ReleaseTime.Unix()),
 				NotionalValue:  value,
-				TxHash:         pe.dbData.Msg.TxHash.String(),
+				TxHash:         pe.dbData.Msg.TxIDString(),
 			})
 		}
 	}
@@ -649,7 +649,7 @@ func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartb
 					Sequence:      pe.dbData.Msg.Sequence,
 					ReleaseTime:   uint32(pe.dbData.ReleaseTime.Unix()),
 					NotionalValue: value,
-					TxHash:        pe.dbData.Msg.TxHash.String(),
+					TxHash:        pe.dbData.Msg.TxIDString(),
 				})
 			}
 		}

+ 44 - 44
node/pkg/governor/governor_test.go

@@ -697,13 +697,13 @@ func newChainGovernorForTestWithLogger(ctx context.Context, logger *zap.Logger)
 	return gov, nil
 }
 
-// Converts a string into a go-ethereum Hash object used as test input.
-func hashFromString(str string) eth_common.Hash {
+// Converts a TxHash string into a byte array to be used as a TxID.
+func hashToTxID(str string) []byte {
 	if (len(str) > 2) && (str[0] == '0') && (str[1] == 'x') {
 		str = str[2:]
 	}
 
-	return eth_common.HexToHash(str)
+	return eth_common.HexToHash(str).Bytes()
 }
 
 func TestVaaForUninterestingEmitterChain(t *testing.T) {
@@ -717,7 +717,7 @@ func TestVaaForUninterestingEmitterChain(t *testing.T) {
 	payload := []byte{1, 97, 97, 97, 97, 97}
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -749,7 +749,7 @@ func TestVaaForUninterestingEmitterAddress(t *testing.T) {
 	payload := []byte{1, 97, 97, 97, 97, 97}
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -782,7 +782,7 @@ func TestVaaForUninterestingPayloadType(t *testing.T) {
 	payload := []byte{2, 97, 97, 97, 97, 97}
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -886,7 +886,7 @@ func TestVaaForUninterestingToken(t *testing.T) {
 	tokenBridgeAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -971,7 +971,7 @@ func TestFlowCancelProcessMsgForTimeFullCancel(t *testing.T) {
 
 	// Transfer from Ethereum to Sui via the token bridge
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        transferTime,
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -989,7 +989,7 @@ func TestFlowCancelProcessMsgForTimeFullCancel(t *testing.T) {
 
 	// Transfer from Sui to Ethereum via the token bridge
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0xabc123f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"),
+		TxID:             hashToTxID("0xabc123f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"),
 		Timestamp:        transferTime,
 		Nonce:            uint32(2),
 		Sequence:         uint64(2),
@@ -1007,7 +1007,7 @@ func TestFlowCancelProcessMsgForTimeFullCancel(t *testing.T) {
 
 	// msg and asset that are NOT flow cancelable
 	msg3 := common.MessagePublication{
-		TxHash:           hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
+		TxID:             hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
 		Timestamp:        time.Unix(int64(transferTime.Unix()+1), 0),
 		Nonce:            uint32(3),
 		Sequence:         uint64(3),
@@ -1204,7 +1204,7 @@ func TestFlowCancelProcessMsgForTimePartialCancel(t *testing.T) {
 
 	// Transfer from Ethereum to Sui via the token bridge
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        transferTime,
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -1222,7 +1222,7 @@ func TestFlowCancelProcessMsgForTimePartialCancel(t *testing.T) {
 
 	// Transfer from Sui to Ethereum via the token bridge
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0xabc123f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"),
+		TxID:             hashToTxID("0xabc123f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"),
 		Timestamp:        transferTime,
 		Nonce:            uint32(2),
 		Sequence:         uint64(2),
@@ -1240,7 +1240,7 @@ func TestFlowCancelProcessMsgForTimePartialCancel(t *testing.T) {
 
 	// msg and asset that are NOT flow cancelable
 	msg3 := common.MessagePublication{
-		TxHash:           hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
+		TxID:             hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
 		Timestamp:        time.Unix(int64(transferTime.Unix()+1), 0),
 		Nonce:            uint32(3),
 		Sequence:         uint64(3),
@@ -1391,7 +1391,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 
 	// The first two transfers should be accepted.
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -1402,7 +1402,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	}
 
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(2),
@@ -1444,7 +1444,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	)
 
 	msg3 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(3),
@@ -1467,7 +1467,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 
 	// But a small one should still go through.
 	msg4 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(4),
@@ -1517,7 +1517,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	)
 
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -1549,7 +1549,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	)
 
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -1581,7 +1581,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	)
 
 	msg3 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -1613,7 +1613,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	)
 
 	msg4 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -1744,7 +1744,7 @@ func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) {
 
 	// First message: consume most of the dailyLimit for the emitter chain
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
+		TxID:             hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
 		Timestamp:        time.Unix(int64(transferTime.Unix()+1), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -1762,7 +1762,7 @@ func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) {
 
 	// Second message: This transfer gets queued because the limit is exhausted
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
+		TxID:             hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
 		Timestamp:        time.Unix(int64(transferTime.Unix()+2), 0),
 		Nonce:            uint32(2),
 		Sequence:         uint64(2),
@@ -1781,7 +1781,7 @@ func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) {
 	// Third message: Incoming flow cancelling transfer to the emitter chain for the previous messages. This
 	// reduces the Governor usage for that chain.
 	msg3 := common.MessagePublication{
-		TxHash:           hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
+		TxID:             hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"),
 		Timestamp:        time.Unix(int64(transferTime.Unix()+3), 0),
 		Nonce:            uint32(3),
 		Sequence:         uint64(3),
@@ -1973,7 +1973,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 
 	// The first VAA should be accepted.
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -2003,7 +2003,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 
 	// And so should the second.
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -2033,7 +2033,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 
 	// But the third, big one should be queued up.
 	msg3 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -2063,7 +2063,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 
 	// A fourth, smaller, but still too big one, should get enqueued.
 	msg4 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -2093,7 +2093,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 
 	// A fifth, smaller, but still too big one, should also get enqueued.
 	msg5 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -2123,7 +2123,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 
 	// A sixth, big one should also get enqueued.
 	msg6 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -2223,7 +2223,7 @@ func TestNumDaysForReleaseTimerReset(t *testing.T) {
 
 	// message that, when processed, should exceed the big transfer size
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        messageTimestamp,
 		Nonce:            uint32(1),
 		Sequence:         uint64(3),
@@ -2284,7 +2284,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 
 	// The first small transfer should be accepted.
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -2314,7 +2314,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 
 	// And so should the second.
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(2),
@@ -2344,7 +2344,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 
 	// But the third big one should get enqueued.
 	msg3 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(3),
@@ -2501,7 +2501,7 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
 
 	// Submit a small transfer that will get enqueued due to the low daily limit.
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -2606,7 +2606,7 @@ func TestTransferPayloadTooShort(t *testing.T) {
 	payloadBytes1 = payloadBytes1[0 : len(payloadBytes1)-1]
 
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -2697,7 +2697,7 @@ func TestDontReloadDuplicates(t *testing.T) {
 	pending1 := &db.PendingTransfer{
 		ReleaseTime: now.Add(time.Hour * 24),
 		Msg: common.MessagePublication{
-			TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+			TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 			Timestamp:        time.Unix(int64(1654543099), 0),
 			Nonce:            uint32(1),
 			Sequence:         uint64(200),
@@ -2712,7 +2712,7 @@ func TestDontReloadDuplicates(t *testing.T) {
 	pending2 := &db.PendingTransfer{
 		ReleaseTime: now.Add(time.Hour * 24),
 		Msg: common.MessagePublication{
-			TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+			TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 			Timestamp:        time.Unix(int64(1654543099), 0),
 			Nonce:            uint32(1),
 			Sequence:         uint64(201),
@@ -2963,7 +2963,7 @@ func TestReobservationOfPublishedMsg(t *testing.T) {
 
 	// The first transfer should be accepted.
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -3026,7 +3026,7 @@ func TestReobservationOfEnqueued(t *testing.T) {
 
 	// A big transfer should get enqueued.
 	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -3088,7 +3088,7 @@ func TestReusedMsgIdWithDifferentPayloadGetsProcessed(t *testing.T) {
 
 	// The first transfer should be accepted.
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -3118,7 +3118,7 @@ func TestReusedMsgIdWithDifferentPayloadGetsProcessed(t *testing.T) {
 
 	// A second message with the same msgId but a different payload should also get published and apply to the notional value.
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -3291,7 +3291,7 @@ func TestPendingTransferWithBadPayloadGetsDroppedNotReleased(t *testing.T) {
 
 	// Create two big transactions.
 	msg1 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
 		Sequence:         uint64(1),
@@ -3308,7 +3308,7 @@ func TestPendingTransferWithBadPayloadGetsDroppedNotReleased(t *testing.T) {
 	}
 
 	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		TxID:             hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(2),
 		Sequence:         uint64(2),

+ 6 - 4
node/pkg/node/node_test.go

@@ -374,8 +374,9 @@ var someMsgEmitterChain vaa.ChainID = vaa.ChainIDSolana
 
 func someMessage() *common.MessagePublication {
 	someMsgSequenceCounter++
+	txID := [32]byte{byte(someMsgSequenceCounter % 8), byte(someMsgSequenceCounter / 8), 3}
 	return &common.MessagePublication{
-		TxHash:           [32]byte{byte(someMsgSequenceCounter % 8), byte(someMsgSequenceCounter / 8), 3},
+		TxID:             txID[:],
 		Timestamp:        randomTime(),
 		Nonce:            math_rand.Uint32(), //nolint
 		Sequence:         someMsgSequenceCounter,
@@ -439,8 +440,9 @@ func governedMsg(shouldBeDelayed bool) *common.MessagePublication {
 	)
 
 	tokenBridgeSequenceCounter++
+	txID := [32]byte{byte(tokenBridgeSequenceCounter % 8), byte(tokenBridgeSequenceCounter / 8), 3, 1, 10, 76}
 	return &common.MessagePublication{
-		TxHash:           [32]byte{byte(tokenBridgeSequenceCounter % 8), byte(tokenBridgeSequenceCounter / 8), 3, 1, 10, 76},
+		TxID:             txID[:],
 		Timestamp:        randomTime(),
 		Nonce:            math_rand.Uint32(), //nolint
 		Sequence:         tokenBridgeSequenceCounter,
@@ -458,7 +460,7 @@ func makeObsDb(tc []testCase) mock.ObservationDb {
 		if t.unavailableInReobservation {
 			continue
 		}
-		db[t.msg.TxHash] = t.msg
+		db[eth_common.BytesToHash(t.msg.TxID)] = t.msg
 	}
 	return db
 }
@@ -748,7 +750,7 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) {
 					_, err := adminCs[adminRpcGuardianIndex].SendObservationRequest(queryCtx, &nodev1.SendObservationRequestRequest{
 						ObservationRequest: &gossipv1.ObservationRequest{
 							ChainId: uint32(testCase.msg.EmitterChain),
-							TxHash:  testCase.msg.TxHash[:],
+							TxHash:  testCase.msg.TxID,
 						},
 					})
 					queryCancel()

+ 3 - 3
node/pkg/node/options.go

@@ -353,7 +353,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
 									level = zapcore.ErrorLevel
 								}
 								logger.Log(level, "SECURITY CRITICAL: Received observation from a chain that was not marked as originating from that chain",
-									zap.Stringer("tx", msg.TxHash),
+									zap.String("tx", msg.TxIDString()),
 									zap.Stringer("emitter_address", msg.EmitterAddress),
 									zap.Uint64("sequence", msg.Sequence),
 									zap.Stringer("msgChainId", msg.EmitterChain),
@@ -368,7 +368,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
 									level = zapcore.ErrorLevel
 								}
 								logger.Log(level, "SECURITY ERROR: Received observation with EmitterAddress == 0x00",
-									zap.Stringer("tx", msg.TxHash),
+									zap.String("tx", msg.TxIDString()),
 									zap.Stringer("emitter_address", msg.EmitterAddress),
 									zap.Uint64("sequence", msg.Sequence),
 									zap.Stringer("msgChainId", msg.EmitterChain),
@@ -380,7 +380,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
 									zap.Stringer("emitter_chain", msg.EmitterChain),
 									zap.Stringer("emitter_address", msg.EmitterAddress),
 									zap.Uint32("nonce", msg.Nonce),
-									zap.Stringer("txhash", msg.TxHash),
+									zap.String("txID", msg.TxIDString()),
 									zap.Time("timestamp", msg.Timestamp))
 							} else {
 								g.msgC.writeC <- msg

+ 3 - 3
node/pkg/processor/batch_obs_test.go

@@ -8,7 +8,6 @@ import (
 	"github.com/certusone/wormhole/node/pkg/devnet"
 	"github.com/certusone/wormhole/node/pkg/p2p"
 	gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
-	ethcommon "github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/crypto"
 	pubsub "github.com/libp2p/go-libp2p-pubsub"
 	"github.com/stretchr/testify/assert"
@@ -43,6 +42,7 @@ func TestMarshalSignedObservationBatch(t *testing.T) {
 
 	NumObservations := uint64(p2p.MaxObservationBatchSize)
 	observations := make([]*gossipv1.Observation, 0, NumObservations)
+	txHash := []byte("0123456789012345678901234567890123456789012345678901234567890123") // 64 bytes, the size of a Solana signature.
 	for seqNo := uint64(1); seqNo <= NumObservations; seqNo++ {
 		vaa := getUniqueVAA(seqNo)
 		digest := vaa.SigningDigest()
@@ -52,14 +52,14 @@ func TestMarshalSignedObservationBatch(t *testing.T) {
 		observations = append(observations, &gossipv1.Observation{
 			Hash:      digest.Bytes(),
 			Signature: sig,
-			TxHash:    ethcommon.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
+			TxHash:    txHash,
 			MessageId: vaa.MessageID(),
 		})
 	}
 
 	obsBuf, err := proto.Marshal(observations[0])
 	require.NoError(t, err)
-	assert.Equal(t, 205, len(obsBuf))
+	assert.Equal(t, (173 + len(txHash)), len(obsBuf))
 
 	batch := gossipv1.SignedObservationBatch{
 		Addr:         crypto.PubkeyToAddress(gk.PublicKey).Bytes(),

+ 2 - 2
node/pkg/processor/benchmark_test.go

@@ -194,7 +194,7 @@ func createProcessorForTest(b *testing.B, numVAAs int, ctx context.Context, db *
 func (pd *ProcessorData) createMessagePublication(b *testing.B, sequence uint64) *common.MessagePublication {
 	b.Helper()
 	return &common.MessagePublication{
-		TxHash:           ethCommon.HexToHash(fmt.Sprintf("%064x", sequence)),
+		TxID:             ethCommon.HexToHash(fmt.Sprintf("%064x", sequence)).Bytes(),
 		Timestamp:        time.Now(),
 		Nonce:            42,
 		Sequence:         sequence,
@@ -235,7 +235,7 @@ func (pd *ProcessorData) createObservation(b *testing.B, guardianIdx int, k *com
 	return &gossipv1.Observation{
 		Hash:      digest.Bytes(),
 		Signature: signature,
-		TxHash:    k.TxHash.Bytes(),
+		TxHash:    k.TxID,
 		MessageId: pd.messageID(k.Sequence),
 	}
 }

+ 5 - 5
node/pkg/processor/message.go

@@ -37,7 +37,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
 		p.logger.Warn("dropping observation since we haven't initialized our guardian set yet",
 			zap.String("message_id", k.MessageIDString()),
 			zap.Uint32("nonce", k.Nonce),
-			zap.Stringer("txhash", k.TxHash),
+			zap.String("txID", k.TxIDString()),
 			zap.Time("timestamp", k.Timestamp),
 		)
 		return
@@ -80,8 +80,8 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
 	if p.logger.Core().Enabled(zapcore.DebugLevel) {
 		p.logger.Debug("observed and signed confirmed message publication",
 			zap.String("message_id", k.MessageIDString()),
-			zap.Stringer("txhash", k.TxHash),
-			zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
+			zap.String("txID", k.TxIDString()),
+			zap.String("txID_b58", base58.Encode(k.TxID)),
 			zap.String("hash", hash),
 			zap.Uint32("nonce", k.Nonce),
 			zap.Time("timestamp", k.Timestamp),
@@ -93,7 +93,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
 	}
 
 	// Broadcast the signature.
-	ourObs, msg := p.broadcastSignature(v.MessageID(), k.TxHash.Bytes(), digest, signature, shouldPublishImmediately)
+	ourObs, msg := p.broadcastSignature(v.MessageID(), k.TxID, digest, signature, shouldPublishImmediately)
 
 	// Indicate that we observed this one.
 	observationsReceivedTotal.Inc()
@@ -114,7 +114,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
 
 	// Update our state.
 	s.ourObservation = v
-	s.txHash = k.TxHash.Bytes()
+	s.txHash = k.TxID
 	s.source = v.GetEmitterChain().String()
 	s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs
 	s.signatures[p.ourAddr] = signature

+ 1 - 1
node/pkg/watchers/algorand/watcher.go

@@ -157,7 +157,7 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap.
 
 	for _, obs := range observations {
 		observation := &common.MessagePublication{
-			TxHash:           txHash,
+			TxID:             txHash.Bytes(),
 			Timestamp:        time.Unix(b.TimeStamp, 0),
 			Nonce:            obs.nonce,
 			Sequence:         obs.sequence,

+ 2 - 2
node/pkg/watchers/aptos/watcher.go

@@ -305,7 +305,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq u
 	}
 
 	observation := &common.MessagePublication{
-		TxHash:           txHash,
+		TxID:             txHash.Bytes(),
 		Timestamp:        time.Unix(int64(ts.Uint()), 0),
 		Nonce:            uint32(nonce.Uint()), // uint32
 		Sequence:         sequence.Uint(),
@@ -319,7 +319,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq u
 	aptosMessagesConfirmed.Inc()
 
 	logger.Info("message observed",
-		zap.Stringer("txHash", observation.TxHash),
+		zap.String("txHash", observation.TxIDString()),
 		zap.Time("timestamp", observation.Timestamp),
 		zap.Uint32("nonce", observation.Nonce),
 		zap.Uint64("sequence", observation.Sequence),

+ 1 - 1
node/pkg/watchers/cosmwasm/watcher.go

@@ -511,7 +511,7 @@ func EventsToMessagePublications(contract string, txHash string, events []gjson.
 			continue
 		}
 		messagePublication := &common.MessagePublication{
-			TxHash:           txHashValue,
+			TxID:             txHashValue.Bytes(),
 			Timestamp:        time.Unix(blockTimeInt, 0),
 			Nonce:            uint32(nonceInt),
 			Sequence:         sequenceInt,

+ 1 - 1
node/pkg/watchers/evm/by_transaction.go

@@ -75,7 +75,7 @@ func MessageEventsForTransaction(
 		}
 
 		message := &common.MessagePublication{
-			TxHash:           ev.Raw.TxHash,
+			TxID:             ev.Raw.TxHash.Bytes(),
 			Timestamp:        time.Unix(int64(blockTime), 0),
 			Nonce:            ev.Nonce,
 			Sequence:         ev.Sequence,

+ 18 - 18
node/pkg/watchers/evm/watcher.go

@@ -327,7 +327,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 					if msg.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
 						logger.Info("re-observed message publication transaction, publishing it immediately",
 							zap.String("msgId", msg.MessageIDString()),
-							zap.Stringer("txHash", msg.TxHash),
+							zap.String("txHash", msg.TxIDString()),
 							zap.Uint64("current_block", blockNumberU),
 							zap.Uint64("observed_block", blockNumber),
 						)
@@ -339,7 +339,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 						if safeBlockNumberU == 0 {
 							logger.Error("no safe block number available, ignoring observation request",
 								zap.String("msgId", msg.MessageIDString()),
-								zap.Stringer("txHash", msg.TxHash),
+								zap.String("txHash", msg.TxIDString()),
 							)
 							continue
 						}
@@ -347,7 +347,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 						if blockNumber <= safeBlockNumberU {
 							logger.Info("re-observed message publication transaction",
 								zap.String("msgId", msg.MessageIDString()),
-								zap.Stringer("txHash", msg.TxHash),
+								zap.String("txHash", msg.TxIDString()),
 								zap.Uint64("current_safe_block", safeBlockNumberU),
 								zap.Uint64("observed_block", blockNumber),
 							)
@@ -355,7 +355,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 						} else {
 							logger.Info("ignoring re-observed message publication transaction",
 								zap.String("msgId", msg.MessageIDString()),
-								zap.Stringer("txHash", msg.TxHash),
+								zap.String("txHash", msg.TxIDString()),
 								zap.Uint64("current_safe_block", safeBlockNumberU),
 								zap.Uint64("observed_block", blockNumber),
 							)
@@ -367,7 +367,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 					if blockNumberU == 0 {
 						logger.Error("no block number available, ignoring observation request",
 							zap.String("msgId", msg.MessageIDString()),
-							zap.Stringer("txHash", msg.TxHash),
+							zap.String("txHash", msg.TxIDString()),
 						)
 						continue
 					}
@@ -384,7 +384,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 					if blockNumber <= blockNumberU {
 						logger.Info("re-observed message publication transaction",
 							zap.String("msgId", msg.MessageIDString()),
-							zap.Stringer("txHash", msg.TxHash),
+							zap.String("txHash", msg.TxIDString()),
 							zap.Uint64("current_block", blockNumberU),
 							zap.Uint64("observed_block", blockNumber),
 						)
@@ -392,7 +392,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 					} else {
 						logger.Info("ignoring re-observed message publication transaction",
 							zap.String("msgId", msg.MessageIDString()),
-							zap.Stringer("txHash", msg.TxHash),
+							zap.String("txHash", msg.TxIDString()),
 							zap.Uint64("current_block", blockNumberU),
 							zap.Uint64("observed_block", blockNumber),
 						)
@@ -512,7 +512,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 					if pLock.height <= blockNumberU {
 						msm := time.Now()
 						timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
-						tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash)
+						tx, err := w.ethConn.TransactionReceipt(timeout, eth_common.BytesToHash(pLock.message.TxID))
 						queryLatency.WithLabelValues(w.networkName, "transaction_receipt").Observe(time.Since(msm).Seconds())
 						cancel()
 
@@ -527,7 +527,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 						if tx == nil || err == rpc.ErrNoResult || (err != nil && err.Error() == "not found") {
 							logger.Warn("tx was orphaned",
 								zap.String("msgId", pLock.message.MessageIDString()),
-								zap.Stringer("txHash", pLock.message.TxHash),
+								zap.String("txHash", pLock.message.TxIDString()),
 								zap.Stringer("blockHash", key.BlockHash),
 								zap.Uint64("target_blockNum", pLock.height),
 								zap.Stringer("current_blockNum", ev.Number),
@@ -545,7 +545,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 						if tx.Status != 1 {
 							logger.Error("transaction receipt with non-success status",
 								zap.String("msgId", pLock.message.MessageIDString()),
-								zap.Stringer("txHash", pLock.message.TxHash),
+								zap.String("txHash", pLock.message.TxIDString()),
 								zap.Stringer("blockHash", key.BlockHash),
 								zap.Uint64("target_blockNum", pLock.height),
 								zap.Stringer("current_blockNum", ev.Number),
@@ -563,7 +563,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 								// An error from this "transient" case has persisted for more than MaxWaitConfirmations.
 								logger.Info("observation timed out",
 									zap.String("msgId", pLock.message.MessageIDString()),
-									zap.Stringer("txHash", pLock.message.TxHash),
+									zap.String("txHash", pLock.message.TxIDString()),
 									zap.Stringer("blockHash", key.BlockHash),
 									zap.Uint64("target_blockNum", pLock.height),
 									zap.Stringer("current_blockNum", ev.Number),
@@ -575,7 +575,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 							} else {
 								logger.Warn("transaction could not be fetched",
 									zap.String("msgId", pLock.message.MessageIDString()),
-									zap.Stringer("txHash", pLock.message.TxHash),
+									zap.String("txHash", pLock.message.TxIDString()),
 									zap.Stringer("blockHash", key.BlockHash),
 									zap.Uint64("target_blockNum", pLock.height),
 									zap.Stringer("current_blockNum", ev.Number),
@@ -592,7 +592,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 						if tx.BlockHash != key.BlockHash {
 							logger.Info("tx got dropped and mined in a different block; the message should have been reobserved",
 								zap.String("msgId", pLock.message.MessageIDString()),
-								zap.Stringer("txHash", pLock.message.TxHash),
+								zap.String("txHash", pLock.message.TxIDString()),
 								zap.Stringer("blockHash", key.BlockHash),
 								zap.Uint64("target_blockNum", pLock.height),
 								zap.Stringer("current_blockNum", ev.Number),
@@ -606,7 +606,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 
 						logger.Info("observation confirmed",
 							zap.String("msgId", pLock.message.MessageIDString()),
-							zap.Stringer("txHash", pLock.message.TxHash),
+							zap.String("txHash", pLock.message.TxIDString()),
 							zap.Stringer("blockHash", key.BlockHash),
 							zap.Uint64("target_blockNum", pLock.height),
 							zap.Stringer("current_blockNum", ev.Number),
@@ -832,7 +832,7 @@ func (w *Watcher) getBlockTime(ctx context.Context, blockHash eth_common.Hash) (
 // postMessage creates a message object from a log event and adds it to the pending list for processing.
 func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublished, blockTime uint64) {
 	message := &common.MessagePublication{
-		TxHash:           ev.Raw.TxHash,
+		TxID:             ev.Raw.TxHash.Bytes(),
 		Timestamp:        time.Unix(int64(blockTime), 0),
 		Nonce:            ev.Nonce,
 		Sequence:         ev.Sequence,
@@ -847,7 +847,7 @@ func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublis
 	if message.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
 		logger.Info("found new message publication transaction, publishing it immediately",
 			zap.String("msgId", message.MessageIDString()),
-			zap.Stringer("txHash", message.TxHash),
+			zap.String("txHash", message.TxIDString()),
 			zap.Uint64("blockNum", ev.Raw.BlockNumber),
 			zap.Uint64("latestFinalizedBlock", atomic.LoadUint64(&w.latestFinalizedBlockNumber)),
 			zap.Stringer("blockHash", ev.Raw.BlockHash),
@@ -863,7 +863,7 @@ func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublis
 
 	logger.Info("found new message publication transaction",
 		zap.String("msgId", message.MessageIDString()),
-		zap.Stringer("txHash", message.TxHash),
+		zap.String("txHash", message.TxIDString()),
 		zap.Uint64("blockNum", ev.Raw.BlockNumber),
 		zap.Uint64("latestFinalizedBlock", atomic.LoadUint64(&w.latestFinalizedBlockNumber)),
 		zap.Stringer("blockHash", ev.Raw.BlockHash),
@@ -873,7 +873,7 @@ func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublis
 	)
 
 	key := pendingKey{
-		TxHash:         message.TxHash,
+		TxHash:         eth_common.BytesToHash(message.TxID),
 		BlockHash:      ev.Raw.BlockHash,
 		EmitterAddress: message.EmitterAddress,
 		Sequence:       message.Sequence,

+ 6 - 6
node/pkg/watchers/ibc/watcher.go

@@ -537,7 +537,7 @@ func parseIbcReceivePublishEvent(logger *zap.Logger, desiredContract string, eve
 
 	evt := new(ibcReceivePublishEvent)
 	evt.Msg = new(common.MessagePublication)
-	evt.Msg.TxHash = txHash
+	evt.Msg.TxID = txHash.Bytes()
 
 	evt.ChannelID, err = attributes.GetAsString("channel_id")
 	if err != nil {
@@ -595,7 +595,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs
 	if err != nil {
 		w.logger.Error("query for IBC channel ID failed",
 			zap.String("IbcChannelID", evt.ChannelID),
-			zap.Stringer("TxHash", evt.Msg.TxHash),
+			zap.String("TxID", evt.Msg.TxIDString()),
 			zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
 			zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
 			zap.Uint64("Sequence", evt.Msg.Sequence),
@@ -613,7 +613,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs
 		// Therefore we don't want to return an error here. Restarting won't help.
 		w.logger.Error(fmt.Sprintf("received %s message from unknown IBC channel, dropping observation", observationType),
 			zap.String("IbcChannelID", evt.ChannelID),
-			zap.Stringer("TxHash", evt.Msg.TxHash),
+			zap.String("TxID", evt.Msg.TxIDString()),
 			zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
 			zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
 			zap.Uint64("Sequence", evt.Msg.Sequence),
@@ -631,7 +631,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs
 		w.logger.Debug(fmt.Sprintf("received %s message from an unconfigured chain, dropping observation", observationType),
 			zap.String("IbcChannelID", evt.ChannelID),
 			zap.Stringer("ChainID", mappedChainID),
-			zap.Stringer("TxHash", evt.Msg.TxHash),
+			zap.String("TxID", evt.Msg.TxIDString()),
 			zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
 			zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
 			zap.Uint64("Sequence", evt.Msg.Sequence),
@@ -647,7 +647,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs
 			zap.String("IbcChannelID", evt.ChannelID),
 			zap.Uint16("MappedChainID", uint16(mappedChainID)),
 			zap.Uint16("ExpectedChainID", uint16(ce.chainID)),
-			zap.Stringer("TxHash", evt.Msg.TxHash),
+			zap.String("TxID", evt.Msg.TxIDString()),
 			zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
 			zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
 			zap.Uint64("Sequence", evt.Msg.Sequence),
@@ -662,7 +662,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs
 	w.logger.Info(fmt.Sprintf("%s message detected", observationType),
 		zap.String("IbcChannelID", evt.ChannelID),
 		zap.String("ChainName", ce.chainName),
-		zap.Stringer("TxHash", evt.Msg.TxHash),
+		zap.String("TxID", evt.Msg.TxIDString()),
 		zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
 		zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
 		zap.Uint64("Sequence", evt.Msg.Sequence),

+ 1 - 1
node/pkg/watchers/ibc/watcher_test.go

@@ -55,7 +55,7 @@ func TestParseIbcReceivePublishEvent(t *testing.T) {
 	expectedResult := ibcReceivePublishEvent{
 		ChannelID: "channel-0",
 		Msg: &common.MessagePublication{
-			TxHash:         txHash,
+			TxID:           txHash.Bytes(),
 			EmitterAddress: expectedSender,
 			EmitterChain:   vaa.ChainIDTerra2,
 			Nonce:          1,

+ 1 - 1
node/pkg/watchers/near/tx_processing.go

@@ -235,7 +235,7 @@ func (e *Watcher) processWormholeLog(logger *zap.Logger, _ context.Context, job
 	ts := outcomeBlockHeader.Timestamp
 
 	observation := &common.MessagePublication{
-		TxHash:           txHashEthFormat,
+		TxID:             txHashEthFormat.Bytes(),
 		Timestamp:        time.Unix(int64(ts), 0),
 		Nonce:            pubEvent.Nonce,
 		Sequence:         pubEvent.Seq,

+ 10 - 11
node/pkg/watchers/near/watcher_test.go

@@ -14,7 +14,6 @@ import (
 	gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
 	"github.com/certusone/wormhole/node/pkg/supervisor"
 	mockserver "github.com/certusone/wormhole/node/pkg/watchers/near/nearapi/mock"
-	eth_common "github.com/ethereum/go-ethereum/common"
 	"github.com/stretchr/testify/assert"
 	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 	"go.uber.org/zap"
@@ -231,7 +230,7 @@ func TestWatcherSimple(t *testing.T) {
 		},
 		expectedMsgObserved: []*common.MessagePublication{
 			{
-				TxHash:           eth_common.BytesToHash(txHashBytes),
+				TxID:             txHashBytes,
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,
@@ -273,7 +272,7 @@ func TestWatcherSimple2(t *testing.T) {
 		},
 		expectedMsgObserved: []*common.MessagePublication{
 			{
-				TxHash:           eth_common.BytesToHash(txHashBytes),
+				TxID:             txHashBytes,
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,
@@ -309,7 +308,7 @@ func TestWatcherReobservation(t *testing.T) {
 		},
 		expectedMsgReObserved: []*common.MessagePublication{
 			{
-				TxHash:           eth_common.BytesToHash(txHashBytes),
+				TxID:             txHashBytes,
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,
@@ -359,7 +358,7 @@ func TestWatcherDelayedFinal(t *testing.T) {
 		latestFinalBlocks: lfb,
 		expectedMsgObserved: []*common.MessagePublication{
 			{
-				TxHash:           eth_common.BytesToHash(txHashBytes),
+				TxID:             txHashBytes,
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,
@@ -398,7 +397,7 @@ func TestWatcherDelayedFinalAndGaps(t *testing.T) {
 		},
 		expectedMsgObserved: []*common.MessagePublication{
 			{
-				TxHash:           eth_common.BytesToHash(txHashBytes),
+				TxID:             txHashBytes,
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,
@@ -444,7 +443,7 @@ func TestWatcherSynthetic(t *testing.T) {
 		},
 		expectedMsgReObserved: []*common.MessagePublication{
 			{
-				TxHash:           eth_common.BytesToHash([]byte("_____________________________TX1")),
+				TxID:             []byte("_____________________________TX1"),
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,
@@ -455,7 +454,7 @@ func TestWatcherSynthetic(t *testing.T) {
 				Unreliable:       false,
 			},
 			{
-				TxHash:           eth_common.BytesToHash([]byte("_____________________________TX2")),
+				TxID:             []byte("_____________________________TX2"),
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,
@@ -466,7 +465,7 @@ func TestWatcherSynthetic(t *testing.T) {
 				Unreliable:       false,
 			},
 			{
-				TxHash:           eth_common.BytesToHash([]byte("_____________________________TX3")),
+				TxID:             []byte("_____________________________TX3"),
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,
@@ -537,7 +536,7 @@ func TestWatcherUnfinalized(t *testing.T) {
 		},
 		expectedMsgReObserved: []*common.MessagePublication{
 			{
-				TxHash:           eth_common.BytesToHash([]byte("_____________________________TX1")),
+				TxID:             []byte("_____________________________TX1"),
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,
@@ -548,7 +547,7 @@ func TestWatcherUnfinalized(t *testing.T) {
 				Unreliable:       false,
 			},
 			{
-				TxHash:           eth_common.BytesToHash([]byte("_____________________________TX3")),
+				TxID:             []byte("_____________________________TX3"),
 				EmitterAddress:   portalEmitterAddress(),
 				ConsistencyLevel: 0,
 				EmitterChain:     vaa.ChainIDNear,

+ 1 - 1
node/pkg/watchers/solana/client.go

@@ -969,7 +969,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
 	}
 
 	observation := &common.MessagePublication{
-		TxHash:           txHash,
+		TxID:             txHash.Bytes(),
 		Timestamp:        time.Unix(int64(proposal.SubmissionTime), 0),
 		Nonce:            proposal.Nonce,
 		Sequence:         proposal.Sequence,

+ 2 - 2
node/pkg/watchers/sui/watcher.go

@@ -262,7 +262,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservatio
 	}
 
 	observation := &common.MessagePublication{
-		TxHash:           txHashEthFormat,
+		TxID:             txHashEthFormat.Bytes(),
 		Timestamp:        time.Unix(ts, 0),
 		Nonce:            uint32(*fields.Nonce),
 		Sequence:         seq,
@@ -276,7 +276,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservatio
 	suiMessagesConfirmed.Inc()
 
 	logger.Info("message observed",
-		zap.Stringer("txHash", observation.TxHash),
+		zap.String("txHash", observation.TxIDString()),
 		zap.Time("timestamp", observation.Timestamp),
 		zap.Uint32("nonce", observation.Nonce),
 		zap.Uint64("sequence", observation.Sequence),