Ver código fonte

Revert "node: governor listen to quorum gossip (#1487)" (#1682)

This reverts commit 0dbd0b66280c0e5d407cff2ef0740747e49094fd.
tbjump 3 anos atrás
pai
commit
d5368b32c4

+ 5 - 14
node/cmd/guardiand/node.go

@@ -459,22 +459,13 @@ func runNode(cmd *cobra.Command, args []string) {
 
 	// In devnet mode, we automatically set a number of flags that rely on deterministic keys.
 	if *unsafeDevMode {
-		// When running multiple guardians in tilt, we only see p2p traffic from others if they are also bootstrap hosts.
-		p2pStr := ""
-		for idx := 0; idx < int(*devNumGuardians); idx++ {
-			key, err := peer.IDFromPrivateKey(devnet.DeterministicP2PPrivKeyByIndex(int64(idx)))
-			if err != nil {
-				panic(err)
-			}
-
-			if p2pStr != "" {
-				p2pStr = p2pStr + ","
-			}
-			p2pStr = p2pStr + fmt.Sprintf("/dns4/guardian-%d.guardian/udp/%d/quic/p2p/%s", idx, *p2pPort, key.String())
+		g0key, err := peer.IDFromPrivateKey(devnet.DeterministicP2PPrivKeyByIndex(0))
+		if err != nil {
+			panic(err)
 		}
 
-		*p2pBootstrap = p2pStr
-		logger.Info("running in dev mode", zap.Uint("devNumGuardians", *devNumGuardians), zap.String("p2pBootstrap", *p2pBootstrap))
+		// Use the first guardian node as bootstrap
+		*p2pBootstrap = fmt.Sprintf("/dns4/guardian-0.guardian/udp/%d/quic/p2p/%s", *p2pPort, g0key.String())
 
 		// Deterministic ganache ETH devnet address.
 		*ethContract = devnet.GanacheWormholeContractAddress.Hex()

+ 6 - 13
node/pkg/db/governor.go

@@ -264,10 +264,8 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
 				}
 
 				key := oldPendingMsgID(&pending.Msg)
-				if err := d.db.Update(func(txn *badger.Txn) error {
-					err := txn.Delete(key)
-					return err
-				}); err != nil {
+				err = d.db.DropPrefix(key)
+				if err != nil {
 					return fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pending.Msg.MessageIDString(), err)
 				}
 			}
@@ -317,11 +315,8 @@ func (d *Database) StorePendingMsg(pending *PendingTransfer) error {
 // This is called by the chain governor to delete a transfer after the time limit has expired.
 func (d *Database) DeleteTransfer(t *Transfer) error {
 	key := TransferMsgID(t)
-
-	if err := d.db.Update(func(txn *badger.Txn) error {
-		err := txn.Delete(key)
-		return err
-	}); err != nil {
+	err := d.db.DropPrefix(key)
+	if err != nil {
 		return fmt.Errorf("failed to delete transfer msg for key [%v]: %w", key, err)
 	}
 
@@ -331,10 +326,8 @@ func (d *Database) DeleteTransfer(t *Transfer) error {
 // This is called by the chain governor to delete a pending transfer.
 func (d *Database) DeletePendingMsg(pending *PendingTransfer) error {
 	key := PendingMsgID(&pending.Msg)
-	if err := d.db.Update(func(txn *badger.Txn) error {
-		err := txn.Delete(key)
-		return err
-	}); err != nil {
+	err := d.db.DropPrefix(key)
+	if err != nil {
 		return fmt.Errorf("failed to delete pending msg for key [%v]: %w", key, err)
 	}
 

+ 0 - 19
node/pkg/db/governor_test.go

@@ -15,13 +15,6 @@ import (
 	"go.uber.org/zap"
 )
 
-func (d *Database) rowExistsInDB(key []byte) error {
-	return d.db.View(func(txn *badger.Txn) error {
-		_, err := txn.Get(key)
-		return err
-	})
-}
-
 func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
 	tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
 	require.NoError(t, err)
@@ -188,14 +181,8 @@ func TestDeleteTransfer(t *testing.T) {
 	err2 := db.StoreTransfer(xfer1)
 	require.NoError(t, err2)
 
-	// Make sure the xfer exists in the db.
-	assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer1)))
-
 	err3 := db.DeleteTransfer(xfer1)
 	require.NoError(t, err3)
-
-	// Make sure the xfer is no longer in the db.
-	assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(TransferMsgID(xfer1)))
 }
 
 func TestStorePendingMsg(t *testing.T) {
@@ -253,14 +240,8 @@ func TestDeletePendingMsg(t *testing.T) {
 	err3 := db.StorePendingMsg(pending)
 	require.NoError(t, err3)
 
-	// Make sure the pending transfer exists in the db.
-	assert.NoError(t, db.rowExistsInDB(PendingMsgID(msg)))
-
 	err4 := db.DeletePendingMsg(pending)
 	assert.Nil(t, err4)
-
-	// Make sure the pending transfer is no longer in the db.
-	assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(PendingMsgID(msg)))
 }
 
 func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {

+ 34 - 176
node/pkg/governor/governor.go

@@ -46,9 +46,6 @@ const (
 	TestNetMode = 2
 	DevNetMode  = 3
 	GoTestMode  = 4
-
-	transferComplete = true
-	transferEnqueued = false
 )
 
 // WARNING: Change me in ./node/db as well
@@ -121,7 +118,6 @@ type ChainGovernor struct {
 	tokens                map[tokenKey]*tokenEntry
 	tokensByCoinGeckoId   map[string][]*tokenEntry
 	chains                map[vaa.ChainID]*chainEntry
-	msgsById              map[string]bool // Use consts transferComplete and transferEnqueued.
 	msgsToPublish         []*common.MessagePublication
 	dayLengthInMinutes    int
 	coinGeckoQuery        string
@@ -143,7 +139,6 @@ func NewChainGovernor(
 		tokens:              make(map[tokenKey]*tokenEntry),
 		tokensByCoinGeckoId: make(map[string][]*tokenEntry),
 		chains:              make(map[vaa.ChainID]*chainEntry),
-		msgsById:            make(map[string]bool),
 		env:                 env,
 	}
 }
@@ -292,8 +287,6 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		return false, fmt.Errorf("msg is nil")
 	}
 
-	msgId := msg.MessageIDString()
-
 	gov.mutex.Lock()
 	defer gov.mutex.Unlock()
 
@@ -301,25 +294,25 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 
 	// If we don't care about this chain, the VAA can be published.
 	if !exists {
-		gov.logger.Info("cgov: ignoring vaa because the emitter chain is not configured", zap.String("msgID", msgId))
+		gov.logger.Info("cgov: ignoring vaa because the emitter chain is not configured", zap.String("msgID", msg.MessageIDString()))
 		return true, nil
 	}
 
 	// If we don't care about this emitter, the VAA can be published.
 	if msg.EmitterAddress != ce.emitterAddr {
-		gov.logger.Info("cgov: ignoring vaa because the emitter address is not configured", zap.String("msgID", msgId))
+		gov.logger.Info("cgov: ignoring vaa because the emitter address is not configured", zap.String("msgID", msg.MessageIDString()))
 		return true, nil
 	}
 
 	// We only care about transfers.
 	if !vaa.IsTransfer(msg.Payload) {
-		gov.logger.Info("cgov: ignoring vaa because it is not a transfer", zap.String("msgID", msgId))
+		gov.logger.Info("cgov: ignoring vaa because it is not a transfer", zap.String("msgID", msg.MessageIDString()))
 		return true, nil
 	}
 
 	payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload)
 	if err != nil {
-		gov.logger.Error("cgov: failed to decode vaa", zap.String("msgID", msgId), zap.Error(err))
+		gov.logger.Error("cgov: failed to decode vaa", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
 		return true, err
 	}
 
@@ -327,33 +320,26 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 	tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress}
 	token, exists := gov.tokens[tk]
 	if !exists {
-		gov.logger.Info("cgov: ignoring vaa because the token is not in the list", zap.String("msgID", msgId))
+		gov.logger.Info("cgov: ignoring vaa because the token is not in the list", zap.String("msgID", msg.MessageIDString()))
 		return true, nil
 	}
 
-	// If we've already seen this message via quorum, we can publish it.
-	xferComplete, exists := gov.msgsById[msgId]
-	if exists {
-		gov.logger.Info("cgov: ignoring vaa because it has already been seen", zap.String("msgID", msgId))
-		return xferComplete, nil
-	}
-
 	startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
-	prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
+	prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db)
 	if err != nil {
-		gov.logger.Error("cgov: failed to trim transfers", zap.String("msgID", msgId), zap.Error(err))
+		gov.logger.Error("cgov: failed to trim transfers", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
 		return false, err
 	}
 
 	value, err := computeValue(payload.Amount, token)
 	if err != nil {
-		gov.logger.Error("cgov: failed to compute value of transfer", zap.String("msgID", msgId), zap.Error(err))
+		gov.logger.Error("cgov: failed to compute value of transfer", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
 		return false, err
 	}
 
 	newTotalValue := prevTotalValue + value
 	if newTotalValue < prevTotalValue {
-		gov.logger.Error("cgov: total value has overflowed", zap.String("msgID", msgId), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue))
+		gov.logger.Error("cgov: total value has overflowed", zap.String("msgID", msg.MessageIDString()), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue))
 		return false, fmt.Errorf("total value has overflowed")
 	}
 
@@ -366,7 +352,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 			zap.Uint64("value", value),
 			zap.Uint64("prevTotalValue", prevTotalValue),
 			zap.Uint64("newTotalValue", newTotalValue),
-			zap.String("msgID", msgId),
+			zap.String("msgID", msg.MessageIDString()),
 			zap.Stringer("releaseTime", releaseTime),
 			zap.Uint64("bigTransactionSize", ce.bigTransactionSize),
 		)
@@ -378,17 +364,16 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 			zap.Uint64("prevTotalValue", prevTotalValue),
 			zap.Uint64("newTotalValue", newTotalValue),
 			zap.Stringer("releaseTime", releaseTime),
-			zap.String("msgID", msgId),
+			zap.String("msgID", msg.MessageIDString()),
 		)
 	}
 
 	if enqueueIt {
 		dbData := db.PendingTransfer{ReleaseTime: releaseTime, Msg: *msg}
 		ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: dbData})
