瀏覽代碼

Node: governor not handling duplicates properly (#1772)

* Node: governor not handling duplicates properly

* Minor tweaks

* Logging changes to help debugging
bruce-riley 3 年之前
父節點
當前提交
e1a6a1f85a

+ 15 - 0
node/pkg/common/chainlock.go

@@ -105,3 +105,18 @@ func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) {
 
 	return msg, nil
 }
+
+func (msg *MessagePublication) CreateVAA(gsIndex uint32) *vaa.VAA {
+	return &vaa.VAA{
+		Version:          vaa.SupportedVAAVersion,
+		GuardianSetIndex: gsIndex,
+		Signatures:       nil,
+		Timestamp:        msg.Timestamp,
+		Nonce:            msg.Nonce,
+		EmitterChain:     msg.EmitterChain,
+		EmitterAddress:   msg.EmitterAddress,
+		Payload:          msg.Payload,
+		Sequence:         msg.Sequence,
+		ConsistencyLevel: msg.ConsistencyLevel,
+	}
+}

+ 127 - 8
node/pkg/db/governor.go

@@ -55,6 +55,7 @@ type Transfer struct {
 	EmitterChain   vaa.ChainID
 	EmitterAddress vaa.Address
 	MsgID          string
+	Hash           string
 }
 
 func (t *Transfer) Marshal() ([]byte, error) {
@@ -66,7 +67,14 @@ func (t *Transfer) Marshal() ([]byte, error) {
 	buf.Write(t.OriginAddress[:])
 	vaa.MustWrite(buf, binary.BigEndian, t.EmitterChain)
 	buf.Write(t.EmitterAddress[:])
-	buf.Write([]byte(t.MsgID))
+	vaa.MustWrite(buf, binary.BigEndian, uint16(len(t.MsgID)))
+	if len(t.MsgID) > 0 {
+		buf.Write([]byte(t.MsgID))
+	}
+	vaa.MustWrite(buf, binary.BigEndian, uint16(len(t.Hash)))
+	if len(t.Hash) > 0 {
+		buf.Write([]byte(t.Hash))
+	}
 	return buf.Bytes(), nil
 }
 
@@ -105,6 +113,72 @@ func UnmarshalTransfer(data []byte) (*Transfer, error) {
 	}
 	t.EmitterAddress = emitterAddress
 
+	msgIdLen := uint16(0)
+	if err := binary.Read(reader, binary.BigEndian, &msgIdLen); err != nil {
+		return nil, fmt.Errorf("failed to read msgID length: %w", err)
+	}
+
+	if msgIdLen > 0 {
+		msgID := make([]byte, msgIdLen)
+		n, err := reader.Read(msgID)
+		if err != nil || n == 0 {
+			return nil, fmt.Errorf("failed to read vaa id [%d]: %w", n, err)
+		}
+		t.MsgID = string(msgID[:n])
+	}
+
+	hashLen := uint16(0)
+	if err := binary.Read(reader, binary.BigEndian, &hashLen); err != nil {
+		return nil, fmt.Errorf("failed to read hash length: %w", err)
+	}
+
+	if hashLen > 0 {
+		hash := make([]byte, hashLen)
+		n, err := reader.Read(hash)
+		if err != nil || n == 0 {
+			return nil, fmt.Errorf("failed to read hash [%d]: %w", n, err)
+		}
+		t.Hash = string(hash[:n])
+	}
+
+	return t, nil
+}
+
+func unmarshalOldTransfer(data []byte) (*Transfer, error) {
+	t := &Transfer{}
+
+	reader := bytes.NewReader(data[:])
+
+	unixSeconds := uint32(0)
+	if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil {
+		return nil, fmt.Errorf("failed to read timestamp: %w", err)
+	}
+	t.Timestamp = time.Unix(int64(unixSeconds), 0)
+
+	if err := binary.Read(reader, binary.BigEndian, &t.Value); err != nil {
+		return nil, fmt.Errorf("failed to read value: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &t.OriginChain); err != nil {
+		return nil, fmt.Errorf("failed to read token chain id: %w", err)
+	}
+
+	originAddress := vaa.Address{}
+	if n, err := reader.Read(originAddress[:]); err != nil || n != 32 {
+		return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
+	}
+	t.OriginAddress = originAddress
+
+	if err := binary.Read(reader, binary.BigEndian, &t.EmitterChain); err != nil {
+		return nil, fmt.Errorf("failed to read token chain id: %w", err)
+	}
+
+	emitterAddress := vaa.Address{}
+	if n, err := reader.Read(emitterAddress[:]); err != nil || n != 32 {
+		return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
+	}
+	t.EmitterAddress = emitterAddress
+
 	msgID := make([]byte, 256)
 	n, err := reader.Read(msgID)
 	if err != nil || n == 0 {
@@ -162,7 +236,10 @@ func UnmarshalPendingTransfer(data []byte) (*PendingTransfer, error) {
 	return p, nil
 }
 
-const transfer = "GOV:XFER:"
+const oldTransfer = "GOV:XFER:"
+const oldTransferLen = len(oldTransfer)
+
+const transfer = "GOV:XFER2:"
 const transferLen = len(transfer)
 
 // Since we are changing the DB format of pending entries, we will use a new tag in the pending key field.
@@ -181,6 +258,10 @@ func TransferMsgID(t *Transfer) []byte {
 	return []byte(fmt.Sprintf("%v%v", transfer, t.MsgID))
 }
 
+func oldTransferMsgID(t *Transfer) []byte {
+	return []byte(fmt.Sprintf("%v%v", oldTransfer, t.MsgID))
+}
+
 func PendingMsgID(k *common.MessagePublication) []byte {
 	return []byte(fmt.Sprintf("%v%v", pending, k.MessageIDString()))
 }
@@ -193,6 +274,10 @@ func IsTransfer(keyBytes []byte) bool {
 	return (len(keyBytes) >= transferLen+minMsgIdLen) && (string(keyBytes[0:transferLen]) == transfer)
 }
 
+func isOldTransfer(keyBytes []byte) bool {
+	return (len(keyBytes) >= oldTransferLen+minMsgIdLen) && (string(keyBytes[0:oldTransferLen]) == oldTransfer)
+}
+
 func IsPendingMsg(keyBytes []byte) bool {
 	return (len(keyBytes) >= pendingLen+minMsgIdLen) && (string(keyBytes[0:pendingLen]) == pending)
 }
@@ -207,6 +292,7 @@ func (d *Database) GetChainGovernorData(logger *zap.Logger) (transfers []*Transf
 }
 
 func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time) (transfers []*Transfer, pending []*PendingTransfer, err error) {
+	oldTransfers := []*Transfer{}
 	oldPendingToUpdate := []*PendingTransfer{}
 	err = d.db.View(func(txn *badger.Txn) error {
 		opts := badger.DefaultIteratorOptions
@@ -252,6 +338,14 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
 				p := &PendingTransfer{ReleaseTime: now.Add(maxEnqueuedTime), Msg: *msg}
 				pending = append(pending, p)
 				oldPendingToUpdate = append(oldPendingToUpdate, p)
+			} else if isOldTransfer(key) {
+				v, err := unmarshalOldTransfer(val)
+				if err != nil {
+					return err
+				}
+
+				transfers = append(transfers, v)
+				oldTransfers = append(oldTransfers, v)
 			}
 		}
 
@@ -264,12 +358,33 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
 				}
 
 				key := oldPendingMsgID(&pending.Msg)
-				err = d.db.DropPrefix(key)
-				if err != nil {
+				if err := d.db.Update(func(txn *badger.Txn) error {
+					err := txn.Delete(key)
+					return err
+				}); err != nil {
 					return fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pending.Msg.MessageIDString(), err)
 				}
 			}
 		}
