Pārlūkot izejas kodu

node: Fix marshaling for MessagePublications (#4428)

- Correct marshaling for MessagePublication to include the newer fields
  added to this struct (Unreliable, VerificationState)
- Implement marshal/unmarshal in new methods on MessagePublication that
  conform to the standard Go marshaling interfaces
- Upgrades the Governor's database methods to use the new format
- Add instructions on how to do a Governor database migration

Previously, these fields were not marshaled but this had little effect
as the MessagePublication was only unmarshaled by the Governor which did
not makes use of the new fields.
However, the Verification State will be required by the Transfer
Verifier and Notary, so it's now necessary to fix marshaling.

Chainlock
- Remove deprecated UnmarshalOldMessagePublicationWithTxHash function
- Replace basic TestUnmarshalError with comprehensive table-driven TestMessagePublicationUnmarshalBinaryErrors
- Add test cases covering all error conditions for new Unmarshal
  function
- Add deprecation comments to legacy test functions

Governor
- Update PendingTransfer to use new MarshalBinary/UnmarshalBinary methods
- Increment version prefixes: GOV:XFER3→XFER4, GOV:PENDING4→PENDING5
- Remove unmarshalOldTransfer function, consolidate to UnmarshalTransfer
- Add comprehensive documentation for Transfer and PendingTransfer types
- Update test expectations for new version prefixes
John Saigle 3 mēneši atpakaļ
vecāks
revīzija
a708838a9d

+ 3 - 0
cspell-custom-words.txt

@@ -113,6 +113,7 @@ Linea
 localnet
 localterra
 lockfiles
+marshal
 merkle
 Metaplex
 mezo
@@ -207,6 +208,7 @@ undercollateralization
 unforgeable
 unichain
 Unichain
+unmarshal
 unnormalize
 untampered
 utest
@@ -229,5 +231,6 @@ wormscan
 wormscanurl
 xlayer
 xpla
+XFER
 XPLA
 Zellic

+ 1 - 0
node/Makefile

@@ -5,3 +5,4 @@ lint:
 .PHONY: test
 test: 
 	go test -v ./...
+	timeout 10s go test -fuzz=FuzzMessagePublicationUnmarshalBinary -fuzztime=5s ./pkg/common || true

+ 315 - 56
node/pkg/common/chainlock.go

@@ -10,13 +10,73 @@ import (
 	"math"
 	"time"
 
-	"github.com/ethereum/go-ethereum/common"
 	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 	"go.uber.org/zap"
 )
 
-const HashLength = 32
-const AddressLength = 32
+const (
+	// TxIDLenMin is the minimum length of a txID.
+	TxIDLenMin = 32
+	// AddressLength is the length of a normalized Wormhole address in bytes.
+	AddressLength = 32
+
+	// Wormhole supports arbitrary payloads due to the variance in transaction and block sizes between chains.
+	// However, during serialization, payload lengths are limited by Go slice length constraints and violations
+	// of these limits can cause panics.
+	// (https://go.dev/src/runtime/slice.go)
+	// This limit is chosen to be large enough to prevent such panics but it should comfortably handle all payloads.
+	// If not, the limit can be increased.
+	PayloadLenMax = 1024 * 1024 * 1024 * 10 // 10 GB
+
+	// The minimum size of a marshaled message publication. It is the sum of the sizes of each of
+	// the fields plus length information for fields with variable lengths (TxID and Payload).
+	marshaledMsgLenMin = 1 + // TxID length (uint8)
+		TxIDLenMin + // TxID ([]byte), minimum length of 32 bytes (but may be longer)
+		8 + // Timestamp (int64)
+		4 + // Nonce (uint32)
+		8 + // Sequence (uint64)
+		1 + // ConsistencyLevel (uint8)
+		2 + // EmitterChain (uint16)
+		32 + // EmitterAddress (32 bytes)
+		1 + // IsReobservation (bool)
+		1 + // Unreliable (bool)
+		1 + // verificationState (uint8)
+		8 // Payload length (int64), may be zero
+
+	// Deprecated: represents the minimum message length for a marshaled message publication
+	// before the Unreliable and verificationState fields were added.
+	// Use [marshaledMsgSizeMin] instead.
+	minMsgLength = 88
+
+	// minMsgIdLen is the minimum length of a message ID. It is used to uniquely identify
+	// messages in the case of a duplicate message ID and is stored in the database.
+	MinMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")
+)
+
+var (
+	ErrBinaryWrite              = errors.New("failed to write binary data")
+	ErrInvalidBinaryBool        = errors.New("invalid binary bool (neither 0x00 nor 0x01)")
+	ErrInvalidVerificationState = errors.New("invalid verification state")
+)
+
+type ErrUnexpectedEndOfRead struct {
+	expected int
+	got      int
+}
+
+func (e ErrUnexpectedEndOfRead) Error() string {
+	return fmt.Sprintf("data position after unmarshal does not match data length. expected: %d got: %d", e.expected, e.got)
+}
+
+// ErrInputSize is returned when the input size is not the expected size during marshaling.
+type ErrInputSize struct {
+	msg string
+	got int
+}
+
+func (e ErrInputSize) Error() string {
+	return fmt.Sprintf("wrong size: %s. expected >= %d bytes, got %d", e.msg, marshaledMsgLenMin, e.got)
+}
 
 // The `VerificationState` is the result of applying transfer verification to the transaction associated with the `MessagePublication`.
 // While this could likely be extended to additional security controls in the future, it is only used for `txverifier` at present.
@@ -38,6 +98,10 @@ const (
 	CouldNotVerify
 )
 
+// NumVariantsVerificationState is the number of variants in the VerificationState enum.
+// Used to validate deserialization.
+const NumVariantsVerificationState = 6
+
 func (v VerificationState) String() string {
 	switch v {
 	case NotVerified:
@@ -66,12 +130,14 @@ type MessagePublication struct {
 	ConsistencyLevel uint8
 	EmitterChain     vaa.ChainID
 	EmitterAddress   vaa.Address
-	Payload          []byte
-	IsReobservation  bool
+	// NOTE: there is no upper bound on the size of the payload. Wormhole supports arbitrary payloads
+	// due to the variance in transaction and block sizes between chains. However, during deserialization,
+	// payload lengths are bounds-checked against [PayloadLenMax] to prevent makeslice panics from malformed input.
+	Payload         []byte
+	IsReobservation bool
 
 	// Unreliable indicates if this message can be reobserved. If a message is considered unreliable it cannot be
 	// reobserved.
-	// This field is not marshalled/serialized.
 	Unreliable bool
 
 	// The `VerificationState` is the result of applying transfer
@@ -84,10 +150,10 @@ type MessagePublication struct {
 	// This field is intentionally private so that it must be
 	// updated using the setter, which performs verification on the new
 	// state.
-	// This field is not marshalled/serialized.
 	verificationState VerificationState
 }
 
+// TxIDString returns a hex-encoded representation of the TxID field, prefixed with '0x'.
 func (msg *MessagePublication) TxIDString() string {
 	return "0x" + hex.EncodeToString(msg.TxID)
 }
@@ -123,8 +189,8 @@ func (msg *MessagePublication) SetVerificationState(s VerificationState) error {
 	return nil
 }
 
-const minMsgLength = 88 // Marshalled length with empty payload
-
+// Deprecated: This function does not unmarshal the Unreliable or verificationState fields.
+// Use [MessagePublication.MarshalBinary] instead.
 func (msg *MessagePublication) Marshal() ([]byte, error) {
 	buf := new(bytes.Buffer)
 
@@ -148,66 +214,108 @@ func (msg *MessagePublication) Marshal() ([]byte, error) {
 	return buf.Bytes(), nil
 }
 
-// UnmarshalOldMessagePublicationWithTxHash deserializes a MessagePublication from when the TxHash was a fixed size ethCommon.Hash.
-// This function can be deleted once all guardians have been upgraded. That's why the code is just duplicated.
-func UnmarshalOldMessagePublicationWithTxHash(data []byte) (*MessagePublication, error) {
-	if len(data) < minMsgLength {
-		return nil, errors.New("message is too short")
-	}
-
-	msg := &MessagePublication{}
-
-	reader := bytes.NewReader(data[:])
-
-	txHash := common.Hash{}
-	if n, err := reader.Read(txHash[:]); err != nil || n != HashLength {
-		return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err)
+// MarshalBinary implements the BinaryMarshaler interface for MessagePublication.
+func (msg *MessagePublication) MarshalBinary() ([]byte, error) {
+	// Marshalled Message Publication layout:
+	//
+	// - TxID length
+	// - TxID
+	// - Timestamp
+	// - Nonce
+	// - Sequence
+	// - ConsistencyLevel
+	// - EmitterChain
+	// - EmitterAddress
+	// - IsReobservation
+	// - Unreliable
+	// - verificationState
+	// - Payload length
+	// - Payload
+
+	// TxID is an alias for []byte.
+	const TxIDSizeMax = math.MaxUint8
+
+	// Check preconditions
+	txIDLen := len(msg.TxID)
+	if txIDLen > TxIDSizeMax {
+		return nil, ErrInputSize{msg: "TxID too long"}
 	}
-	msg.TxID = txHash.Bytes()
 
-	unixSeconds := uint32(0)
-	if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil {
-		return nil, fmt.Errorf("failed to read timestamp: %w", err)
+	if txIDLen < TxIDLenMin {
+		return nil, ErrInputSize{msg: "TxID too short"}
 	}
-	msg.Timestamp = time.Unix(int64(unixSeconds), 0)
 
-	if err := binary.Read(reader, binary.BigEndian, &msg.Nonce); err != nil {
-		return nil, fmt.Errorf("failed to read nonce: %w", err)
-	}
+	payloadLen := len(msg.Payload)
+	// Set up for serialization
+	var (
+		be = binary.BigEndian
+		// Size of the buffer needed to hold the serialized message.
+		// TxIDLenMin is already accounted for in the marshaledMsgLenMin calculation.
+		bufSize = (marshaledMsgLenMin - TxIDLenMin) + txIDLen + payloadLen
+		buf     = make([]byte, 0, bufSize)
+	)
 
-	if err := binary.Read(reader, binary.BigEndian, &msg.Sequence); err != nil {
-		return nil, fmt.Errorf("failed to read sequence: %w", err)
+	// TxID (and length)
+	buf = append(buf, uint8(txIDLen))
+	buf = append(buf, msg.TxID...)
+
+	// Timestamp
+	tsBytes := make([]byte, 8)
+	// #nosec G115  -- int64 and uint64 have the same number of bytes, and Unix time won't be negative.
+	be.PutUint64(tsBytes, uint64(msg.Timestamp.Unix()))
+	buf = append(buf, tsBytes...)
+
+	// Nonce
+	nonceBytes := make([]byte, 4)
+	be.PutUint32(nonceBytes, msg.Nonce)
+	buf = append(buf, nonceBytes...)
+
+	// Sequence
+	seqBytes := make([]byte, 8)
+	be.PutUint64(seqBytes, msg.Sequence)
+	buf = append(buf, seqBytes...)
+
+	// ConsistencyLevel
+	buf = append(buf, msg.ConsistencyLevel)
+
+	// EmitterChain
+	chainBytes := make([]byte, 2)
+	be.PutUint16(chainBytes, uint16(msg.EmitterChain))
+	buf = append(buf, chainBytes...)
+
+	// EmitterAddress
+	buf = append(buf, msg.EmitterAddress.Bytes()...)
+
+	// IsReobservation
+	if msg.IsReobservation {
+		buf = append(buf, byte(1))
+	} else {
+		buf = append(buf, byte(0))
 	}
 
-	if err := binary.Read(reader, binary.BigEndian, &msg.ConsistencyLevel); err != nil {
-		return nil, fmt.Errorf("failed to read consistency level: %w", err)
+	// Unreliable
+	if msg.Unreliable {
+		buf = append(buf, byte(1))
+	} else {
+		buf = append(buf, byte(0))
 	}
 
-	if err := binary.Read(reader, binary.BigEndian, &msg.EmitterChain); err != nil {
-		return nil, fmt.Errorf("failed to read emitter chain: %w", err)
-	}
+	// verificationState
+	buf = append(buf, uint8(msg.verificationState))
 
-	emitterAddress := vaa.Address{}
-	if n, err := reader.Read(emitterAddress[:]); err != nil || n != AddressLength {
-		return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
-	}
-	msg.EmitterAddress = emitterAddress
+	// Payload (and length)
+	// There is no upper bound on the size of the payload as Wormhole supports arbitrary payloads. A uint64 should suffice to hold the length of the payload.
+	payloadLenBytes := make([]byte, 8)
+	be.PutUint64(payloadLenBytes, uint64(payloadLen))
+	buf = append(buf, payloadLenBytes...)
+	buf = append(buf, msg.Payload...)
 
-	if err := binary.Read(reader, binary.BigEndian, &msg.IsReobservation); err != nil {
-		return nil, fmt.Errorf("failed to read isReobservation: %w", err)
-	}
-
-	payload := make([]byte, reader.Len())
-	n, err := reader.Read(payload)
-	if err != nil || n == 0 {
-		return nil, fmt.Errorf("failed to read payload [%d]: %w", n, err)
-	}
-	msg.Payload = payload[:n]
-
-	return msg, nil
+	return buf, nil
 }
 
-// UnmarshalMessagePublication deserializes a MessagePublication
+// Deprecated: UnmarshalMessagePublication deserializes a MessagePublication.
+// This function does not unmarshal the Unreliable or verificationState fields.
+// Use [MessagePublication.UnmarshalBinary] instead.
 func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) {
 	if len(data) < minMsgLength {
 		return nil, errors.New("message is too short")
@@ -272,6 +380,141 @@ func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) {
 	return msg, nil
 }
 
+// UnmarshalBinary implements the BinaryUnmarshaler interface for MessagePublication.
+func (m *MessagePublication) UnmarshalBinary(data []byte) error {
+
+	// fixedFieldsLen is the minimum length of the fixed portion of a message publication.
+	// It is the sum of the sizes of each of the fields plus length information for the Payload.
+	// This is used to check that the data is long enough for the rest of the message after reading the TxID.
+	const fixedFieldsLen = 8 + // Timestamp (int64)
+		4 + // Nonce (uint32)
+		8 + // Sequence (uint64)
+		1 + // ConsistencyLevel (uint8)
+		2 + // EmitterChain (uint16)
+		32 + // EmitterAddress (32 bytes)
+		1 + // IsReobservation (bool)
+		1 + // Unreliable (bool)
+		8 // Payload length (uint64)
+
+	// Calculate minimum required length for the fixed portion
+	// (excluding variable-length fields: TxID and Payload)
+	if len(data) < marshaledMsgLenMin {
+		return ErrInputSize{msg: "data too short", got: len(data)}
+	}
+
+	mp := &MessagePublication{}
+
+	// Set up deserialization
+	be := binary.BigEndian
+	pos := 0
+
+	// TxID length
+	txIDLen := uint8(data[pos])
+	pos++
+
+	// Bounds checks. TxID length should be at least TxIDLenMin, but not larger than the length of the data.
+	// The second check is to avoid panics.
+	if int(txIDLen) < TxIDLenMin || int(txIDLen) > len(data) {
+		return ErrInputSize{msg: "TxID length is invalid"}
+	}
+
+	// Read TxID
+	mp.TxID = make([]byte, txIDLen)
+	copy(mp.TxID, data[pos:pos+int(txIDLen)])
+	pos += int(txIDLen)
+
+	// TxID has a dynamic length, so now that we've read it, check that the remaining data is long enough for the rest of the message. This means that all fixed-length fields can be parsed with a payload of 0 or more bytes.
+	// Concretely, we're checking that the data is at least long enough to contain information for all of
+	// the fields except for the Payload itself.
+	if len(data)-pos < fixedFieldsLen {
+		return ErrInputSize{msg: "data too short after reading TxID", got: len(data)}
+	}
+
+	// Timestamp
+	timestamp := be.Uint64(data[pos : pos+8])
+	// Nanoseconds are not serialized as they are not used in Wormhole, so set them to zero.
+	// #nosec G115  -- int64 and uint64 have the same number of bytes, and Unix time won't be negative.
+	mp.Timestamp = time.Unix(int64(timestamp), 0)
+	pos += 8
+
+	// Nonce
+	mp.Nonce = be.Uint32(data[pos : pos+4])
+	pos += 4
+
+	// Sequence
+	mp.Sequence = be.Uint64(data[pos : pos+8])
+	pos += 8
+
+	// ConsistencyLevel
+	// TODO: This could be validated against the valid range of values for ConsistencyLevel.
+	mp.ConsistencyLevel = data[pos]
+	pos++
+
+	// EmitterChain
+	mp.EmitterChain = vaa.ChainID(be.Uint16(data[pos : pos+2]))
+	pos += 2
+
+	// EmitterAddress
+	copy(mp.EmitterAddress[:], data[pos:pos+32])
+	pos += 32
+
+	// IsReobservation
+	if !validBinaryBool(data[pos]) {
+		return ErrInvalidBinaryBool
+	}
+	mp.IsReobservation = data[pos] != 0
+	pos++
+
+	// Unreliable
+	if !validBinaryBool(data[pos]) {
+		return ErrInvalidBinaryBool
+	}
+	mp.Unreliable = data[pos] != 0
+	pos++
+
+	// verificationState. NumVariantsVerificationState is the number of variants of the enum,
+	// which begins at 0. This means the valid range is [0, NumVariantsVerificationState-1].
+	if data[pos] > NumVariantsVerificationState-1 {
+		return ErrInvalidVerificationState
+	}
+	mp.verificationState = VerificationState(data[pos])
+	pos++
+
+	// Payload length
+	payloadLen := be.Uint64(data[pos : pos+8])
+	pos += 8
+
+	// Check if payload length is within reasonable bounds to prevent makeslice panic.
+	// Since payloadLen is read as uint64 from untrusted input, it could potentially
+	// exceed this limit and cause a runtime panic when passed to make([]byte, payloadLen).
+	// This bounds check prevents such panics by rejecting oversized payload lengths early.
+	if payloadLen > PayloadLenMax {
+		return ErrInputSize{msg: "payload length too large", got: len(data)}
+	}
+
+	// Check if we have enough data for the payload
+	// #nosec G115 -- payloadLen is read from data, bounds checked above
+	if len(data) < pos+int(payloadLen) {
+		return ErrInputSize{msg: "invalid payload length"}
+	}
+
+	// Read payload
+	mp.Payload = make([]byte, payloadLen)
+	// #nosec G115 -- payloadLen is read from data, bounds checked above
+	copy(mp.Payload, data[pos:pos+int(payloadLen)])
+	// #nosec G115 -- payloadLen is read from data, bounds checked above
+	pos += int(payloadLen)
+
+	// Check that exactly the correct number of bytes was read.
+	if pos != len(data) {
+		return ErrUnexpectedEndOfRead{expected: len(data), got: pos}
+	}
+
+	// Overwrite the receiver with the deserialized message.
+	*m = *mp
+	return nil
+}
+
 // The standard json Marshal / Unmarshal of time.Time gets confused between local and UTC time.
 func (msg *MessagePublication) MarshalJSON() ([]byte, error) {
 	type Alias MessagePublication
@@ -336,3 +579,19 @@ func (msg *MessagePublication) ZapFields(fields ...zap.Field) []zap.Field {
 		zap.String("verificationState", msg.verificationState.String()),
 	)
 }
+
+// VAAHash returns a hash corresponding to the fields of the Message Publication that are ultimately
+// encoded in a VAA. This is a helper function used to uniquely identify a Message Publication.
+func (msg *MessagePublication) VAAHash() string {
+	v := msg.CreateVAA(0) // We can pass zero in as the guardian set index because it is not part of the digest.
+	digest := v.SigningDigest()
+	return hex.EncodeToString(digest.Bytes())
+}
+
+// validBinaryBool returns true if the byte is either 0x00 or 0x01.
+// Go marshals booleans as strictly 0x00 or 0x01, so this function is used to validate
+// that a given byte is a valid boolean. When reading, any non-zero value is considered true,
+// but here we want to validate that the value is strictly either 0x00 or 0x01.
+func validBinaryBool(b byte) bool {
+	return b == 0x00 || b == 0x01
+}

+ 346 - 1
node/pkg/common/chainlock_test.go

@@ -2,6 +2,7 @@ package common
 
 import (
 	"encoding/binary"
+	"math"
 	"math/big"
 	"testing"
 	"time"
@@ -12,6 +13,24 @@ import (
 	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 )
 
+// The following constants are used to calculate the offset of each field in the serialized message publication.
+const (
+	offsetTxIDLength = 0
+	// Assumes a length of 32 bytes for the TxID.
+	offsetTxID              = offsetTxIDLength + 1
+	offsetTimestamp         = offsetTxID + 32
+	offsetNonce             = offsetTimestamp + 8
+	offsetSequence          = offsetNonce + 4
+	offsetConsistencyLevel  = offsetSequence + 8
+	offsetEmitterChain      = offsetConsistencyLevel + 1
+	offsetEmitterAddress    = offsetEmitterChain + 2
+	offsetIsReobservation   = offsetEmitterAddress + 32
+	offsetUnreliable        = offsetIsReobservation + 1
+	offsetVerificationState = offsetUnreliable + 1
+	offsetPayloadLength     = offsetVerificationState + 1
+	offsetPayload           = offsetPayloadLength + 8
+)
+
 func encodePayloadBytes(payload *vaa.TransferPayloadHdr) []byte {
 	bytes := make([]byte, 101)
 	bytes[0] = payload.Type
@@ -29,7 +48,325 @@ func encodePayloadBytes(payload *vaa.TransferPayloadHdr) []byte {
 	return bytes
 }
 
-func TestSerializeAndDeserializeOfMessagePublication(t *testing.T) {
+// makeTestMsgPub is a helper function that generates a Message Publication.
+func makeTestMsgPub(t *testing.T) *MessagePublication {
+	t.Helper()
+	originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E")
+	require.NoError(t, err)
+
+	targetAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	payload := &vaa.TransferPayloadHdr{
+		Type:          0x01,
+		Amount:        big.NewInt(27000000000),
+		OriginAddress: originAddress,
+		OriginChain:   vaa.ChainIDEthereum,
+		TargetAddress: targetAddress,
+		TargetChain:   vaa.ChainIDPolygon,
+	}
+
+	payloadBytes := encodePayloadBytes(payload)
+
+	// Use a non-default value for each field to ensure that the unmarshalled values are represented correctly.
+	return &MessagePublication{
+		TxID:              eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
+		Timestamp:         time.Unix(int64(1654516425), 0),
+		Nonce:             123456,
+		Sequence:          789101112131415,
+		EmitterChain:      vaa.ChainIDEthereum,
+		EmitterAddress:    tokenBridgeAddress,
+		Payload:           payloadBytes,
+		ConsistencyLevel:  32,
+		Unreliable:        true,
+		IsReobservation:   true,
+		verificationState: Anomalous,
+	}
+}
+
+func TestRoundTripMarshal(t *testing.T) {
+	orig := makeTestMsgPub(t)
+	var loaded MessagePublication
+
+	bytes, writeErr := orig.MarshalBinary()
+	require.NoError(t, writeErr)
+	t.Logf("marshaled bytes: %x", bytes)
+
+	readErr := loaded.UnmarshalBinary(bytes)
+	require.NoError(t, readErr)
+
+	require.Equal(t, *orig, loaded)
+}
+
+func TestMessagePublicationUnmarshalBinaryErrors(t *testing.T) {
+	orig := makeTestMsgPub(t)
+	validBytes, err := orig.MarshalBinary()
+	require.Greater(t, len(validBytes), marshaledMsgLenMin)
+	require.NoError(t, err)
+
+	tests := []struct {
+		name         string
+		data         []byte
+		expectedErr  error
+		errorChecker func(t *testing.T, err error)
+		setupData    func() []byte
+	}{
+		{
+			name: "data too short - empty data",
+			data: []byte{},
+			errorChecker: func(t *testing.T, err error) {
+				var inputSizeErr ErrInputSize
+				require.ErrorAs(t, err, &inputSizeErr)
+				assert.Contains(t, inputSizeErr.Error(), "data too short")
+			},
+		},
+		{
+			name: "data too short - less than minimum size",
+			data: make([]byte, marshaledMsgLenMin-1),
+			errorChecker: func(t *testing.T, err error) {
+				var inputSizeErr ErrInputSize
+				require.ErrorAs(t, err, &inputSizeErr)
+				assert.Contains(t, inputSizeErr.Error(), "data too short")
+			},
+		},
+		{
+			name:        "invalid IsReobservation boolean - value 0x02",
+			expectedErr: ErrInvalidBinaryBool,
+			setupData: func() []byte {
+				data := make([]byte, len(validBytes))
+				copy(data, validBytes)
+				data[offsetIsReobservation] = 0x02
+				return data
+			},
+		},
+		{
+			name:        "invalid IsReobservation boolean - value 0xFF",
+			expectedErr: ErrInvalidBinaryBool,
+			setupData: func() []byte {
+				data := make([]byte, len(validBytes))
+				copy(data, validBytes)
+				data[offsetIsReobservation] = 0xFF
+				return data
+			},
+		},
+		{
+			name:        "invalid Unreliable boolean - value 0x02",
+			expectedErr: ErrInvalidBinaryBool,
+			setupData: func() []byte {
+				data := make([]byte, len(validBytes))
+				copy(data, validBytes)
+				data[offsetUnreliable] = 0x02
+				return data
+			},
+		},
+		{
+			name:        "invalid Unreliable boolean - value 0xFF",
+			expectedErr: ErrInvalidBinaryBool,
+			setupData: func() []byte {
+				data := make([]byte, len(validBytes))
+				copy(data, validBytes)
+				data[offsetUnreliable] = 0xFF
+				return data
+			},
+		},
+		{
+			name:        "invalid verification state - at boundary",
+			expectedErr: ErrInvalidVerificationState,
+			setupData: func() []byte {
+				data := make([]byte, len(validBytes))
+				copy(data, validBytes)
+				data[offsetVerificationState] = NumVariantsVerificationState
+				return data
+			},
+		},
+		{
+			name:        "invalid verification state - above boundary",
+			expectedErr: ErrInvalidVerificationState,
+			setupData: func() []byte {
+				data := make([]byte, len(validBytes))
+				copy(data, validBytes)
+				data[offsetVerificationState] = NumVariantsVerificationState + 1
+				return data
+			},
+		},
+		{
+			name: "invalid payload length - truncated at payload length",
+			errorChecker: func(t *testing.T, err error) {
+				var inputSizeErr ErrInputSize
+				require.ErrorAs(t, err, &inputSizeErr)
+				assert.Contains(t, inputSizeErr.Error(), "invalid payload length")
+			},
+			setupData: func() []byte {
+				data := make([]byte, len(validBytes))
+				copy(data, validBytes)
+				// Set payload length to be larger than remaining data
+				// #nosec G115 -- intentionally creating invalid data for testing
+				binary.BigEndian.PutUint64(data[offsetPayloadLength:offsetPayloadLength+8], uint64(len(data)-offsetPayload+1))
+				return data
+			},
+		},
+		{
+			name: "invalid payload length - no payload data",
+			errorChecker: func(t *testing.T, err error) {
+				var inputSizeErr ErrInputSize
+				require.ErrorAs(t, err, &inputSizeErr)
+				assert.Contains(t, inputSizeErr.Error(), "invalid payload length")
+			},
+			setupData: func() []byte {
+				// Create data that ends right before payload
+				data := make([]byte, offsetPayload)
+				copy(data, validBytes[:offsetPayload])
+				// Set payload length to 1 but provide no payload data
+				binary.BigEndian.PutUint64(data[offsetPayloadLength:offsetPayloadLength+8], 1)
+				return data
+			},
+		},
+		{
+			name: "unexpected end of read - extra bytes",
+			errorChecker: func(t *testing.T, err error) {
+				var endOfReadErr ErrUnexpectedEndOfRead
+				require.ErrorAs(t, err, &endOfReadErr)
+				assert.Greater(t, endOfReadErr.expected, endOfReadErr.got)
+			},
+			setupData: func() []byte {
+				data := make([]byte, len(validBytes)+1)
+				copy(data, validBytes)
+				data[len(validBytes)] = 0xFF // extra byte
+				return data
+			},
+		},
+		{
+			name: "unexpected end of read - missing bytes",
+			errorChecker: func(t *testing.T, err error) {
+				// This case actually triggers invalid payload length error first
+				var inputSizeErr ErrInputSize
+				require.ErrorAs(t, err, &inputSizeErr)
+				assert.Contains(t, inputSizeErr.Error(), "invalid payload length")
+			},
+			setupData: func() []byte {
+				// Create data that's shorter than expected but has valid payload length
+				data := make([]byte, len(validBytes)-1)
+				copy(data, validBytes[:len(validBytes)-1])
+				return data
+			},
+		},
+		{
+			name: "payload length overflow - makeslice panic",
+			errorChecker: func(t *testing.T, err error) {
+				var inputSizeErr ErrInputSize
+				require.ErrorAs(t, err, &inputSizeErr)
+				assert.Contains(t, inputSizeErr.Error(), "payload length too large")
+			},
+			setupData: func() []byte {
+				data := make([]byte, len(validBytes))
+				copy(data, validBytes)
+				// Set payload length to maximum uint64 value which would cause makeslice to panic
+				binary.BigEndian.PutUint64(data[offsetPayloadLength:offsetPayloadLength+8], math.MaxUint64)
+				return data
+			},
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			var data []byte
+			if tt.setupData != nil {
+				data = tt.setupData()
+			} else {
+				data = tt.data
+			}
+
+			var mp MessagePublication
+			err := mp.UnmarshalBinary(data)
+
+			require.Error(t, err, "expected error for test case: %s", tt.name)
+
+			if tt.errorChecker != nil {
+				tt.errorChecker(t, err)
+			} else if tt.expectedErr != nil {
+				require.ErrorIs(t, err, tt.expectedErr, "expected specific error type for test case: %s", tt.name)
+			}
+		})
+	}
+}
+
+func FuzzMessagePublicationUnmarshalBinary(f *testing.F) {
+	// Create a valid message publication for seeding
+	orig := &MessagePublication{
+		TxID:              make([]byte, TxIDLenMin), // Use minimum valid TxID length
+		Timestamp:         time.Unix(int64(1654516425), 0),
+		Nonce:             123456,
+		Sequence:          789101112131415,
+		EmitterChain:      vaa.ChainIDEthereum,
+		EmitterAddress:    vaa.Address{0x07, 0x07, 0xf9, 0x11, 0x8e, 0x33, 0xa9, 0xb8, 0x99, 0x8b, 0xea, 0x41, 0xdd, 0x0d, 0x46, 0xf3, 0x8b, 0xb9, 0x63, 0xfc, 0x80},
+		Payload:           []byte("test payload"),
+		ConsistencyLevel:  32,
+		IsReobservation:   true,
+		Unreliable:        true,
+		verificationState: Valid,
+	}
+
+	// Seed with valid marshaled data
+	validBytes, err := orig.MarshalBinary()
+	if err == nil {
+		f.Add(validBytes)
+	}
+
+	// Seed with some edge cases
+	f.Add([]byte{})                           // empty data
+	f.Add(make([]byte, marshaledMsgLenMin-1)) // too short
+	f.Add(make([]byte, marshaledMsgLenMin))   // minimum size
+	f.Add(make([]byte, 1000))                 // larger data
+	// Previous inputs that caused panics
+	f.Add([]byte(" 000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\x01\x01\x00\x7f\xff\xff\xff\xff\xff\xff\xed"))
+	f.Add([]byte("x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
+	f.Add([]byte(" 000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\x00\x00\x00\xec0000000"))
+
+	f.Add([]byte("\x000000000000000000000000000000000000000000000000000000000\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 00000000000000000000000000000000"))
+
+	f.Fuzz(func(t *testing.T, data []byte) {
+		// Catch panics and report them as test failures
+		defer func() {
+			if r := recover(); r != nil {
+				t.Errorf("UnmarshalBinary panicked with input length %d: %v", len(data), r)
+			}
+		}()
+
+		var mp MessagePublication
+		err := mp.UnmarshalBinary(data)
+
+		// The function should never panic, but may return an error
+		// We don't assert anything about the error - just that it doesn't crash
+		if err == nil {
+			// If unmarshaling succeeded, the result should be valid
+			// Basic sanity checks on the unmarshaled data
+			if len(mp.TxID) > 255 {
+				t.Errorf("TxID length %d exceeds maximum of 255", len(mp.TxID))
+			}
+			if len(mp.TxID) < TxIDLenMin && len(mp.TxID) > 0 {
+				t.Errorf("TxID length %d is less than minimum of %d (unless empty)", len(mp.TxID), TxIDLenMin)
+			}
+
+			// Verify that a successful unmarshal can be marshaled back
+			_, marshalErr := mp.MarshalBinary()
+			if marshalErr != nil {
+				t.Errorf("Successfully unmarshaled data cannot be marshaled back: %v", marshalErr)
+			}
+
+			// Additional invariant checks
+			if mp.verificationState >= NumVariantsVerificationState {
+				t.Errorf("Invalid verification state %d >= %d", mp.verificationState, NumVariantsVerificationState)
+			}
+		}
+	})
+}
+
+// This tests a message publication using the deprecated [Marshal] and [UnmarshalMessagePublication] functions.
+// The test and these functions can be removed once the message publication upgrade is complete.
+func TestDeprecatedSerializeAndDeserializeOfMessagePublication(t *testing.T) {
 	originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E")
 	require.NoError(t, err)
 
@@ -89,6 +426,8 @@ func TestSerializeAndDeserializeOfMessagePublication(t *testing.T) {
 	assert.Equal(t, payload1, payload2)
 }
 
+// This tests a message publication using the deprecated [Marshal] and [UnmarshalMessagePublication] functions.
+// The test and these functions can be removed once the message publication upgrade is complete.
 func TestSerializeAndDeserializeOfMessagePublicationWithEmptyTxID(t *testing.T) {
 	originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E")
 	require.NoError(t, err)
@@ -134,6 +473,8 @@ func TestSerializeAndDeserializeOfMessagePublicationWithEmptyTxID(t *testing.T)
 	assert.Equal(t, payload1, payload2)
 }
 
+// This tests a message publication using the deprecated [Marshal] and [UnmarshalMessagePublication] functions.
+// The test and these functions can be removed once the message publication upgrade is complete.
 func TestSerializeAndDeserializeOfMessagePublicationWithArbitraryTxID(t *testing.T) {
 	originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E")
 	require.NoError(t, err)
@@ -179,6 +520,8 @@ func TestSerializeAndDeserializeOfMessagePublicationWithArbitraryTxID(t *testing
 	assert.Equal(t, payload1, payload2)
 }
 
+// This tests a message publication using the deprecated [Marshal] and [UnmarshalMessagePublication] functions.
+// The test and these functions can be removed once the message publication upgrade is complete.
 func TestTxIDStringTooLongShouldFail(t *testing.T) {
 	tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
 	require.NoError(t, err)
@@ -201,6 +544,8 @@ func TestTxIDStringTooLongShouldFail(t *testing.T) {
 	assert.ErrorContains(t, err, "TxID too long")
 }
 
+// This tests a message publication using the deprecated [Marshal] and [UnmarshalMessagePublication] functions.
+// The test and these functions can be removed once the message publication upgrade is complete.
 func TestSerializeAndDeserializeOfMessagePublicationWithBigPayload(t *testing.T) {
 	tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
 	require.NoError(t, err)

+ 59 - 100
node/pkg/db/governor.go

@@ -45,6 +45,7 @@ func (d *MockGovernorDB) GetChainGovernorData(logger *zap.Logger) (transfers []*
 	return nil, nil, nil
 }
 
+// Transfer represents a completed transfer that has been processed by the Governor during its sliding window.
 type Transfer struct {
 	// This value is generated by the Governor. It is not read from the blockchain transaction. It represents the
 	// time at which it was observed and evaluated by the Governor.
@@ -63,6 +64,7 @@ type Transfer struct {
 	TargetChain    vaa.ChainID
 }
 
+// Marshal serializes a Transfer. TODO: This function could be rewritten to use the BinaryMarshaler interface.
 func (t *Transfer) Marshal() ([]byte, error) {
 	buf := new(bytes.Buffer)
 
@@ -91,6 +93,7 @@ func (t *Transfer) Marshal() ([]byte, error) {
 	return buf.Bytes(), nil
 }
 
+// UnmarshalTransfer deserializes a Transfer. TODO: This function could be rewritten to use the BinaryUnmarshaler interface.
 func UnmarshalTransfer(data []byte) (*Transfer, error) {
 	t := &Transfer{}
 
@@ -167,94 +170,31 @@ func UnmarshalTransfer(data []byte) (*Transfer, error) {
 	return t, nil
 }
 
-func unmarshalOldTransfer(data []byte) (*Transfer, error) {
-	t := &Transfer{}
-
-	reader := bytes.NewReader(data[:])
-
-	unixSeconds := uint32(0)
-	if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil {
-		return nil, fmt.Errorf("failed to read timestamp: %w", err)
-	}
-	t.Timestamp = time.Unix(int64(unixSeconds), 0)
-
-	if err := binary.Read(reader, binary.BigEndian, &t.Value); err != nil {
-		return nil, fmt.Errorf("failed to read value: %w", err)
-	}
-
-	if err := binary.Read(reader, binary.BigEndian, &t.OriginChain); err != nil {
-		return nil, fmt.Errorf("failed to read origin chain id: %w", err)
-	}
-
-	originAddress := vaa.Address{}
-	if n, err := reader.Read(originAddress[:]); err != nil || n != 32 {
-		return nil, fmt.Errorf("failed to read origin address [%d]: %w", n, err)
-	}
-	t.OriginAddress = originAddress
-
-	if err := binary.Read(reader, binary.BigEndian, &t.EmitterChain); err != nil {
-		return nil, fmt.Errorf("failed to read emitter chain id: %w", err)
-	}
-
-	emitterAddress := vaa.Address{}
-	if n, err := reader.Read(emitterAddress[:]); err != nil || n != 32 {
-		return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
-	}
-	t.EmitterAddress = emitterAddress
-
-	msgIdLen := uint16(0)
-	if err := binary.Read(reader, binary.BigEndian, &msgIdLen); err != nil {
-		return nil, fmt.Errorf("failed to read msgID length: %w", err)
-	}
-
-	if msgIdLen > 0 {
-		msgID := make([]byte, msgIdLen)
-		n, err := reader.Read(msgID)
-		if err != nil || n == 0 {
-			return nil, fmt.Errorf("failed to read vaa id [%d]: %w", n, err)
-		}
-		t.MsgID = string(msgID[:n])
-	}
-
-	hashLen := uint16(0)
-	if err := binary.Read(reader, binary.BigEndian, &hashLen); err != nil {
-		return nil, fmt.Errorf("failed to read hash length: %w", err)
-	}
-
-	if hashLen > 0 {
-		hash := make([]byte, hashLen)
-		n, err := reader.Read(hash)
-		if err != nil || n == 0 {
-			return nil, fmt.Errorf("failed to read hash [%d]: %w", n, err)
-		}
-		t.Hash = string(hash[:n])
-	}
-
-	// Do not include the target chain or address.
-
-	return t, nil
-}
-
+// PendingTransfer represent a pending transfer that is waiting to be released by the Governor.
+// It is the same as a [common.MessagePublication], but with a timestamp indicating when it can be released.
+// Upon release, it is converted to a [Transfer].
+// It is also referred to as a "pending message" in the codebase.
 type PendingTransfer struct {
 	ReleaseTime time.Time
 	Msg         common.MessagePublication
 }
 
+// Marshal returns the pending transfer serialized. TODO: This function could be rewritten to use the BinaryMarshaler interface.
 func (p *PendingTransfer) Marshal() ([]byte, error) {
-	buf := new(bytes.Buffer)
-
-	vaa.MustWrite(buf, binary.BigEndian, uint32(p.ReleaseTime.Unix())) // #nosec G115 -- This conversion is safe until year 2106
 
-	b, err := p.Msg.Marshal()
+	buf := new(bytes.Buffer)
+	b, err := p.Msg.MarshalBinary()
 	if err != nil {
 		return buf.Bytes(), fmt.Errorf("failed to marshal pending transfer: %w", err)
 	}
 
+	vaa.MustWrite(buf, binary.BigEndian, uint32(p.ReleaseTime.Unix())) // #nosec G115 -- This conversion is safe until year 2106
 	vaa.MustWrite(buf, binary.BigEndian, b)
 
 	return buf.Bytes(), nil
 }
 
+// UnmarshalPendingTransfer deserializes a PendingTransfer. TODO: This function could be rewritten to use the BinaryUnmarshaler interface.
 func UnmarshalPendingTransfer(data []byte, isOld bool) (*PendingTransfer, error) {
 	p := &PendingTransfer{}
 
@@ -273,68 +213,82 @@ func UnmarshalPendingTransfer(data []byte, isOld bool) (*PendingTransfer, error)
 		return nil, fmt.Errorf("failed to read pending transfer msg [%d]: %w", n, err)
 	}
 
-	var msg *common.MessagePublication
+	var (
+		// msg is modified in-place to conform to the BinaryUnmarshaler interface, but oldMsg uses
+		// a pointer.
+		oldMsg       *common.MessagePublication
+		msg          common.MessagePublication
+		unmarshalErr error
+	)
 	if isOld {
-		msg, err = common.UnmarshalOldMessagePublicationWithTxHash(buf)
+		//nolint: staticcheck // NOTE: the deprecated function is used only for the Governor upgrade.
+		oldMsg, unmarshalErr = common.UnmarshalMessagePublication(buf)
+		msg = *oldMsg
 	} else {
-		msg, err = common.UnmarshalMessagePublication(buf)
+		unmarshalErr = msg.UnmarshalBinary(buf)
 	}
-	if err != nil {
-		return nil, fmt.Errorf("failed to unmarshal pending transfer msg, isOld: %t: %w", isOld, err)
+	if unmarshalErr != nil {
+		return nil, fmt.Errorf("failed to unmarshal pending transfer msg, isOld: %t: %w", isOld, unmarshalErr)
 	}
 
-	p.Msg = *msg
+	p.Msg = msg
 	return p, nil
 }
 
-const oldTransfer = "GOV:XFER2:"
-const oldTransferLen = len(oldTransfer)
-
-const transfer = "GOV:XFER3:"
-const transferLen = len(transfer)
+// These constants are used when the message format changes. It allows the governor to support both formats.
+// This is important because when the message format changes and the Guardians are restarted, the existing
+// messages will be stored in the old format. Both message formats need to be supported for duration of the
+// Governor's sliding window.
+const (
+	// Transfers.
+	oldTransferPrefix = "GOV:XFER3:"
+	oldTransferLen    = len(oldTransferPrefix)
+	transferPrefix    = "GOV:XFER4:"
+	transferLen       = len(transferPrefix)
+
+	// Pending messages.
+	oldPendingPrefix = "GOV:PENDING4:"
+	oldPendingLen    = len(oldPendingPrefix)
+	pendingPrefix    = "GOV:PENDING5:"
+	pendingLen       = len(pendingPrefix)
+)
 
 // Since we are changing the DB format of pending entries, we will use a new tag in the pending key field.
 // The first time we run this new release, any existing entries with the old tag will get converted
 // to the new format and the new tag. In a future release, code for the old format can be deleted.
 
-const oldPending = "GOV:PENDING3:"
-const oldPendingLen = len(oldPending)
-
-const pending = "GOV:PENDING4:"
-const pendingLen = len(pending)
-
-const minMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")
-
 func TransferMsgID(t *Transfer) []byte {
-	return []byte(fmt.Sprintf("%v%v", transfer, t.MsgID))
+	return []byte(fmt.Sprintf("%v%v", transferPrefix, t.MsgID))
 }
 
+// Used only to delete old transfers.
 func oldTransferMsgID(t *Transfer) []byte {
-	return []byte(fmt.Sprintf("%v%v", oldTransfer, t.MsgID))
+	return []byte(fmt.Sprintf("%v%v", oldTransferPrefix, t.MsgID))
 }
 
 func PendingMsgID(k *common.MessagePublication) []byte {
-	return []byte(fmt.Sprintf("%v%v", pending, k.MessageIDString()))
+	return []byte(fmt.Sprintf("%v%v", pendingPrefix, k.MessageIDString()))
 }
 
+// Used only to delete old pending transfers.
 func oldPendingMsgID(k *common.MessagePublication) []byte {
-	return []byte(fmt.Sprintf("%v%v", oldPending, k.MessageIDString()))
+	return []byte(fmt.Sprintf("%v%v", oldPendingPrefix, k.MessageIDString()))
 }
 
 func IsTransfer(keyBytes []byte) bool {
-	return (len(keyBytes) >= transferLen+minMsgIdLen) && (string(keyBytes[0:transferLen]) == transfer)
+	return (len(keyBytes) >= transferLen+common.MinMsgIdLen) && (string(keyBytes[0:transferLen]) == transferPrefix)
 }
 
 func isOldTransfer(keyBytes []byte) bool {
-	return (len(keyBytes) >= oldTransferLen+minMsgIdLen) && (string(keyBytes[0:oldTransferLen]) == oldTransfer)
+	return (len(keyBytes) >= oldTransferLen+common.MinMsgIdLen) && (string(keyBytes[0:oldTransferLen]) == oldTransferPrefix)
 }
 
 func IsPendingMsg(keyBytes []byte) bool {
-	return (len(keyBytes) >= pendingLen+minMsgIdLen) && (string(keyBytes[0:pendingLen]) == pending)
+	return (len(keyBytes) >= pendingLen+common.MinMsgIdLen) && (string(keyBytes[0:pendingLen]) == pendingPrefix)
 }
 
 func isOldPendingMsg(keyBytes []byte) bool {
-	return (len(keyBytes) >= oldPendingLen+minMsgIdLen) && (string(keyBytes[0:oldPendingLen]) == oldPending)
+	return (len(keyBytes) >= oldPendingLen+common.MinMsgIdLen) && (string(keyBytes[0:oldPendingLen]) == oldPendingPrefix)
 }
 
 // This is called by the chain governor on start up to reload status.
@@ -359,6 +313,9 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
 				return err
 			}
 
+			// Provide separate handling for pending messages and transfers depending on the current
+			// serialization format. These are convenience functions that can be used during upgrades
+			// to the Governor.
 			if IsPendingMsg(key) {
 				p, err := UnmarshalPendingTransfer(val, false)
 				if err != nil {
@@ -383,7 +340,9 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
 				transfers = append(transfers, v)
 
 			} else if isOldTransfer(key) {
-				v, err := unmarshalOldTransfer(val)
+				// NOTE: This is intentionally the same as IsTransfer branch
+				// for the current upgrade but this branch is left here for convenience for future upgrades.
+				v, err := UnmarshalTransfer(val)
 				if err != nil {
 					return err
 				}

+ 295 - 432
node/pkg/db/governor_test.go

@@ -1,12 +1,8 @@
 package db
 
 import (
-	"bytes"
-	"encoding/binary"
 	"fmt"
-	"math"
 	"os"
-	"sort"
 	"testing"
 	"time"
 
@@ -57,11 +53,11 @@ func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
 
 	assert.Equal(t, xfer1, xfer2)
 
-	expectedTransferKey := "GOV:XFER3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
+	expectedTransferKey := "GOV:XFER4:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
 	assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2)))
 }
 
-func TestPendingMsgID(t *testing.T) {
+func TestPendingMsgIDV5(t *testing.T) {
 	ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
 	require.NoError(t, err)
 
@@ -76,10 +72,10 @@ func TestPendingMsgID(t *testing.T) {
 		ConsistencyLevel: 16,
 	}
 
-	assert.Equal(t, []byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1))
+	assert.Equal(t, []byte("GOV:PENDING5:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1))
 }
 
-func TestTransferMsgID(t *testing.T) {
+func TestTransferMsgIDV4(t *testing.T) {
 	tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
 	require.NoError(t, err)
 
@@ -102,37 +98,296 @@ func TestTransferMsgID(t *testing.T) {
 		Hash:           "Hash1",
 	}
 
-	assert.Equal(t, []byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), TransferMsgID(xfer))
+	assert.Equal(t, []byte("GOV:XFER4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), TransferMsgID(xfer))
 }
 
-func TestIsTransfer(t *testing.T) {
-	assert.Equal(t, true, IsTransfer([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:")))
-	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:1")))
-	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:1/1/1")))
-	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
-	assert.Equal(t, true, IsTransfer([]byte("GOV:XFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
-	assert.Equal(t, false, IsTransfer([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, IsTransfer([]byte{0x01, 0x02, 0x03, 0x04}))
-	assert.Equal(t, false, IsTransfer([]byte{}))
-	assert.Equal(t, true, isOldTransfer([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, isOldTransfer([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+// Deprecated: This function does not unmarshal the Unreliable or verificationState fields.
+func TestTransferMsgIDV3(t *testing.T) {
+	tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
+	require.NoError(t, err)
+
+	bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
+	require.NoError(t, err)
+
+	xfer := &Transfer{
+		Timestamp:      time.Unix(int64(1654516425), 0),
+		Value:          125000,
+		OriginChain:    vaa.ChainIDEthereum,
+		OriginAddress:  tokenAddr,
+		EmitterChain:   vaa.ChainIDEthereum,
+		EmitterAddress: ethereumTokenBridgeAddr,
+		TargetChain:    vaa.ChainIDBSC,
+		TargetAddress:  bscTokenBridgeAddr,
+		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
+		Hash:           "Hash1",
+	}
+
+	assert.Equal(t, []byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), oldTransferMsgID(xfer))
+}
+
+// TestIsTransferV4 tests the IsTransfer function for the current transfer format.
+// The V4 suffix matches the "GOV:XFER4:" prefix used by the current transfer implementation.
+func TestIsTransferV4(t *testing.T) {
+	tests := []struct {
+		name     string
+		input    []byte
+		expected bool
+	}{
+		{
+			name:     "valid transfer message",
+			input:    []byte("GOV:XFER4:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: true,
+		},
+		{
+			name:     "previous message format",
+			input:    []byte("GOV:XFER3:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: false,
+		},
+		{
+			name:     "transfer prefix only",
+			input:    []byte("GOV:XFER4:"),
+			expected: false,
+		},
+		{
+			name:     "transfer with single digit",
+			input:    []byte("GOV:XFER4:1"),
+			expected: false,
+		},
+		{
+			name:     "transfer with a msgID that is too small",
+			input:    []byte("GOV:XFER4:1/1/1"),
+			expected: false,
+		},
+		{
+			name:     "transfer with missing sequence",
+			input:    []byte("GOV:XFER4:" + "1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"),
+			expected: false,
+		},
+		{
+			name:     "valid transfer with sequence 0",
+			input:    []byte("GOV:XFER4:" + "1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"),
+			expected: true,
+		},
+		{
+			name:     "pending message (not transfer)",
+			input:    []byte("GOV:PENDING:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: false,
+		},
+		{
+			name:     "binary data",
+			input:    []byte{0x01, 0x02, 0x03, 0x04},
+			expected: false,
+		},
+		{
+			name:     "empty input",
+			input:    []byte{},
+			expected: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			result := IsTransfer(tt.input)
+			assert.Equal(t, tt.expected, result)
+		})
+	}
+}
+
+// TestIsTransferV3 tests the isOldTransfer function for the legacy transfer format.
+// The V3 suffix matches the "GOV:XFER3:" prefix used by the legacy transfer implementation.
+func TestIsTransferV3(t *testing.T) {
+	tests := []struct {
+		name     string
+		input    []byte
+		expected bool
+	}{
+		{
+			name:     "old transfer message",
+			input:    []byte("GOV:XFER3:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: true,
+		},
+		{
+			name:     "new transfer message",
+			input:    []byte("GOV:XFER4:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: false,
+		},
+		{
+			name:     "old transfer prefix only",
+			input:    []byte("GOV:XFER3:"),
+			expected: false,
+		},
+		{
+			name:     "old transfer with single digit",
+			input:    []byte("GOV:XFER3:1"),
+			expected: false,
+		},
+		{
+			name:     "old transfer with a msgID that is too small",
+			input:    []byte("GOV:XFER3:1/1/1"),
+			expected: false,
+		},
+		{
+			name:     "old transfer with missing sequence",
+			input:    []byte("GOV:XFER3:" + "1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"),
+			expected: false,
+		},
+		{
+			name:     "old transfer with sequence 0",
+			input:    []byte("GOV:XFER3:" + "1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"),
+			expected: true,
+		},
+		{
+			name:     "binary data",
+			input:    []byte{0x01, 0x02, 0x03, 0x04},
+			expected: false,
+		},
+		{
+			name:     "empty input",
+			input:    []byte{},
+			expected: false,
+		},
+	}
 
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			result := isOldTransfer(tt.input)
+			assert.Equal(t, tt.expected, result)
+		})
+	}
 }
 
-func TestIsPendingMsg(t *testing.T) {
-	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1/1/1")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
-	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING4:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, IsPendingMsg([]byte{0x01, 0x02, 0x03, 0x04}))
-	assert.Equal(t, false, IsPendingMsg([]byte{}))
-	assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+// TestIsPendingMsgV5 tests the IsPendingMsg function for the current pending message format.
+// The V5 suffix matches the "GOV:PENDING5:" prefix used by the current pending message implementation.
+func TestIsPendingMsgV5(t *testing.T) {
+	tests := []struct {
+		name     string
+		input    []byte
+		expected bool
+	}{
+		{
+			name:     "valid pending message",
+			input:    []byte("GOV:PENDING5:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: true,
+		},
+		{
+			name:     "transfer message (not pending)",
+			input:    []byte("GOV:XFER4:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: false,
+		},
+		{
+			name:     "pending prefix only",
+			input:    []byte("GOV:PENDING5:"),
+			expected: false,
+		},
+		{
+			name:     "pending with single digit",
+			input:    []byte("GOV:PENDING5:" + "1"),
+			expected: false,
+		},
+		{
+			name:     "pending with a msgID that is too small",
+			input:    []byte("GOV:PENDING5:" + "1/1/1"),
+			expected: false,
+		},
+		{
+			name:     "pending with missing sequence",
+			input:    []byte("GOV:PENDING5:" + "1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"),
+			expected: false,
+		},
+		{
+			name:     "valid pending with sequence 0",
+			input:    []byte("GOV:PENDING5:" + "1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"),
+			expected: true,
+		},
+		{
+			name:     "old pending version",
+			input:    []byte("GOV:PENDING4:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: false,
+		},
+		{
+			name:     "binary data",
+			input:    []byte{0x01, 0x02, 0x03, 0x04},
+			expected: false,
+		},
+		{
+			name:     "empty input",
+			input:    []byte{},
+			expected: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			result := IsPendingMsg(tt.input)
+			assert.Equal(t, tt.expected, result)
+		})
+	}
+}
+
+// TestIsPendingMsgV4 tests the isOldPendingMsg function for the legacy pending message format.
+// The V4 suffix matches the "GOV:PENDING4:" prefix used by the legacy pending message implementation.
+func TestIsPendingMsgV4(t *testing.T) {
+	tests := []struct {
+		name     string
+		input    []byte
+		expected bool
+	}{
+		{
+			name:     "old pending message",
+			input:    []byte("GOV:PENDING4:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: true,
+		},
+		{
+			name:     "new pending message",
+			input:    []byte("GOV:PENDING5:" + "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+			expected: false,
+		},
+		{
+			name:     "old pending prefix only",
+			input:    []byte("GOV:PENDING4:"),
+			expected: false,
+		},
+		{
+			name:     "old pending with single digit",
+			input:    []byte("GOV:PENDING4:" + "1"),
+			expected: false,
+		},
+		{
+			name:     "old pending with a msgID that is too small",
+			input:    []byte("GOV:PENDING4:" + "1/1/1"),
+			expected: false,
+		},
+		{
+			name:     "old pending with missing sequence",
+			input:    []byte("GOV:PENDING4:" + "1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"),
+			expected: false,
+		},
+		{
+			name:     "old pending with sequence 0",
+			input:    []byte("GOV:PENDING4:" + "1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"),
+			expected: true,
+		},
+		{
+			name:     "binary data",
+			input:    []byte{0x01, 0x02, 0x03, 0x04},
+			expected: false,
+		},
+		{
+			name:     "empty input",
+			input:    []byte{},
+			expected: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			result := isOldPendingMsg(tt.input)
+			assert.Equal(t, tt.expected, result)
+		})
+	}
 }
 
 func TestGetChainGovernorData(t *testing.T) {
@@ -294,6 +549,8 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {
 		ConsistencyLevel: 16,
 		IsReobservation:  true,
 	}
+	vStateErr := msg.SetVerificationState(common.Valid)
+	require.NoError(t, vStateErr)
 
 	pending1 := &PendingTransfer{
 		ReleaseTime: time.Unix(int64(1654516425+72*60*60), 0),
@@ -301,18 +558,18 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {
 	}
 
 	pending1Bytes, err := pending1.Marshal()
-	require.NoError(t, err)
+	require.NoError(t, err, fmt.Sprintf("Failed to marshal pending transfer: %v", err))
 
 	pending2, err := UnmarshalPendingTransfer(pending1Bytes, false)
-	require.NoError(t, err)
+	require.NoError(t, err, fmt.Sprintf("Failed to unmarshal pending transfer: %v", err))
 
 	assert.Equal(t, pending1, pending2)
 
-	expectedPendingKey := "GOV:PENDING4:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
+	expectedPendingKey := "GOV:PENDING5:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
 	assert.Equal(t, expectedPendingKey, string(PendingMsgID(&pending2.Msg)))
 }
 
-func TestStoreAndReloadTransfers(t *testing.T) {
+func TestStoreAndReloadTransfersAndPendingMessages(t *testing.T) {
 	dbPath := t.TempDir()
 	db := OpenDb(zap.NewNop(), &dbPath)
 	defer db.Close()
@@ -563,397 +820,3 @@ func TestUnmarshalPendingTransferFailures(t *testing.T) {
 	_, err = UnmarshalPendingTransfer(pending1Bytes[0:len(pending1Bytes)-10], false)
 	assert.ErrorContains(t, err, "failed to unmarshal pending transfer msg")
 }
-
-func (d *Database) storeOldPendingMsg(t *testing.T, p *PendingTransfer) {
-	buf := new(bytes.Buffer)
-
-	vaa.MustWrite(buf, binary.BigEndian, uint32(p.ReleaseTime.Unix())) // #nosec G115 -- This conversion is safe until year 2106
-
-	b := marshalOldMessagePublication(&p.Msg)
-
-	vaa.MustWrite(buf, binary.BigEndian, b)
-
-	err := d.db.Update(func(txn *badger.Txn) error {
-		if err := txn.Set(oldPendingMsgID(&p.Msg), buf.Bytes()); err != nil {
-			return err
-		}
-		return nil
-	})
-
-	require.NoError(t, err)
-}
-
-func marshalOldMessagePublication(msg *common.MessagePublication) []byte {
-	buf := new(bytes.Buffer)
-
-	buf.Write(msg.TxID[:])
-	vaa.MustWrite(buf, binary.BigEndian, uint32(msg.Timestamp.Unix())) // #nosec G115 -- This conversion is safe until year 2106
-	vaa.MustWrite(buf, binary.BigEndian, msg.Nonce)
-	vaa.MustWrite(buf, binary.BigEndian, msg.Sequence)
-	vaa.MustWrite(buf, binary.BigEndian, msg.ConsistencyLevel)
-	vaa.MustWrite(buf, binary.BigEndian, msg.EmitterChain)
-	buf.Write(msg.EmitterAddress[:])
-	vaa.MustWrite(buf, binary.BigEndian, msg.IsReobservation)
-	buf.Write(msg.Payload)
-
-	return buf.Bytes()
-}
-
-func TestLoadingOldPendingTransfers(t *testing.T) {
-	dbPath := t.TempDir()
-	db := OpenDb(zap.NewNop(), &dbPath)
-	defer db.Close()
-	defer os.Remove(dbPath)
-
-	ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
-	require.NoError(t, err)
-
-	bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
-	require.NoError(t, err)
-
-	tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
-	require.NoError(t, err)
-
-	oldXfer1 := &Transfer{
-		Timestamp:      time.Unix(int64(1654516425), 0),
-		Value:          125000,
-		OriginChain:    vaa.ChainIDEthereum,
-		OriginAddress:  tokenAddr,
-		EmitterChain:   vaa.ChainIDEthereum,
-		EmitterAddress: ethereumTokenBridgeAddr,
-		// Don't set TargetChain or TargetAddress.
-		MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
-		Hash:  "Hash1",
-	}
-
-	err = db.storeOldTransfer(oldXfer1)
-	require.NoError(t, err)
-
-	newXfer1 := &Transfer{
-		Timestamp:      time.Unix(int64(1654516426), 0),
-		Value:          125000,
-		OriginChain:    vaa.ChainIDEthereum,
-		OriginAddress:  tokenAddr,
-		EmitterChain:   vaa.ChainIDEthereum,
-		EmitterAddress: ethereumTokenBridgeAddr,
-		TargetChain:    vaa.ChainIDBSC,
-		TargetAddress:  bscTokenBridgeAddr,
-		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
-		Hash:           "Hash1",
-	}
-
-	err = db.StoreTransfer(newXfer1)
-	require.NoError(t, err)
-
-	oldXfer2 := &Transfer{
-		Timestamp:      time.Unix(int64(1654516427), 0),
-		Value:          125000,
-		OriginChain:    vaa.ChainIDEthereum,
-		OriginAddress:  tokenAddr,
-		EmitterChain:   vaa.ChainIDEthereum,
-		EmitterAddress: ethereumTokenBridgeAddr,
-		// Don't set TargetChain or TargetAddress.
-		MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131417",
-		Hash:  "Hash2",
-	}
-
-	err = db.storeOldTransfer(oldXfer2)
-	require.NoError(t, err)
-
-	newXfer2 := &Transfer{
-		Timestamp:      time.Unix(int64(1654516428), 0),
-		Value:          125000,
-		OriginChain:    vaa.ChainIDEthereum,
-		OriginAddress:  tokenAddr,
-		EmitterChain:   vaa.ChainIDEthereum,
-		EmitterAddress: ethereumTokenBridgeAddr,
-		TargetChain:    vaa.ChainIDBSC,
-		TargetAddress:  bscTokenBridgeAddr,
-		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131418",
-		Hash:           "Hash2",
-	}
-
-	err = db.StoreTransfer(newXfer2)
-	require.NoError(t, err)
-
-	// Write the first pending event in the old format.
-	now := time.Unix(time.Now().Unix(), 0)
-	pending1 := &PendingTransfer{
-		ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.,
-		Msg: common.MessagePublication{
-			TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
-			Timestamp:        now,
-			Nonce:            123456,
-			Sequence:         789101112131417,
-			EmitterChain:     vaa.ChainIDEthereum,
-			EmitterAddress:   ethereumTokenBridgeAddr,
-			Payload:          []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
-			ConsistencyLevel: 16,
-			// IsReobservation will not be serialized. It should be set to false on reload.
-		},
-	}
-
-	db.storeOldPendingMsg(t, pending1)
-	require.NoError(t, err)
-
-	// Write the second one in the new format.
-	now = now.Add(time.Second * 5)
-	pending2 := &PendingTransfer{
-		ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.
-		Msg: common.MessagePublication{
-			TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
-			Timestamp:        now,
-			Nonce:            123456,
-			Sequence:         789101112131418,
-			EmitterChain:     vaa.ChainIDEthereum,
-			EmitterAddress:   ethereumTokenBridgeAddr,
-			Payload:          []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
-			ConsistencyLevel: 16,
-			IsReobservation:  true,
-		},
-	}
-
-	err = db.StorePendingMsg(pending2)
-	require.NoError(t, err)
-
-	// Write the third pending event in the old format.
-	now = now.Add(time.Second * 5)
-	pending3 := &PendingTransfer{
-		ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.,
-		Msg: common.MessagePublication{
-			TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(),
-			Timestamp:        now,
-			Nonce:            123456,
-			Sequence:         789101112131419,
-			EmitterChain:     vaa.ChainIDEthereum,
-			EmitterAddress:   ethereumTokenBridgeAddr,
-			Payload:          []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
-			ConsistencyLevel: 16,
-			// IsReobservation will not be serialized. It should be set to false on reload.
-		},
-	}
-
-	db.storeOldPendingMsg(t, pending3)
-	require.NoError(t, err)
-
-	logger, zapObserver := setupLogsCapture(t)
-
-	xfers, pendings, err := db.GetChainGovernorDataForTime(logger, now)
-
-	require.NoError(t, err)
-	require.Equal(t, 4, len(xfers))
-	require.Equal(t, 3, len(pendings))
-
-	// Verify that we converted the two old pending transfers and the two old completed transfers.
-	loggedEntries := zapObserver.FilterMessage("updating format of database entry for pending vaa").All()
-	require.Equal(t, 2, len(loggedEntries))
-	loggedEntries = zapObserver.FilterMessage("updating format of database entry for completed transfer").All()
-	require.Equal(t, 2, len(loggedEntries))
-
-	sort.SliceStable(xfers, func(i, j int) bool {
-		return xfers[i].Timestamp.Before(xfers[j].Timestamp)
-	})
-
-	assert.Equal(t, oldXfer1, xfers[0])
-	assert.Equal(t, newXfer1, xfers[1])
-	assert.Equal(t, oldXfer2, xfers[2])
-	assert.Equal(t, newXfer2, xfers[3])
-
-	// Updated old pending events get placed at the end, so we need to sort into timestamp order.
-	sort.SliceStable(pendings, func(i, j int) bool {
-		return pendings[i].Msg.Timestamp.Before(pendings[j].Msg.Timestamp)
-	})
-
-	assert.Equal(t, pending1.Msg, pendings[0].Msg)
-	assert.Equal(t, pending2.Msg, pendings[1].Msg)
-	assert.Equal(t, pending3.Msg, pendings[2].Msg)
-
-	// Make sure we can reload the updated pendings.
-
-	logger, zapObserver = setupLogsCapture(t)
-
-	xfers2, pendings2, err := db.GetChainGovernorDataForTime(logger, now)
-
-	require.NoError(t, err)
-	require.Equal(t, 4, len(xfers2))
-	require.Equal(t, 3, len(pendings2))
-
-	// This time we shouldn't have updated anything.
-	loggedEntries = zapObserver.FilterMessage("updating format of database entry for pending vaa").All()
-	require.Equal(t, 0, len(loggedEntries))
-	loggedEntries = zapObserver.FilterMessage("updating format of database entry for completed transfer").All()
-	require.Equal(t, 0, len(loggedEntries))
-
-	sort.SliceStable(xfers2, func(i, j int) bool {
-		return xfers2[i].Timestamp.Before(xfers2[j].Timestamp)
-	})
-
-	assert.Equal(t, oldXfer1, xfers2[0])
-	assert.Equal(t, newXfer1, xfers2[1])
-	assert.Equal(t, oldXfer2, xfers2[2])
-	assert.Equal(t, newXfer2, xfers2[3])
-
-	assert.Equal(t, pending1.Msg, pendings2[0].Msg)
-	assert.Equal(t, pending2.Msg, pendings2[1].Msg)
-}
-
-func marshalOldTransfer(xfer *Transfer) ([]byte, error) {
-	buf := new(bytes.Buffer)
-
-	vaa.MustWrite(buf, binary.BigEndian, uint32(xfer.Timestamp.Unix())) // #nosec G115 -- This conversion is safe until year 2106
-	vaa.MustWrite(buf, binary.BigEndian, xfer.Value)
-	vaa.MustWrite(buf, binary.BigEndian, xfer.OriginChain)
-	buf.Write(xfer.OriginAddress[:])
-	vaa.MustWrite(buf, binary.BigEndian, xfer.EmitterChain)
-	buf.Write(xfer.EmitterAddress[:])
-	if len(xfer.MsgID) > math.MaxUint16 {
-		return nil, fmt.Errorf("failed to marshal MsgID, length too long: %d", len(xfer.MsgID))
-	}
-	vaa.MustWrite(buf, binary.BigEndian, uint16(len(xfer.MsgID))) // #nosec G115 -- This conversion is checked above
-	if len(xfer.MsgID) > 0 {
-		buf.Write([]byte(xfer.MsgID))
-	}
-	if len(xfer.Hash) > math.MaxUint16 {
-		return nil, fmt.Errorf("failed to marshal Hash, length too long: %d", len(xfer.Hash))
-	}
-	vaa.MustWrite(buf, binary.BigEndian, uint16(len(xfer.Hash))) // #nosec G115 -- This conversion is checked above
-	if len(xfer.Hash) > 0 {
-		buf.Write([]byte(xfer.Hash))
-	}
-	return buf.Bytes(), nil
-}
-
-func (d *Database) storeOldTransfer(xfer *Transfer) error {
-	key := []byte(fmt.Sprintf("%v%v", oldTransfer, xfer.MsgID))
-	b, err := marshalOldTransfer(xfer)
-
-	if err != nil {
-		return err
-	}
-
-	return d.db.Update(func(txn *badger.Txn) error {
-		if err := txn.Set(key, b); err != nil {
-			return err
-		}
-		return nil
-	})
-}
-
-func TestDeserializeOfOldTransfer(t *testing.T) {
-	tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
-	require.NoError(t, err)
-
-	ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
-	require.NoError(t, err)
-
-	xfer1 := &Transfer{
-		Timestamp:      time.Unix(int64(1654516425), 0),
-		Value:          125000,
-		OriginChain:    vaa.ChainIDEthereum,
-		OriginAddress:  tokenAddr,
-		EmitterChain:   vaa.ChainIDEthereum,
-		EmitterAddress: ethereumTokenBridgeAddr,
-		// Don't set TargetChain or TargetAddress.
-		MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
-		Hash:  "Hash1",
-	}
-
-	xfer1Bytes, err := marshalOldTransfer(xfer1)
-	require.NoError(t, err)
-
-	xfer2, err := unmarshalOldTransfer(xfer1Bytes)
-	require.NoError(t, err)
-
-	assert.Equal(t, xfer1, xfer2)
-
-	expectedTransferKey := "GOV:XFER3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
-	assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2)))
-}
-
-func TestOldTransfersUpdatedWhenReloading(t *testing.T) {
-	dbPath := t.TempDir()
-	db := OpenDb(zap.NewNop(), &dbPath)
-	defer db.Close()
-	defer os.Remove(dbPath)
-
-	ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
-	require.NoError(t, err)
-
-	bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
-	require.NoError(t, err)
-
-	tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
-	require.NoError(t, err)
-
-	// Write the first transfer in the old format.
-	xfer1 := &Transfer{
-		Timestamp:      time.Unix(int64(1654516425), 0),
-		Value:          125000,
-		OriginChain:    vaa.ChainIDEthereum,
-		OriginAddress:  tokenAddr,
-		EmitterChain:   vaa.ChainIDEthereum,
-		EmitterAddress: ethereumTokenBridgeAddr,
-		// Don't set TargetChain or TargetAddress.
-		MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
-		// Do not set the Hash.
-	}
-
-	err = db.storeOldTransfer(xfer1)
-	require.NoError(t, err)
-
-	// Write the second one in the new format.
-	xfer2 := &Transfer{
-		Timestamp:      time.Unix(int64(1654516430), 0),
-		Value:          125000,
-		OriginChain:    vaa.ChainIDEthereum,
-		OriginAddress:  tokenAddr,
-		EmitterChain:   vaa.ChainIDEthereum,
-		EmitterAddress: ethereumTokenBridgeAddr,
-		TargetChain:    vaa.ChainIDBSC,
-		TargetAddress:  bscTokenBridgeAddr,
-		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
-		Hash:           "Hash2",
-	}
-
-	err = db.StoreTransfer(xfer2)
-	require.NoError(t, err)
-
-	now := time.Unix(time.Now().Unix(), 0)
-
-	logger := zap.NewNop()
-	xfers, pendings, err := db.GetChainGovernorDataForTime(logger, now)
-
-	require.NoError(t, err)
-	require.Equal(t, 2, len(xfers))
-	require.Equal(t, 0, len(pendings))
-
-	// Updated old pending events get placed at the end, so we need to sort into timestamp order.
-	sort.SliceStable(xfers, func(i, j int) bool {
-		return xfers[i].Timestamp.Before(xfers[j].Timestamp)
-	})
-
-	assert.Equal(t, xfer1, xfers[0])
-	assert.Equal(t, xfer2, xfers[1])
-
-	// Make sure the old transfer got dropped from the database and rewritten in the new format.
-	assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(oldTransferMsgID(xfer1)))
-	assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer1)))
-
-	// And make sure the other transfer is still there.
-	assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer2)))
-
-	// Make sure we can still read the database after the conversion.
-	xfers, pendings, err = db.GetChainGovernorDataForTime(logger, now)
-
-	require.NoError(t, err)
-	require.Equal(t, 2, len(xfers))
-	require.Equal(t, 0, len(pendings))
-
-	// Updated old pending events get placed at the end, so we need to sort into timestamp order.
-	sort.SliceStable(xfers, func(i, j int) bool {
-		return xfers[i].Timestamp.Before(xfers[j].Timestamp)
-	})
-
-	assert.Equal(t, xfer1, xfers[0])
-	assert.Equal(t, xfer2, xfers[1])
-}

+ 52 - 0
node/pkg/governor/README.md

@@ -0,0 +1,52 @@
+# Governor
+
+## Performing a database upgrade
+
+A database upgrade is required whenever the serialized format of a `MessagePublication` changes.
+This occurs if a field is added, removed, or changed or if the marshaling format changes.
+
+The Governor has to handle this in a special way because it must be able to read the old format and write the new format.
+This is required so that the Governor can continue to monitor transfers and messages that have already been recorded.
+It only needs to support the old format for the duration of its sliding window as the old transfers and messages will
+automatically be dropped after the duration of the window.
+
+### Example upgrade
+
+Commit `1ed88d1` performs a modification of the `MessagePublication` struct and upgrades both the Governor and the Accountant.
+
+### Upgrade Process
+
+When upgrading the database format, follow these steps:
+
+1. **Update prefixes in `node/pkg/db/governor.go`**:
+   - Move current prefixes to `old*Prefix` constants (e.g., `transferPrefix` → `oldTransferPrefix`)
+   - Increment version number in new prefixes (e.g., `GOV:XFER4:` → `GOV:XFER5:`)
+   - Update corresponding length constants
+
+2. **Update database functions**:
+   - Ensure `Is*` functions use new prefixes for current format detection
+   - Ensure `isOld*` functions use old prefixes for legacy format detection
+   - Update `*MsgID` functions to use new prefixes
+
+3. **Update unit tests in `node/pkg/db/governor_test.go`**:
+   - Update existing tests to use new prefixes
+   - Create separate test functions for each version using naming pattern `TestIs*V[N]`
+   - Add function comments explaining version suffix and prefix mapping
+   - Example: `TestIsTransferV4` tests `GOV:XFER4:`, `TestIsTransferV3` tests `GOV:XFER3:`
+
+4. **Test coverage**:
+   - Ensure both current and legacy formats are tested
+   - Include round-trip tests to verify serialization compatibility:
+      - Save and load to database
+      - Marshal and unmarshal 
+
+#### Database APIs
+
+The actual database CRUD calls should be kept version-agnostic to avoid an explosion in the number of versioned
+methods and the accompanying maintenance burden.
+
+### Live Migration
+
+When the Governor restarts and loads its data from the key-value store, it marks which transfers and/or messages
+are in the old format and saves them in the new format right away. It also deletes the records using the old format.
+This minimizes the time needed to support the two different versions.

+ 8 - 31
node/pkg/governor/governor_db.go

@@ -3,6 +3,7 @@
 package governor
 
 import (
+	"fmt"
 	"sort"
 	"time"
 
@@ -21,6 +22,7 @@ func (gov *ChainGovernor) loadFromDB() error {
 // loadFromDBAlreadyLocked method loads transfers and pending data from the database and modifies the corresponding fields in the ChainGovernor.
 // These fields are slices of transfers or pendingTransfers and will be sorted by their Timestamp property.
 // Modifies the state of the database as a side-effect: 'transfers' that are older than 24 hours are deleted.
+// It assumes that the Governor's mutex is already locked.
 func (gov *ChainGovernor) loadFromDBAlreadyLocked() error {
 	xfers, pending, err := gov.db.GetChainGovernorData(gov.logger)
 	if err != nil {
@@ -65,45 +67,20 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
 	msg := &pending.Msg
 	ce, exists := gov.chains[msg.EmitterChain]
 	if !exists {
-		gov.logger.Error("reloaded pending transfer for unsupported chain, dropping it",
-			zap.String("MsgID", msg.MessageIDString()),
-			zap.String("txID", msg.TxIDString()),
-			zap.Stringer("Timestamp", msg.Timestamp),
-			zap.Uint32("Nonce", msg.Nonce),
-			zap.Uint64("Sequence", msg.Sequence),
-			zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
-			zap.Stringer("EmitterChain", msg.EmitterChain),
-			zap.Stringer("EmitterAddress", msg.EmitterAddress),
-		)
+		gov.logger.Error("reloaded pending transfer for unsupported chain, dropping it", msg.ZapFields()...)
 		return
 	}
 
 	if msg.EmitterAddress != ce.emitterAddr {
-		gov.logger.Error("reloaded pending transfer for unsupported emitter address, dropping it",
-			zap.String("MsgID", msg.MessageIDString()),
-			zap.String("txID", msg.TxIDString()),
-			zap.Stringer("Timestamp", msg.Timestamp),
-			zap.Uint32("Nonce", msg.Nonce),
-			zap.Uint64("Sequence", msg.Sequence),
-			zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
-			zap.Stringer("EmitterChain", msg.EmitterChain),
-			zap.Stringer("EmitterAddress", msg.EmitterAddress),
-		)
+		gov.logger.Error("reloaded pending transfer for unsupported emitter address, dropping it", msg.ZapFields()...)
 		return
 	}
 
 	payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload)
 	if err != nil {
-		gov.logger.Error("failed to parse payload for reloaded pending transfer, dropping it",
-			zap.String("MsgID", msg.MessageIDString()),
-			zap.String("txID", msg.TxIDString()),
-			zap.Stringer("Timestamp", msg.Timestamp),
-			zap.Uint32("Nonce", msg.Nonce),
-			zap.Uint64("Sequence", msg.Sequence),
-			zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
-			zap.Stringer("EmitterChain", msg.EmitterChain),
-			zap.Stringer("EmitterAddress", msg.EmitterAddress),
-			zap.Error(err),
+		gov.logger.Error(
+			fmt.Sprintf("failed to parse payload for reloaded pending transfer, dropping it %v", zap.Error(err)),
+			msg.ZapFields()...,
 		)
 		return
 	}
@@ -163,7 +140,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
 	gov.msgsSeen[hash] = transferEnqueued
 }
 
-// reloadTransfer method processes a db.Transfer and validates that it should be loaded into `gov`.
+// reloadTransfer method processes a [db.Transfer] and validates that it should be loaded into `gov`.
 // Modifies `gov` as a side-effect: when a valid transfer is loaded, the properties 'transfers' and 'msgsSeen' are
 // updated with information about the loaded transfer. In the case where a flow-canceling asset's transfer is loaded,
 // both chain entries (emitter and target) will be updated.