-		gov.msgsById[msgId] = transferEnqueued
 		err = gov.db.StorePendingMsg(&dbData)
 		if err != nil {
-			gov.logger.Error("cgov: failed to store pending vaa", zap.String("msgID", msgId), zap.Error(err))
+			gov.logger.Error("cgov: failed to store pending vaa", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
 			return false, err
 		}
 
@@ -399,143 +384,19 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
 		zap.Uint64("value", value),
 		zap.Uint64("prevTotalValue", prevTotalValue),
 		zap.Uint64("newTotalValue", newTotalValue),
-		zap.String("msgID", msgId))
+		zap.String("msgID", msg.MessageIDString()))
 
-	xfer := &db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: msg.EmitterChain, EmitterAddress: msg.EmitterAddress, MsgID: msgId}
-	ce.transfers = append(ce.transfers, xfer)
-	gov.msgsById[msgId] = transferComplete
-	err = gov.db.StoreTransfer(xfer)
+	xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: msg.EmitterChain, EmitterAddress: msg.EmitterAddress, MsgID: msg.MessageIDString()}
+	ce.transfers = append(ce.transfers, &xfer)
+	err = gov.db.StoreTransfer(&xfer)
 	if err != nil {
-		gov.logger.Error("cgov: failed to store transfer", zap.String("msgID", msgId), zap.Error(err))
+		gov.logger.Error("cgov: failed to store transfer", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
 		return false, err
 	}
 
 	return true, nil
 }
 