+
+		if len(oldTransfers) != 0 {
+			for _, xfer := range oldTransfers {
+				logger.Info("cgov: updating format of database entry for completed transfer", zap.String("msgId", xfer.MsgID))
+				err := d.StoreTransfer(xfer)
+				if err != nil {
+					return fmt.Errorf("failed to write new completed transfer for key [%v]: %w", xfer.MsgID, err)
+				}
+
+				key := oldTransferMsgID(xfer)
+				if err := d.db.Update(func(txn *badger.Txn) error {
+					err := txn.Delete(key)
+					return err
+				}); err != nil {
+					return fmt.Errorf("failed to delete old completed transfer for key [%v]: %w", xfer.MsgID, err)
+				}
+			}
+		}
+
 		return nil
 	})
 
@@ -315,8 +430,10 @@ func (d *Database) StorePendingMsg(pending *PendingTransfer) error {
 // This is called by the chain governor to delete a transfer after the time limit has expired.
 func (d *Database) DeleteTransfer(t *Transfer) error {
 	key := TransferMsgID(t)
-	err := d.db.DropPrefix(key)
-	if err != nil {
+	if err := d.db.Update(func(txn *badger.Txn) error {
+		err := txn.Delete(key)
+		return err
+	}); err != nil {
 		return fmt.Errorf("failed to delete transfer msg for key [%v]: %w", key, err)
 	}
 
@@ -326,8 +443,10 @@ func (d *Database) DeleteTransfer(t *Transfer) error {
 // This is called by the chain governor to delete a pending transfer.
 func (d *Database) DeletePendingMsg(pending *PendingTransfer) error {
 	key := PendingMsgID(&pending.Msg)
-	err := d.db.DropPrefix(key)
-	if err != nil {
+	if err := d.db.Update(func(txn *badger.Txn) error {
+		err := txn.Delete(key)
+		return err
+	}); err != nil {
 		return fmt.Errorf("failed to delete pending msg for key [%v]: %w", key, err)
 	}
 

+ 182 - 9
node/pkg/db/governor_test.go

@@ -1,6 +1,9 @@
 package db
 
 import (
+	"bytes"
+	"encoding/binary"
+	"fmt"
 	"os"
 	"sort"
 	"testing"
@@ -15,6 +18,13 @@ import (
 	"go.uber.org/zap"
 )
 
+func (d *Database) rowExistsInDB(key []byte) error {
+	return d.db.View(func(txn *badger.Txn) error {
+		_, err := txn.Get(key)
+		return err
+	})
+}
+
 func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
 	tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
 	require.NoError(t, err)
@@ -30,6 +40,7 @@ func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
 		EmitterChain:   vaa.ChainIDEthereum,
 		EmitterAddress: tokenBridgeAddr,
 		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
+		Hash:           "Hash1",
 	}
 
 	bytes, err := xfer1.Marshal()
@@ -40,7 +51,7 @@ func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
 
 	assert.Equal(t, xfer1, xfer2)
 
-	expectedTransferKey := "GOV:XFER:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
+	expectedTransferKey := "GOV:XFER2:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
 	assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2)))
 }
 
@@ -77,26 +88,30 @@ func TestTransferMsgID(t *testing.T) {
 		EmitterChain:   vaa.ChainIDEthereum,
 		EmitterAddress: tokenBridgeAddr,
 		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
+		Hash:           "Hash1",
 	}
 
-	assert.Equal(t, []byte("GOV:XFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), TransferMsgID(xfer))
+	assert.Equal(t, []byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), TransferMsgID(xfer))
 }
 
 func TestIsTransfer(t *testing.T) {
-	assert.Equal(t, true, IsTransfer([]byte("GOV:XFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER:")))
-	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER:1")))
-	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER:1/1/1")))
-	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
-	assert.Equal(t, true, IsTransfer([]byte("GOV:XFER:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
+	assert.Equal(t, true, IsTransfer([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER2:")))
+	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER2:1")))
+	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER2:1/1/1")))
+	assert.Equal(t, false, IsTransfer([]byte("GOV:XFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
+	assert.Equal(t, true, IsTransfer([]byte("GOV:XFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
 	assert.Equal(t, false, IsTransfer([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 	assert.Equal(t, false, IsTransfer([]byte{0x01, 0x02, 0x03, 0x04}))
 	assert.Equal(t, false, IsTransfer([]byte{}))
+	assert.Equal(t, true, isOldTransfer([]byte("GOV:XFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, isOldTransfer([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+
 }
 
 func TestIsPendingMsg(t *testing.T) {
 	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:")))
 	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"1")))
 	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"1/1/1")))
@@ -148,6 +163,7 @@ func TestStoreTransfer(t *testing.T) {
 		EmitterChain:   vaa.ChainIDEthereum,
 		EmitterAddress: tokenBridgeAddr,
 		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
+		Hash:           "Hash1",
 	}
 
 	err2 := db.StoreTransfer(xfer1)
@@ -176,13 +192,20 @@ func TestDeleteTransfer(t *testing.T) {
 		EmitterChain:   vaa.ChainIDEthereum,
 		EmitterAddress: tokenBridgeAddr,
 		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
+		Hash:           "Hash1",
 	}
 
 	err2 := db.StoreTransfer(xfer1)
 	require.NoError(t, err2)
 
+	// Make sure the xfer exists in the db.
+	assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer1)))
+
 	err3 := db.DeleteTransfer(xfer1)
 	require.NoError(t, err3)
+
+	// Make sure the xfer is no longer in the db.
+	assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(TransferMsgID(xfer1)))
 }
 
 func TestStorePendingMsg(t *testing.T) {
@@ -240,8 +263,14 @@ func TestDeletePendingMsg(t *testing.T) {
 	err3 := db.StorePendingMsg(pending)
 	require.NoError(t, err3)
 
+	// Make sure the pending transfer exists in the db.
+	assert.NoError(t, db.rowExistsInDB(PendingMsgID(msg)))
+
 	err4 := db.DeletePendingMsg(pending)
 	assert.Nil(t, err4)
+
+	// Make sure the pending transfer is no longer in the db.
+	assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(PendingMsgID(msg)))
 }
 
 func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {
@@ -299,6 +328,7 @@ func TestStoreAndReloadTransfers(t *testing.T) {
 		EmitterChain:   vaa.ChainIDEthereum,
 		EmitterAddress: tokenBridgeAddr,
 		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
+		Hash:           "Hash1",
 	}
 
 	err = db.StoreTransfer(xfer1)
@@ -312,6 +342,7 @@ func TestStoreAndReloadTransfers(t *testing.T) {
 		EmitterChain:   vaa.ChainIDEthereum,
 		EmitterAddress: tokenBridgeAddr,
 		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
+		Hash:           "Hash2",
 	}
 
 	err = db.StoreTransfer(xfer2)
@@ -399,6 +430,7 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 		EmitterChain:   vaa.ChainIDEthereum,
 		EmitterAddress: tokenBridgeAddr,
 		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
+		Hash:           "Hash1",
 	}
 
 	err = db.StoreTransfer(xfer1)
@@ -412,6 +444,7 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 		EmitterChain:   vaa.ChainIDEthereum,
 		EmitterAddress: tokenBridgeAddr,
 		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
+		Hash:           "Hash2",
 	}
 
 	err = db.StoreTransfer(xfer2)
@@ -487,3 +520,143 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 	assert.Equal(t, pending1.Msg, pendings2[0].Msg)
 	assert.Equal(t, pending2.Msg, pendings2[1].Msg)
 }
+
+func marshalOldTransfer(xfer *Transfer) []byte {
+	buf := new(bytes.Buffer)
+	vaa.MustWrite(buf, binary.BigEndian, uint32(xfer.Timestamp.Unix()))
+	vaa.MustWrite(buf, binary.BigEndian, xfer.Value)
+	vaa.MustWrite(buf, binary.BigEndian, xfer.OriginChain)
+	buf.Write(xfer.OriginAddress[:])
+	vaa.MustWrite(buf, binary.BigEndian, xfer.EmitterChain)
+	buf.Write(xfer.EmitterAddress[:])
+	buf.Write([]byte(xfer.MsgID))
+	return buf.Bytes()
+}
+
+func (d *Database) storeOldTransfer(t *testing.T, xfer *Transfer) error {
+	key := []byte(fmt.Sprintf("%v%v", oldTransfer, xfer.MsgID))
+	b := marshalOldTransfer(xfer)
+
+	err := d.db.Update(func(txn *badger.Txn) error {
+		if err := txn.Set(key, b); err != nil {
+			return err
+		}
+		return nil
+	})
+	require.NoError(t, err)
+	return nil
+}
+
+func TestDeserializeOfOldTransfer(t *testing.T) {
+	tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	tokenBridgeAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
+	require.NoError(t, err)
+
+	xfer1 := &Transfer{
+		Timestamp:      time.Unix(int64(1654516425), 0),
+		Value:          125000,
+		OriginChain:    vaa.ChainIDEthereum,
+		OriginAddress:  tokenAddr,
+		EmitterChain:   vaa.ChainIDEthereum,
+		EmitterAddress: tokenBridgeAddr,
+		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
+		// Do not set the Hash.
+	}
+
+	bytes := marshalOldTransfer(xfer1)
+
+	xfer2, err := unmarshalOldTransfer(bytes)
+	require.NoError(t, err)
+
+	assert.Equal(t, xfer1, xfer2)
+
+	expectedTransferKey := "GOV:XFER2:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
+	assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2)))
+}
+
+func TestOldTransfersUpdatedWhenReloading(t *testing.T) {
+	dbPath := t.TempDir()
+	db, err := Open(dbPath)
+	if err != nil {
+		t.Error("failed to open database")
+	}
+	defer db.Close()
+	defer os.Remove(dbPath)
+
+	tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
+	require.NoError(t, err)
+
+	tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	// Write the first transfer in the old format.
+	xfer1 := &Transfer{
+		Timestamp:      time.Unix(int64(1654516425), 0),
+		Value:          125000,
+		OriginChain:    vaa.ChainIDEthereum,
+		OriginAddress:  tokenAddr,
+		EmitterChain:   vaa.ChainIDEthereum,
+		EmitterAddress: tokenBridgeAddr,
+		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
+		// Do not set the Hash.
+	}
+
+	err = db.storeOldTransfer(t, xfer1)
+	require.NoError(t, err)
+
+	// Write the second one in the new format.
+	xfer2 := &Transfer{
+		Timestamp:      time.Unix(int64(1654516430), 0),
+		Value:          125000,
+		OriginChain:    vaa.ChainIDEthereum,
+		OriginAddress:  tokenAddr,
+		EmitterChain:   vaa.ChainIDEthereum,
+		EmitterAddress: tokenBridgeAddr,
+		MsgID:          "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
+		Hash:           "Hash2",
+	}
+
+	err = db.StoreTransfer(xfer2)
+	require.NoError(t, err)
+
+	now := time.Unix(time.Now().Unix(), 0)
+
+	logger := zap.NewNop()
+	xfers, pendings, err := db.GetChainGovernorDataForTime(logger, now)
+
+	require.NoError(t, err)
+	require.Equal(t, 2, len(xfers))
+	require.Equal(t, 0, len(pendings))
+
+	// Updated old pending events get placed at the end, so we need to sort into timestamp order.
+	sort.SliceStable(xfers, func(i, j int) bool {
+		return xfers[i].Timestamp.Before(xfers[j].Timestamp)
+	})
+
+	assert.Equal(t, xfer1, xfers[0])
+	assert.Equal(t, xfer2, xfers[1])
+
+	// Make sure the old transfer got dropped from the database and rewritten in the new format.
+	assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(oldTransferMsgID(xfer1)))
+	assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer1)))
+
+	// And make sure the other transfer is still there.
+	assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer2)))
+
+	// Make sure we can still read the database after the conversion.
+	xfers, pendings, err = db.GetChainGovernorDataForTime(logger, now)
+
+	require.NoError(t, err)
+	require.Equal(t, 2, len(xfers))
+	require.Equal(t, 0, len(pendings))
+
+	// Updated old pending events get placed at the end, so we need to sort into timestamp order.
+	sort.SliceStable(xfers, func(i, j int) bool {
+		return xfers[i].Timestamp.Before(xfers[j].Timestamp)
+	})
+
+	assert.Equal(t, xfer1, xfers[0])
+	assert.Equal(t, xfer2, xfers[1])
+}

