Explorar el Código

node: Add Notary package for verifying MessagePublications (#4315)

This PR adds a "Notary" package that can evaluates the verification state of a Message Publication. It provides a Verdict on what should occur, e.g. it reports that an `Anomalous` message should be delayed.

It stores messages that should be Delayed or Blackholed in a database (BadgerDB)

## Changes to the Processor
The processor now inspects the Notary's queue of message to delay and skips publishing them until their ReleaseTime has passed. Once they are ready to release, it passes them to the Governor and Accountant as normal (assuming Guardians have them enabled).

# Design decisions + supporting tasks

## Database

The package uses the BadgerDB key-value store to delay and persist messages, similar to the Governor. This allows the Notary's decisions to persist across restarts.

An implementation is possible that would use only in-memory representations, but this has some drawbacks:
* We would lose messages that are marked as delayed on a node restart
* Messages with a `Rejected` status should never be processed until the end of time. We need a way to track this.

Rather than use the existing `Database` struct (like Governor and Accountant), a new `NotaryDB` struct was added that is a wrapper for the BadgerDB connection. The goal here is to isolate the Notary-related API from other modules.

## Creation of min-heap data structure PendingMessageQueue

Conceptually, it makes sense to store delayed messages according to their release time. If the collection is always sorted, we can access elements from one side or the other until one message is too recent to process. In that case, all of the other messages will also be too new, and we can exit early.

This also allows us to be flexible with our delay time, compared with storing the time _at which_ the message is stored rather than when it should be delayed. We can delay a message for one minute, a day, or a year. As long as the sorting holds, we don't need to iterate over many items.

In order to achieve this, I've added a min-heap data structure with a safe API that allows for storing messages this way.

# Future Work

## Add drop/release/reset admin commands

This package should allow a super-minority of Guardians to forcibly drop, release, or reset the delay time of delayed or blackholed messages. This can help when the Notary has delayed an Anomalous message that is in fact fraudulent - in this case, the message can be moved to the 'blackhole' deny-list. Alternatively, if the Transfer Verifier or Notary has a bug, this allows the Guardians to release a harmless message.

## Add Guardian p2p feature flags
Similar to: https://github.com/wormhole-foundation/wormhole/pull/4317

This change will be done in a follow-up PR.
John Saigle hace 1 mes
padre
commit
dea09ba835

+ 5 - 3
.github/CODEOWNERS

@@ -52,9 +52,6 @@
 
 /node/cmd/ @panoel @evan-gray
 
-## Transfer Verifier standalone tool
-
-/node/cmd/txverifier/ @djb15 @johnsaigle @mdulin2 @pleasew8t
 
 ## DB
 
@@ -91,6 +88,7 @@
 ## Transfer Verifier
 
 /node/pkg/txverifier/ @djb15 @johnsaigle @mdulin2 @pleasew8t
+/node/cmd/txverifier/ @djb15 @johnsaigle @mdulin2 @pleasew8t
 
 ## Watchers
 
@@ -100,6 +98,10 @@
 /node/go.mod @bemic @djb15 @johnsaigle @mdulin2 @pleasew8t
 /sdk/go.mod @bemic @djb15 @johnsaigle @mdulin2 @pleasew8t
 
+## Notary
+
+/node/pkg/notary/ @djb15 @johnsaigle @mdulin2 @pleasew8t
+
 ## Hacks / Tools
 
 /node/hack/ @panoel @evan-gray

+ 2 - 1
devnet/node.yaml

@@ -149,7 +149,8 @@ spec:
             - --ccqAllowedRequesters=beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe,25021A4FCAf61F2EADC8202D3833Df48B2Fa0D54
             - --ccqAllowedPeers=12D3KooWSnju8zhywCYVi2JwTqky1sySPnmtYLsHHzc4WerMnDQH,12D3KooWM6WqedfR6ehtTd1y6rJu3ZUrEkTjcJJnJZYesjd89zj8
             - --transferVerifierEnabledChainIDs=2
-            # - --logLevel=debug
+            - --notaryEnabled=true
+            - --logLevel=warn
           securityContext:
             capabilities:
               add:

+ 6 - 1
node/cmd/guardiand/node.go

@@ -294,7 +294,8 @@ var (
 	txVerifierChains []vaa.ChainID
 
 	// featureFlags are additional static flags that should be published in P2P heartbeats.
-	featureFlags []string
+	featureFlags  []string
+	notaryEnabled *bool
 )
 
 func init() {
@@ -526,6 +527,8 @@ func init() {
 	subscribeToVAAs = NodeCmd.Flags().Bool("subscribeToVAAs", false, "Guardiand should subscribe to incoming signed VAAs, set to true if running a public RPC node")
 
 	transferVerifierEnabledChainIDs = NodeCmd.Flags().UintSlice("transferVerifierEnabledChainIDs", make([]uint, 0), "Transfer Verifier will be enabled for these chain IDs (comma-separated)")
+
+	notaryEnabled = NodeCmd.Flags().Bool("notaryEnabled", false, "Run the notary")
 }
 
 var (
@@ -640,6 +643,7 @@ func runNode(cmd *cobra.Command, args []string) {
 	}
 
 	// Override the default go-log config, which uses a magic environment variable.
+	logger.Info("setting level for all loggers", zap.String("level", logger.Level().String()))
 	ipfslog.SetAllLoggers(lvl)
 
 	if viper.ConfigFileUsed() != "" {
@@ -1889,6 +1893,7 @@ func runNode(cmd *cobra.Command, args []string) {
 		node.GuardianOptionWatchers(watcherConfigs, ibcWatcherConfig),
 		node.GuardianOptionAccountant(*accountantWS, *accountantContract, *accountantCheckEnabled, accountantWormchainConn, *accountantNttContract, accountantNttWormchainConn),
 		node.GuardianOptionGovernor(*chainGovernorEnabled, *governorFlowCancelEnabled, *coinGeckoApiKey),
+		node.GuardianOptionNotary(*notaryEnabled),
 		node.GuardianOptionGatewayRelayer(*gatewayRelayerContract, gatewayRelayerWormchainConn),
 		node.GuardianOptionQueryHandler(*ccqEnabled, *ccqAllowedRequesters),
 		node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),

+ 0 - 2
node/go.mod

@@ -379,5 +379,3 @@ replace github.com/wormhole-foundation/wormchain => ../wormchain
 replace github.com/CosmWasm/wasmd v0.30.0 => github.com/wormhole-foundation/wasmd v0.30.0-wormchain-2
 
 replace github.com/cosmos/cosmos-sdk => github.com/wormhole-foundation/cosmos-sdk v0.45.9-wormhole
-
-replace github.com/certusone/wormhole/node/pkg/txverifier => ./pkg/txverifier

+ 29 - 11
node/pkg/common/chainlock.go

@@ -55,7 +55,6 @@ const (
 )
 
 var (
-	ErrBinaryWrite              = errors.New("failed to write binary data")
 	ErrInvalidBinaryBool        = errors.New("invalid binary bool (neither 0x00 nor 0x01)")
 	ErrInvalidVerificationState = errors.New("invalid verification state")
 )
@@ -71,12 +70,12 @@ func (e ErrUnexpectedEndOfRead) Error() string {
 
 // ErrInputSize is returned when the input size is not the expected size during marshaling.
 type ErrInputSize struct {
-	msg string
-	got int
+	Msg string
+	Got int
 }
 
 func (e ErrInputSize) Error() string {
-	return fmt.Sprintf("wrong size: %s. expected >= %d bytes, got %d", e.msg, marshaledMsgLenMin, e.got)
+	return fmt.Sprintf("wrong size: %s. expected >= %d bytes, got %d", e.Msg, marshaledMsgLenMin, e.Got)
 }
 
 // MaxSafeInputSize defines the maximum safe size for untrusted input from `io` Readers.
@@ -86,6 +85,25 @@ const MaxSafeInputSize = 128 * 1024 * 1024 // 128MB (arbitrary)
 
 var ErrInputTooLarge = errors.New("input data exceeds maximum allowed size")
 
+var (
+	ErrBinaryWrite         = errors.New("failed to write binary data")
+	ErrTxIDTooLong         = errors.New("field TxID too long")
+	ErrTxIDTooShort        = errors.New("field TxID too short")
+	ErrInvalidPayload      = errors.New("field payload too long")
+	ErrDataTooShort        = errors.New("data too short")
+	ErrTimestampTooShort   = errors.New("data too short for timestamp")
+	ErrNonceTooShort       = errors.New("data too short for nonce")
+	ErrSequenceTooShort    = errors.New("data too short for sequence")
+	ErrConsistencyTooShort = errors.New("data too short for consistency level")
+	ErrChainTooShort       = errors.New("data too short for emitter chain")
+	ErrAddressTooShort     = errors.New("data too short for emitter address")
+	ErrReobsTooShort       = errors.New("data too short for IsReobservation")
+	ErrUnreliableTooShort  = errors.New("data too short for Unreliable")
+	ErrVerStateTooShort    = errors.New("data too short for verification state")
+	ErrPayloadLenTooShort  = errors.New("data too short for payload length")
+	ErrPayloadTooShort     = errors.New("data too short for payload")
+)
+
 // The `VerificationState` is the result of applying transfer verification to the transaction associated with the `MessagePublication`.
 // While this could likely be extended to additional security controls in the future, it is only used for `txverifier` at present.
 // Consequently, its status should be set to `NotVerified` or `NotApplicable` for all messages that aren't token transfers.
@@ -246,11 +264,11 @@ func (msg *MessagePublication) MarshalBinary() ([]byte, error) {
 	// Check preconditions
 	txIDLen := len(msg.TxID)
 	if txIDLen > TxIDSizeMax {
-		return nil, ErrInputSize{msg: "TxID too long"}
+		return nil, ErrInputSize{Msg: "TxID too long"}
 	}
 
 	if txIDLen < TxIDLenMin {
-		return nil, ErrInputSize{msg: "TxID too short"}
+		return nil, ErrInputSize{Msg: "TxID too short"}
 	}
 
 	payloadLen := len(msg.Payload)
@@ -407,7 +425,7 @@ func (m *MessagePublication) UnmarshalBinary(data []byte) error {
 	// Calculate minimum required length for the fixed portion
 	// (excluding variable-length fields: TxID and Payload)
 	if len(data) < marshaledMsgLenMin {
-		return ErrInputSize{msg: "data too short", got: len(data)}
+		return ErrInputSize{Msg: "data too short", Got: len(data)}
 	}
 
 	mp := &MessagePublication{}
@@ -423,7 +441,7 @@ func (m *MessagePublication) UnmarshalBinary(data []byte) error {
 	// Bounds checks. TxID length should be at least TxIDLenMin, but not larger than the length of the data.
 	// The second check is to avoid panics.
 	if int(txIDLen) < TxIDLenMin || int(txIDLen) > len(data) {
-		return ErrInputSize{msg: "TxID length is invalid"}
+		return ErrInputSize{Msg: "TxID length is invalid"}
 	}
 
 	// Read TxID
@@ -435,7 +453,7 @@ func (m *MessagePublication) UnmarshalBinary(data []byte) error {
 	// Concretely, we're checking that the data is at least long enough to contain information for all of
 	// the fields except for the Payload itself.
 	if len(data)-pos < fixedFieldsLen {
-		return ErrInputSize{msg: "data too short after reading TxID", got: len(data)}
+		return ErrInputSize{Msg: "data too short after reading TxID", Got: len(data)}
 	}
 
 	// Timestamp
@@ -497,13 +515,13 @@ func (m *MessagePublication) UnmarshalBinary(data []byte) error {
 	// exceed this limit and cause a runtime panic when passed to make([]byte, payloadLen).
 	// This bounds check prevents such panics by rejecting oversized payload lengths early.
 	if payloadLen > PayloadLenMax {
-		return ErrInputSize{msg: "payload length too large", got: len(data)}
+		return ErrInputSize{Msg: "payload length too large", Got: len(data)}
 	}
 
 	// Check if we have enough data for the payload
 	// #nosec G115 -- payloadLen is read from data, bounds checked above
 	if len(data) < pos+int(payloadLen) {
-		return ErrInputSize{msg: "invalid payload length"}
+		return ErrInputSize{Msg: "invalid payload length"}
 	}
 
 	// Read payload

+ 239 - 0
node/pkg/common/pendingmessage.go

@@ -0,0 +1,239 @@
+package common
+
+import (
+	"bytes"
+	"cmp"
+	"container/heap"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"slices"
+	"sync"
+	"time"
+
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
+)
+
+const (
+	// marshaledPendingMsgLenMin is the minimum length of a marshaled pending message.
+	// It includes 8 bytes for the timestamp.
+	marshaledPendingMsgLenMin = 8 + marshaledMsgLenMin
+)
+
+// PendingMessage is a wrapper type around a [MessagePublication] that includes the time for which it
+// should be released.
+type PendingMessage struct {
+	ReleaseTime time.Time
+	Msg         MessagePublication
+}
+
+func (p PendingMessage) Compare(other PendingMessage) int {
+	return cmp.Compare(p.ReleaseTime.Unix(), other.ReleaseTime.Unix())
+}
+
+// MarshalBinary implements BinaryMarshaler for [PendingMessage].
+func (p *PendingMessage) MarshalBinary() ([]byte, error) {
+	buf := new(bytes.Buffer)
+
+	// Compare with [PendingTransfer.Marshal].
+	// #nosec G115  -- int64 and uint64 have the same number of bytes, and Unix time won't be negative.
+	vaa.MustWrite(buf, binary.BigEndian, uint64(p.ReleaseTime.Unix()))
+
+	bz, err := p.Msg.MarshalBinary()
+	if err != nil {
+		return nil, fmt.Errorf("marshal pending message: %w", err)
+	}
+
+	vaa.MustWrite(buf, binary.BigEndian, bz)
+
+	return buf.Bytes(), nil
+}
+
+// UnmarshalBinary implements BinaryUnmarshaler for [PendingMessage].
+func (p *PendingMessage) UnmarshalBinary(data []byte) error {
+
+	if len(data) < marshaledPendingMsgLenMin {
+		return ErrInputSize{Msg: "pending message too short"}
+	}
+
+	// Compare with [UnmarshalPendingTransfer].
+	p.ReleaseTime = time.Unix(
+		// #nosec G115  -- int64 and uint64 have the same number of bytes, and Unix time won't be negative.
+		int64(binary.BigEndian.Uint64(data[0:8])),
+		0,
+	)
+
+	err := p.Msg.UnmarshalBinary(data[8:])
+
+	if err != nil {
+		return fmt.Errorf("unmarshal pending message: %w", err)
+	}
+
+	return nil
+}
+
+// A pendingMessageHeap is a min-heap of [PendingMessage] and uses the heap interface
+// by implementing the methods below.
+// As a result:
+// - The heap is always sorted by timestamp.
+// - the oldest (smallest) timestamp is always the last element.
+// This allows us to pop from the heap in order to get the oldest timestamp. If
+// that value greater than whatever time threshold we specify, we know that
+// there are no other messages that need to be released because their
+// timestamps must be greater. This should allow for constant-time lookups when
+// looking for messages to release.
+//
+// See: https://pkg.go.dev/container/heap#Interface
+type pendingMessageHeap []*PendingMessage
+
+func (h pendingMessageHeap) Len() int {
+	return len(h)
+}
+func (h pendingMessageHeap) Less(i, j int) bool {
+	return h[i].ReleaseTime.Before(h[j].ReleaseTime)
+}
+func (h pendingMessageHeap) Swap(i, j int) {
+	h[i], h[j] = h[j], h[i]
+}
+
+// Push dangerously pushes a value to the heap.
+func (h *pendingMessageHeap) Push(x any) {
+	// Push and Pop use pointer receivers because they modify the slice's length,
+	// not just its contents.
+	item, ok := x.(*PendingMessage)
+	if !ok {
+		panic("PendingMessageHeap: cannot push non-*PendingMessage")
+	}
+
+	// Null check
+	if item == nil {
+		panic("PendingMessageHeap: cannot push nil *PendingMessage")
+	}
+
+	*h = append(*h, item)
+}
+
+// Pops dangerously pops a value from the heap.
+func (h *pendingMessageHeap) Pop() any {
+	old := *h
+	n := len(old)
+	if n == 0 {
+		panic("PendingMessageHeap: cannot Pop from empty heap")
+	}
+	last := old[n-1]
+	*h = old[0 : n-1]
+	return last
+}
+
+// PendingMessageQueue is a thread-safe min-heap that sorts [PendingMessage] in descending order of Timestamp.
+// It also prevents duplicate [MessagePublication]s from being added to the queue.
+type PendingMessageQueue struct {
+	// pendingMessageHeap exposes dangerous APIs as a necessary consequence of implementing [heap.Interface].
+	// Wrap it and expose only a safe API.
+	heap pendingMessageHeap
+	mu   sync.RWMutex
+}
+
+func NewPendingMessageQueue() *PendingMessageQueue {
+	q := &PendingMessageQueue{heap: pendingMessageHeap{}}
+	heap.Init(&q.heap)
+	return q
+}
+
+// Push adds an element to the heap. If msg is nil, nothing is added.
+func (q *PendingMessageQueue) Push(pMsg *PendingMessage) {
+	// noop if the message is nil or already in the queue.
+	if pMsg == nil || q.ContainsMessagePublication(&pMsg.Msg) {
+		return
+	}
+
+	q.mu.Lock()
+	defer q.mu.Unlock()
+
+	heap.Push(&q.heap, pMsg)
+}
+
+// Pop removes the last element from the heap and returns its value.
+// Returns nil if the heap is empty or if the value is not a *[PendingMessage].
+func (q *PendingMessageQueue) Pop() *PendingMessage {
+	if q.heap.Len() == 0 {
+		return nil
+	}
+
+	q.mu.Lock()
+	defer q.mu.Unlock()
+
+	last, ok := heap.Pop(&q.heap).(*PendingMessage)
+	if !ok {
+		return nil
+	}
+
+	return last
+}
+
+func (q *PendingMessageQueue) Len() int {
+	return q.heap.Len()
+}
+
+// Peek returns the element at the top of the heap without removing it.
+func (q *PendingMessageQueue) Peek() *PendingMessage {
+	if q.heap.Len() == 0 {
+		return nil
+	}
+	// container/heap stores the "next" element at the first offset.
+	last := *q.heap[0]
+	return &last
+}
+
+// RemoveItem removes target MessagePublication from the heap. Returns the element that was removed or nil
+// if the item was not found. No error is returned if the item was not found.
+func (q *PendingMessageQueue) RemoveItem(target *MessagePublication) (*PendingMessage, error) {
+	if target == nil {
+		return nil, errors.New("pendingmessage: nil argument for RemoveItem")
+	}
+
+	q.mu.Lock()
+	defer q.mu.Unlock()
+
+	var removed *PendingMessage
+	for i, item := range q.heap {
+		// Assumption: MsgIDs are unique across MessagePublications.
+		if bytes.Equal(item.Msg.MessageID(), target.MessageID()) {
+			pMsg, ok := heap.Remove(&q.heap, i).(*PendingMessage)
+			if !ok {
+				return nil, errors.New("pendingmessage: item removed from heap is not PendingMessage")
+			}
+			removed = pMsg
+			break
+		}
+	}
+
+	return removed, nil
+}
+
+// Contains determines whether the queue contains a [PendingMessage].
+func (q *PendingMessageQueue) Contains(pMsg *PendingMessage) bool {
+	if pMsg == nil {
+		return false
+	}
+
+	q.mu.RLock()
+	defer q.mu.RUnlock()
+
+	return slices.Contains(q.heap, pMsg)
+}
+
+// ContainsMessagePublication determines whether the queue contains a [MessagePublication] (not a [PendingMessage]).
+func (q *PendingMessageQueue) ContainsMessagePublication(msgPub *MessagePublication) bool {
+	if msgPub == nil {
+		return false
+	}
+
+	q.mu.RLock()
+	defer q.mu.RUnlock()
+
+	// Relies on MessageIDString to be unique.
+	return slices.ContainsFunc(q.heap, func(pMsg *PendingMessage) bool {
+		return bytes.Equal(pMsg.Msg.MessageID(), msgPub.MessageID())
+	})
+}

+ 396 - 0
node/pkg/common/pendingmessage_test.go

@@ -0,0 +1,396 @@
+package common_test
+
+import (
+	"bytes"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"math"
+	"math/big"
+	"math/rand/v2"
+	"slices"
+	"testing"
+	"time"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	eth_common "github.com/ethereum/go-ethereum/common"
+	"github.com/stretchr/testify/require"
+	"github.com/wormhole-foundation/wormhole/sdk"
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
+)
+
+func TestPendingMessage_RoundTripMarshal(t *testing.T) {
+	orig := makeUniquePendingMessage(t)
+	var loaded common.PendingMessage
+
+	bz, writeErr := orig.MarshalBinary()
+	require.NoError(t, writeErr)
+
+	readErr := loaded.UnmarshalBinary(bz)
+	require.NoError(t, readErr)
+
+	require.Equal(t, *orig, loaded)
+}
+
+func TestPendingMessage_MarshalError(t *testing.T) {
+
+	type test struct {
+		label string
+		input common.MessagePublication
+		err   error
+	}
+
+	// Set up.
+	var (
+		longTxID = bytes.NewBuffer(make([]byte, math.MaxUint8+1))
+	)
+	emitter, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	require.NoError(t, err)
+
+	tests := []test{
+		{
+			label: "txID too long",
+			input: common.MessagePublication{
+				TxID: longTxID.Bytes(),
+			},
+			err: common.ErrInputSize{Msg: "TxID too long"},
+		},
+		{
+			label: "txID too short",
+			input: common.MessagePublication{
+				TxID:             []byte{},
+				Timestamp:        time.Unix(int64(1654516425), 0),
+				Nonce:            123456,
+				Sequence:         789101112131415,
+				EmitterChain:     vaa.ChainIDEthereum,
+				EmitterAddress:   emitter,
+				Payload:          []byte{},
+				ConsistencyLevel: 32,
+				Unreliable:       true,
+				IsReobservation:  true,
+			},
+			err: common.ErrInputSize{Msg: "TxID too short"},
+		},
+	}
+
+	for _, tc := range tests {
+		t.Run(tc.label, func(t *testing.T) {
+			pMsg := &common.PendingMessage{
+				ReleaseTime: time.Now(),
+				Msg:         tc.input,
+			}
+
+			bz, writeErr := pMsg.MarshalBinary()
+			require.Error(t, writeErr)
+			require.True(t, errors.Is(writeErr, tc.err), fmt.Sprintf("got wrong error type: %v", writeErr))
+			require.Nil(t, bz)
+		})
+	}
+
+}
+
+func TestPendingMessageQueue_NoDuplicates(t *testing.T) {
+	q := common.NewPendingMessageQueue()
+
+	// Create two messages with the same sequence number.
+	msg1 := *makeUniquePendingMessage(t)
+	msg2 := *makeUniquePendingMessage(t)
+	msg2.Msg.Sequence = msg1.Msg.Sequence
+
+	q.Push(&msg1)
+	require.Equal(t, 1, q.Len())
+
+	msg2.ReleaseTime = msg1.ReleaseTime.Add(time.Hour)
+	require.True(t, msg1.ReleaseTime.Before(msg2.ReleaseTime))
+
+	// Pushing the same message twice should not add it to the queue.
+	q.Push(&msg2)
+	require.Equal(t, 1, q.Len())
+}
+
+func TestPendingMessage_HeapInvariants(t *testing.T) {
+
+	msg1 := *makeUniquePendingMessage(t)
+	msg2 := *makeUniquePendingMessage(t)
+	msg3 := *makeUniquePendingMessage(t)
+
+	// Modify release times, in ascending (past-to-future) order: msg1 < msg2 < msg3
+	msg2.ReleaseTime = msg1.ReleaseTime.Add(time.Hour)
+	msg3.ReleaseTime = msg1.ReleaseTime.Add(time.Hour * 2)
+
+	require.True(t, msg1.ReleaseTime.Before(msg2.ReleaseTime))
+	require.True(t, msg2.ReleaseTime.Before(msg3.ReleaseTime))
+
+	tests := []struct {
+		name string // description of this test case
+		// Named input parameters for target function.
+		order []*common.PendingMessage
+	}{
+		{
+			"ascending order",
+			[]*common.PendingMessage{&msg1, &msg2, &msg3},
+		},
+		{
+			"mixed order A",
+			[]*common.PendingMessage{&msg2, &msg3, &msg1},
+		},
+		{
+			"mixed order B",
+			[]*common.PendingMessage{&msg3, &msg1, &msg2},
+		},
+	}
+
+	// Try different variations of adding messages to the heap.
+	// After each variation, the first element returned should be equal
+	// to the smallest/oldest message publication, which is msg1.
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Set-up
+			q := common.NewPendingMessageQueue()
+			for pMsg := range slices.Values(tt.order) {
+				q.Push(pMsg)
+			}
+			require.Equal(t, len(tt.order), q.Len())
+
+			res := consumeHeapAndAssertOrdering(t, q)
+			require.Equal(t, 0, q.Len())
+			require.True(t, &msg1 == res[0])
+			assertSliceOrdering(t, res)
+		})
+	}
+
+	// Ensure that calling RemoveItem doesn't change the ordering.
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Set-up
+			q := common.NewPendingMessageQueue()
+			for pMsg := range slices.Values(tt.order) {
+				q.Push(pMsg)
+			}
+			require.Equal(t, len(tt.order), q.Len())
+
+			removed, err := q.RemoveItem(&msg2.Msg)
+			require.NoError(t, err)
+			require.NotNil(t, removed)
+			require.Equal(t, &msg2, removed, "removed message does not match expected message")
+
+			require.Equal(t, len(tt.order)-1, q.Len())
+
+			res := consumeHeapAndAssertOrdering(t, q)
+			require.Equal(t, 0, q.Len())
+			require.True(t, &msg1 == res[0])
+			assertSliceOrdering(t, res)
+		})
+	}
+
+}
+
+func TestPendingMessageQueue_Peek(t *testing.T) {
+	q := common.NewPendingMessageQueue()
+
+	msg1 := *makeUniquePendingMessage(t)
+	msg2 := *makeUniquePendingMessage(t)
+	msg3 := *makeUniquePendingMessage(t)
+
+	// Modify release times, in ascending (past-to-future) order: msg1 < msg2 < msg3
+	msg2.ReleaseTime = msg1.ReleaseTime.Add(time.Hour)
+	msg3.ReleaseTime = msg1.ReleaseTime.Add(time.Hour * 2)
+
+	require.True(t, msg1.ReleaseTime.Before(msg2.ReleaseTime))
+	require.True(t, msg2.ReleaseTime.Before(msg3.ReleaseTime))
+
+	// Push elements in an arbitrary order.
+	// Assert that Peek() returns msg1 because it is the smallest.
+	q.Push(&msg2)
+	q.Push(&msg3)
+	q.Push(&msg1)
+	require.Equal(t, 3, q.Len())
+	require.Equal(t, &msg1, q.Peek())
+
+}
+
+func TestPendingMessageQueue_RemoveItem(t *testing.T) {
+	msgInQueue := makeUniquePendingMessage(t).Msg
+	msgNotInQueue := makeUniquePendingMessage(t).Msg
+	msgNotInQueue.TxID = []byte{0xff}
+
+	tests := []struct {
+		name string // description of this test case
+		// Named input parameters for target function.
+		target *common.MessagePublication
+		want   *common.PendingMessage
+	}{
+		{
+			"successful removal",
+			&msgInQueue,
+			&common.PendingMessage{Msg: msgInQueue},
+		},
+		{
+			"remove an item that is not in the queue",
+			&msgNotInQueue,
+			nil,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+
+			q := common.NewPendingMessageQueue()
+
+			q.Push(&common.PendingMessage{Msg: msgInQueue})
+
+			got, gotErr := q.RemoveItem(tt.target)
+			require.NoError(t, gotErr)
+
+			if tt.want != nil {
+				require.NotNil(t, got)
+				require.Equal(t, 0, q.Len())
+				// The RemoveItem function relies on comparing TxIDs.
+				require.Equal(t, tt.want.Msg.TxID, got.Msg.TxID)
+			} else {
+				require.Nil(t, got)
+				require.Equal(t, 1, q.Len(), "item should not have been removed from queue")
+			}
+
+		})
+	}
+}
+
+// TestPendingMessageQueue_DangerousOperations ensures that dangerous operations
+// on the queue do not panic or cause unexpected behavior.
+func TestPendingMessageQueue_DangerousOperations(t *testing.T) {
+	q := common.NewPendingMessageQueue()
+
+	// Popping an empty queue should not panic or alter the queue.
+	element := q.Pop()
+	require.Nil(t, element)
+	require.Equal(t, 0, q.Len())
+
+	// Peeking an empty queue should not panic or alter the queue.
+	element = q.Peek()
+	require.Nil(t, element)
+	require.Equal(t, 0, q.Len())
+
+	// Build some state for the next test.
+	msg1 := *makeUniquePendingMessage(t)
+	msg2 := *makeUniquePendingMessage(t)
+	msg3 := *makeUniquePendingMessage(t)
+
+	q.Push(&msg1)
+	q.Push(&msg2)
+	q.Push(&msg3)
+	require.Equal(t, 3, q.Len())
+
+	// Add nil to the queue and ensure that it is ignored.
+	q.Push(nil)
+	require.Equal(t, 3, q.Len())
+}
+
+func assertSliceOrdering(t *testing.T, s []*common.PendingMessage) {
+	for i := range len(s) - 1 {
+		require.True(t, s[i].ReleaseTime.Before(s[i+1].ReleaseTime))
+	}
+}
+
+// consumeHeapAndAssertOrdering takes heap and pops every element, ensuring that
+// each popped element is smaller than the next one on the heap.
+// Returns the elements in order of when they were popped. This should result
+// in a slice of strictly ascending values.
+func consumeHeapAndAssertOrdering(t *testing.T, q *common.PendingMessageQueue) []*common.PendingMessage {
+	require.True(t, q.Len() > 0, "programming error: can't process empty queue")
+
+	res := make([]*common.PendingMessage, 0, q.Len())
+
+	// Pop all entries from the heap. Ensure that the element on top of the heap
+	// is always the earliest (smallest timestamp).
+	for q.Len() > 0 {
+		// length changes automatically after popping.
+		earliest := q.Pop()
+		res = append(res, earliest)
+
+		next := q.Peek()
+
+		// Expect next to not be nil unless we just popped the last element.
+		if q.Len() > 0 {
+			require.NotNil(t, next)
+		}
+
+		if next == nil {
+			continue
+		}
+
+		require.True(t, earliest.ReleaseTime.Before(q.Peek().ReleaseTime))
+	}
+	return res
+}
+
+func encodePayloadBytes(payload *vaa.TransferPayloadHdr) []byte {
+	bz := make([]byte, 101)
+	bz[0] = payload.Type
+
+	amtBytes := payload.Amount.Bytes()
+	if len(amtBytes) > 32 {
+		panic("amount will not fit in 32 bytes!")
+	}
+	copy(bz[33-len(amtBytes):33], amtBytes)
+
+	copy(bz[33:65], payload.OriginAddress.Bytes())
+	binary.BigEndian.PutUint16(bz[65:67], uint16(payload.OriginChain))
+	copy(bz[67:99], payload.TargetAddress.Bytes())
+	binary.BigEndian.PutUint16(bz[99:101], uint16(payload.TargetChain))
+	return bz
+}
+
+// Helper function that returns a valid PendingMessage. It creates identical messages publications
+// with different sequence numbers.
+func makeUniquePendingMessage(t *testing.T) *common.PendingMessage {
+	t.Helper()
+
+	originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") //nolint:gosec
+	require.NoError(t, err)
+
+	targetAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	// Required as the Notary checks the emitter address.
+	tokenBridge := sdk.KnownTokenbridgeEmitters[vaa.ChainIDEthereum]
+	tokenBridgeAddress := vaa.Address(tokenBridge)
+	require.NoError(t, err)
+
+	payload := &vaa.TransferPayloadHdr{
+		Type:          0x01,
+		Amount:        big.NewInt(27000000000),
+		OriginAddress: originAddress,
+		OriginChain:   vaa.ChainIDEthereum,
+		TargetAddress: targetAddress,
+		TargetChain:   vaa.ChainIDPolygon,
+	}
+
+	payloadBytes := encodePayloadBytes(payload)
+
+	// Should be unique for each test with high probability.
+	// #nosec: G404 -- Cryptographically secure pseudo-random number generator not needed.
+	var sequence = rand.Uint64()
+	msgpub := &common.MessagePublication{
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
+		Timestamp:        time.Unix(int64(1654516425), 0),
+		Nonce:            123456,
+		Sequence:         sequence,
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddress,
+		Payload:          payloadBytes,
+		ConsistencyLevel: 32,
+		Unreliable:       true,
+		IsReobservation:  true,
+	}
+	setErr := msgpub.SetVerificationState(common.Anomalous)
+	require.NoError(t, setErr)
+
+	// The nanoseconds are not important to us and are not serialized.
+	releaseTime := time.Unix(int64(1654516425), 0)
+	return &common.PendingMessage{
+		ReleaseTime: releaseTime,
+		Msg:         *msgpub,
+	}
+}

+ 5 - 0
node/pkg/db/db.go

@@ -236,3 +236,8 @@ func (d *Database) FindEmitterSequenceGap(prefix VAAID) (resp []uint64, firstSeq
 	}
 	return
 }
+
+// Conn returns a pointer to the underlying database connection.
+func (d *Database) Conn() *badger.DB {
+	return d.db
+}

+ 231 - 0
node/pkg/db/notary.go

@@ -0,0 +1,231 @@
+// SECURITY: The calling code is responsible for handling mutex operations when
+// working with this package.
+package db
+
+import (
+	"errors"
+	"fmt"
+	"strings"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/dgraph-io/badger/v3"
+	"go.uber.org/zap"
+)
+
+type NotaryDBInterface interface {
+	StoreBlackholed(m *common.MessagePublication) error
+	StoreDelayed(p *common.PendingMessage) error
+	DeleteBlackholed(m *common.MessagePublication) error
+	DeleteDelayed(p *common.PendingMessage) error
+	LoadAll(logger *zap.Logger) (*NotaryLoadResult, error)
+}
+
+// NotaryDB is a wrapper struct for a database connection.
+// Its main purpose is to provide some separation from the Notary's functionality
+// and the general functioning of db.Database
+type NotaryDB struct {
+	db *badger.DB
+}
+
+func NewNotaryDB(dbConn *badger.DB) *NotaryDB {
+	return &NotaryDB{
+		db: dbConn,
+	}
+}
+
+// Define prefixes used to isolate different message publications stored in the database.
+const (
+	delayedPrefix   = "NOTARY:DELAY:V1:"
+	blackholePrefix = "NOTARY:BLACKHOLE:V1:"
+)
+
+// The type of data stored in the Notary's database.
+type dataType string
+
+const (
+	Unknown    dataType = "unknown"
+	Delayed    dataType = "delayed"
+	Blackholed dataType = "blackholed"
+)
+
+var (
+	ErrMarshal   = errors.New("notary: marshal")
+	ErrUnmarshal = errors.New("notary: unmarshal")
+)
+
+// Operation represents a database operation type
+type Operation string
+
+const (
+	OpRead   Operation = "read"
+	OpUpdate Operation = "update"
+	OpDelete Operation = "delete"
+)
+
+type DBError struct {
+	Op  Operation
+	Key []byte
+	Err error
+}
+
+func (e *DBError) Unwrap() error {
+	return e.Err
+}
+
+func (e *DBError) Error() string {
+	return fmt.Sprintf("notary database: %s key: %x error: %v", e.Op, e.Key, e.Err)
+}
+
+func (d *NotaryDB) StoreDelayed(p *common.PendingMessage) error {
+	b, marshalErr := p.MarshalBinary()
+
+	if marshalErr != nil {
+		return errors.Join(ErrMarshal, marshalErr)
+	}
+
+	key := delayKey(p)
+	if updateErr := d.update(key, b); updateErr != nil {
+		return &DBError{Op: OpUpdate, Key: key, Err: updateErr}
+	}
+
+	return nil
+}
+
+func (d *NotaryDB) StoreBlackholed(m *common.MessagePublication) error {
+	b, marshalErr := m.MarshalBinary()
+
+	if marshalErr != nil {
+		return errors.Join(ErrMarshal, marshalErr)
+	}
+
+	key := blackholeKey(m)
+	if updateErr := d.update(key, b); updateErr != nil {
+		return &DBError{Op: OpUpdate, Key: key, Err: updateErr}
+	}
+	return nil
+}
+
+func (d *NotaryDB) DeleteDelayed(p *common.PendingMessage) error {
+	return d.deleteEntry(delayKey(p))
+}
+
+func (d *NotaryDB) DeleteBlackholed(m *common.MessagePublication) error {
+	return d.deleteEntry(blackholeKey(m))
+}
+
+type NotaryLoadResult struct {
+	Delayed    []*common.PendingMessage
+	Blackholed []*common.MessagePublication
+}
+
+// LoadAll retrieves all keys from the database.
+func (d *NotaryDB) LoadAll(logger *zap.Logger) (*NotaryLoadResult, error) {
+	result := NotaryLoadResult{
+		Delayed:    make([]*common.PendingMessage, 0),
+		Blackholed: make([]*common.MessagePublication, 0),
+	}
+	viewErr := d.db.View(func(txn *badger.Txn) error {
+		opts := badger.DefaultIteratorOptions
+		opts.PrefetchSize = 10
+		it := txn.NewIterator(opts)
+		defer it.Close()
+		for it.Rewind(); it.Valid(); it.Next() {
+			item := it.Item()
+			key := item.Key()
+			data, copyErr := item.ValueCopy(nil)
+			if copyErr != nil {
+				return copyErr
+			}
+
+			switch dbDataType(key) {
+			case Blackholed:
+				var msgPub common.MessagePublication
+				unmarshalErr := msgPub.UnmarshalBinary(data)
+				if unmarshalErr != nil {
+					return errors.Join(
+						ErrUnmarshal,
+						unmarshalErr,
+					)
+				}
+				result.Blackholed = append(result.Blackholed, &msgPub)
+			case Delayed:
+				var pMsg common.PendingMessage
+				unmarshalErr := pMsg.UnmarshalBinary(data)
+				if unmarshalErr != nil {
+					return errors.Join(
+						ErrUnmarshal,
+						unmarshalErr,
+					)
+				}
+				result.Delayed = append(result.Delayed, &pMsg)
+			case Unknown:
+				// The key-value store is shared across other modules and message types (e.g. Governor, Accountant).
+				// If another key is discovered, just ignore it.
+				logger.Debug("notary: load database ignoring unknown key type", zap.String("key", string(key)))
+				continue
+			}
+
+		}
+		return nil
+	})
+
+	if viewErr != nil {
+		// No key provided here since the View function is iterating over every entry.
+		return nil, &DBError{Op: OpRead, Err: viewErr}
+	}
+
+	return &result, nil
+}
+
+// dbDataType returns the data type for an entry in the database based on its key.
+func dbDataType(key []byte) dataType {
+	if strings.HasPrefix(string(key), blackholePrefix) {
+		return Blackholed
+	}
+	if strings.HasPrefix(string(key), delayedPrefix) {
+		return Delayed
+	}
+	return Unknown
+
+}
+
+func (d *NotaryDB) update(key []byte, data []byte) error {
+	updateErr := d.db.Update(func(txn *badger.Txn) error {
+		if setErr := txn.Set(key, data); setErr != nil {
+			return setErr
+		}
+		return nil
+	})
+
+	if updateErr != nil {
+		return &DBError{Op: OpUpdate, Key: key, Err: updateErr}
+	}
+
+	return nil
+}
+
+func (d *NotaryDB) deleteEntry(key []byte) error {
+	if updateErr := d.db.Update(func(txn *badger.Txn) error {
+		deleteErr := txn.Delete(key)
+		return deleteErr
+	}); updateErr != nil {
+		return &DBError{Op: OpDelete, Key: key, Err: updateErr}
+	}
+
+	return nil
+}
+
+// delayKey returns a unique prefix for pending messages to be stored in the Notary's database.
+func delayKey(p *common.PendingMessage) []byte {
+	return key(delayedPrefix, p.Msg.MessageIDString())
+}
+
+// blackholeKey returns a unique prefix for blackholed message publications to be stored in the Notary's database.
+func blackholeKey(m *common.MessagePublication) []byte {
+	return key(blackholePrefix, m.MessageIDString())
+}
+
+// key returns a unique prefix for different data types stored in the Notary's database.
+func key(prefix string, msgID string) (key []byte) {
+	return fmt.Appendf(key, "%v%v", prefix, msgID)
+}

+ 105 - 0
node/pkg/db/notary_test.go

@@ -0,0 +1,105 @@
+package db
+
+import (
+	"encoding/hex"
+	"fmt"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/stretchr/testify/require"
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
+	"go.uber.org/zap"
+)
+
+func TestStoreAndReloadData(t *testing.T) {
+	// Set-up.
+	dbPath := t.TempDir()
+	database := OpenDb(zap.NewNop(), &dbPath)
+	defer database.Close()
+	defer os.Remove(dbPath)
+	nDB := NotaryDB{db: database.db}
+
+	// Build messages.
+	msg1 := makeNewMsgPub(t)
+	msg2 := *msg1
+	pendingMsg := makeNewPendingMsg(t, msg1)
+
+	// Store messages.
+	delayErr := nDB.StoreDelayed(pendingMsg)
+	require.NoError(t, delayErr, fmt.Sprintf("failed to store delayed message: %v", delayErr))
+	blackholeErr := nDB.StoreBlackholed(&msg2)
+	require.NoError(t, blackholeErr, fmt.Sprintf("failed to store blackholed message: %v", blackholeErr))
+
+	// Retrieve both messages and ensure they're equal to what was stored.
+	res, loadErr := nDB.LoadAll(zap.NewNop())
+	require.NoError(t, loadErr)
+	require.Equal(t, 1, len(res.Delayed))
+	require.Equal(t, 1, len(res.Blackholed))
+	require.Equal(t, pendingMsg, res.Delayed[0])
+	require.Equal(t, &msg2, res.Blackholed[0])
+}
+
+func TestKeysForStoredMessagesV1(t *testing.T) {
+	msg1 := makeNewMsgPub(t)
+	pMsg := makeNewPendingMsg(t, msg1)
+
+	require.Equal(
+		t,
+		[]byte("NOTARY:DELAY:V1:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+		delayKey(pMsg),
+	)
+
+	require.Equal(
+		t,
+		[]byte("NOTARY:BLACKHOLE:V1:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"),
+		blackholeKey(msg1),
+	)
+}
+
+// nowSeconds is a helper function that returns time.Now() with the nanoseconds truncated.
+// The nanoseconds are not important to us and are not serialized.
+func nowSeconds() time.Time {
+	return time.Unix(time.Now().Unix(), 0)
+}
+
+// makeNewMsgPub returns a MessagePublication that has a token transfer payload
+// but otherwise has default values.
+func makeNewMsgPub(t *testing.T) *common.MessagePublication {
+	t.Helper()
+
+	ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
+	require.NoError(t, err)
+
+	validTxID, err := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8")
+	require.NoError(t, err)
+
+	msg := &common.MessagePublication{
+		TxID:            validTxID,
+		Timestamp:       nowSeconds(),
+		Nonce:           1,
+		Sequence:        789101112131415,
+		EmitterChain:    vaa.ChainIDEthereum,
+		EmitterAddress:  ethereumTokenBridgeAddr,
+		Unreliable:      false,
+		IsReobservation: false,
+		Payload:         []byte{0x01},
+	}
+
+	err = msg.SetVerificationState(common.Anomalous)
+	require.NoError(t, err)
+
+	return msg
+}
+
+// makeNewPendingMsg wraps a message publication and adds a release time to create a PendingMessage
+func makeNewPendingMsg(t *testing.T, msg *common.MessagePublication) *common.PendingMessage {
+	t.Helper()
+
+	return &common.PendingMessage{
+		// The nanoseconds are not important to us and are not serialized.
+		ReleaseTime: nowSeconds().Add(24 * time.Hour),
+		Msg:         *msg,
+	}
+}

+ 2 - 2
node/pkg/governor/governor.go

@@ -744,8 +744,8 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked(
 	return true, ce, token, payload, nil
 }
 
-// CheckPending is a wrapper method for CheckPendingForTime. It is called by the processor with the purpose of releasing
-// queued transfers.
+// CheckPending is a wrapper method for CheckPendingForTime that uses time.Now as the release time.
+// Returns a slice of MessagePublications that are ready to be published.
 func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) {
 	return gov.CheckPendingForTime(time.Now())
 }

+ 9 - 0
node/pkg/node/node.go

@@ -11,6 +11,7 @@ import (
 	"github.com/certusone/wormhole/node/pkg/governor"
 	"github.com/certusone/wormhole/node/pkg/guardiansigner"
 	"github.com/certusone/wormhole/node/pkg/gwrelayer"
+	"github.com/certusone/wormhole/node/pkg/notary"
 	gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
 	"github.com/certusone/wormhole/node/pkg/query"
 	"github.com/certusone/wormhole/node/pkg/supervisor"
@@ -90,6 +91,7 @@ type G struct {
 	gst                *common.GuardianSetState
 	acct               *accountant.Accountant
 	gov                *governor.ChainGovernor
+	notary             *notary.Notary
 	gatewayRelayer     *gwrelayer.GatewayRelayer
 	queryHandler       *query.QueryHandler
 	publicrpcServer    *grpc.Server
@@ -235,6 +237,13 @@ func (g *G) Run(rootCtxCancel context.CancelFunc, options ...*GuardianOption) su
 			}
 		}
 
+		if g.notary != nil {
+			logger.Info("starting notary")
+			if err := g.notary.Run(); err != nil {
+				logger.Fatal("failed to create notary", zap.Error(err))
+			}
+		}
+
 		if g.gatewayRelayer != nil {
 			logger.Info("Starting gateway relayer")
 			if err := g.gatewayRelayer.Start(ctx); err != nil {

+ 1 - 0
node/pkg/node/node_test.go

@@ -189,6 +189,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
 			GuardianOptionWatchers(watcherConfigs, nil),
 			GuardianOptionNoAccountant(), // disable accountant
 			GuardianOptionGovernor(true, false, ""),
+			GuardianOptionNotary(true),
 			GuardianOptionGatewayRelayer("", nil), // disable gateway relayer
 			GuardianOptionQueryHandler(false, ""), // disable queries
 			GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail),

+ 20 - 2
node/pkg/node/options.go

@@ -13,6 +13,7 @@ import (
 	guardianDB "github.com/certusone/wormhole/node/pkg/db"
 	"github.com/certusone/wormhole/node/pkg/governor"
 	"github.com/certusone/wormhole/node/pkg/gwrelayer"
+	"github.com/certusone/wormhole/node/pkg/notary"
 	"github.com/certusone/wormhole/node/pkg/p2p"
 	"github.com/certusone/wormhole/node/pkg/processor"
 	gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
@@ -254,6 +255,22 @@ func GuardianOptionGovernor(governorEnabled bool, flowCancelEnabled bool, coinGe
 		}}
 }
 
+// GuardianOptionNotary enables or disables the Notary.
+// Dependencies: db
+func GuardianOptionNotary(notaryEnabled bool) *GuardianOption {
+	return &GuardianOption{
+		name:         "notary",
+		dependencies: []string{"db"},
+		f: func(ctx context.Context, logger *zap.Logger, g *G) error {
+			if notaryEnabled {
+				g.notary = notary.NewNotary(ctx, logger, g.db, g.env)
+			} else {
+				logger.Info("notary is disabled")
+			}
+			return nil
+		}}
+}
+
 // GuardianOptionGatewayRelayer configures the Gateway Relayer module. If the gateway relayer smart contract is configured, we will instantiate
 // the GatewayRelayer and signed VAAs will be passed to it for processing when they are published. It will forward payload three transfers destined
 // for the specified contract on wormchain to that contract.
@@ -614,8 +631,8 @@ func GuardianOptionAlternatePublisher(guardianAddr []byte, configs []string) *Gu
 func GuardianOptionProcessor(networkId string) *GuardianOption {
 	return &GuardianOption{
 		name: "processor",
-		// governor and accountant may be set to nil, but that choice needs to be made before the processor is configured
-		dependencies: []string{"accountant", "alternate-publisher", "db", "gateway-relayer", "governor"},
+		// governor, accountant, and notary may be set to nil, but that choice needs to be made before the processor is configured
+		dependencies: []string{"accountant", "alternate-publisher", "db", "gateway-relayer", "governor", "notary"},
 
 		f: func(ctx context.Context, logger *zap.Logger, g *G) error {
 
@@ -633,6 +650,7 @@ func GuardianOptionProcessor(networkId string) *GuardianOption {
 				g.gov,
 				g.acct,
 				g.acctC.readC,
+				g.notary,
 				g.gatewayRelayer,
 				networkId,
 				g.alternatePublisher,

+ 521 - 0
node/pkg/notary/notary.go

@@ -0,0 +1,521 @@
+// Notary evaluates the status of [common.MessagePublication]s and makes decisions regarding
+// how they should be processed.
+//
+// Currently, it returns one of three possible verdicts:
+// 1. Approve
+//   - Messages should pass through normally.
+//   - This verdict is used for any message that has a non-error status.
+//
+// 2. Delay
+//   - Messages should be delayed.
+//   - This verdict is used for Anomalous messages.
+//
+// 3. Blackhole
+//   - Messages should be blocked from publication permanently, including for reobservation pathways.
+//   - This status is reserved for messages with a Rejected status.
+//
+// The Notary does not modify message publications nor does it stop them from
+// being processed. It only informs other code what to do. When a message is
+// Delayed or Rejected, the Notary will track it in a database.
+//
+// Delayed messages are stored with a timestamp indicating when they should be
+// released. After the timestamp expires, they can be removed from the
+// database.
+//
+// Because Blackholed messages are meant to be blocked permanently, they should
+// be stored in the database forever. In practice, messages will be marked as
+// Rejected only in very extreme circumstances, so the database should always
+// be small.
+package notary
+
+import (
+	"bytes"
+	"context"
+	"errors"
+	"slices"
+	"sync"
+	"time"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/certusone/wormhole/node/pkg/db"
+	"github.com/wormhole-foundation/wormhole/sdk"
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
+
+	"go.uber.org/zap"
+)
+
+type (
+	// Verdict is an enum that reports what action the Notary has taken after processing a message.
+	Verdict uint8
+)
+
+const (
+	Unknown Verdict = iota
+	// Approve means a message should be processed normally. All messages that are not Token Transfers
+	// must always be Approve, as the Notary does not support other messasge types.
+	Approve
+	// Delay means a message should be temporarily delayed so that it can be manually inspected.
+	Delay
+	// Blackhole means a message should be permanently blocked from being processed.
+	Blackhole
+)
+
+func (v Verdict) String() string {
+	switch v {
+	case Approve:
+		return "Approve"
+	case Delay:
+		return "Delay"
+	case Blackhole:
+		return "Blackhole"
+	case Unknown:
+		return "Unknown"
+	default:
+		return "Unknown"
+	}
+}
+
+const (
+	// How long a message should be held in the pending list before being processed.
+	// The value should be long enough to allow for manual review and classification
+	// by the Guardians.
+	DelayFor = time.Hour * 24 * 4
+)
+
+var (
+	ErrAlreadyInitialized = errors.New("notary: message queues already initialized during database load")
+	ErrAlreadyBlackholed  = errors.New("notary: message is already blackholed")
+	ErrCannotRelease      = errors.New("notary: could not release message")
+	ErrInvalidMsg         = errors.New("notary: message is invalid")
+)
+
+type (
+	// A set corresponding to message publications. The elements of the set must be the results of
+	// the function [common.MessagePublication.VAAHashUnchecked].
+	msgPubSet struct {
+		elements map[string]struct{}
+	}
+
+	Notary struct {
+		ctx    context.Context
+		logger *zap.Logger
+		mutex  sync.RWMutex
+		// database persists information about delayed and black-holed messages.
+		// Must be guarded by a read-write mutex.
+		database db.NotaryDBInterface
+
+		// Min-heap queue of delayed messages (MessagePublication + Timestamp for release)
+		delayed *common.PendingMessageQueue
+
+		// All of the messages that have been black-holed due to being rejected by the Transfer Verifier.
+		// [msgPubSet] is not thread-safe so this field must be guarded by a read-write mutex.
+		blackholed *msgPubSet
+
+		// env reports whether the guardian is running in production or a test environment.
+		env common.Environment
+	}
+)
+
+func NewNotary(
+	ctx context.Context,
+	logger *zap.Logger,
+	guardianDB *db.Database,
+	env common.Environment,
+) *Notary {
+	return &Notary{
+		ctx:    ctx,
+		logger: logger,
+		mutex:  sync.RWMutex{},
+		// Get the underlying database connection from the Guardian.
+		database:   db.NewNotaryDB(guardianDB.Conn()),
+		delayed:    common.NewPendingMessageQueue(),
+		blackholed: nil,
+		env:        env,
+	}
+}
+
+func (n *Notary) Run() error {
+	if n.env != common.GoTest {
+		n.logger.Info("loading notary data from database")
+		if err := n.loadFromDB(n.logger); err != nil {
+			return err
+		}
+	}
+
+	n.logger.Info("notary ready")
+
+	return nil
+}
+
+func (n *Notary) ProcessMsg(msg *common.MessagePublication) (v Verdict, err error) {
+
+	n.logger.Debug("notary: processing message", msg.ZapFields()...)
+
+	// NOTE: Only token transfers originated on Ethereum are currently considered.
+	// For the initial implementation, the Notary only rules on messages based
+	// on the Transfer Verifier. However, there is no technical barrier to
+	// supporting other message types.
+	if msg.EmitterChain != vaa.ChainIDEthereum {
+		n.logger.Debug("notary: automatically approving message publication because it is not from Ethereum", msg.ZapFields()...)
+		return Approve, nil
+	}
+
+	if !vaa.IsTransfer(msg.Payload) {
+		n.logger.Debug("notary: automatically approving message publication because it is not a token transfer", msg.ZapFields()...)
+		return Approve, nil
+	}
+
+	if tokenBridge, ok := sdk.KnownTokenbridgeEmitters[msg.EmitterChain]; !ok {
+		// Return Unknown if the token bridge is not registered in the SDK.
+		n.logger.Error("notary: unknown token bridge emitter", msg.ZapFields()...)
+		return Unknown, errors.New("unknown token bridge emitter")
+	} else {
+		// Approve if the token transfer is not from the token bridge.
+		// For now, the notary only rules on token transfers from the token bridge.
+		if !bytes.Equal(msg.EmitterAddress.Bytes(), tokenBridge) {
+			n.logger.Debug("notary: automatically approving message publication because it is not from the token bridge", msg.ZapFields()...)
+			return Approve, nil
+		}
+	}
+
+	// Return early if the message has already been blackholed. This is important in case a message
+	// is reobserved or otherwise processed here more than once. An Anomalous message that becomes
+	// delayed and later blackholed should not be able to be re-added to the Delayed queue.
+	if n.IsBlackholed(msg) {
+		n.logger.Warn("notary: got message publication that is already blackholed",
+			msg.ZapFields(zap.String("verdict", Blackhole.String()))...,
+		)
+		return Blackhole, nil
+	}
+
+	switch msg.VerificationState() {
+	case common.Anomalous:
+		err = n.delay(msg, DelayFor)
+		v = Delay
+	case common.Rejected:
+		err = n.blackhole(msg)
+		v = Blackhole
+	case common.Valid:
+		v = Approve
+	case common.CouldNotVerify, common.NotVerified, common.NotApplicable:
+		// NOTE: All other statuses are simply approved for now. In the future, it may be
+		// desirable to log a warning if a [common.NotVerified] message is handled here, with
+		// the idea that messages handled by the Notary must already have a non-default
+		// status.
+		n.logger.Debug("notary: got unexpected verification status for token transfer", msg.ZapFields()...)
+		v = Approve
+	}
+
+	n.logger.Debug("notary result",
+		msg.ZapFields(zap.String("verdict", v.String()))...,
+	)
+	return
+}
+
+// ReleaseReadyMessages removes messages from the database and the delayed queue if they are ready to
+// be released. Returns the messages that are ready to be published.
+func (n *Notary) ReleaseReadyMessages() []*common.MessagePublication {
+	if n == nil || n.delayed == nil {
+		return nil
+	}
+
+	n.logger.Debug(
+		"notary: begin process ready message",
+		zap.Int("delayedCount", n.delayed.Len()),
+	)
+	var (
+		readyMsgs = make([]*common.MessagePublication, 0, n.delayed.Len())
+		now       = time.Now()
+	)
+
+	// Pop elements from the queue until the release time is after the current time.
+	// If errors occur, continue instead of returning early so that other messages
+	// can still be processed.
+	for n.delayed.Len() != 0 {
+		next := n.delayed.Peek()
+		if next == nil || next.ReleaseTime.After(now) {
+			break // No more messages to process or next message not ready
+		}
+
+		// Pop reduces the length of n.delayed
+		pMsg := n.delayed.Pop()
+		if pMsg == nil {
+			n.logger.Error("nil message after pop")
+			continue // Skip if Pop returns nil (shouldn't happen if Peek worked)
+		}
+
+		// Update database. Do this before adding the message to the ready list so that we don't
+		// accidentally add the same message twice if deleting the message from the database fails.
+		err := n.database.DeleteDelayed(pMsg)
+		if err != nil {
+			n.logger.Error("delete pending message from notary database", pMsg.Msg.ZapFields(zap.Error(err))...)
+			continue
+		}
+
+		// If the message is in the delayed queue, it should not be in the blackholed queue.
+		// This is a sanity check to ensure that the blackholed queue is not published,
+		// but it should never happen.
+		if n.IsBlackholed(&pMsg.Msg) {
+			n.logger.Error("notary: got blackholed message in delayed queue", pMsg.Msg.ZapFields()...)
+			continue
+		}
+
+		// Append return value.
+		readyMsgs = append(readyMsgs, &pMsg.Msg)
+
+	}
+
+	n.logger.Debug(
+		"notary: finish process ready message",
+		zap.Int("readyCount", len(readyMsgs)),
+		zap.Int("delayedCount", n.delayed.Len()),
+	)
+
+	return readyMsgs
+}
+
+// delay stores a MessagePublication in the database and populates its in-memory
+// representation in the Notary.
+// Acquires the mutex lock and unlocks when complete.
+func (n *Notary) delay(msg *common.MessagePublication, dur time.Duration) error {
+	if msg == nil {
+		return ErrInvalidMsg
+	}
+
+	n.mutex.Lock()
+	defer n.mutex.Unlock()
+
+	// Ensure that the message can't be added to the delayed list or database if it's already blackholed.
+	if n.blackholed.Contains(msg.VAAHash()) {
+		return ErrAlreadyBlackholed
+	}
+
+	// Remove nanoseconds from time.Now(). They are not serialized in the binary
+	// representation. If we don't truncate nanoseconds here, then testing
+	// message equality before and after loading to the database will fail.
+	release := time.Unix(time.Now().Unix(), 0)
+
+	pMsg := &common.PendingMessage{
+		Msg:         *msg,
+		ReleaseTime: release.Add(dur),
+	}
+
+	// Store in in-memory slice. This should happen even if a database error occurs.
+	n.delayed.Push(pMsg)
+
+	// Store in database.
+	dbErr := n.database.StoreDelayed(pMsg)
+	if dbErr != nil {
+		return dbErr
+	}
+
+	n.logger.Info("notary: delayed message", msg.ZapFields()...)
+
+	return dbErr
+}
+
+// blackhole adds a message publication to the blackholed in-memory set and stores it in the database.
+// It also removes the message from the delayed list and database, if present.
+// Acquires the mutex and unlocks when complete.
+func (n *Notary) blackhole(msg *common.MessagePublication) error {
+
+	if msg == nil {
+		return ErrInvalidMsg
+	}
+	n.mutex.Lock()
+
+	// Store in in-memory slice. This should happen even if a database error occurs.
+	n.blackholed.Add(msg.VAAHash())
+
+	// Store in database.
+	dbErr := n.database.StoreBlackholed(msg)
+	if dbErr != nil {
+		// Ensure the mutex is unlocked before returning.
+		// Not using defer for unlocking here because removeDelayed acquires the mutex.
+		n.mutex.Unlock()
+		return dbErr
+	}
+	// Unlock mutex before calling removeDelayed, which also acquires the mutex.
+	n.mutex.Unlock()
+
+	// When a message is blackholed, it should be removed from the delayed list and database.
+	err := n.removeDelayed(msg)
+	if err != nil {
+		return err
+	}
+
+	n.logger.Info("notary: blackholed message", msg.ZapFields()...)
+
+	return nil
+}
+
+// forget removes a message from the database and from the delayed and blackholed lists.
+func (n *Notary) forget(msg *common.MessagePublication) error {
+	if msg == nil {
+		return ErrInvalidMsg
+	}
+
+	// Both of the following methods lock and unlock the mutex.
+
+	err := n.removeDelayed(msg)
+	if err != nil {
+		return err
+	}
+
+	err = n.removeBlackholed(msg)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// IsBlackholed returns true if the message is in the blackholed list.
+func (n *Notary) IsBlackholed(msg *common.MessagePublication) bool {
+	n.mutex.RLock()
+	defer n.mutex.RUnlock()
+	return n.blackholed.Contains(msg.VAAHash())
+}
+
+// removeBlackholed removes a message from the blackholed list and database.
+// Acquires the mutex and unlocks when complete.
+func (n *Notary) removeBlackholed(msg *common.MessagePublication) error {
+	n.mutex.Lock()
+	defer n.mutex.Unlock()
+	if msg == nil {
+		return ErrInvalidMsg
+	}
+
+	n.blackholed.Remove(msg.VAAHash())
+
+	err := n.database.DeleteBlackholed(msg)
+	if err != nil {
+		return err
+	}
+
+	n.logger.Info("notary: removed blackholed message", msg.ZapFields()...)
+
+	return nil
+}
+
+func (n *Notary) IsDelayed(msg *common.MessagePublication) bool {
+	// The notary's mutex is not used here because the pending message queue
+	// uses its own read mutex for this method.
+	return n.delayed.ContainsMessagePublication(msg)
+}
+
+// removeDelayed removes a message from the delayed list and database.
+func (n *Notary) removeDelayed(msg *common.MessagePublication) error {
+	n.mutex.Lock()
+	defer n.mutex.Unlock()
+	if msg == nil {
+		return ErrInvalidMsg
+	}
+	removed, err := n.delayed.RemoveItem(msg)
+	if err != nil {
+		return err
+	}
+
+	if removed != nil {
+		err := n.database.DeleteDelayed(removed)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// loadFromDB reads all the database entries.
+func (n *Notary) loadFromDB(logger *zap.Logger) error {
+	n.mutex.Lock()
+	defer n.mutex.Unlock()
+
+	result, err := n.database.LoadAll(logger)
+	if err != nil {
+		n.logger.Error(
+			"notary: LoadAll call returned error",
+			zap.Error(err),
+		)
+		return err
+	}
+	if result == nil {
+		n.logger.Error(
+			"notary: LoadAll call produced nil result",
+		)
+		return errors.New("nil result from database")
+	}
+
+	n.logger.Info(
+		"loaded notary data from database",
+		zap.Int("delayedMsgs", len(result.Delayed)),
+		zap.Int("blackholedMsgs", len(result.Blackholed)),
+	)
+
+	// Avoid overwriting data by mistake.
+	if n.delayed != nil && n.delayed.Len() > 0 {
+		return ErrAlreadyInitialized
+	}
+
+	var (
+		delayed    = common.NewPendingMessageQueue()
+		blackholed = NewSet()
+	)
+
+	if len(result.Delayed) > 0 {
+		for entry := range slices.Values(result.Delayed) {
+			delayed.Push(entry)
+		}
+	}
+
+	if len(result.Blackholed) > 0 {
+		for result := range slices.Values(result.Blackholed) {
+			blackholed.Add(result.VAAHash())
+		}
+	}
+
+	n.blackholed = blackholed
+	n.delayed = delayed
+	n.logger.Info(
+		"initialized notary",
+		zap.Int("delayedMsgs", n.delayed.Len()),
+		zap.Int("blackholedMsgs", n.blackholed.Len()),
+	)
+
+	return nil
+}
+
+// NewSet creates and initializes a new Set
+func NewSet() *msgPubSet {
+	return &msgPubSet{
+		elements: make(map[string]struct{}),
+	}
+}
+
+func (s *msgPubSet) Len() int {
+	return len(s.elements)
+}
+
+// Add adds an element to the set
+func (s *msgPubSet) Add(element string) {
+	if s == nil {
+		return // Protect against nil receiver
+	}
+	s.elements[element] = struct{}{}
+}
+
+// Contains checks if an element is in the set
+func (s *msgPubSet) Contains(element string) bool {
+	if s == nil {
+		return false // Protect against nil receiver
+	}
+	_, exists := s.elements[element]
+	return exists
+}
+
+// Remove removes an element from the set
+func (s *msgPubSet) Remove(element string) {
+	if s == nil {
+		return // Protect against nil receiver
+	}
+	delete(s.elements, element)
+}

+ 489 - 0
node/pkg/notary/notary_test.go

@@ -0,0 +1,489 @@
+package notary
+
+import (
+	"context"
+	"encoding/binary"
+	"fmt"
+	"math/big"
+	"math/rand/v2"
+	"slices"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/certusone/wormhole/node/pkg/db"
+	"github.com/stretchr/testify/require"
+	"github.com/wormhole-foundation/wormhole/sdk"
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
+	"go.uber.org/zap"
+
+	eth_common "github.com/ethereum/go-ethereum/common"
+)
+
+type MockNotaryDB struct{}
+
+func (md MockNotaryDB) StoreBlackholed(m *common.MessagePublication) error  { return nil }
+func (md MockNotaryDB) StoreDelayed(p *common.PendingMessage) error         { return nil }
+func (md MockNotaryDB) DeleteBlackholed(m *common.MessagePublication) error { return nil }
+func (md MockNotaryDB) DeleteDelayed(p *common.PendingMessage) error        { return nil }
+func (md MockNotaryDB) LoadAll(l *zap.Logger) (*db.NotaryLoadResult, error) { return nil, nil }
+
+func makeTestNotary(t *testing.T) *Notary {
+	t.Helper()
+
+	return &Notary{
+		ctx:        context.Background(),
+		logger:     zap.NewNop(),
+		mutex:      sync.RWMutex{},
+		database:   MockNotaryDB{},
+		delayed:    &common.PendingMessageQueue{},
+		blackholed: NewSet(),
+		env:        common.GoTest,
+	}
+}
+
+func TestNotary_ProcessMessageCorrectVerdict(t *testing.T) {
+
+	// NOTE: This test should be exhaustive over VerificationState variants.
+	tests := map[string]struct {
+		verificationState common.VerificationState
+		verdict           Verdict
+	}{
+		"approve N/A": {
+			common.NotApplicable,
+			Approve,
+		},
+		"approve not verified": {
+			common.NotVerified,
+			Approve,
+		},
+		"approve valid": {
+			common.Valid,
+			Approve,
+		},
+		"approve could not verify": {
+			common.CouldNotVerify,
+			Approve,
+		},
+		"blackhole rejected": {
+			common.Rejected,
+			Blackhole,
+		},
+		"delay anomalous": {
+			common.Anomalous,
+			Delay,
+		},
+	}
+
+	for name, test := range tests {
+		t.Run(name, func(t *testing.T) {
+			n := makeTestNotary(t)
+			msg := makeUniqueMessagePublication(t)
+
+			err := msg.SetVerificationState(test.verificationState)
+			if test.verificationState != common.NotVerified {
+				// SetVerificationState fails if the old status is equal to the new one.
+				require.NoError(t, err)
+			}
+
+			require.True(t, vaa.IsTransfer(msg.Payload))
+
+			verdict, err := n.ProcessMsg(msg)
+			require.NoError(t, err)
+			require.Equal(
+				t,
+				test.verdict,
+				verdict,
+				fmt.Sprintf("verificationState=%s verdict=%s", msg.VerificationState().String(), verdict.String()),
+			)
+		})
+	}
+}
+func TestNotary_ProcessMsgUpdatesCollections(t *testing.T) {
+
+	// NOTE: This test should be exhaustive over VerificationState variants.
+	type expectedSizes struct {
+		delayed    int
+		blackholed int
+	}
+	tests := map[string]struct {
+		verificationState common.VerificationState
+		expectedSizes
+	}{
+		"Valid has no effect": {
+			common.Valid,
+			expectedSizes{},
+		},
+		"NotVerified has no effect": {
+			common.NotVerified,
+			expectedSizes{},
+		},
+		"NotApplicable has no effect": {
+			common.NotApplicable,
+			expectedSizes{},
+		},
+		"CouldNotVerify has no effect": {
+			common.CouldNotVerify,
+			expectedSizes{},
+		},
+		"Anomalous gets delayed": {
+			common.Anomalous,
+			expectedSizes{
+				delayed:    1,
+				blackholed: 0,
+			},
+		},
+		"Rejected gets blackholed": {
+			common.Rejected,
+			expectedSizes{
+				delayed:    0,
+				blackholed: 1,
+			},
+		},
+	}
+
+	for name, test := range tests {
+		t.Run(name, func(t *testing.T) {
+			// Set-up
+			var (
+				n   = makeTestNotary(t)
+				msg = makeUniqueMessagePublication(t)
+				err = msg.SetVerificationState(test.verificationState)
+			)
+			if test.verificationState != common.NotVerified {
+				// SetVerificationState fails if the old status is equal to the new one.
+				require.NoError(t, err)
+			}
+			require.Equal(t, test.verificationState, msg.VerificationState())
+			require.True(t, vaa.IsTransfer(msg.Payload))
+
+			// Ensure that the collections are properly updated.
+			_, err = n.ProcessMsg(msg)
+			require.NoError(t, err)
+			require.Equal(
+				t,
+				test.expectedSizes.delayed,
+				n.delayed.Len(),
+				fmt.Sprintf("delayed count did not match. verificationState %s", msg.VerificationState().String()),
+			)
+			require.Equal(
+				t,
+				test.expectedSizes.blackholed,
+				n.blackholed.Len(),
+				fmt.Sprintf("blackholed count did not match. verificationState %s", msg.VerificationState().String()),
+			)
+
+		})
+	}
+}
+
+func TestNotary_ProcessMessageAlwaysApprovesNonTokenTransfers(t *testing.T) {
+	n := makeTestNotary(t)
+
+	// NOTE: This test should be exhaustive over VerificationState variants.
+	tests := map[string]struct {
+		verificationState common.VerificationState
+	}{
+		"approve non-token transfer: NotVerified": {
+			common.NotVerified,
+		},
+		"approve non-token transfer: CouldNotVerify": {
+			common.CouldNotVerify,
+		},
+		"approve non-token transfer: Anomalous": {
+			common.Anomalous,
+		},
+		"approve non-token transfer: Rejected": {
+			common.Rejected,
+		},
+		"approve non-token transfer: NotApplicable": {
+			common.NotApplicable,
+		},
+		"approve non-token transfer: Valid": {
+			common.Valid,
+		},
+	}
+
+	for name, test := range tests {
+		t.Run(name, func(t *testing.T) {
+			msg := makeUniqueMessagePublication(t)
+
+			// Change the payload to something other than a token transfer.
+			msg.Payload = []byte{0x02}
+			require.False(t, vaa.IsTransfer(msg.Payload))
+
+			if msg.VerificationState() != common.NotVerified {
+				// SetVerificationState fails if the old status is equal to the new one.
+				err := msg.SetVerificationState(test.verificationState)
+				require.NoError(t, err)
+			}
+
+			verdict, err := n.ProcessMsg(msg)
+			require.NoError(t, err)
+			require.Equal(t, Approve, verdict)
+		})
+	}
+}
+
+func TestNotary_ProcessReadyMessages(t *testing.T) {
+
+	tests := []struct {
+		name               string                   // description of this test case
+		delayed            []*common.PendingMessage // initial messages in delayed queue
+		expectedDelayCount int
+		expectedReadyCount int
+	}{
+		{
+			"no messages ready",
+			[]*common.PendingMessage{
+				{
+					ReleaseTime: time.Now().Add(time.Hour),
+					Msg:         *makeUniqueMessagePublication(t),
+				},
+			},
+			1,
+			0,
+		},
+		{
+			"some messages ready",
+			[]*common.PendingMessage{
+				{
+					ReleaseTime: time.Now().Add(-2 * time.Hour),
+					Msg:         *makeUniqueMessagePublication(t),
+				},
+				{
+					ReleaseTime: time.Now().Add(time.Hour),
+					Msg:         *makeUniqueMessagePublication(t),
+				},
+				{
+					ReleaseTime: time.Now().Add(-time.Hour),
+					Msg:         *makeUniqueMessagePublication(t),
+				},
+				{
+					ReleaseTime: time.Now().Add(2 * time.Hour),
+					Msg:         *makeUniqueMessagePublication(t),
+				},
+			},
+			2,
+			2,
+		},
+		{
+			"all messages ready",
+			[]*common.PendingMessage{
+				{
+					ReleaseTime: time.Now().Add(-2 * time.Hour),
+					Msg:         *makeUniqueMessagePublication(t),
+				},
+				{
+					ReleaseTime: time.Now().Add(-1 * time.Hour),
+					Msg:         *makeUniqueMessagePublication(t),
+				},
+			},
+			0,
+			2,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Set-up
+			n := makeTestNotary(t)
+			n.delayed = common.NewPendingMessageQueue()
+
+			currentLength := n.delayed.Len()
+			for pMsg := range slices.Values(tt.delayed) {
+				require.NotNil(t, pMsg)
+				n.delayed.Push(pMsg)
+				// Ensure that the queue grows after each push.
+				require.Greater(t, n.delayed.Len(), currentLength)
+				currentLength = n.delayed.Len()
+			}
+			require.Equal(t, len(tt.delayed), n.delayed.Len())
+
+			readyMsgs := n.ReleaseReadyMessages()
+			require.Equal(t, tt.expectedReadyCount, len(readyMsgs), "ready length does not match")
+			require.Equal(t, tt.expectedDelayCount, n.delayed.Len(), "delayed length does not match")
+		})
+	}
+}
+
+func TestNotary_Forget(t *testing.T) {
+	tests := []struct { // description of this test case
+		name               string
+		msg                *common.MessagePublication
+		expectedDelayCount int
+		expectedBlackholed int
+	}{
+		{
+			"remove from delayed list",
+			makeUniqueMessagePublication(t),
+			0,
+			0,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Set-up
+			n := makeTestNotary(t)
+			n.delayed = common.NewPendingMessageQueue()
+			n.blackholed = NewSet()
+
+			require.Equal(t, 0, n.delayed.Len())
+			require.Equal(t, 0, n.blackholed.Len())
+
+			err := n.delay(tt.msg, time.Hour)
+			require.NoError(t, err)
+
+			require.Equal(t, 1, n.delayed.Len())
+			require.Equal(t, 0, n.blackholed.Len())
+
+			// Modify the set manually because calling the blackhole function will remove the message from the delayed list.
+			n.blackholed.Add(tt.msg.VAAHash())
+
+			require.Equal(t, 1, n.delayed.Len())
+			require.Equal(t, 1, n.blackholed.Len())
+
+			err = n.forget(tt.msg)
+			require.NoError(t, err)
+
+			require.Equal(t, tt.expectedDelayCount, n.delayed.Len())
+			require.Equal(t, tt.expectedBlackholed, n.blackholed.Len())
+		})
+	}
+}
+
+func TestNotary_BlackholeRemovesFromDelayedList(t *testing.T) {
+	tests := []struct { // description of this test case
+		name               string
+		msg                *common.MessagePublication
+		expectedDelayCount int
+		expectedBlackholed int
+	}{
+		{
+			"remove from delayed list",
+			makeUniqueMessagePublication(t),
+			0,
+			1,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Set-up
+			n := makeTestNotary(t)
+			n.delayed = common.NewPendingMessageQueue()
+			n.blackholed = NewSet()
+
+			require.Equal(t, 0, n.delayed.Len())
+			require.Equal(t, 0, n.blackholed.Len())
+
+			err := n.delay(tt.msg, time.Hour)
+			require.NoError(t, err)
+
+			require.Equal(t, 1, n.delayed.Len())
+			require.Equal(t, 0, n.blackholed.Len())
+
+			err = n.blackhole(tt.msg)
+			require.NoError(t, err)
+
+			require.Equal(t, 0, n.delayed.Len())
+			require.Equal(t, 1, n.blackholed.Len())
+		})
+	}
+}
+
+func TestNotary_DelayFailsIfMessageAlreadyBlackholed(t *testing.T) {
+	tests := []struct { // description of this test case
+		name string
+		msg  *common.MessagePublication
+	}{
+		{
+			"delay fails if message is already blackholed",
+			makeUniqueMessagePublication(t),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Set-up
+			n := makeTestNotary(t)
+			n.delayed = common.NewPendingMessageQueue()
+			n.blackholed = NewSet()
+
+			require.Equal(t, 0, n.delayed.Len())
+			require.Equal(t, 0, n.blackholed.Len())
+
+			err := n.blackhole(tt.msg)
+			require.NoError(t, err)
+
+			require.Equal(t, 0, n.delayed.Len())
+			require.Equal(t, 1, n.blackholed.Len())
+
+			err = n.delay(tt.msg, time.Hour)
+			require.ErrorIs(t, err, ErrAlreadyBlackholed)
+
+			require.Equal(t, 0, n.delayed.Len())
+			require.Equal(t, 1, n.blackholed.Len())
+		})
+	}
+}
+
+// Helper function that returns a valid PendingMessage. It creates identical messages publications
+// with different sequence numbers.
+func makeUniqueMessagePublication(t *testing.T) *common.MessagePublication {
+	t.Helper()
+
+	originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") //nolint:gosec
+	require.NoError(t, err)
+
+	targetAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
+	require.NoError(t, err)
+
+	// Required as the Notary checks the emitter address.
+	tokenBridge := sdk.KnownTokenbridgeEmitters[vaa.ChainIDEthereum]
+	tokenBridgeAddress := vaa.Address(tokenBridge)
+	require.NoError(t, err)
+
+	payload := &vaa.TransferPayloadHdr{
+		Type:          0x01,
+		Amount:        big.NewInt(27000000000),
+		OriginAddress: originAddress,
+		OriginChain:   vaa.ChainIDEthereum,
+		TargetAddress: targetAddress,
+		TargetChain:   vaa.ChainIDPolygon,
+	}
+	payloadBytes := encodePayloadBytes(payload)
+
+	// #nosec: G404 -- Cryptographically secure pseudo-random number generator not needed.
+	var sequence = rand.Uint64()
+	msgpub := &common.MessagePublication{
+		TxID:             eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
+		Timestamp:        time.Unix(int64(1654516425), 0),
+		Nonce:            123456,
+		Sequence:         sequence,
+		EmitterChain:     vaa.ChainIDEthereum,
+		EmitterAddress:   tokenBridgeAddress,
+		Payload:          payloadBytes,
+		ConsistencyLevel: 32,
+		Unreliable:       true,
+		IsReobservation:  true,
+		// verificationState is set to NotVerified by default.
+	}
+
+	return msgpub
+}
+
+func encodePayloadBytes(payload *vaa.TransferPayloadHdr) []byte {
+	bz := make([]byte, 101)
+	bz[0] = payload.Type
+
+	amtBytes := payload.Amount.Bytes()
+	if len(amtBytes) > 32 {
+		panic("amount will not fit in 32 bytes!")
+	}
+	copy(bz[33-len(amtBytes):33], amtBytes)
+
+	copy(bz[33:65], payload.OriginAddress.Bytes())
+	binary.BigEndian.PutUint16(bz[65:67], uint16(payload.OriginChain))
+	copy(bz[67:99], payload.TargetAddress.Bytes())
+	binary.BigEndian.PutUint16(bz[99:101], uint16(payload.TargetChain))
+	return bz
+}

+ 109 - 6
node/pkg/processor/processor.go

@@ -11,6 +11,7 @@ import (
 	guardianDB "github.com/certusone/wormhole/node/pkg/db"
 	"github.com/certusone/wormhole/node/pkg/governor"
 	"github.com/certusone/wormhole/node/pkg/guardiansigner"
+	guardianNotary "github.com/certusone/wormhole/node/pkg/notary"
 	"github.com/certusone/wormhole/node/pkg/p2p"
 
 	ethcommon "github.com/ethereum/go-ethereum/common"
@@ -28,7 +29,7 @@ import (
 	"github.com/prometheus/client_golang/prometheus/promauto"
 )
 
-var GovInterval = time.Minute
+var PollInterval = time.Minute
 var CleanupInterval = time.Second * 30
 
 type (
@@ -149,6 +150,7 @@ type Processor struct {
 	governor       *governor.ChainGovernor
 	acct           *accountant.Accountant
 	acctReadC      <-chan *common.MessagePublication
+	notary         *guardianNotary.Notary
 	pythnetVaas    map[string]PythNetVaaEntry
 	gatewayRelayer *gwrelayer.GatewayRelayer
 	updateVAALock  sync.Mutex
@@ -225,6 +227,7 @@ func NewProcessor(
 	g *governor.ChainGovernor,
 	acct *accountant.Accountant,
 	acctReadC <-chan *common.MessagePublication,
+	notary *guardianNotary.Notary,
 	gatewayRelayer *gwrelayer.GatewayRelayer,
 	networkID string,
 	alternatePublisher *altpub.AlternatePublisher,
@@ -249,6 +252,7 @@ func NewProcessor(
 		governor:       g,
 		acct:           acct,
 		acctReadC:      acctReadC,
+		notary:         notary,
 		pythnetVaas:    make(map[string]PythNetVaaEntry),
 		gatewayRelayer: gatewayRelayer,
 		batchObsvPubC:  make(chan *gossipv1.Observation, batchObsvPubChanSize),
@@ -269,7 +273,7 @@ func (p *Processor) Run(ctx context.Context) error {
 	cleanup := time.NewTicker(CleanupInterval)
 
 	// Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that.
-	govTimer := time.NewTimer(GovInterval)
+	pollTimer := time.NewTimer(PollInterval)
 
 	for {
 		select {
@@ -286,17 +290,73 @@ func (p *Processor) Run(ctx context.Context) error {
 			)
 			p.gst.Set(p.gs)
 		case k := <-p.msgC:
+			// This is the main message processing loop. It is responsible for handling messages that are
+			// received on the message channel. Depending on the configuration, a message may be processed
+			// by the Notary, the Governor, and/or the Accountant.
+			// This loop effectively causes each of these components to process messages in a modular
+			// manner. The Notary, Governor, and Accountant can be enabled or disabled independently.
+			// As a consequence of this loop, each of these components updates its internal state, tracking
+			// whether a message is ready to be processed from its perspective. This state is used by the
+			// processor to determine whether a message should be processed or not. This occurs elsewhere
+			// in the processor code.
+
+			p.logger.Debug("processor: received new message publication on message channel", k.ZapFields()...)
+
+			// Notary: check whether a message is well-formed.
+			// Send messages to the Notary first. If messages are not approved, they should not continue
+			// to the Governor or the Accountant.
+			if p.notary != nil {
+				p.logger.Debug("processor: sending message to notary for evaluation", k.ZapFields()...)
+
+				// NOTE: Always returns Approve for messages that are not token transfers.
+				verdict, err := p.notary.ProcessMsg(k)
+				if err != nil {
+					// TODO: The error is deliberately ignored so that the processor does not panic and restart.
+					// In contrast, the Accountant does not ignore the error and restarts the processor if it fails.
+					// The error-handling strategy can be revisited once the Notary is considered stable.
+					p.logger.Error("notary failed to process message", zap.Error(err), zap.String("messageID", k.MessageIDString()))
+					continue
+				}
+
+				// Based on the verdict, we can decide what to do with the message.
+				switch verdict {
+				case guardianNotary.Blackhole, guardianNotary.Delay:
+					p.logger.Error("notary evaluated message as threatening", k.ZapFields(zap.String("verdict", verdict.String()))...)
+					if verdict == guardianNotary.Blackhole {
+						// Black-holed messages should not be processed.
+						p.logger.Error("message will not be processed", k.ZapFields(zap.String("verdict", verdict.String()))...)
+					} else {
+						// Delayed messages are added to a separate queue and processed elsewhere.
+						p.logger.Error("message will be delayed", k.ZapFields(zap.String("verdict", verdict.String()))...)
+					}
+					// We're done processing the message.
+					continue
+				case guardianNotary.Unknown:
+					p.logger.Error("notary returned Unknown verdict", k.ZapFields(zap.String("verdict", verdict.String()))...)
+				case guardianNotary.Approve:
+					// no-op: process normally
+					p.logger.Debug("notary evaluated message as approved", k.ZapFields(zap.String("verdict", verdict.String()))...)
+				default:
+					p.logger.Error("notary returned unrecognized verdict", k.ZapFields(zap.String("verdict", verdict.String()))...)
+				}
+			}
+
+			// Governor: check if a message is ready to be published.
 			if p.governor != nil {
 				if !p.governor.ProcessMsg(k) {
+					// We're done processing the message.
 					continue
 				}
 			}
+
+			// Accountant: check if a message is ready to be published (i.e. if it has enough observations).
 			if p.acct != nil {
 				shouldPub, err := p.acct.SubmitObservation(k)
 				if err != nil {
-					return fmt.Errorf("failed to process message `%s`: %w", k.MessageIDString(), err)
+					return fmt.Errorf("accountant: failed to process message `%s`: %w", k.MessageIDString(), err)
 				}
 				if !shouldPub {
+					// We're done processing the message.
 					continue
 				}
 			}
@@ -318,7 +378,49 @@ func (p *Processor) Run(ctx context.Context) error {
 			p.handleInboundSignedVAAWithQuorum(m)
 		case <-cleanup.C:
 			p.handleCleanup(ctx)
-		case <-govTimer.C:
+		case <-pollTimer.C:
+			// Poll the pending lists for messages that can be released. Both the Notary and the Governor
+			// can delay messages.
+			// As each of the Notary, Governor, and Accountant can be enabled separately, each must
+			// be processed in a modular way.
+			// When more than one of these features are enabled, messages should be processed
+			// serially in the order: Notary -> Governor -> Accountant.
+			// NOTE: The Accountant can signal to a channel that it is ready to publish a message via
+			// writing to acctReadC so it is not handled here.
+			if p.notary != nil {
+				readyMsgs := p.notary.ReleaseReadyMessages()
+
+				// Iterate over all ready messages. Hand-off to the Governor or the Accountant
+				// if they're enabled. If not, publish.
+				for _, msg := range readyMsgs {
+					// TODO: Much of this is duplicated from the msgC branch. It might be a good
+					// idea to refactor how we handle combinations of Notary, Governor, and Accountant being
+					// enabled.
+
+					// Hand-off to governor
+					if p.governor != nil {
+						if !p.governor.ProcessMsg(msg) {
+							continue
+						}
+					}
+
+					// Hand-off to accountant. If we get here, both the Notary and the Governor
+					// have signalled that the message is OK to publish.
+					if p.acct != nil {
+						shouldPub, err := p.acct.SubmitObservation(msg)
+						if err != nil {
+							return fmt.Errorf("accountant: failed to process message `%s`: %w", msg.MessageIDString(), err)
+						}
+						if !shouldPub {
+							continue
+						}
+					}
+
+					// Notary, Governor, and Accountant have all approved.
+					p.handleMessage(ctx, msg)
+				}
+			}
+
 			if p.governor != nil {
 				toBePublished, err := p.governor.CheckPending()
 				if err != nil {
@@ -345,8 +447,9 @@ func (p *Processor) Run(ctx context.Context) error {
 					}
 				}
 			}
-			if (p.governor != nil) || (p.acct != nil) {
-				govTimer.Reset(GovInterval)
+
+			if (p.notary != nil) || (p.governor != nil) || (p.acct != nil) {
+				pollTimer.Reset(PollInterval)
 			}
 		}
 	}