-// Note: This function only gets called once for each VAA. If the processor finds the VAA in the DB, this function will not be called.
-func (gov *ChainGovernor) ProcessInboundQuorum(v *vaa.VAA) {
-	if v == nil {
-		gov.logger.Error("cgov: received inbound quorum event with nil vaa")
-		return
-	}
-
-	msgId := v.MessageID()
-
-	gov.mutex.Lock()
-	defer gov.mutex.Unlock()
-
-	ce, exists := gov.chains[v.EmitterChain]
-
-	// If we don't care about this chain, we can ignore this VAA.
-	if !exists {
-		gov.logger.Info("cgov: ignoring incoming quorum vaa because the emitter chain is not configured", zap.String("msgID", msgId))
-		return
-	}
-
-	// If we don't care about this emitter,  we can ignore this VAA.
-	if v.EmitterAddress != ce.emitterAddr {
-		gov.logger.Info("cgov: ignoring incoming quorum vaa because the emitter address is not configured", zap.String("msgID", msgId))
-		return
-	}
-
-	// We only care about transfers.
-	if !vaa.IsTransfer(v.Payload) {
-		gov.logger.Info("cgov: ignoring incoming quorum vaa because it is not a transfer", zap.String("msgID", msgId))
-		return
-	}
-
-	payload, err := vaa.DecodeTransferPayloadHdr(v.Payload)
-	if err != nil {
-		gov.logger.Error("cgov: failed to decode incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err))
-		return
-	}
-
-	// If we don't care about this token,  we can ignore this VAA.
-	tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress}
-	token, exists := gov.tokens[tk]
-	if !exists {
-		gov.logger.Info("cgov: ignoring incoming quorum vaa because the token is not in the list", zap.String("msgID", msgId))
-		return
-	}
-
-	// See if we have already processed this VAA.
-	xferComplete, exists := gov.msgsById[msgId]
-	if exists {
-		if xferComplete {
-			gov.logger.Info("cgov: ignoring incoming quorum vaa because we've already seen it", zap.String("msgID", msgId))
-			return
-		}
-
-		// If we get here, the VAA is enqueued.
-		foundIt := false
-		for _, ce := range gov.chains {
-			for idx, pe := range ce.pending {
-				if msgId == pe.dbData.Msg.MessageIDString() {
-					gov.logger.Info("cgov: received incoming quorum for transfer in the enqueued list, removing it from the pending list", zap.String("msgID", msgId))
-					if err := gov.db.DeletePendingMsg(&pe.dbData); err != nil {
-						gov.logger.Error("cgov: failed to delete pending entry", zap.String("msgId", msgId), zap.Error(err))
-						// Continue on so that our data structures are accurate.
-					}
-
-					ce.pending = append(ce.pending[:idx], ce.pending[idx+1:]...)
-					delete(gov.msgsById, msgId)
-					foundIt = true
-					break
-				}
-			}
-
-			if foundIt {
-				break
-			}
-		}
-
-		if !foundIt {
-			gov.logger.Error("cgov: failed to find pending entry for incoming quorum vaa, adding it to the list of transfers", zap.String("msgId", msgId))
-		}
-	}
-
-	value, err := computeValue(payload.Amount, token)
-	if err != nil {
-		gov.logger.Error("cgov: failed to compute value of incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err))
-		return
-	}
-
-	now := time.Now()
-	startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
-	prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
-	if err != nil {
-		gov.logger.Error("cgov: failed to trim transfers for incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err))
-		return
-	}
-
-	newTotalValue := prevTotalValue + value
-
-	if newTotalValue > ce.dailyLimit {
-		gov.logger.Error("cgov: incoming quorum vaa put us over the daily limit",
-			zap.Uint64("value", value),
-			zap.Uint64("prevTotalValue", prevTotalValue),
-			zap.Uint64("newTotalValue", newTotalValue),
-			zap.Uint64("dailyLimit", ce.dailyLimit),
-			zap.String("msgID", msgId))
-	} else {
-		gov.logger.Info("cgov: adding incoming quorum vaa",
-			zap.Uint64("value", value),
-			zap.Uint64("prevTotalValue", prevTotalValue),
-			zap.Uint64("newTotalValue", newTotalValue),
-			zap.String("msgID", msgId))
-	}
-
-	xfer := &db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: v.EmitterChain, EmitterAddress: v.EmitterAddress, MsgID: msgId}
-	ce.transfers = append(ce.transfers, xfer)
-	gov.msgsById[msgId] = transferComplete
-	err = gov.db.StoreTransfer(xfer)
-	if err != nil {
-		gov.logger.Error("cgov: failed to store transfer for incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err))
-		return
-	}
-}
-
 func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) {
 	return gov.CheckPendingForTime(time.Now())
 }
@@ -558,7 +419,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
 		// Keep going as long as we find something that will fit.
 		for {
 			foundOne := false
-			prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
+			prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db)
 			if err != nil {
 				gov.logger.Error("cgov: failed to trim transfers", zap.Error(err))
 				gov.msgsToPublish = msgsToPublish
@@ -567,13 +428,12 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
 
 			// Keep going until we find something that fits or hit the end.
 			for idx, pe := range ce.pending {
-				msgId := pe.dbData.Msg.MessageIDString()
 				value, err := computeValue(pe.amount, pe.token)
 				if err != nil {
 					gov.logger.Error("cgov: failed to compute value for pending vaa",
 						zap.Stringer("amount", pe.amount),
 						zap.Stringer("price", pe.token.price),
-						zap.String("msgID", msgId),
+						zap.String("msgID", pe.dbData.Msg.MessageIDString()),
 						zap.Error(err),
 					)
 
@@ -593,7 +453,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
 						zap.Stringer("price", pe.token.price),
 						zap.Uint64("value", value),
 						zap.Stringer("releaseTime", pe.dbData.ReleaseTime),
-						zap.String("msgID", msgId))
+						zap.String("msgID", pe.dbData.Msg.MessageIDString()))
 				} else if now.After(pe.dbData.ReleaseTime) {
 					countsTowardsTransfers = false
 					gov.logger.Info("cgov: posting pending vaa because the release time has been reached",
@@ -601,7 +461,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
 						zap.Stringer("price", pe.token.price),
 						zap.Uint64("value", value),
 						zap.Stringer("releaseTime", pe.dbData.ReleaseTime),
-						zap.String("msgID", msgId))
+						zap.String("msgID", pe.dbData.Msg.MessageIDString()))
 				} else {
 					newTotalValue := prevTotalValue + value
 					if newTotalValue < prevTotalValue {
@@ -620,24 +480,21 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
 						zap.Uint64("value", value),
 						zap.Uint64("prevTotalValue", prevTotalValue),
 						zap.Uint64("newTotalValue", newTotalValue),
-						zap.String("msgID", msgId))
+						zap.String("msgID", pe.dbData.Msg.MessageIDString()))
 				}
 
 				// If we get here, publish it and remove it from the pending list.
 				msgsToPublish = append(msgsToPublish, &pe.dbData.Msg)
 
 				if countsTowardsTransfers {
-					gov.msgsById[msgId] = transferComplete
-					xfer := &db.Transfer{Timestamp: now, Value: value, OriginChain: pe.token.token.chain, OriginAddress: pe.token.token.addr,
-						EmitterChain: pe.dbData.Msg.EmitterChain, EmitterAddress: pe.dbData.Msg.EmitterAddress, MsgID: msgId}
-					ce.transfers = append(ce.transfers, xfer)
+					xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: pe.token.token.chain, OriginAddress: pe.token.token.addr,
+						EmitterChain: pe.dbData.Msg.EmitterChain, EmitterAddress: pe.dbData.Msg.EmitterAddress, MsgID: pe.dbData.Msg.MessageIDString()}
+					ce.transfers = append(ce.transfers, &xfer)
 
-					if err := gov.db.StoreTransfer(xfer); err != nil {
+					if err := gov.db.StoreTransfer(&xfer); err != nil {
 						gov.msgsToPublish = msgsToPublish
 						return nil, err
 					}
-				} else {
-					delete(gov.msgsById, msgId)
 				}
 
 				if err := gov.db.DeletePendingMsg(&pe.dbData); err != nil {
@@ -678,12 +535,12 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) {
 	return value, nil
 }
 