+ 109 - 23
node/pkg/governor/governor.go

@@ -27,6 +27,7 @@ package governor
 
 import (
 	"context"
+	"encoding/hex"
 	"fmt"
 	"math"
 	"math/big"
@@ -46,6 +47,9 @@ const (
 	TestNetMode = 2
 	DevNetMode  = 3
 	GoTestMode  = 4
+
+	transferComplete = true
+	transferEnqueued = false
 )
 
 // WARNING: Change me in ./node/db as well
@@ -91,6 +95,7 @@ type (
 	pendingEntry struct {
 		token  *tokenEntry // Store a reference to the token so we can get the current price to compute the value each interval.
 		amount *big.Int
+		hash   string
 		dbData db.PendingTransfer // This info gets persisted in the DB.
 	}
 
@@ -118,6 +123,7 @@ type ChainGovernor struct {
 	tokens                map[tokenKey]*tokenEntry
 	tokensByCoinGeckoId   map[string][]*tokenEntry
 	chains                map[vaa.ChainID]*chainEntry
+	msgsSeen              map[string]bool // Key is hash, payload is consts transferComplete and transferEnqueued.
 	msgsToPublish         []*common.MessagePublication
 	dayLengthInMinutes    int
 	coinGeckoQuery        string
@@ -139,6 +145,7 @@ func NewChainGovernor(
 		tokens:              make(map[tokenKey]*tokenEntry),
 		tokensByCoinGeckoId: make(map[string][]*tokenEntry),
 		chains:              make(map[vaa.ChainID]*chainEntry),
+		msgsSeen:            make(map[string]bool),
 		env:                 env,
 	}
 }
@@ -324,22 +331,58 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		return true, nil
 	}
 
+	hash := gov.HashFromMsg(msg)
+	xferComplete, alreadySeen := gov.msgsSeen[hash]
+	if alreadySeen {
+		if !xferComplete {
+			gov.logger.Info("cgov: ignoring duplicate vaa because it is enqueued",
+				zap.String("msgID", msg.MessageIDString()),
+				zap.String("hash", hash),
+				zap.Stringer("txHash", msg.TxHash),
+			)
+			return false, nil
+		}
+
+		gov.logger.Info("cgov: allowing duplicate vaa to be published again, but not adding it to the notional value",
+			zap.String("msgID", msg.MessageIDString()),
+			zap.String("hash", hash),
+			zap.Stringer("txHash", msg.TxHash),
+		)
+		return true, nil
+	}
+
 	startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
-	prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db)
+	prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
 	if err != nil {
-		gov.logger.Error("cgov: failed to trim transfers", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
+		gov.logger.Error("cgov: failed to trim transfers",
+			zap.String("msgID", msg.MessageIDString()),
+			zap.String("hash", hash),
+			zap.Stringer("txHash", msg.TxHash),
+			zap.Error(err),
+		)
 		return false, err
 	}
 
 	value, err := computeValue(payload.Amount, token)
 	if err != nil {
-		gov.logger.Error("cgov: failed to compute value of transfer", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
+		gov.logger.Error("cgov: failed to compute value of transfer",
+			zap.String("msgID", msg.MessageIDString()),
+			zap.String("hash", hash),
+			zap.Stringer("txHash", msg.TxHash),
+			zap.Error(err),
+		)
 		return false, err
 	}
 
 	newTotalValue := prevTotalValue + value
 	if newTotalValue < prevTotalValue {
-		gov.logger.Error("cgov: total value has overflowed", zap.String("msgID", msg.MessageIDString()), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue))
+		gov.logger.Error("cgov: total value has overflowed",
+			zap.String("msgID", msg.MessageIDString()),
+			zap.String("hash", hash),
+			zap.Stringer("txHash", msg.TxHash),
+			zap.Uint64("prevTotalValue", prevTotalValue),
+			zap.Uint64("newTotalValue", newTotalValue),
+		)
 		return false, fmt.Errorf("total value has overflowed")
 	}
 
@@ -355,6 +398,8 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 			zap.String("msgID", msg.MessageIDString()),
 			zap.Stringer("releaseTime", releaseTime),
 			zap.Uint64("bigTransactionSize", ce.bigTransactionSize),
+			zap.String("hash", hash),
+			zap.Stringer("txHash", msg.TxHash),
 		)
 	} else if newTotalValue > ce.dailyLimit {
 		enqueueIt = true
@@ -365,18 +410,26 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 			zap.Uint64("newTotalValue", newTotalValue),
 			zap.Stringer("releaseTime", releaseTime),
 			zap.String("msgID", msg.MessageIDString()),
+			zap.String("hash", hash),
+			zap.Stringer("txHash", msg.TxHash),
 		)
 	}
 
 	if enqueueIt {
 		dbData := db.PendingTransfer{ReleaseTime: releaseTime, Msg: *msg}
-		ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: dbData})
 		err = gov.db.StorePendingMsg(&dbData)
 		if err != nil {
-			gov.logger.Error("cgov: failed to store pending vaa", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
+			gov.logger.Error("cgov: failed to store pending vaa",
+				zap.String("msgID", msg.MessageIDString()),
+				zap.String("hash", hash),
+				zap.Stringer("txHash", msg.TxHash),
+				zap.Error(err),
+			)
 			return false, err
 		}
 
+		ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, hash: hash, dbData: dbData})
+		gov.msgsSeen[hash] = transferEnqueued
 		return false, nil
 	}
 
