Bläddra i källkod

Node: Don't request reobservation on reobservations (#3397)

* Node: Don't request reobservation on reobservation

* Acct should handle reloading old pending transfers
bruce-riley 2 år sedan
förälder
incheckning
41fa0ecc0e

+ 71 - 4
node/pkg/common/chainlock.go

@@ -5,6 +5,7 @@ import (
 	"encoding/binary"
 	"encoding/hex"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"time"
 
@@ -14,6 +15,9 @@ import (
 	"github.com/ethereum/go-ethereum/common"
 )
 
+const HashLength = 32
+const AddressLength = 32
+
 type MessagePublication struct {
 	TxHash    common.Hash // TODO: rename to identifier? on Solana, this isn't actually the tx hash
 	Timestamp time.Time
@@ -24,6 +28,7 @@ type MessagePublication struct {
 	EmitterChain     vaa.ChainID
 	EmitterAddress   vaa.Address
 	Payload          []byte
+	IsReobservation  bool
 
 	// Unreliable indicates if this message can be reobserved. If a message is considered unreliable it cannot be
 	// reobserved.
@@ -38,7 +43,7 @@ func (msg *MessagePublication) MessageIDString() string {
 	return fmt.Sprintf("%v/%v/%v", uint16(msg.EmitterChain), msg.EmitterAddress, msg.Sequence)
 }
 
-const minMsgLength = 88
+const minMsgLength = 88 // Marshalled length with empty payload
 
 func (msg *MessagePublication) Marshal() ([]byte, error) {
 	buf := new(bytes.Buffer)
@@ -50,12 +55,70 @@ func (msg *MessagePublication) Marshal() ([]byte, error) {
 	vaa.MustWrite(buf, binary.BigEndian, msg.ConsistencyLevel)
 	vaa.MustWrite(buf, binary.BigEndian, msg.EmitterChain)
 	buf.Write(msg.EmitterAddress[:])
+	vaa.MustWrite(buf, binary.BigEndian, msg.IsReobservation)
 	buf.Write(msg.Payload)
 
 	return buf.Bytes(), nil
 }
 
-// Unmarshal deserializes the binary representation of a VAA
+const oldMinMsgLength = 83 // Old marshalled length with empty payload
+
+// UnmarshalOldMessagePublicationBeforeIsReobservation deserializes a MessagePublication from prior to the addition of IsReobservation.
+// This function can be deleted once all guardians have been upgraded. That's why the code is just duplicated.
+func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessagePublication, error) {
+	if len(data) < oldMinMsgLength {
+		return nil, errors.New("message is too short")
+	}
+
+	msg := &MessagePublication{}
+
+	reader := bytes.NewReader(data[:])
+
+	txHash := common.Hash{}
+	if n, err := reader.Read(txHash[:]); err != nil || n != HashLength {
+		return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err)
+	}
+	msg.TxHash = txHash
+
+	unixSeconds := uint32(0)
+	if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil {
+		return nil, fmt.Errorf("failed to read timestamp: %w", err)
+	}
+	msg.Timestamp = time.Unix(int64(unixSeconds), 0)
+
+	if err := binary.Read(reader, binary.BigEndian, &msg.Nonce); err != nil {
+		return nil, fmt.Errorf("failed to read nonce: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &msg.Sequence); err != nil {
+		return nil, fmt.Errorf("failed to read sequence: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &msg.ConsistencyLevel); err != nil {
+		return nil, fmt.Errorf("failed to read consistency level: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &msg.EmitterChain); err != nil {
+		return nil, fmt.Errorf("failed to read emitter chain: %w", err)
+	}
+
+	emitterAddress := vaa.Address{}
+	if n, err := reader.Read(emitterAddress[:]); err != nil || n != AddressLength {
+		return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
+	}
+	msg.EmitterAddress = emitterAddress
+
+	payload := make([]byte, reader.Len())
+	n, err := reader.Read(payload)
+	if err != nil || n == 0 {
+		return nil, fmt.Errorf("failed to read payload [%d]: %w", n, err)
+	}
+	msg.Payload = payload[:n]
+
+	return msg, nil
+}
+
+// UnmarshalMessagePublication deserializes a MessagePublication
 func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) {
 	if len(data) < minMsgLength {
 		return nil, fmt.Errorf("message is too short")
@@ -66,7 +129,7 @@ func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) {
 	reader := bytes.NewReader(data[:])
 
 	txHash := common.Hash{}
-	if n, err := reader.Read(txHash[:]); err != nil || n != 32 {
+	if n, err := reader.Read(txHash[:]); err != nil || n != HashLength {
 		return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err)
 	}
 	msg.TxHash = txHash
@@ -94,11 +157,15 @@ func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) {
 	}
 
 	emitterAddress := vaa.Address{}
-	if n, err := reader.Read(emitterAddress[:]); err != nil || n != 32 {
+	if n, err := reader.Read(emitterAddress[:]); err != nil || n != AddressLength {
 		return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
 	}
 	msg.EmitterAddress = emitterAddress
 
+	if err := binary.Read(reader, binary.BigEndian, &msg.IsReobservation); err != nil {
+		return nil, fmt.Errorf("failed to read isReobservation: %w", err)
+	}
+
 	payload := make([]byte, reader.Len())
 	n, err := reader.Read(payload)
 	if err != nil || n == 0 {

+ 91 - 24
node/pkg/db/accountant.go

@@ -31,11 +31,22 @@ func (d *MockAccountantDB) AcctGetData(logger *zap.Logger) ([]*common.MessagePub
 	return nil, nil
 }
 
-const acctPendingTransfer = "ACCT:PXFER:"
+const acctOldPendingTransfer = "ACCT:PXFER:"
+const acctOldPendingTransferLen = len(acctOldPendingTransfer)
+
+const acctPendingTransfer = "ACCT:PXFER2:"
 const acctPendingTransferLen = len(acctPendingTransfer)
 
 const acctMinMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")
 
+func acctOldPendingTransferMsgID(msgId string) []byte {
+	return []byte(fmt.Sprintf("%v%v", acctOldPendingTransfer, msgId))
+}
+
+func acctIsOldPendingTransfer(keyBytes []byte) bool {
+	return (len(keyBytes) >= acctOldPendingTransferLen+acctMinMsgIdLen) && (string(keyBytes[0:acctOldPendingTransferLen]) == acctOldPendingTransfer)
+}
+
 func acctPendingTransferMsgID(msgId string) []byte {
 	return []byte(fmt.Sprintf("%v%v", acctPendingTransfer, msgId))
 }
@@ -47,36 +58,92 @@ func acctIsPendingTransfer(keyBytes []byte) bool {
 // This is called by the accountant on start up to reload pending transfers.
 func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication, error) {
 	pendingTransfers := []*common.MessagePublication{}
-	prefixBytes := []byte(acctPendingTransfer)
-	err := d.db.View(func(txn *badger.Txn) error {
-		opts := badger.DefaultIteratorOptions
-		opts.PrefetchSize = 10
-		it := txn.NewIterator(opts)
-		defer it.Close()
-		for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() {
-			item := it.Item()
-			key := item.Key()
-			val, err := item.ValueCopy(nil)
-			if err != nil {
-				return err
+	var err error
+	{
+		prefixBytes := []byte(acctPendingTransfer)
+		err = d.db.View(func(txn *badger.Txn) error {
+			opts := badger.DefaultIteratorOptions
+			opts.PrefetchSize = 10
+			it := txn.NewIterator(opts)
+			defer it.Close()
+			for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() {
+				item := it.Item()
+				key := item.Key()
+				val, err := item.ValueCopy(nil)
+				if err != nil {
+					return err
+				}
+
+				if acctIsPendingTransfer(key) {
+					var pt common.MessagePublication
+					err := json.Unmarshal(val, &pt)
+					if err != nil {
+						logger.Error("failed to unmarshal pending transfer for key", zap.String("key", string(key[:])), zap.Error(err))
+						continue
+					}
+
+					pendingTransfers = append(pendingTransfers, &pt)
+				} else {
+					return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key))
+				}
+			}
+
+			return nil
+		})
+	}
+
+	// See if we have any old format pending transfers.
+	if err == nil {
+		oldPendingTransfers := []*common.MessagePublication{}
+		prefixBytes := []byte(acctOldPendingTransfer)
+		err = d.db.View(func(txn *badger.Txn) error {
+			opts := badger.DefaultIteratorOptions
+			opts.PrefetchSize = 10
+			it := txn.NewIterator(opts)
+			defer it.Close()
+			for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() {
+				item := it.Item()
+				key := item.Key()
+				val, err := item.ValueCopy(nil)
+				if err != nil {
+					return err
+				}
+
+				if acctIsOldPendingTransfer(key) {
+					pt, err := common.UnmarshalOldMessagePublicationBeforeIsReobservation(val)
+					if err != nil {
+						logger.Error("failed to unmarshal old pending transfer for key", zap.String("key", string(key[:])), zap.Error(err))
+						continue
+					}
+
+					oldPendingTransfers = append(oldPendingTransfers, pt)
+				} else {
+					return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key))
+				}
 			}
 
-			if acctIsPendingTransfer(key) {
-				var pt common.MessagePublication
-				err := json.Unmarshal(val, &pt)
+			return nil
+		})
+
+		if err == nil && len(oldPendingTransfers) != 0 {
+			pendingTransfers = append(pendingTransfers, oldPendingTransfers...)
+			for _, pt := range oldPendingTransfers {
+				logger.Info("updating format of database entry for pending vaa", zap.String("msgId", pt.MessageIDString()))
+				err := d.AcctStorePendingTransfer(pt)
 				if err != nil {
-					logger.Error("failed to unmarshal pending transfer for key", zap.String("key", string(key[:])), zap.Error(err))
-					continue
+					return pendingTransfers, fmt.Errorf("failed to write new pending msg for key [%v]: %w", pt.MessageIDString(), err)
 				}
 
-				pendingTransfers = append(pendingTransfers, &pt)
-			} else {
-				return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key))
+				key := acctOldPendingTransferMsgID(pt.MessageIDString())
+				if err := d.db.Update(func(txn *badger.Txn) error {
+					err := txn.Delete(key)
+					return err
+				}); err != nil {
+					return pendingTransfers, fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pt.MessageIDString(), err)
+				}
 			}
 		}
-
-		return nil
-	})
+	}
 
 	return pendingTransfers, err
 }

+ 108 - 13
node/pkg/db/accountant_test.go

@@ -1,6 +1,10 @@
 package db
 
 import (
+	"bytes"
+	"encoding/binary"
+	"os"
+	"sort"
 	"testing"
 	"time"
 
@@ -30,19 +34,22 @@ func TestAcctPendingTransferMsgID(t *testing.T) {
 		ConsistencyLevel: 16,
 	}
 
-	assert.Equal(t, []byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctPendingTransferMsgID(msg1.MessageIDString()))
+	assert.Equal(t, []byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctOldPendingTransferMsgID(msg1.MessageIDString()))
+	assert.Equal(t, []byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctPendingTransferMsgID(msg1.MessageIDString()))
 }
 
 func TestAcctIsPendingTransfer(t *testing.T) {
-	assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER:")))
-	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER:1")))
-	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER:1/1/1")))
-	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
-	assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
+	assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:")))
+	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:1")))
+	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:1/1/1")))
+	assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
+	assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
 	assert.Equal(t, false, acctIsPendingTransfer([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 	assert.Equal(t, false, acctIsPendingTransfer([]byte{0x01, 0x02, 0x03, 0x04}))
 	assert.Equal(t, false, acctIsPendingTransfer([]byte{}))
+	assert.Equal(t, true, acctIsOldPendingTransfer([]byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, acctIsOldPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 }
 
 func TestAcctStoreAndDeletePendingTransfers(t *testing.T) {
@@ -121,9 +128,9 @@ func TestAcctGetEmptyData(t *testing.T) {
 
 	logger, _ := zap.NewDevelopment()
 
-	pendingTransfers, err := db.AcctGetData(logger)
+	pendings, err := db.AcctGetData(logger)
 	require.NoError(t, err)
-	assert.Equal(t, 0, len(pendingTransfers))
+	assert.Equal(t, 0, len(pendings))
 }
 
 func TestAcctGetData(t *testing.T) {
@@ -186,10 +193,98 @@ func TestAcctGetData(t *testing.T) {
 	err = db.AcctStorePendingTransfer(&msg1a)
 	require.NoError(t, err)
 
-	pendingTransfers, err := db.AcctGetData(logger)
+	pendings, err := db.AcctGetData(logger)
 	require.NoError(t, err)
-	require.Equal(t, 2, len(pendingTransfers))
+	require.Equal(t, 2, len(pendings))
 
-	assert.Equal(t, msg1a, *pendingTransfers[0])
-	assert.Equal(t, *msg2, *pendingTransfers[1])
+	assert.Equal(t, msg1a, *pendings[0])
+	assert.Equal(t, *msg2, *pendings[1])
+}
+
+func TestAcctLoadingOldPendings(t *testing.T) {
+	dbPath := t.TempDir()
+	db, err := Open(dbPath)
+	if err != nil {
+		t.Error("failed to open database")
+	}
+	defer db.Close()
+	defer os.Remove(dbPath)
+
+	tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
+	require.NoError(t, err)
+
+	now := time.Unix(time.Now().Unix(), 0)
+
+	// Write the first pending event in the old format.
+	pending1 := &common.MessagePublication{
+		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		Timestamp:        now,
+		Nonce:            123456,
+		Sequence:         789101112131417,
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddr,
+		Payload:          []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+		ConsistencyLevel: 16,
+		// IsReobservation will not be serialized. It should be set to false on reload.
+	}
+
+	db.acctStoreOldPendingTransfer(t, pending1)
+	require.Nil(t, err)
+
+	now2 := now.Add(time.Second * 5)
+
+	// Write the second one in the new format.
+	pending2 := &common.MessagePublication{
+		TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
+		Timestamp:        now2,
+		Nonce:            123456,
+		Sequence:         789101112131418,
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddr,
+		Payload:          []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+		ConsistencyLevel: 16,
+		IsReobservation:  true,
+	}
+
+	err = db.AcctStorePendingTransfer(pending2)
+	require.Nil(t, err)
+
+	logger := zap.NewNop()
+	pendings, err := db.AcctGetData(logger)
+	require.NoError(t, err)
+	require.Equal(t, 2, len(pendings))
+
+	// Updated old pending events get placed at the end, so we need to sort into timestamp order.
+	sort.SliceStable(pendings, func(i, j int) bool {
+		return pendings[i].Timestamp.Before(pendings[j].Timestamp)
+	})
+
+	assert.Equal(t, *pending1, *pendings[0])
+	assert.Equal(t, *pending2, *pendings[1])
+
+	// Make sure we can reload the updated pendings.
+	pendings2, err := db.AcctGetData(logger)
+
+	require.Nil(t, err)
+	require.Equal(t, 2, len(pendings2))
+
+	assert.Equal(t, pending1, pendings2[0])
+	assert.Equal(t, pending2, pendings2[1])
+}
+
+func (d *Database) acctStoreOldPendingTransfer(t *testing.T, msg *common.MessagePublication) {
+	buf := new(bytes.Buffer)
+
+	b := marshalOldMessagePublication(msg)
+
+	vaa.MustWrite(buf, binary.BigEndian, b)
+
+	err := d.db.Update(func(txn *badger.Txn) error {
+		if err := txn.Set(acctOldPendingTransferMsgID(msg.MessageIDString()), buf.Bytes()); err != nil {
+			return err
+		}
+		return nil
+	})
+
+	require.NoError(t, err)
 }

+ 19 - 25
node/pkg/db/governor.go

@@ -13,9 +13,6 @@ import (
 	"go.uber.org/zap"
 )
 
-// WARNING: Change me in ./node/governor as well
-const maxEnqueuedTime = time.Hour * 24
-
 type GovernorDB interface {
 	StoreTransfer(t *Transfer) error
 	StorePendingMsg(k *PendingTransfer) error
@@ -209,7 +206,7 @@ func (p *PendingTransfer) Marshal() ([]byte, error) {
 	return buf.Bytes(), nil
 }
 
-func UnmarshalPendingTransfer(data []byte) (*PendingTransfer, error) {
+func UnmarshalPendingTransfer(data []byte, isOld bool) (*PendingTransfer, error) {
 	p := &PendingTransfer{}
 
 	reader := bytes.NewReader(data[:])
@@ -227,9 +224,14 @@ func UnmarshalPendingTransfer(data []byte) (*PendingTransfer, error) {
 		return nil, fmt.Errorf("failed to read pending transfer msg [%d]: %w", n, err)
 	}
 
-	msg, err := common.UnmarshalMessagePublication(buf)
+	var msg *common.MessagePublication
+	if isOld {
+		msg, err = common.UnmarshalOldMessagePublicationBeforeIsReobservation(buf)
+	} else {
+		msg, err = common.UnmarshalMessagePublication(buf)
+	}
 	if err != nil {
-		return nil, fmt.Errorf("failed to unmarshal pending transfer msg: %w", err)
+		return nil, fmt.Errorf("failed to unmarshal pending transfer msg, isOld: %t: %w", isOld, err)
 	}
 
 	p.Msg = *msg
@@ -243,13 +245,13 @@ const transfer = "GOV:XFER2:"
 const transferLen = len(transfer)
 
 // Since we are changing the DB format of pending entries, we will use a new tag in the pending key field.
-// The first time we run this new release, any existing entries with the "GOV:PENDING" tag will get converted
-// to the new format and given the "GOV:PENDING2" format. In a future release, the "GOV:PENDING" code can be deleted.
+// The first time we run this new release, any existing entries with the "GOV:PENDING2" tag will get converted
+// to the new format and given the "GOV:PENDING3" format. In a future release, the "GOV:PENDING2" code can be deleted.
 
-const oldPending = "GOV:PENDING:"
+const oldPending = "GOV:PENDING2:"
 const oldPendingLen = len(oldPending)
 
-const pending = "GOV:PENDING2:"
+const pending = "GOV:PENDING3:"
 const pendingLen = len(pending)
 
 const minMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")
@@ -308,20 +310,20 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
 			}
 
 			if IsPendingMsg(key) {
-				p, err := UnmarshalPendingTransfer(val)
+				p, err := UnmarshalPendingTransfer(val, false)
 				if err != nil {
 					return err
 				}
 
-				if time.Until(p.ReleaseTime) > maxEnqueuedTime {
-					p.ReleaseTime = now.Add(maxEnqueuedTime)
-					err := d.StorePendingMsg(p)
-					if err != nil {
-						return fmt.Errorf("failed to write new pending msg for key [%v]: %w", p.Msg.MessageIDString(), err)
-					}
+				pending = append(pending, p)
+			} else if isOldPendingMsg(key) {
+				p, err := UnmarshalPendingTransfer(val, true)
+				if err != nil {
+					return err
 				}
 
 				pending = append(pending, p)
+				oldPendingToUpdate = append(oldPendingToUpdate, p)
 			} else if IsTransfer(key) {
 				v, err := UnmarshalTransfer(val)
 				if err != nil {
@@ -329,15 +331,7 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
 				}
 
 				transfers = append(transfers, v)
-			} else if isOldPendingMsg(key) {
-				msg, err := common.UnmarshalMessagePublication(val)
-				if err != nil {
-					return err
-				}
 
-				p := &PendingTransfer{ReleaseTime: now.Add(maxEnqueuedTime), Msg: *msg}
-				pending = append(pending, p)
-				oldPendingToUpdate = append(oldPendingToUpdate, p)
 			} else if isOldTransfer(key) {
 				v, err := unmarshalOldTransfer(val)
 				if err != nil {

+ 42 - 17
node/pkg/db/governor_test.go

@@ -70,7 +70,7 @@ func TestPendingMsgID(t *testing.T) {
 		ConsistencyLevel: 16,
 	}
 
-	assert.Equal(t, []byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1))
+	assert.Equal(t, []byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1))
 }
 
 func TestTransferMsgID(t *testing.T) {
@@ -110,18 +110,18 @@ func TestIsTransfer(t *testing.T) {
 }
 
 func TestIsPendingMsg(t *testing.T) {
-	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 	assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"1")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"1/1/1")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
-	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
-	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/1/1")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
+	assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
+	assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 	assert.Equal(t, false, IsPendingMsg([]byte{0x01, 0x02, 0x03, 0x04}))
 	assert.Equal(t, false, IsPendingMsg([]byte{}))
-	assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
-	assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
+	assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
 }
 
 func TestGetChainGovernorData(t *testing.T) {
@@ -286,6 +286,7 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {
 		EmitterAddress:   tokenBridgeAddr,
 		Payload:          []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
 		ConsistencyLevel: 16,
+		IsReobservation:  true,
 	}
 
 	pending1 := &PendingTransfer{
@@ -296,12 +297,12 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {
 	bytes, err := pending1.Marshal()
 	require.NoError(t, err)
 
-	pending2, err := UnmarshalPendingTransfer(bytes)
+	pending2, err := UnmarshalPendingTransfer(bytes, false)
 	require.NoError(t, err)
 
 	assert.Equal(t, pending1, pending2)
 
-	expectedPendingKey := "GOV:PENDING2:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
+	expectedPendingKey := "GOV:PENDING3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
 	assert.Equal(t, expectedPendingKey, string(PendingMsgID(&pending2.Msg)))
 }
 
@@ -395,18 +396,40 @@ func TestStoreAndReloadTransfers(t *testing.T) {
 	assert.Equal(t, pending2, pending[1])
 }
 
-func (d *Database) storeOldPendingMsg(t *testing.T, k *common.MessagePublication) {
-	b, _ := k.Marshal()
+func (d *Database) storeOldPendingMsg(t *testing.T, p *PendingTransfer) {
+	buf := new(bytes.Buffer)
+
+	vaa.MustWrite(buf, binary.BigEndian, uint32(p.ReleaseTime.Unix()))
+
+	b := marshalOldMessagePublication(&p.Msg)
+
+	vaa.MustWrite(buf, binary.BigEndian, b)
 
 	err := d.db.Update(func(txn *badger.Txn) error {
-		if err := txn.Set(oldPendingMsgID(k), b); err != nil {
+		if err := txn.Set(oldPendingMsgID(&p.Msg), buf.Bytes()); err != nil {
 			return err
 		}
 		return nil
 	})
+
 	require.NoError(t, err)
 }
 
+func marshalOldMessagePublication(msg *common.MessagePublication) []byte {
+	buf := new(bytes.Buffer)
+
+	buf.Write(msg.TxHash[:])
+	vaa.MustWrite(buf, binary.BigEndian, uint32(msg.Timestamp.Unix()))
+	vaa.MustWrite(buf, binary.BigEndian, msg.Nonce)
+	vaa.MustWrite(buf, binary.BigEndian, msg.Sequence)
+	vaa.MustWrite(buf, binary.BigEndian, msg.ConsistencyLevel)
+	vaa.MustWrite(buf, binary.BigEndian, msg.EmitterChain)
+	buf.Write(msg.EmitterAddress[:])
+	buf.Write(msg.Payload)
+
+	return buf.Bytes()
+}
+
 func TestLoadingOldPendingTransfers(t *testing.T) {
 	dbPath := t.TempDir()
 	db, err := Open(dbPath)
@@ -454,7 +477,7 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 
 	// Write the first pending event in the old format.
 	pending1 := &PendingTransfer{
-		ReleaseTime: now.Add(time.Hour * 72), // Since we are writing this in the old format, this will not get stored, but computed on reload.
+		ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.,
 		Msg: common.MessagePublication{
 			TxHash:           eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
 			Timestamp:        now,
@@ -464,10 +487,11 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 			EmitterAddress:   tokenBridgeAddr,
 			Payload:          []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
 			ConsistencyLevel: 16,
+			// IsReobservation will not be serialized. It should be set to false on reload.
 		},
 	}
 
-	db.storeOldPendingMsg(t, &pending1.Msg)
+	db.storeOldPendingMsg(t, pending1)
 	require.Nil(t, err)
 
 	now2 := now.Add(time.Second * 5)
@@ -484,6 +508,7 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
 			EmitterAddress:   tokenBridgeAddr,
 			Payload:          []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
 			ConsistencyLevel: 16,
+			IsReobservation:  true,
 		},
 	}
 

+ 0 - 1
node/pkg/governor/governor.go

@@ -47,7 +47,6 @@ const (
 	transferEnqueued = false
 )
 
-// WARNING: Change me in ./node/db as well
 const maxEnqueuedTime = time.Hour * 24
 
 type (

+ 6 - 0
node/pkg/processor/cleanup.go

@@ -163,6 +163,12 @@ func (p *Processor) handleCleanup(ctx context.Context) {
 					break
 				}
 
+				// Reobservation requests should not be resubmitted but we will keep waiting for more observations.
+				if s.ourObservation.IsReobservation() {
+					p.logger.Debug("not submitting reobservation request for reobservation", zap.String("digest", hash), zap.Duration("delta", delta))
+					break
+				}
+
 				// If we have already stored this VAA, there is no reason for us to request reobservation.
 				alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s)
 				if err != nil {

+ 5 - 2
node/pkg/processor/message.go

@@ -76,7 +76,8 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {
 			Sequence:         k.Sequence,
 			ConsistencyLevel: k.ConsistencyLevel,
 		},
-		Unreliable: k.Unreliable,
+		Unreliable:    k.Unreliable,
+		Reobservation: k.IsReobservation,
 	}
 
 	// Generate digest of the unsigned VAA.
@@ -100,7 +101,9 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {
 		zap.String("emitter_address_b58", base58.Encode(k.EmitterAddress.Bytes())),
 		zap.Uint8("consistency_level", k.ConsistencyLevel),
 		zap.String("message_id", v.MessageID()),
-		zap.String("signature", hex.EncodeToString(s)))
+		zap.String("signature", hex.EncodeToString(s)),
+		zap.Bool("isReobservation", k.IsReobservation),
+	)
 
 	messagesSignedTotal.With(prometheus.Labels{
 		"emitter_chain": k.EmitterChain.String()}).Add(1)

+ 2 - 0
node/pkg/processor/processor.go

@@ -40,6 +40,8 @@ type (
 		SigningDigest() ethcommon.Hash
 		// IsReliable returns whether this message is considered reliable meaning it can be reobserved.
 		IsReliable() bool
+		// IsReobservation returns whether this message is the result of a reobservation request.
+		IsReobservation() bool
 		// HandleQuorum finishes processing the observation once a quorum of signatures have
 		// been received for it.
 		HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)

+ 6 - 1
node/pkg/processor/vaa.go

@@ -7,7 +7,8 @@ import (
 
 type VAA struct {
 	vaa.VAA
-	Unreliable bool
+	Unreliable    bool
+	Reobservation bool
 }
 
 func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
@@ -41,3 +42,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
 func (v *VAA) IsReliable() bool {
 	return !v.Unreliable
 }
+
+func (v *VAA) IsReobservation() bool {
+	return v.Reobservation
+}

+ 4 - 3
node/pkg/watchers/algorand/watcher.go

@@ -130,7 +130,7 @@ func gatherObservations(e *Watcher, t types.SignedTxnWithAD, depth int, logger *
 // lookAtTxn takes an outer transaction from the block.payset and gathers
 // observations from messages emitted in nested inner transactions
 // then passes them on the relevant channels
-func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap.Logger) {
+func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap.Logger, isReobservation bool) {
 
 	observations := gatherObservations(e, t.SignedTxnWithAD, 0, logger)
 
@@ -165,6 +165,7 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap.
 			EmitterAddress:   obs.emitterAddress,
 			Payload:          obs.payload,
 			ConsistencyLevel: 0,
+			IsReobservation:  isReobservation,
 		}
 
 		algorandMessagesConfirmed.Inc()
@@ -261,7 +262,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				}
 
 				for _, element := range block.Payset {
-					lookAtTxn(e, element, block, logger)
+					lookAtTxn(e, element, block, logger, true)
 				}
 			}
 
@@ -287,7 +288,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 					}
 
 					for _, element := range block.Payset {
-						lookAtTxn(e, element, block, logger)
+						lookAtTxn(e, element, block, logger, false)
 					}
 					e.next_round = e.next_round + 1
 

+ 4 - 3
node/pkg/watchers/aptos/watcher.go

@@ -146,7 +146,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				if !data.Exists() {
 					break
 				}
-				e.observeData(logger, data, nativeSeq)
+				e.observeData(logger, data, nativeSeq, true)
 			}
 
 		case <-timer.C:
@@ -201,7 +201,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				if !data.Exists() {
 					continue
 				}
-				e.observeData(logger, data, eventSequence.Uint())
+				e.observeData(logger, data, eventSequence.Uint(), false)
 			}
 
 			health, err := e.retrievePayload(aptosHealth)
@@ -250,7 +250,7 @@ func (e *Watcher) retrievePayload(s string) ([]byte, error) {
 	return body, err
 }
 
-func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq uint64) {
+func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq uint64, isReobservation bool) {
 	em := data.Get("sender")
 	if !em.Exists() {
 		logger.Error("sender field missing")
@@ -313,6 +313,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq u
 		EmitterAddress:   a,
 		Payload:          pl,
 		ConsistencyLevel: uint8(consistencyLevel.Uint()),
+		IsReobservation:  isReobservation,
 	}
 
 	aptosMessagesConfirmed.Inc()

+ 1 - 0
node/pkg/watchers/cosmwasm/watcher.go

@@ -304,6 +304,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 
 				msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger, e.chainID, contractAddressLogKey)
 				for _, msg := range msgs {
+					msg.IsReobservation = true
 					e.msgC <- msg
 					messagesConfirmed.WithLabelValues(networkName).Inc()
 				}

+ 1 - 0
node/pkg/watchers/evm/watcher.go

@@ -423,6 +423,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 				}
 
 				for _, msg := range msgs {
+					msg.IsReobservation = true
 					if msg.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
 						logger.Info("re-observed message publication transaction, publishing it immediately",
 							zap.Stringer("tx", msg.TxHash),

+ 1 - 0
node/pkg/watchers/ibc/watcher.go

@@ -476,6 +476,7 @@ func (w *Watcher) handleObservationRequests(ctx context.Context, ce *chainEntry)
 					}
 
 					if evt != nil {
+						evt.Msg.IsReobservation = true
 						if err := w.processIbcReceivePublishEvent(evt, "reobservation"); err != nil {
 							return fmt.Errorf("failed to process reobserved IBC event: %w", err)
 						}

+ 3 - 1
node/pkg/watchers/mock/watcher.go

@@ -41,7 +41,9 @@ func NewWatcherRunnable(
 				logger.Info("Received obsv request", zap.String("log_msg_type", "obsv_req_received"), zap.String("tx_hash", hash.Hex()))
 				msg, ok := c.ObservationDb[hash]
 				if ok {
-					msgC <- msg
+					msg2 := *msg
+					msg2.IsReobservation = true
+					msgC <- &msg2
 				}
 			}
 		}

+ 1 - 1
node/pkg/watchers/near/poll.go

@@ -21,7 +21,7 @@ func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, ch
 
 	result := make([]*transactionProcessingJob, len(txns))
 	for i, tx := range txns {
-		result[i] = newTransactionProcessingJob(tx.Hash, tx.SignerId)
+		result[i] = newTransactionProcessingJob(tx.Hash, tx.SignerId, false)
 	}
 	return result, nil
 }

+ 1 - 0
node/pkg/watchers/near/tx_processing.go

@@ -243,6 +243,7 @@ func (e *Watcher) processWormholeLog(logger *zap.Logger, _ context.Context, job
 		EmitterAddress:   a,
 		Payload:          pl,
 		ConsistencyLevel: 0,
+		IsReobservation:  job.isReobservation,
 	}
 
 	// tell everyone about it

+ 4 - 2
node/pkg/watchers/near/watcher.go

@@ -56,6 +56,7 @@ type (
 		creationTime    time.Time
 		retryCounter    uint
 		delay           time.Duration
+		isReobservation bool
 
 		// set during processing
 		hasWormholeMsg bool // set during processing; whether this transaction emitted a Wormhole message
@@ -111,13 +112,14 @@ func NewWatcher(
 	}
 }
 
-func newTransactionProcessingJob(txHash string, senderAccountId string) *transactionProcessingJob {
+func newTransactionProcessingJob(txHash string, senderAccountId string, isReobservation bool) *transactionProcessingJob {
 	return &transactionProcessingJob{
 		txHash,
 		senderAccountId,
 		time.Now(),
 		0,
 		initialTxProcDelay,
+		isReobservation,
 		false,
 	}
 }
@@ -204,7 +206,7 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error {
 			// This value is used by NEAR to determine which shard to query. An incorrect value here is not a security risk but could lead to reobservation requests failing.
 			// Guardians currently run nodes for all shards and the API seems to be returning the correct results independent of the set senderAccountId but this could change in the future.
 			// Fixing this would require adding the transaction sender account ID to the observation request.
-			job := newTransactionProcessingJob(txHash, e.wormholeAccount)
+			job := newTransactionProcessingJob(txHash, e.wormholeAccount, true)
 			err := e.schedule(ctx, job, time.Nanosecond)
 			if err != nil {
 				// Error-level logging here because this is after an re-observation request already, which should be infrequent

+ 1 - 0
node/pkg/watchers/near/watcher_test.go

@@ -150,6 +150,7 @@ func (testCase *testCase) run(ctx context.Context) error {
 	// assert that messages were re-observed correctly...
 	expectedMsgReObserved := map[string]*testMessageTracker{}
 	for _, em := range testCase.expectedMsgReObserved {
+		em.IsReobservation = true
 		expectedMsgReObserved[em.MessageIDString()] = &testMessageTracker{MessagePublication: em, seen: false}
 	}
 

+ 21 - 20
node/pkg/watchers/solana/client.go

@@ -311,7 +311,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
 			case <-ctx.Done():
 				return nil
 			case msg := <-s.pumpData:
-				err := s.processAccountSubscriptionData(ctx, logger, msg)
+				err := s.processAccountSubscriptionData(ctx, logger, msg, false)
 				if err != nil {
 					p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
 					solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "account_subscription_data").Inc()
@@ -327,7 +327,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
 				logger.Info("received observation request", zap.String("account", acc.String()))
 
 				rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
-				s.fetchMessageAccount(rCtx, logger, acc, 0)
+				s.fetchMessageAccount(rCtx, logger, acc, 0, true)
 				cancel()
 			case <-timer.C:
 				// Get current slot height
@@ -370,7 +370,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
 					for slot := rangeStart; slot <= rangeEnd; slot++ {
 						_slot := slot
 						common.RunWithScissors(ctx, s.errC, "SolanaWatcherSlotFetcher", func(ctx context.Context) error {
-							s.retryFetchBlock(ctx, logger, _slot, 0)
+							s.retryFetchBlock(ctx, logger, _slot, 0, false)
 							return nil
 						})
 					}
@@ -389,8 +389,8 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
 	}
 }
 
-func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, retry uint) {
-	ok := s.fetchBlock(ctx, logger, slot, 0)
+func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, retry uint, isReobservation bool) {
+	ok := s.fetchBlock(ctx, logger, slot, 0, isReobservation)
 
 	if !ok {
 		if retry >= maxRetries {
@@ -409,13 +409,13 @@ func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger,
 			zap.Uint("retry", retry))
 
 		common.RunWithScissors(ctx, s.errC, "retryFetchBlock", func(ctx context.Context) error {
-			s.retryFetchBlock(ctx, logger, slot, retry+1)
+			s.retryFetchBlock(ctx, logger, slot, retry+1, isReobservation)
 			return nil
 		})
 	}
 }
 
-func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, emptyRetry uint) (ok bool) {
+func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, emptyRetry uint, isReobservation bool) (ok bool) {
 	logger.Debug("requesting block",
 		zap.Uint64("slot", slot),
 		zap.String("commitment", string(s.commitment)),
@@ -455,7 +455,7 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot
 			if emptyRetry < maxEmptyRetry {
 				common.RunWithScissors(ctx, s.errC, "delayedFetchBlock", func(ctx context.Context) error {
 					time.Sleep(retryDelay)
-					s.fetchBlock(ctx, logger, slot, emptyRetry+1)
+					s.fetchBlock(ctx, logger, slot, emptyRetry+1, isReobservation)
 					return nil
 				})
 			}
@@ -545,7 +545,7 @@ OUTER:
 
 		// Find top-level instructions
 		for i, inst := range tx.Message.Instructions {
-			found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i)
+			found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
 			if err != nil {
 				logger.Error("malformed Wormhole instruction",
 					zap.Error(err),
@@ -591,7 +591,7 @@ OUTER:
 
 		for _, inner := range tr.Meta.InnerInstructions {
 			for i, inst := range inner.Instructions {
-				_, err = s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i)
+				_, err = s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
 				if err != nil {
 					logger.Error("malformed Wormhole instruction",
 						zap.Error(err),
@@ -614,7 +614,7 @@ OUTER:
 	return true
 }
 
-func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx *solana.Transaction, signature solana.Signature, idx int) (bool, error) {
+func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx *solana.Transaction, signature solana.Signature, idx int, isReobservation bool) (bool, error) {
 	if inst.ProgramIDIndex != programIndex {
 		return false, nil
 	}
@@ -657,15 +657,15 @@ func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logg
 		zap.Stringer("signature", signature), zap.Uint64("slot", slot), zap.Int("idx", idx))
 
 	common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error {
-		s.retryFetchMessageAccount(ctx, logger, acc, slot, 0)
+		s.retryFetchMessageAccount(ctx, logger, acc, slot, 0, isReobservation)
 		return nil
 	})
 
 	return true, nil
 }
 
-func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64, retry uint) {
-	retryable := s.fetchMessageAccount(ctx, logger, acc, slot)
+func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64, retry uint, isReobservation bool) {
+	retryable := s.fetchMessageAccount(ctx, logger, acc, slot, isReobservation)
 
 	if retryable {
 		if retry >= maxRetries {
@@ -686,13 +686,13 @@ func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *za
 			zap.Uint("retry", retry))
 
 		common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error {
-			s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1)
+			s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1, isReobservation)
 			return nil
 		})
 	}
 }
 
-func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64) (retryable bool) {
+func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64, isReobservation bool) (retryable bool) {
 	// Fetching account
 	rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
 	defer cancel()
@@ -741,11 +741,11 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
 		zap.Stringer("account", acc),
 		zap.Binary("data", data))
 
-	s.processMessageAccount(logger, data, acc)
+	s.processMessageAccount(logger, data, acc, isReobservation)
 	return false
 }
 
-func (s *SolanaWatcher) processAccountSubscriptionData(_ context.Context, logger *zap.Logger, data []byte) error {
+func (s *SolanaWatcher) processAccountSubscriptionData(_ context.Context, logger *zap.Logger, data []byte, isReobservation bool) error {
 	// Do we have an error on the subscription?
 	var e EventSubscriptionError
 	err := json.Unmarshal(data, &e)
@@ -795,7 +795,7 @@ func (s *SolanaWatcher) processAccountSubscriptionData(_ context.Context, logger
 	switch string(data[:3]) {
 	case accountPrefixReliable, accountPrefixUnreliable:
 		acc := solana.PublicKeyFromBytes([]byte(value.Pubkey))
-		s.processMessageAccount(logger, data, acc)
+		s.processMessageAccount(logger, data, acc, isReobservation)
 	default:
 		break
 	}
@@ -803,7 +803,7 @@ func (s *SolanaWatcher) processAccountSubscriptionData(_ context.Context, logger
 	return nil
 }
 
-func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) {
+func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey, isReobservation bool) {
 	proposal, err := ParseMessagePublicationAccount(data)
 	if err != nil {
 		solanaAccountSkips.WithLabelValues(s.networkName, "parse_transfer_out").Inc()
@@ -837,6 +837,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
 		EmitterAddress:   proposal.EmitterAddress,
 		Payload:          proposal.Payload,
 		ConsistencyLevel: proposal.ConsistencyLevel,
+		IsReobservation:  isReobservation,
 		Unreliable:       !reliable,
 	}
 

+ 4 - 3
node/pkg/watchers/sui/watcher.go

@@ -173,7 +173,7 @@ func NewWatcher(
 	}
 }
 
-func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult) error {
+func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservation bool) error {
 	if body.ID.TxDigest == nil {
 		return errors.New("Missing TxDigest field")
 	}
@@ -249,6 +249,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult) error {
 		EmitterAddress:   emitter,
 		Payload:          fields.Payload,
 		ConsistencyLevel: *fields.ConsistencyLevel,
+		IsReobservation:  isReobservation,
 	}
 
 	suiMessagesConfirmed.Inc()
@@ -374,7 +375,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				}
 
 				if res.Params != nil && (*res.Params).Result != nil {
-					err := e.inspectBody(logger, *(*res.Params).Result)
+					err := e.inspectBody(logger, *(*res.Params).Result, false)
 					if err != nil {
 						logger.Error(fmt.Sprintf("inspectBody: %s", err.Error()))
 					}
@@ -491,7 +492,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				}
 
 				for i, chunk := range res.Result {
-					err := e.inspectBody(logger, chunk)
+					err := e.inspectBody(logger, chunk, true)
 					if err != nil {
 						logger.Info("skipping event data in result", zap.String("txhash", tx58), zap.Int("index", i), zap.Error(err))
 					}