-func (gov *ChainGovernor) TrimAndSumValueForChain(ce *chainEntry, startTime time.Time) (sum uint64, err error) {
-	sum, ce.transfers, err = gov.TrimAndSumValue(ce.transfers, startTime)
+func (ce *chainEntry) TrimAndSumValue(startTime time.Time, db db.GovernorDB) (sum uint64, err error) {
+	sum, ce.transfers, err = TrimAndSumValue(ce.transfers, startTime, db)
 	return sum, err
 }
 
-func (gov *ChainGovernor) TrimAndSumValue(transfers []*db.Transfer, startTime time.Time) (uint64, []*db.Transfer, error) {
+func TrimAndSumValue(transfers []*db.Transfer, startTime time.Time, db db.GovernorDB) (uint64, []*db.Transfer, error) {
 	if len(transfers) == 0 {
 		return 0, transfers, nil
 	}
@@ -700,10 +557,11 @@ func (gov *ChainGovernor) TrimAndSumValue(transfers []*db.Transfer, startTime ti
 	}
 
 	if trimIdx >= 0 {
-		for idx := 0; idx <= trimIdx; idx++ {
-			delete(gov.msgsById, transfers[idx].MsgID)
-			if err := gov.db.DeleteTransfer(transfers[idx]); err != nil {
-				return 0, transfers, err
+		if db != nil {
+			for idx := 0; idx <= trimIdx; idx++ {
+				if err := db.DeleteTransfer(transfers[idx]); err != nil {
+					return 0, transfers, err
+				}
 			}
 		}
 

+ 25 - 52
node/pkg/governor/governor_db.go

@@ -58,11 +58,10 @@ func (gov *ChainGovernor) loadFromDBAlreadyLocked() error {
 
 func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now time.Time) {
 	msg := &pending.Msg
-	msgId := msg.MessageIDString()
 	ce, exists := gov.chains[msg.EmitterChain]
 	if !exists {
 		gov.logger.Error("cgov: reloaded pending transfer for unsupported chain, dropping it",
-			zap.String("MsgID", msgId),
+			zap.String("MsgID", msg.MessageIDString()),
 			zap.Stringer("TxHash", msg.TxHash),
 			zap.Stringer("Timestamp", msg.Timestamp),
 			zap.Uint32("Nonce", msg.Nonce),
@@ -76,7 +75,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
 
 	if msg.EmitterAddress != ce.emitterAddr {
 		gov.logger.Error("cgov: reloaded pending transfer for unsupported emitter address, dropping it",
-			zap.String("MsgID", msgId),
+			zap.String("MsgID", msg.MessageIDString()),
 			zap.Stringer("TxHash", msg.TxHash),
 			zap.Stringer("Timestamp", msg.Timestamp),
 			zap.Uint32("Nonce", msg.Nonce),
@@ -91,7 +90,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
 	payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload)
 	if err != nil {
 		gov.logger.Error("cgov: failed to parse payload for reloaded pending transfer, dropping it",
-			zap.String("MsgID", msgId),
+			zap.String("MsgID", msg.MessageIDString()),
 			zap.Stringer("TxHash", msg.TxHash),
 			zap.Stringer("Timestamp", msg.Timestamp),
 			zap.Uint32("Nonce", msg.Nonce),
@@ -110,7 +109,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
 	token, exists := gov.tokens[tk]
 	if !exists {
 		gov.logger.Error("cgov: reloaded pending transfer for unsupported token, dropping it",
-			zap.String("MsgID", msgId),
+			zap.String("MsgID", msg.MessageIDString()),
 			zap.Stringer("TxHash", msg.TxHash),
 			zap.Stringer("Timestamp", msg.Timestamp),
 			zap.Uint32("Nonce", msg.Nonce),
@@ -124,34 +123,19 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
 		return
 	}
 
-	if _, exists := gov.msgsById[msgId]; !exists {
-		gov.logger.Info("cgov: reloaded pending transfer",
-			zap.String("MsgID", msgId),
-			zap.Stringer("TxHash", msg.TxHash),
-			zap.Stringer("Timestamp", msg.Timestamp),
-			zap.Uint32("Nonce", msg.Nonce),
-			zap.Uint64("Sequence", msg.Sequence),
-			zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
-			zap.Stringer("EmitterChain", msg.EmitterChain),
-			zap.Stringer("EmitterAddress", msg.EmitterAddress),
-			zap.Stringer("Amount", payload.Amount),
-		)
-
-		ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: *pending})
-		gov.msgsById[msgId] = transferEnqueued
-	} else {
-		gov.logger.Info("cgov: dropping duplicate pending transfer",
-			zap.String("MsgID", msgId),
-			zap.Stringer("TxHash", msg.TxHash),
-			zap.Stringer("Timestamp", msg.Timestamp),
-			zap.Uint32("Nonce", msg.Nonce),
-			zap.Uint64("Sequence", msg.Sequence),
-			zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
-			zap.Stringer("EmitterChain", msg.EmitterChain),
-			zap.Stringer("EmitterAddress", msg.EmitterAddress),
-			zap.Stringer("Amount", payload.Amount),
-		)
-	}
+	gov.logger.Info("cgov: reloaded pending transfer",
+		zap.String("MsgID", msg.MessageIDString()),
+		zap.Stringer("TxHash", msg.TxHash),
+		zap.Stringer("Timestamp", msg.Timestamp),
+		zap.Uint32("Nonce", msg.Nonce),
+		zap.Uint64("Sequence", msg.Sequence),
+		zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
+		zap.Stringer("EmitterChain", msg.EmitterChain),
+		zap.Stringer("EmitterAddress", msg.EmitterAddress),
+		zap.Stringer("Amount", payload.Amount),
+	)
+
+	ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: *pending})
 }
 
 func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, startTime time.Time) {
@@ -191,24 +175,13 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, start
 		return
 	}
 
-	if _, exists := gov.msgsById[xfer.MsgID]; !exists {
-		gov.logger.Info("cgov: reloaded transfer",
-			zap.Stringer("Timestamp", xfer.Timestamp),
-			zap.Uint64("Value", xfer.Value),
-			zap.Stringer("OriginChain", xfer.OriginChain),
-			zap.Stringer("OriginAddress", xfer.OriginAddress),
-			zap.String("MsgID", xfer.MsgID),
-		)
+	gov.logger.Info("cgov: reloaded transfer",
+		zap.Stringer("Timestamp", xfer.Timestamp),
+		zap.Uint64("Value", xfer.Value),
+		zap.Stringer("OriginChain", xfer.OriginChain),
+		zap.Stringer("OriginAddress", xfer.OriginAddress),
+		zap.String("MsgID", xfer.MsgID),
+	)
 
-		ce.transfers = append(ce.transfers, xfer)
-		gov.msgsById[xfer.MsgID] = transferComplete
-	} else {
-		gov.logger.Info("cgov: dropping duplicate transfer",
-			zap.Stringer("Timestamp", xfer.Timestamp),
-			zap.Uint64("Value", xfer.Value),
-			zap.Stringer("OriginChain", xfer.OriginChain),
-			zap.Stringer("OriginAddress", xfer.OriginAddress),
-			zap.String("MsgID", xfer.MsgID),
-		)
-	}
+	ce.transfers = append(ce.transfers, xfer)
 }

+ 21 - 503
node/pkg/governor/governor_test.go

@@ -111,27 +111,17 @@ func (gov *ChainGovernor) getStatsForAllChains() (numTrans int, valueTrans uint6
 }
 
 func TestTrimEmptyTransfers(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
 	var transfers []*db.Transfer
-	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now)
+	sum, updatedTransfers, err := TrimAndSumValue(transfers, now, nil)
 	require.NoError(t, err)
 	assert.Equal(t, uint64(0), sum)
 	assert.Equal(t, 0, len(updatedTransfers))
 }
 
 func TestSumAllFromToday(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
@@ -139,18 +129,13 @@ func TestSumAllFromToday(t *testing.T) {
 	transferTime, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 11:00am (CST)")
 	require.NoError(t, err)
 	transfers = append(transfers, &db.Transfer{Value: 125000, Timestamp: transferTime})
-	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24))
+	sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil)
 	require.NoError(t, err)
 	assert.Equal(t, uint64(125000), sum)
 	assert.Equal(t, 1, len(updatedTransfers))
 }
 
 func TestTrimOneOfTwoTransfers(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
@@ -167,18 +152,13 @@ func TestTrimOneOfTwoTransfers(t *testing.T) {
 	transfers = append(transfers, &db.Transfer{Value: 225000, Timestamp: transferTime2})
 	assert.Equal(t, 2, len(transfers))
 
-	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24))
+	sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil)
 	require.NoError(t, err)
 	assert.Equal(t, 1, len(updatedTransfers))
 	assert.Equal(t, uint64(225000), sum)
 }
 
 func TestTrimSeveralTransfers(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
@@ -208,18 +188,13 @@ func TestTrimSeveralTransfers(t *testing.T) {
 
 	assert.Equal(t, 5, len(transfers))
 
-	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24))
+	sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil)
 	require.NoError(t, err)
 	assert.Equal(t, 3, len(updatedTransfers))
 	assert.Equal(t, uint64(465000), sum)
 }
 
 func TestTrimmingAllTransfersShouldReturnZero(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
 	now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
 	require.NoError(t, err)
 
@@ -234,7 +209,7 @@ func TestTrimmingAllTransfersShouldReturnZero(t *testing.T) {
 	transfers = append(transfers, &db.Transfer{Value: 225000, Timestamp: transferTime2})
 	assert.Equal(t, 2, len(transfers))
 
-	sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now)
+	sum, updatedTransfers, err := TrimAndSumValue(transfers, now, nil)
 	require.NoError(t, err)
 	assert.Equal(t, 0, len(updatedTransfers))
 	assert.Equal(t, uint64(0), sum)
@@ -317,7 +292,6 @@ func TestVaaForUninterestingEmitterChain(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 0, len(gov.msgsById))
 }
 
 func TestVaaForUninterestingEmitterAddress(t *testing.T) {
@@ -350,7 +324,6 @@ func TestVaaForUninterestingEmitterAddress(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 0, len(gov.msgsById))
 }
 
 func TestVaaForUninterestingPayloadType(t *testing.T) {
@@ -383,7 +356,6 @@ func TestVaaForUninterestingPayloadType(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 0, len(gov.msgsById))
 }
 
 // Note this method assumes 18 decimals for the amount.