@@ -384,16 +437,32 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		zap.Uint64("value", value),
 		zap.Uint64("prevTotalValue", prevTotalValue),
 		zap.Uint64("newTotalValue", newTotalValue),
-		zap.String("msgID", msg.MessageIDString()))
-
-	xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: msg.EmitterChain, EmitterAddress: msg.EmitterAddress, MsgID: msg.MessageIDString()}
-	ce.transfers = append(ce.transfers, &xfer)
+		zap.String("msgID", msg.MessageIDString()),
+		zap.String("hash", hash),
+		zap.Stringer("txHash", msg.TxHash),
+	)
+
+	xfer := db.Transfer{Timestamp: now,
+		Value:          value,
+		OriginChain:    token.token.chain,
+		OriginAddress:  token.token.addr,
+		EmitterChain:   msg.EmitterChain,
+		EmitterAddress: msg.EmitterAddress,
+		MsgID:          msg.MessageIDString(),
+		Hash:           hash,
+	}
 	err = gov.db.StoreTransfer(&xfer)
 	if err != nil {
-		gov.logger.Error("cgov: failed to store transfer", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
+		gov.logger.Error("cgov: failed to store transfer",
+			zap.String("msgID", msg.MessageIDString()),
+			zap.String("hash", hash), zap.Error(err),
+			zap.Stringer("txHash", msg.TxHash),
+		)
 		return false, err
 	}
 
+	ce.transfers = append(ce.transfers, &xfer)
+	gov.msgsSeen[hash] = transferComplete
 	return true, nil
 }
 
@@ -419,7 +488,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
 		// Keep going as long as we find something that will fit.
 		for {
 			foundOne := false
-			prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db)
+			prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
 			if err != nil {
 				gov.logger.Error("cgov: failed to trim transfers", zap.Error(err))
 				gov.msgsToPublish = msgsToPublish
@@ -487,14 +556,25 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
 				msgsToPublish = append(msgsToPublish, &pe.dbData.Msg)
 
 				if countsTowardsTransfers {
-					xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: pe.token.token.chain, OriginAddress: pe.token.token.addr,
-						EmitterChain: pe.dbData.Msg.EmitterChain, EmitterAddress: pe.dbData.Msg.EmitterAddress, MsgID: pe.dbData.Msg.MessageIDString()}
-					ce.transfers = append(ce.transfers, &xfer)
+					xfer := db.Transfer{Timestamp: now,
+						Value:          value,
+						OriginChain:    pe.token.token.chain,
+						OriginAddress:  pe.token.token.addr,
+						EmitterChain:   pe.dbData.Msg.EmitterChain,
+						EmitterAddress: pe.dbData.Msg.EmitterAddress,
+						MsgID:          pe.dbData.Msg.MessageIDString(),
+						Hash:           pe.hash,
+					}
 
 					if err := gov.db.StoreTransfer(&xfer); err != nil {
 						gov.msgsToPublish = msgsToPublish
 						return nil, err
 					}
+
+					ce.transfers = append(ce.transfers, &xfer)
+					gov.msgsSeen[pe.hash] = transferComplete
+				} else {
+					delete(gov.msgsSeen, pe.hash)
 				}
 
 				if err := gov.db.DeletePendingMsg(&pe.dbData); err != nil {
@@ -535,12 +615,12 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) {
 	return value, nil
 }
 