@@ -487,7 +459,6 @@ func TestVaaForUninterestingToken(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 0, len(gov.msgsById))
 }
 
 func TestTransfersUpToAndOverTheLimit(t *testing.T) {
@@ -518,7 +489,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	)
 
 	// The first two transfers should be accepted.
-	msg1 := common.MessagePublication{
+	msg := common.MessagePublication{
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
@@ -529,7 +500,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 		Payload:          payloadBytes1,
 	}
 
-	canPost, err := gov.ProcessMsgForTime(&msg1, time.Now())
+	canPost, err := gov.ProcessMsgForTime(&msg, time.Now())
 	require.NoError(t, err)
 
 	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
@@ -538,20 +509,8 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	assert.Equal(t, uint64(2218), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
 
-	msg2 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-		Timestamp:        time.Unix(int64(1654543099), 0),
-		Nonce:            uint32(1),
-		Sequence:         uint64(2),
-		EmitterChain:     vaa.ChainIDEthereum,
-		EmitterAddress:   tokenBridgeAddr,
-		ConsistencyLevel: uint8(32),
-		Payload:          payloadBytes1,
-	}
-
-	canPost, err = gov.ProcessMsgForTime(&msg2, time.Now())
+	canPost, err = gov.ProcessMsgForTime(&msg, time.Now())
 	require.NoError(t, err)
 
 	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
@@ -560,7 +519,6 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	assert.Equal(t, uint64(4436), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 2, len(gov.msgsById))
 
 	// But the third one should be queued up.
 	payloadBytes2 := buildMockTransferPayloadBytes(1,
@@ -571,18 +529,9 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 		1250,
 	)
 
-	msg3 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-		Timestamp:        time.Unix(int64(1654543099), 0),
-		Nonce:            uint32(1),
-		Sequence:         uint64(3),
-		EmitterChain:     vaa.ChainIDEthereum,
-		EmitterAddress:   tokenBridgeAddr,
-		ConsistencyLevel: uint8(32),
-		Payload:          payloadBytes2,
-	}
+	msg.Payload = payloadBytes2
 
-	canPost, err = gov.ProcessMsgForTime(&msg3, time.Now())
+	canPost, err = gov.ProcessMsgForTime(&msg, time.Now())
 	require.NoError(t, err)
 
 	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
@@ -591,21 +540,10 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	assert.Equal(t, uint64(4436), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(2218274), valuePending)
-	assert.Equal(t, 3, len(gov.msgsById))
 
 	// But a small one should still go through.
-	msg4 := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-		Timestamp:        time.Unix(int64(1654543099), 0),
-		Nonce:            uint32(1),
-		Sequence:         uint64(4),
-		EmitterChain:     vaa.ChainIDEthereum,
-		EmitterAddress:   tokenBridgeAddr,
-		ConsistencyLevel: uint8(32),
-		Payload:          payloadBytes1,
-	}
-
-	canPost, err = gov.ProcessMsgForTime(&msg4, time.Now())
+	msg.Payload = payloadBytes1
+	canPost, err = gov.ProcessMsgForTime(&msg, time.Now())
 	require.NoError(t, err)
 
 	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
@@ -614,7 +552,6 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
 	assert.Equal(t, uint64(4436+2218), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(2218274), valuePending)
-	assert.Equal(t, 4, len(gov.msgsById))
 }
 
 func TestPendingTransferBeingReleased(t *testing.T) {
@@ -666,7 +603,6 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
 
 	// And so should the second.
 	payloadBytes2 := buildMockTransferPayloadBytes(1,
@@ -681,7 +617,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
-		Sequence:         uint64(2),
+		Sequence:         uint64(1),
 		EmitterChain:     vaa.ChainIDEthereum,
 		EmitterAddress:   tokenBridgeAddr,
 		ConsistencyLevel: uint8(32),
@@ -698,7 +634,6 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 2, len(gov.msgsById))
 
 	// But the third one should be queued up.
 	payloadBytes3 := buildMockTransferPayloadBytes(1,
@@ -713,7 +648,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
-		Sequence:         uint64(3),
+		Sequence:         uint64(1),
 		EmitterChain:     vaa.ChainIDEthereum,
 		EmitterAddress:   tokenBridgeAddr,
 		ConsistencyLevel: uint8(32),
@@ -730,7 +665,6 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(496893), valuePending)
-	assert.Equal(t, 3, len(gov.msgsById))
 
 	// And so should the fourth one.
 	payloadBytes4 := buildMockTransferPayloadBytes(1,
@@ -745,7 +679,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
-		Sequence:         uint64(4),
+		Sequence:         uint64(1),
 		EmitterChain:     vaa.ChainIDEthereum,
 		EmitterAddress:   tokenBridgeAddr,
 		ConsistencyLevel: uint8(32),
@@ -762,7 +696,6 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 2, numPending)
 	assert.Equal(t, uint64(496893+532385), valuePending)
-	assert.Equal(t, 4, len(gov.msgsById))
 
 	// If we check pending before noon, nothing should happen.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)")
@@ -775,7 +708,6 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 2, numPending)
 	assert.Equal(t, uint64(496893+532385), valuePending)