-func (ce *chainEntry) TrimAndSumValue(startTime time.Time, db db.GovernorDB) (sum uint64, err error) {
-	sum, ce.transfers, err = TrimAndSumValue(ce.transfers, startTime, db)
+func (gov *ChainGovernor) TrimAndSumValueForChain(ce *chainEntry, startTime time.Time) (sum uint64, err error) {
+	sum, ce.transfers, err = gov.TrimAndSumValue(ce.transfers, startTime)
 	return sum, err
 }
 
-func TrimAndSumValue(transfers []*db.Transfer, startTime time.Time, db db.GovernorDB) (uint64, []*db.Transfer, error) {
+func (gov *ChainGovernor) TrimAndSumValue(transfers []*db.Transfer, startTime time.Time) (uint64, []*db.Transfer, error) {
 	if len(transfers) == 0 {
 		return 0, transfers, nil
 	}
@@ -557,12 +637,12 @@ func TrimAndSumValue(transfers []*db.Transfer, startTime time.Time, db db.Govern
 	}
 
 	if trimIdx >= 0 {
-		if db != nil {
-			for idx := 0; idx <= trimIdx; idx++ {
-				if err := db.DeleteTransfer(transfers[idx]); err != nil {
-					return 0, transfers, err
-				}
+		for idx := 0; idx <= trimIdx; idx++ {
+			if err := gov.db.DeleteTransfer(transfers[idx]); err != nil {
+				return 0, transfers, err
 			}
+
+			delete(gov.msgsSeen, transfers[idx].Hash)
 		}
 
 		transfers = transfers[trimIdx+1:]
@@ -574,3 +654,9 @@ func TrimAndSumValue(transfers []*db.Transfer, startTime time.Time, db db.Govern
 func (tk tokenKey) String() string {
 	return tk.chain.String() + ":" + tk.addr.String()
 }
+
+func (gov *ChainGovernor) HashFromMsg(msg *common.MessagePublication) string {
+	v := msg.CreateVAA(0) // We can pass zero in as the guardian set index because it is not part of the digest.
+	digest := v.SigningMsg()
+	return hex.EncodeToString(digest.Bytes())
+}

+ 53 - 8
node/pkg/governor/governor_db.go

@@ -123,6 +123,24 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
 		return
 	}
 
+	hash := gov.HashFromMsg(msg)
+
+	if _, alreadyExists := gov.msgsSeen[hash]; alreadyExists {
+		gov.logger.Error("cgov: not reloading pending transfer because it is a duplicate",
+			zap.String("MsgID", msg.MessageIDString()),
+			zap.Stringer("TxHash", msg.TxHash),
+			zap.Stringer("Timestamp", msg.Timestamp),
+			zap.Uint32("Nonce", msg.Nonce),
+			zap.Uint64("Sequence", msg.Sequence),
+			zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
+			zap.Stringer("EmitterChain", msg.EmitterChain),
+			zap.Stringer("EmitterAddress", msg.EmitterAddress),
+			zap.Stringer("Amount", payload.Amount),
+			zap.String("Hash", hash),
+		)
+		return
+	}
+
 	gov.logger.Info("cgov: reloaded pending transfer",
 		zap.String("MsgID", msg.MessageIDString()),
 		zap.Stringer("TxHash", msg.TxHash),
@@ -133,9 +151,11 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
 		zap.Stringer("EmitterChain", msg.EmitterChain),
 		zap.Stringer("EmitterAddress", msg.EmitterAddress),
 		zap.Stringer("Amount", payload.Amount),
+		zap.String("Hash", hash),
 	)
 
-	ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: *pending})
+	ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, hash: hash, dbData: *pending})
+	gov.msgsSeen[hash] = transferEnqueued
 }
 
 func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, startTime time.Time) {
@@ -175,13 +195,38 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, start
 		return
 	}
 
-	gov.logger.Info("cgov: reloaded transfer",
-		zap.Stringer("Timestamp", xfer.Timestamp),
-		zap.Uint64("Value", xfer.Value),
-		zap.Stringer("OriginChain", xfer.OriginChain),
-		zap.Stringer("OriginAddress", xfer.OriginAddress),
-		zap.String("MsgID", xfer.MsgID),
-	)
+	if _, alreadyExists := gov.msgsSeen[xfer.Hash]; alreadyExists {
+		gov.logger.Info("cgov: not reloading transfer because it is a duplicate",
+			zap.Stringer("Timestamp", xfer.Timestamp),
+			zap.Uint64("Value", xfer.Value),
+			zap.Stringer("OriginChain", xfer.OriginChain),
+			zap.Stringer("OriginAddress", xfer.OriginAddress),
+			zap.String("MsgID", xfer.MsgID),
+			zap.String("Hash", xfer.Hash),
+		)
+		return
+	}
+
+	if xfer.Hash != "" {
+		gov.logger.Info("cgov: reloaded transfer",
+			zap.Stringer("Timestamp", xfer.Timestamp),
+			zap.Uint64("Value", xfer.Value),
+			zap.Stringer("OriginChain", xfer.OriginChain),
+			zap.Stringer("OriginAddress", xfer.OriginAddress),
+			zap.String("MsgID", xfer.MsgID),
+			zap.String("Hash", xfer.Hash),
+		)
+
+		gov.msgsSeen[xfer.Hash] = transferComplete
+	} else {
+		gov.logger.Error("cgov: reloaded transfer that does not have a hash, will not be able to detect a duplicate",
+			zap.Stringer("Timestamp", xfer.Timestamp),
+			zap.Uint64("Value", xfer.Value),
+			zap.Stringer("OriginChain", xfer.OriginChain),
+			zap.Stringer("OriginAddress", xfer.OriginAddress),
+			zap.String("MsgID", xfer.MsgID),
+		)
+	}
 
 	ce.transfers = append(ce.transfers, xfer)
 }

+ 424 - 15
node/pkg/governor/governor_test.go

@@ -111,17 +111,27 @@ func (gov *ChainGovernor) getStatsForAllChains() (numTrans int, valueTrans uint6
 }
 
 func TestTrimEmptyTransfers(t *testing.T) {
+	ctx := context.Background()
+	gov, err := newChainGovernorForTest(ctx)
+	require.NoError(t, err)
+	assert.NotNil(t, gov)
+
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
 	var transfers []*db.Transfer
-	sum, updatedTransfers, err := TrimAndSumValue(transfers, now, nil)
+	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now)
 	require.NoError(t, err)
 	assert.Equal(t, uint64(0), sum)
 	assert.Equal(t, 0, len(updatedTransfers))
 }
 
 func TestSumAllFromToday(t *testing.T) {
+	ctx := context.Background()
+	gov, err := newChainGovernorForTest(ctx)
+	require.NoError(t, err)
+	assert.NotNil(t, gov)
+
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
@@ -129,13 +139,18 @@ func TestSumAllFromToday(t *testing.T) {
 	transferTime, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 11:00am (CST)")
 	require.NoError(t, err)
 	transfers = append(transfers, &db.Transfer{Value: 125000, Timestamp: transferTime})
-	sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil)
+	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24))
 	require.NoError(t, err)
 	assert.Equal(t, uint64(125000), sum)
 	assert.Equal(t, 1, len(updatedTransfers))
 }
 
 func TestTrimOneOfTwoTransfers(t *testing.T) {
+	ctx := context.Background()
+	gov, err := newChainGovernorForTest(ctx)
+	require.NoError(t, err)
+	assert.NotNil(t, gov)
+
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
@@ -152,13 +167,18 @@ func TestTrimOneOfTwoTransfers(t *testing.T) {
 	transfers = append(transfers, &db.Transfer{Value: 225000, Timestamp: transferTime2})
 	assert.Equal(t, 2, len(transfers))
 
-	sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil)
+	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24))
 	require.NoError(t, err)
 	assert.Equal(t, 1, len(updatedTransfers))
 	assert.Equal(t, uint64(225000), sum)
 }
 
 func TestTrimSeveralTransfers(t *testing.T) {
+	ctx := context.Background()
+	gov, err := newChainGovernorForTest(ctx)
+	require.NoError(t, err)
+	assert.NotNil(t, gov)
+
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
@@ -188,13 +208,18 @@ func TestTrimSeveralTransfers(t *testing.T) {
 
 	assert.Equal(t, 5, len(transfers))
 
-	sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil)
+	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24))
 	require.NoError(t, err)
 	assert.Equal(t, 3, len(updatedTransfers))
 	assert.Equal(t, uint64(465000), sum)
 }
 
 func TestTrimmingAllTransfersShouldReturnZero(t *testing.T) {
+	ctx := context.Background()
+	gov, err := newChainGovernorForTest(ctx)
+	require.NoError(t, err)
+	assert.NotNil(t, gov)
+
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
@@ -209,7 +234,7 @@ func TestTrimmingAllTransfersShouldReturnZero(t *testing.T) {
 	transfers = append(transfers, &db.Transfer{Value: 225000, Timestamp: transferTime2})
 	assert.Equal(t, 2, len(transfers))
 
-	sum, updatedTransfers, err := TrimAndSumValue(transfers, now, nil)
+	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now)
 	require.NoError(t, err)
 	assert.Equal(t, 0, len(updatedTransfers))
 	assert.Equal(t, uint64(0), sum)
@@ -324,6 +349,7 @@ func TestVaaForUninterestingEmitterAddress(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 0, len(gov.msgsSeen))
 }
 
 func TestVaaForUninterestingPayloadType(t *testing.T) {
@@ -356,6 +382,7 @@ func TestVaaForUninterestingPayloadType(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 0, len(gov.msgsSeen))
 }
 
 // Note this method assumes 18 decimals for the amount.
@@ -459,6 +486,7 @@ func TestVaaForUninterestingToken(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 0, len(gov.msgsSeen))
 }
 
 func TestTransfersUpToAndOverTheLimit(t *testing.T) {
@@ -489,7 +517,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	)
 
 	// The first two transfers should be accepted.
-	msg := common.MessagePublication{
+	msg1 := common.MessagePublication{
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
@@ -500,7 +528,18 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 		Payload:          payloadBytes1,
 	}
 
-	canPost, err := gov.ProcessMsgForTime(&msg, time.Now())
+	msg2 := common.MessagePublication{
+		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		Timestamp:        time.Unix(int64(1654543099), 0),
+		Nonce:            uint32(1),
+		Sequence:         uint64(2),
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddr,
+		ConsistencyLevel: uint8(32),
+		Payload:          payloadBytes1,
+	}
+
+	canPost, err := gov.ProcessMsgForTime(&msg1, time.Now())
 	require.NoError(t, err)
 
 	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
@@ -509,8 +548,9 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	assert.Equal(t, uint64(2218), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
 
-	canPost, err = gov.ProcessMsgForTime(&msg, time.Now())
+	canPost, err = gov.ProcessMsgForTime(&msg2, time.Now())
 	require.NoError(t, err)
 
 	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
@@ -519,6 +559,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	assert.Equal(t, uint64(4436), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 2, len(gov.msgsSeen))
 
 	// But the third one should be queued up.
 	payloadBytes2 := buildMockTransferPayloadBytes(1,
@@ -529,9 +570,18 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 		1250,
 	)
 
-	msg.Payload = payloadBytes2
+	msg3 := common.MessagePublication{
+		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		Timestamp:        time.Unix(int64(1654543099), 0),
+		Nonce:            uint32(1),
+		Sequence:         uint64(3),
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddr,
+		ConsistencyLevel: uint8(32),
+		Payload:          payloadBytes2,
+	}
 
-	canPost, err = gov.ProcessMsgForTime(&msg, time.Now())
+	canPost, err = gov.ProcessMsgForTime(&msg3, time.Now())
 	require.NoError(t, err)
 
 	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
@@ -540,10 +590,21 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	assert.Equal(t, uint64(4436), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(2218274), valuePending)
+	assert.Equal(t, 3, len(gov.msgsSeen))
 
 	// But a small one should still go through.
-	msg.Payload = payloadBytes1
-	canPost, err = gov.ProcessMsgForTime(&msg, time.Now())
+	msg4 := common.MessagePublication{
+		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		Timestamp:        time.Unix(int64(1654543099), 0),
+		Nonce:            uint32(1),
+		Sequence:         uint64(4),
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddr,
+		ConsistencyLevel: uint8(32),
+		Payload:          payloadBytes1,
+	}
+
+	canPost, err = gov.ProcessMsgForTime(&msg4, time.Now())
 	require.NoError(t, err)
 
 	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
@@ -552,6 +613,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	assert.Equal(t, uint64(4436+2218), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(2218274), valuePending)
+	assert.Equal(t, 4, len(gov.msgsSeen))
 }
 
 func TestPendingTransferBeingReleased(t *testing.T) {
@@ -603,6 +665,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
 
 	// And so should the second.
 	payloadBytes2 := buildMockTransferPayloadBytes(1,
@@ -634,6 +697,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 2, len(gov.msgsSeen))
 
 	// But the third one should be queued up.
 	payloadBytes3 := buildMockTransferPayloadBytes(1,
@@ -665,6 +729,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(496893), valuePending)
+	assert.Equal(t, 3, len(gov.msgsSeen))
 
 	// And so should the fourth one.
 	payloadBytes4 := buildMockTransferPayloadBytes(1,
@@ -696,6 +761,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 2, numPending)
 	assert.Equal(t, uint64(496893+532385), valuePending)
+	assert.Equal(t, 4, len(gov.msgsSeen))
 
 	// If we check pending before noon, nothing should happen.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)")
@@ -708,6 +774,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 2, numPending)
 	assert.Equal(t, uint64(496893+532385), valuePending)
+	assert.Equal(t, 4, len(gov.msgsSeen))
 
 	// But at 3pm, the first one should drop off and the first queued one should get posted.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 3:00pm (CST)")
@@ -721,6 +788,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(488020+496893), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(532385), valuePending)
+	assert.Equal(t, 3, len(gov.msgsSeen))
 }
 
 func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
@@ -770,6 +838,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
 
 	// And so should the second.
 	msg2 := common.MessagePublication{
@@ -799,6 +868,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 2, len(gov.msgsSeen))
 
 	// But the third, big one should be queued up.
 	msg3 := common.MessagePublication{
@@ -828,6 +898,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(887309), valuePending)
+	assert.Equal(t, 3, len(gov.msgsSeen))
 
 	// A fourth, smaller, but still too big one, should get enqueued.
 	msg4 := common.MessagePublication{
@@ -857,6 +928,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 2, numPending)
 	assert.Equal(t, uint64(887309+177461), valuePending)