-	assert.Equal(t, 4, len(gov.msgsById))
 
 	// But at 3pm, the first one should drop off and the first queued one should get posted.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 3:00pm (CST)")
@@ -789,7 +721,6 @@ func TestPendingTransferBeingReleased(t *testing.T) {
 	assert.Equal(t, uint64(488020+496893), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(532385), valuePending)
-	assert.Equal(t, 3, len(gov.msgsById))
 }
 
 func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
@@ -839,14 +770,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
 
 	// And so should the second.
 	msg2 := common.MessagePublication{
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
-		Sequence:         uint64(2),
+		Sequence:         uint64(1),
 		EmitterChain:     vaa.ChainIDEthereum,
 		EmitterAddress:   tokenBridgeAddr,
 		ConsistencyLevel: uint8(32),
@@ -869,14 +799,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 2, len(gov.msgsById))
 
 	// But the third, big one should be queued up.
 	msg3 := common.MessagePublication{
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
-		Sequence:         uint64(3),
+		Sequence:         uint64(1),
 		EmitterChain:     vaa.ChainIDEthereum,
 		EmitterAddress:   tokenBridgeAddr,
 		ConsistencyLevel: uint8(32),
@@ -899,14 +828,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(887309), valuePending)
-	assert.Equal(t, 3, len(gov.msgsById))
 
 	// A fourth, smaller, but still too big one, should get enqueued.
 	msg4 := common.MessagePublication{
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
-		Sequence:         uint64(4),
+		Sequence:         uint64(1),
 		EmitterChain:     vaa.ChainIDEthereum,
 		EmitterAddress:   tokenBridgeAddr,
 		ConsistencyLevel: uint8(32),
@@ -929,14 +857,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 2, numPending)
 	assert.Equal(t, uint64(887309+177461), valuePending)
-	assert.Equal(t, 4, len(gov.msgsById))
 
 	// A fifth, smaller, but still too big one, should also get enqueued.
 	msg5 := common.MessagePublication{
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
-		Sequence:         uint64(5),
+		Sequence:         uint64(1),
 		EmitterChain:     vaa.ChainIDEthereum,
 		EmitterAddress:   tokenBridgeAddr,
 		ConsistencyLevel: uint8(32),
@@ -959,14 +886,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 3, numPending)
 	assert.Equal(t, uint64(887309+177461+179236), valuePending)
-	assert.Equal(t, 5, len(gov.msgsById))
 
 	// A sixth, big one should also get enqueued.
 	msg6 := common.MessagePublication{
 		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 		Timestamp:        time.Unix(int64(1654543099), 0),
 		Nonce:            uint32(1),
-		Sequence:         uint64(6),
+		Sequence:         uint64(1),
 		EmitterChain:     vaa.ChainIDEthereum,
 		EmitterAddress:   tokenBridgeAddr,
 		ConsistencyLevel: uint8(32),
@@ -989,7 +915,6 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 4, numPending)
 	assert.Equal(t, uint64(887309+177461+179236+889084), valuePending)
-	assert.Equal(t, 6, len(gov.msgsById))
 
 	// If we check pending before noon, nothing should happen.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)")
@@ -1002,7 +927,6 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(479147+488020), valueTrans)
 	assert.Equal(t, 4, numPending)
 	assert.Equal(t, uint64(887309+177461+179236+889084), valuePending)
-	assert.Equal(t, 6, len(gov.msgsById))
 
 	// But at 3pm, the first one should drop off. This should result in the second and third, smaller pending ones being posted, but not the two big ones.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 3:00pm (CST)")
@@ -1016,7 +940,6 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
 	assert.Equal(t, uint64(488020+177461+179236), valueTrans)
 	assert.Equal(t, 2, numPending)
 	assert.Equal(t, uint64(887309+889084), valuePending)
-	assert.Equal(t, 5, len(gov.msgsById))
 }
 
 func TestMainnetConfigIsValid(t *testing.T) {
@@ -1086,7 +1009,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
 
 	// And so should the second.
 	msg2 := common.MessagePublication{
@@ -1116,7 +1038,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730+88730), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 2, len(gov.msgsById))
 
 	// But the third big one should get enqueued.
 	msg3 := common.MessagePublication{
@@ -1146,7 +1067,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730+88730), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
-	assert.Equal(t, 3, len(gov.msgsById))
 
 	// If we check pending before noon, nothing should happen.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)")
@@ -1160,14 +1080,12 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730+88730), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
-	assert.Equal(t, 3, len(gov.msgsById))
 
 	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
 	assert.Equal(t, 2, numTrans)
 	assert.Equal(t, uint64(88730+88730), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
-	assert.Equal(t, 3, len(gov.msgsById))
 
 	// But just after noon, the first one should drop off. The big pending one should not be affected.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)")
@@ -1181,7 +1099,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(88730), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
-	assert.Equal(t, 2, len(gov.msgsById))
 
 	// And Just after 6pm, the second one should drop off. The big pending one should still not be affected.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 6:01pm (CST)")
@@ -1195,7 +1112,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
 
 	// 23 hours after the big transaction is enqueued, it should still be there.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 3, 2022 at 1:01am (CST)")
@@ -1209,9 +1125,8 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
 
-	// But then the operator resets the release time.
+	// // But then the operator resets the release time.
 	_, err = gov.resetReleaseTimerForTime(msg3.MessageIDString(), now)
 	require.NoError(t, err)
 
@@ -1227,7 +1142,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(177461), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
 
 	// But finally, a full 24hrs, it should get released.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 4, 2022 at 1:01am (CST)")
@@ -1241,7 +1155,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 0, len(gov.msgsById))
 
 	// But the big transaction should not affect the daily notional.
 	ce, exists := gov.chains[vaa.ChainIDEthereum]
@@ -1303,7 +1216,6 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(88730), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
 
 	// If we check 23hrs later, nothing should happen.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 11:00am (CST)")
@@ -1317,7 +1229,6 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 1, numPending)
 	assert.Equal(t, uint64(88730), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
 
 	// But after 24hrs, it should get released.
 	now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)")