+	assert.Equal(t, 4, len(gov.msgsSeen))
 
 	// A fifth, smaller, but still too big one, should also get enqueued.
 	msg5 := common.MessagePublication{
@@ -886,6 +958,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 3, numPending)
 	assert.Equal(t, uint64(887309+177461+179236), valuePending)
+	assert.Equal(t, 5, len(gov.msgsSeen))
 
 	// A sixth, big one should also get enqueued.
 	msg6 := common.MessagePublication{
@@ -915,6 +988,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 4, numPending)
 	assert.Equal(t, uint64(887309+177461+179236+889084), valuePending)
+	assert.Equal(t, 6, len(gov.msgsSeen))
 
 	// If we check pending before noon, nothing should happen.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)")
@@ -927,6 +1001,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 4, numPending)
 	assert.Equal(t, uint64(887309+177461+179236+889084), valuePending)
+	assert.Equal(t, 6, len(gov.msgsSeen))
 
 	// But at 3pm, the first one should drop off. This should result in the second and third, smaller pending ones being posted, but not the two big ones.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 3:00pm (CST)")
@@ -940,6 +1015,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(488020+177461+179236), valueTrans)
 	assert.Equal(t, 2, numPending)
 	assert.Equal(t, uint64(887309+889084), valuePending)
+	assert.Equal(t, 5, len(gov.msgsSeen))
 }
 
 func TestMainnetConfigIsValid(t *testing.T) {
@@ -1009,6 +1085,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
 
 	// And so should the second.
 	msg2 := common.MessagePublication{
@@ -1038,6 +1115,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730+88730), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 2, len(gov.msgsSeen))
 
 	// But the third big one should get enqueued.
 	msg3 := common.MessagePublication{
@@ -1067,6 +1145,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730+88730), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
+	assert.Equal(t, 3, len(gov.msgsSeen))
 
 	// If we check pending before noon, nothing should happen.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)")
@@ -1080,6 +1159,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730+88730), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
+	assert.Equal(t, 3, len(gov.msgsSeen))
 
 	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
 	assert.Equal(t, 2, numTrans)
@@ -1099,6 +1179,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
+	assert.Equal(t, 2, len(gov.msgsSeen))
 
 	// And Just after 6pm, the second one should drop off. The big pending one should still not be affected.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 6:01pm (CST)")
@@ -1125,8 +1206,9 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
 
-	// // But then the operator resets the release time.
+	// But then the operator resets the release time.
 	_, err = gov.resetReleaseTimerForTime(msg3.MessageIDString(), now)
 	require.NoError(t, err)
 
@@ -1142,6 +1224,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
 
 	// But finally, a full 24hrs, it should get released.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 4, 2022 at 1:01am (CST)")
@@ -1155,6 +1238,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 0, len(gov.msgsSeen))
 
 	// But the big transaction should not affect the daily notional.
 	ce, exists := gov.chains[vaa.ChainIDEthereum]
@@ -1216,6 +1300,7 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(88730), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
 
 	// If we check 23hrs later, nothing should happen.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 11:00am (CST)")
@@ -1229,6 +1314,7 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(88730), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
 
 	// But after 24hrs, it should get released.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)")
@@ -1242,6 +1328,7 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 0, len(gov.msgsSeen))
 }
 
 func TestIsBigTransfer(t *testing.T) {
@@ -1290,7 +1377,6 @@ func TestTransferPayloadTooShort(t *testing.T) {
 
 	payloadBytes1 = payloadBytes1[0 : len(payloadBytes1)-1]
 
-	// The first two transfers should be accepted.
 	msg := common.MessagePublication{
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
@@ -1305,8 +1391,331 @@ func TestTransferPayloadTooShort(t *testing.T) {
 	// The low level method should return an error.
 	_, err = gov.ProcessMsgForTime(&msg, time.Now())
 	assert.EqualError(t, err, "buffer too short")
+	assert.Equal(t, 0, len(gov.msgsSeen))
 
-	// The higher level method should false, saying we should not publish.
+	// The higher level method should return false, saying we should not publish.
 	canPost := gov.ProcessMsg(&msg)
 	assert.Equal(t, false, canPost)
+	assert.Equal(t, 0, len(gov.msgsSeen))
+}
+
+func TestDontReloadDuplicates(t *testing.T) {
+	ctx := context.Background()
+	gov, err := newChainGovernorForTest(ctx)
+
+	require.NoError(t, err)
+	assert.NotNil(t, gov)
+
+	emitterAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
+	emitterAddr, err := vaa.StringToAddress(emitterAddrStr)
+	require.NoError(t, err)
+
+	tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
+	tokenAddr, err := vaa.StringToAddress(tokenAddrStr)
+	require.NoError(t, err)
+	toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
+
+	require.NoError(t, err)
+
+	gov.setDayLengthInMinutes(24 * 60)
+	err = gov.setChainForTesting(vaa.ChainIDEthereum, emitterAddrStr, 1000000, 0)
+	require.NoError(t, err)
+	err = gov.setTokenForTesting(vaa.ChainIDEthereum, emitterAddrStr, "WETH", 1774.62)
+	require.NoError(t, err)
+
+	now, _ := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)")
+	startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
+
+	var xfers []*db.Transfer
+
+	xfer1 := &db.Transfer{
+		Timestamp:      startTime.Add(time.Minute * 5),
+		Value:          uint64(1000),
+		OriginChain:    vaa.ChainIDEthereum,
+		OriginAddress:  tokenAddr,
+		EmitterChain:   vaa.ChainIDEthereum,
+		EmitterAddress: emitterAddr,
+		MsgID:          "2/" + emitterAddrStr + "/125",
+		Hash:           "Hash1",
+	}
+	xfers = append(xfers, xfer1)
+
+	xfer2 := &db.Transfer{
+		Timestamp:      startTime.Add(time.Minute * 5),
+		Value:          uint64(2000),
+		OriginChain:    vaa.ChainIDEthereum,
+		OriginAddress:  tokenAddr,
+		EmitterChain:   vaa.ChainIDEthereum,
+		EmitterAddress: emitterAddr,
+		MsgID:          "2/" + emitterAddrStr + "/126",
+		Hash:           "Hash2",
+	}
+	xfers = append(xfers, xfer2)
+
+	// Add a duplicate of each transfer
+	xfers = append(xfers, xfer1)
+	xfers = append(xfers, xfer2)
+	assert.Equal(t, 4, len(xfers))
+
+	payload1 := buildMockTransferPayloadBytes(1,
+		vaa.ChainIDEthereum,
+		tokenAddrStr,
+		vaa.ChainIDPolygon,
+		toAddrStr,
+		1.25,
+	)
+
+	var pendings []*db.PendingTransfer
+	pending1 := &db.PendingTransfer{
+		ReleaseTime: now.Add(time.Hour * 24),
+		Msg: common.MessagePublication{
+			TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+			Timestamp:        time.Unix(int64(1654543099), 0),
+			Nonce:            uint32(1),
+			Sequence:         uint64(200),
+			EmitterChain:     vaa.ChainIDEthereum,
+			EmitterAddress:   emitterAddr,
+			ConsistencyLevel: uint8(32),
+			Payload:          payload1,
+		},
+	}
+	pendings = append(pendings, pending1)
+
+	pending2 := &db.PendingTransfer{
+		ReleaseTime: now.Add(time.Hour * 24),
+		Msg: common.MessagePublication{
+			TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+			Timestamp:        time.Unix(int64(1654543099), 0),
+			Nonce:            uint32(1),
+			Sequence:         uint64(201),
+			EmitterChain:     vaa.ChainIDEthereum,
+			EmitterAddress:   emitterAddr,
+			ConsistencyLevel: uint8(32),
+			Payload:          payload1,
+		},
+	}
+	pendings = append(pendings, pending2)
+
+	// Add a duplicate of each pending transfer
+	pendings = append(pendings, pending1)
+	pendings = append(pendings, pending2)
+	assert.Equal(t, 4, len(pendings))
+
+	for _, p := range xfers {
+		gov.reloadTransfer(p, now, startTime)
+	}
+
+	for _, p := range pendings {
+		gov.reloadPendingTransfer(p, now)
+	}
+
+	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
+	assert.Equal(t, 2, numTrans)
+	assert.Equal(t, uint64(3000), valueTrans)
+	assert.Equal(t, 2, numPending)
+	assert.Equal(t, uint64(4436), valuePending)
+}
+
+func TestReobservationOfPublishedMsg(t *testing.T) {
+	ctx := context.Background()
+	gov, err := newChainGovernorForTest(ctx)
+
+	require.NoError(t, err)
+	assert.NotNil(t, gov)
+
+	tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
+	toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
+	tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
+	tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
+	require.NoError(t, err)
+
+	gov.setDayLengthInMinutes(24 * 60)
+	err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 100000)
+	require.NoError(t, err)
+	err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
+	require.NoError(t, err)
+
+	// The first transfer should be accepted.
+	msg := common.MessagePublication{
+		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		Timestamp:        time.Unix(int64(1654543099), 0),
+		Nonce:            uint32(1),
+		Sequence:         uint64(1),
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddr,
+		ConsistencyLevel: uint8(32),
+		Payload: buildMockTransferPayloadBytes(1,
+			vaa.ChainIDEthereum,
+			tokenAddrStr,
+			vaa.ChainIDPolygon,
+			toAddrStr,
+			50,
+		),
+	}
+
+	now, _ := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:10pm (CST)")
+	canPost, err := gov.ProcessMsgForTime(&msg, now)
+	require.NoError(t, err)
+
+	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
+	assert.Equal(t, true, canPost)
+	assert.Equal(t, 1, numTrans)
+	assert.Equal(t, uint64(88730), valueTrans)
+	assert.Equal(t, 0, numPending)
+	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
+
+	// A reobservation of the same message should get published but should not affect the notional value.
+	canPost, err = gov.ProcessMsgForTime(&msg, now)
+	require.NoError(t, err)
+
+	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
+	assert.Equal(t, true, canPost)
+	assert.Equal(t, 1, numTrans)
+	assert.Equal(t, uint64(88730), valueTrans)
+	assert.Equal(t, 0, numPending)
+	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
+}
+
+func TestReobservationOfEnqueued(t *testing.T) {
+	// The duplicate should not get published and not get enqueued again.
+	ctx := context.Background()
+	gov, err := newChainGovernorForTest(ctx)
+
+	require.NoError(t, err)
+	assert.NotNil(t, gov)
+
+	tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
+	toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
+	tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
+	tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
+	require.NoError(t, err)
+
+	gov.setDayLengthInMinutes(24 * 60)
+	err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 100000)
+	require.NoError(t, err)
+	err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
+	require.NoError(t, err)
+
+	// A big transfer should get enqueued.
+	msg := common.MessagePublication{
+		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		Timestamp:        time.Unix(int64(1654543099), 0),
+		Nonce:            uint32(1),
+		Sequence:         uint64(1),
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddr,
+		ConsistencyLevel: uint8(32),
+		Payload: buildMockTransferPayloadBytes(1,
+			vaa.ChainIDEthereum,
+			tokenAddrStr,
+			vaa.ChainIDPolygon,
+			toAddrStr,
+			5000,
+		),
+	}
+
+	now, _ := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:10pm (CST)")
+	canPost, err := gov.ProcessMsgForTime(&msg, now)
+	require.NoError(t, err)
+
+	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
+	assert.Equal(t, false, canPost)
+	assert.Equal(t, 0, numTrans)
+	assert.Equal(t, uint64(0), valueTrans)
+	assert.Equal(t, 1, numPending)
+	assert.Equal(t, uint64(8_873_099), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
+
+	// A reobservation of the same message should not get published and should not get enqueued again.
+	canPost, err = gov.ProcessMsgForTime(&msg, now)
+	require.NoError(t, err)
+
+	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
+	assert.Equal(t, false, canPost)
+	assert.Equal(t, 0, numTrans)
+	assert.Equal(t, uint64(0), valueTrans)
+	assert.Equal(t, 1, numPending)
+	assert.Equal(t, uint64(8_873_099), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
+}
+
+func TestReusedMsgIdWithDifferentPayloadGetsProcessed(t *testing.T) {
+	ctx := context.Background()
+	gov, err := newChainGovernorForTest(ctx)
+
+	require.NoError(t, err)
+	assert.NotNil(t, gov)
+
+	tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
+	toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
+	tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
+	tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
+	require.NoError(t, err)
+
+	gov.setDayLengthInMinutes(24 * 60)
+	err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 100000)
+	require.NoError(t, err)
+	err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
+	require.NoError(t, err)
+
+	// The first transfer should be accepted.
+	msg1 := common.MessagePublication{
+		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		Timestamp:        time.Unix(int64(1654543099), 0),
+		Nonce:            uint32(1),
+		Sequence:         uint64(1),
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddr,
+		ConsistencyLevel: uint8(32),
+		Payload: buildMockTransferPayloadBytes(1,
+			vaa.ChainIDEthereum,
+			tokenAddrStr,
+			vaa.ChainIDPolygon,
+			toAddrStr,
+			50,
+		),
+	}
+
+	now, _ := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:10pm (CST)")
+	canPost, err := gov.ProcessMsgForTime(&msg1, now)
+	require.NoError(t, err)
+
+	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
+	assert.Equal(t, true, canPost)
+	assert.Equal(t, 1, numTrans)
+	assert.Equal(t, uint64(88730), valueTrans)
+	assert.Equal(t, 0, numPending)
+	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 1, len(gov.msgsSeen))
+
+	// A second message with the same msgId but a different payload should also get published and apply to the notional value.
+	msg2 := common.MessagePublication{
+		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		Timestamp:        time.Unix(int64(1654543099), 0),
+		Nonce:            uint32(1),
+		Sequence:         uint64(1),
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddr,
+		ConsistencyLevel: uint8(32),
+		Payload: buildMockTransferPayloadBytes(1,
+			vaa.ChainIDEthereum,
+			tokenAddrStr,
+			vaa.ChainIDPolygon,
+			toAddrStr,
+			5,
+		),
+	}
+
+	canPost, err = gov.ProcessMsgForTime(&msg2, now)
+	require.NoError(t, err)
+
+	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
+	assert.Equal(t, true, canPost)
+	assert.Equal(t, 2, numTrans)
+	assert.Equal(t, uint64(97603), valueTrans)
+	assert.Equal(t, 0, numPending)
+	assert.Equal(t, uint64(0), valuePending)
+	assert.Equal(t, 2, len(gov.msgsSeen))
 }