@@ -1331,7 +1242,6 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
 	assert.Equal(t, uint64(0), valueTrans)
 	assert.Equal(t, 0, numPending)
 	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 0, len(gov.msgsById))
 }
 
 func TestIsBigTransfer(t *testing.T) {
@@ -1400,395 +1310,3 @@ func TestTransferPayloadTooShort(t *testing.T) {
 	canPost := gov.ProcessMsg(&msg)
 	assert.Equal(t, false, canPost)
 }
-
-func TestQuorumAfterTransferCompleteIsIgnored(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
-	tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
-	toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
-	tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
-	tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
-	require.NoError(t, err)
-
-	gov.setDayLengthInMinutes(24 * 60)
-	err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0)
-	require.NoError(t, err)
-	err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
-	require.NoError(t, err)
-
-	payloadBytes1 := buildMockTransferPayloadBytes(1,
-		vaa.ChainIDEthereum,
-		tokenAddrStr,
-		vaa.ChainIDPolygon,
-		toAddrStr,
-		1.25,
-	)
-
-	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-		Timestamp:        time.Unix(int64(1654543099), 0),
-		Nonce:            uint32(1),
-		Sequence:         uint64(1),
-		EmitterChain:     vaa.ChainIDEthereum,
-		EmitterAddress:   tokenBridgeAddr,
-		ConsistencyLevel: uint8(32),
-		Payload:          payloadBytes1,
-	}
-
-	canPost, err := gov.ProcessMsgForTime(&msg, time.Now())
-	require.NoError(t, err)
-
-	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
-	assert.Equal(t, true, canPost)
-	assert.Equal(t, 1, numTrans)
-	assert.Equal(t, uint64(2218), valueTrans)
-	assert.Equal(t, 0, numPending)
-	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
-
-	v := &vaa.VAA{
-		Version:          vaa.SupportedVAAVersion,
-		GuardianSetIndex: 1,
-		Signatures:       nil,
-		Timestamp:        msg.Timestamp,
-		Nonce:            msg.Nonce,
-		EmitterChain:     msg.EmitterChain,
-		EmitterAddress:   msg.EmitterAddress,
-		Payload:          msg.Payload,
-		Sequence:         msg.Sequence,
-		ConsistencyLevel: msg.ConsistencyLevel,
-	}
-
-	gov.ProcessInboundQuorum(v)
-
-	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
-	assert.Equal(t, 1, numTrans)
-	assert.Equal(t, uint64(2218), valueTrans)
-	assert.Equal(t, 0, numPending)
-	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
-}
-
-func TestQuorumWhenEnqueuedUpdatesTheWindowAndDropsTheEnqueuedEvent(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
-	tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
-	toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
-	tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
-	tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
-	require.NoError(t, err)
-
-	gov.setDayLengthInMinutes(24 * 60)
-	err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0)
-	require.NoError(t, err)
-	err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
-	require.NoError(t, err)
-
-	payloadBytes1 := buildMockTransferPayloadBytes(1,
-		vaa.ChainIDEthereum,
-		tokenAddrStr,
-		vaa.ChainIDPolygon,
-		toAddrStr,
-		1000,
-	)
-
-	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-		Timestamp:        time.Unix(int64(1654543099), 0),
-		Nonce:            uint32(1),
-		Sequence:         uint64(1),
-		EmitterChain:     vaa.ChainIDEthereum,
-		EmitterAddress:   tokenBridgeAddr,
-		ConsistencyLevel: uint8(32),
-		Payload:          payloadBytes1,
-	}
-
-	canPost, err := gov.ProcessMsgForTime(&msg, time.Now())
-	require.NoError(t, err)
-
-	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
-	assert.Equal(t, false, canPost)
-	assert.Equal(t, 0, numTrans)
-	assert.Equal(t, uint64(0), valueTrans)
-	assert.Equal(t, 1, numPending)
-	assert.Equal(t, uint64(1_774_619), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
-
-	v := &vaa.VAA{
-		Version:          vaa.SupportedVAAVersion,
-		GuardianSetIndex: 1,
-		Signatures:       nil,
-		Timestamp:        msg.Timestamp,
-		Nonce:            msg.Nonce,
-		EmitterChain:     msg.EmitterChain,
-		EmitterAddress:   msg.EmitterAddress,
-		Payload:          msg.Payload,
-		Sequence:         msg.Sequence,
-		ConsistencyLevel: msg.ConsistencyLevel,
-	}
-
-	gov.ProcessInboundQuorum(v)
-
-	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
-	assert.Equal(t, 1, numTrans)
-	assert.Equal(t, uint64(1_774_619), valueTrans)
-	assert.Equal(t, 0, numPending)
-	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
-}
-
-func TestQuorumBeforeLocalMessageUpdatesTheWindow(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
-	tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
-	toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
-	tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
-	tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
-	require.NoError(t, err)
-
-	gov.setDayLengthInMinutes(24 * 60)
-	err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0)
-	require.NoError(t, err)
-	err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
-	require.NoError(t, err)
-
-	payloadBytes1 := buildMockTransferPayloadBytes(1,
-		vaa.ChainIDEthereum,
-		tokenAddrStr,
-		vaa.ChainIDPolygon,
-		toAddrStr,
-		1.25,
-	)
-
-	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-		Timestamp:        time.Unix(int64(1654543099), 0),
-		Nonce:            uint32(1),
-		Sequence:         uint64(1),
-		EmitterChain:     vaa.ChainIDEthereum,
-		EmitterAddress:   tokenBridgeAddr,
-		ConsistencyLevel: uint8(32),
-		Payload:          payloadBytes1,
-	}
-
-	v := &vaa.VAA{
-		Version:          vaa.SupportedVAAVersion,
-		GuardianSetIndex: 1,
-		Signatures:       nil,
-		Timestamp:        msg.Timestamp,
-		Nonce:            msg.Nonce,
-		EmitterChain:     msg.EmitterChain,
-		EmitterAddress:   msg.EmitterAddress,
-		Payload:          msg.Payload,
-		Sequence:         msg.Sequence,
-		ConsistencyLevel: msg.ConsistencyLevel,
-	}
-
-	gov.ProcessInboundQuorum(v)
-
-	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
-	assert.Equal(t, 1, numTrans)
-	assert.Equal(t, uint64(2218), valueTrans)
-	assert.Equal(t, 0, numPending)
-	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
-}
-
-func TestLocalMessageAfterQuorumIsPublishedButNotAddedToTheWindowAgain(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
-	tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
-	toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
-	tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
-	tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
-	require.NoError(t, err)
-
-	gov.setDayLengthInMinutes(24 * 60)
-	err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0)
-	require.NoError(t, err)
-	err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
-	require.NoError(t, err)
-
-	payloadBytes1 := buildMockTransferPayloadBytes(1,
-		vaa.ChainIDEthereum,
-		tokenAddrStr,
-		vaa.ChainIDPolygon,
-		toAddrStr,
-		1.25,
-	)
-
-	msg := common.MessagePublication{
-		TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-		Timestamp:        time.Unix(int64(1654543099), 0),
-		Nonce:            uint32(1),
-		Sequence:         uint64(1),
-		EmitterChain:     vaa.ChainIDEthereum,
-		EmitterAddress:   tokenBridgeAddr,
-		ConsistencyLevel: uint8(32),
-		Payload:          payloadBytes1,
-	}
-
-	v := &vaa.VAA{
-		Version:          vaa.SupportedVAAVersion,
-		GuardianSetIndex: 1,
-		Signatures:       nil,
-		Timestamp:        msg.Timestamp,
-		Nonce:            msg.Nonce,
-		EmitterChain:     msg.EmitterChain,
-		EmitterAddress:   msg.EmitterAddress,
-		Payload:          msg.Payload,
-		Sequence:         msg.Sequence,
-		ConsistencyLevel: msg.ConsistencyLevel,
-	}
-
-	gov.ProcessInboundQuorum(v)
-
-	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
-	assert.Equal(t, 1, numTrans)
-	assert.Equal(t, uint64(2218), valueTrans)
-	assert.Equal(t, 0, numPending)
-	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
-
-	canPost, err := gov.ProcessMsgForTime(&msg, time.Now())
-	require.NoError(t, err)
-
-	numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
-	assert.Equal(t, true, canPost)
-	assert.Equal(t, 1, numTrans)
-	assert.Equal(t, uint64(2218), valueTrans)
-	assert.Equal(t, 0, numPending)
-	assert.Equal(t, uint64(0), valuePending)
-	assert.Equal(t, 1, len(gov.msgsById))
-}
-
-func TestDontReloadDuplicates(t *testing.T) {
-	ctx := context.Background()
-	gov, err := newChainGovernorForTest(ctx)
-
-	require.NoError(t, err)
-	assert.NotNil(t, gov)
-
-	emitterAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
-	emitterAddr, err := vaa.StringToAddress(emitterAddrStr)
-	require.NoError(t, err)
-
-	tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
-	tokenAddr, err := vaa.StringToAddress(tokenAddrStr)
-	require.NoError(t, err)
-	toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
-
-	require.NoError(t, err)
-
-	gov.setDayLengthInMinutes(24 * 60)
-	err = gov.setChainForTesting(vaa.ChainIDEthereum, emitterAddrStr, 1000000, 0)
-	require.NoError(t, err)
-	err = gov.setTokenForTesting(vaa.ChainIDEthereum, emitterAddrStr, "WETH", 1774.62)
-	require.NoError(t, err)
-
-	now, _ := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)")
-	startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
-
-	var xfers []*db.Transfer
-
-	xfer1 := &db.Transfer{
-		Timestamp:      startTime.Add(time.Minute * 5),
-		Value:          uint64(1000),
-		OriginChain:    vaa.ChainIDEthereum,
-		OriginAddress:  tokenAddr,
-		EmitterChain:   vaa.ChainIDEthereum,
-		EmitterAddress: emitterAddr,
-		MsgID:          "2/" + emitterAddrStr + "/125",
-	}
-	xfers = append(xfers, xfer1)
-
-	xfer2 := &db.Transfer{
-		Timestamp:      startTime.Add(time.Minute * 5),
-		Value:          uint64(2000),
-		OriginChain:    vaa.ChainIDEthereum,
-		OriginAddress:  tokenAddr,
-		EmitterChain:   vaa.ChainIDEthereum,
-		EmitterAddress: emitterAddr,
-		MsgID:          "2/" + emitterAddrStr + "/126",
-	}
-	xfers = append(xfers, xfer2)
-
-	// Add a duplicate of each transfer
-	xfers = append(xfers, xfer1)
-	xfers = append(xfers, xfer2)
-	assert.Equal(t, 4, len(xfers))
-
-	payload1 := buildMockTransferPayloadBytes(1,
-		vaa.ChainIDEthereum,
-		tokenAddrStr,
-		vaa.ChainIDPolygon,
-		toAddrStr,
-		1.25,
-	)
-
-	var pendings []*db.PendingTransfer
-	pending1 := &db.PendingTransfer{
-		ReleaseTime: now.Add(time.Hour * 24),
-		Msg: common.MessagePublication{
-			TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-			Timestamp:        time.Unix(int64(1654543099), 0),
-			Nonce:            uint32(1),
-			Sequence:         uint64(200),
-			EmitterChain:     vaa.ChainIDEthereum,
-			EmitterAddress:   emitterAddr,
-			ConsistencyLevel: uint8(32),
-			Payload:          payload1,
-		},
-	}
-	pendings = append(pendings, pending1)
-
-	pending2 := &db.PendingTransfer{
-		ReleaseTime: now.Add(time.Hour * 24),
-		Msg: common.MessagePublication{
-			TxHash:           hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
-			Timestamp:        time.Unix(int64(1654543099), 0),
-			Nonce:            uint32(1),
-			Sequence:         uint64(201),
-			EmitterChain:     vaa.ChainIDEthereum,
-			EmitterAddress:   emitterAddr,
-			ConsistencyLevel: uint8(32),
-			Payload:          payload1,
-		},
-	}
-	pendings = append(pendings, pending2)
-
-	// Add a duplicate of each pending transfer
-	pendings = append(pendings, pending1)
-	pendings = append(pendings, pending2)
-	assert.Equal(t, 4, len(pendings))
-
-	for _, p := range xfers {
-		gov.reloadTransfer(p, now, startTime)
-	}
-
-	for _, p := range pendings {
-		gov.reloadPendingTransfer(p, now)
-	}
-
-	numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
-	assert.Equal(t, 2, numTrans)
-	assert.Equal(t, uint64(3000), valueTrans)
-	assert.Equal(t, 2, numPending)
-	assert.Equal(t, uint64(4436), valuePending)
-}

+ 1 - 6
node/pkg/processor/observation.go

@@ -8,7 +8,6 @@ import (
 
 	node_common "github.com/certusone/wormhole/node/pkg/common"
 	"github.com/certusone/wormhole/node/pkg/db"
-	"github.com/certusone/wormhole/node/pkg/governor"
 	"github.com/mr-tron/base58"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promauto"
@@ -219,7 +218,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
 	}
 }
 
-func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum, gov *governor.ChainGovernor) {
+func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) {
 	v, err := vaa.Unmarshal(m.Vaa)
 	if err != nil {
 		p.logger.Warn("received invalid VAA in SignedVAAWithQuorum message",
@@ -313,8 +312,4 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
 		return
 	}
 	p.attestationEvents.ReportVAAQuorum(v)
-
-	if gov != nil {
-		gov.ProcessInboundQuorum(v)
-	}
 }

+ 2 - 2
node/pkg/processor/observation_test.go

@@ -50,7 +50,7 @@ func TestHandleInboundSignedVAAWithQuorum_NilGuardianSet(t *testing.T) {
 	processor := Processor{}
 	processor.logger = observedLogger
 
-	processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum, nil)
+	processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum)
 
 	// Check to see if we got an error, which we should have,
 	// because a `gs` is not defined on processor
@@ -114,7 +114,7 @@ func TestHandleInboundSignedVAAWithQuorum(t *testing.T) {
 			processor.gs = &guardianSet
 			processor.logger = observedLogger
 
-			processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum, nil)
+			processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum)
 
 			// Check to see if we got an error, which we should have
 			assert.Equal(t, 1, observedLogs.Len())

+ 1 - 1
node/pkg/processor/processor.go

@@ -212,7 +212,7 @@ func (p *Processor) Run(ctx context.Context) error {
 		case m := <-p.obsvC:
 			p.handleObservation(ctx, m)
 		case m := <-p.signedInC:
-			p.handleInboundSignedVAAWithQuorum(ctx, m, p.governor)
+			p.handleInboundSignedVAAWithQuorum(ctx, m)
 		case <-p.cleanup.C:
 			p.handleCleanup(ctx)
 		case <-govTimer.C: