Quellcode durchsuchen

Node: Gossip Topic Split (#4000)

* WIP: topic split

* Add cutover support

* Remove measurements that were moved to PR#3988

* Code review rework

* Code review rework

---------

Co-authored-by: Evan Gray <battledingo@gmail.com>
bruce-riley vor 1 Jahr
Ursprung
Commit
d3533aa2ca

+ 15 - 5
node/pkg/node/node.go

@@ -20,8 +20,14 @@ import (
 )
 
 const (
-	// gossipSendBufferSize configures the size of the gossip network send buffer
-	gossipSendBufferSize = 5000
+	// gossipControlSendBufferSize configures the size of the gossip network send buffer
+	gossipControlSendBufferSize = 100
+
+	// gossipAttestationSendBufferSize configures the size of the gossip network send buffer
+	gossipAttestationSendBufferSize = 5000
+
+	// gossipVaaSendBufferSize configures the size of the gossip network send buffer
+	gossipVaaSendBufferSize = 5000
 
 	// inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians.
 	// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
@@ -69,8 +75,10 @@ type G struct {
 	runnables             map[string]supervisor.Runnable
 
 	// various channels
-	// Outbound gossip message queue (needs to be read/write because p2p needs read/write)
-	gossipSendC chan []byte
+	// Outbound gossip message queues (need to be read/write because p2p needs read/write)
+	gossipControlSendC     chan []byte
+	gossipAttestationSendC chan []byte
+	gossipVaaSendC         chan []byte
 	// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
 	obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
 	// Finalized guardian observations aggregated across all chains
@@ -109,7 +117,9 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
 	g.rootCtxCancel = rootCtxCancel
 
 	// Setup various channels...
-	g.gossipSendC = make(chan []byte, gossipSendBufferSize)
+	g.gossipControlSendC = make(chan []byte, gossipControlSendBufferSize)
+	g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
+	g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
 	g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
 	g.msgC = makeChannelPair[*common.MessagePublication](0)
 	g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.

+ 5 - 2
node/pkg/node/options.go

@@ -67,7 +67,9 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers,
 					g.obsvC,
 					g.signedInC.writeC,
 					g.obsvReqC.writeC,
-					g.gossipSendC,
+					g.gossipControlSendC,
+					g.gossipAttestationSendC,
+					g.gossipVaaSendC,
 					g.obsvReqSendC.readC,
 					g.acct,
 					g.gov,
@@ -564,7 +566,8 @@ func GuardianOptionProcessor() *GuardianOption {
 				g.db,
 				g.msgC.readC,
 				g.setC.readC,
-				g.gossipSendC,
+				g.gossipAttestationSendC,
+				g.gossipVaaSendC,
 				g.obsvC,
 				g.obsvReqSendC.writeC,
 				g.signedInC.readC,

+ 87 - 0
node/pkg/p2p/gossip_cutover.go

@@ -0,0 +1,87 @@
+package p2p
+
+import (
+	"fmt"
+	"strings"
+	"sync/atomic"
+	"time"
+
+	"go.uber.org/zap"
+)
+
+// The format of this time is very picky. Please use the exact format specified by cutOverFmtStr!
+const mainnetCutOverTimeStr = ""
+const testnetCutOverTimeStr = ""
+const devnetCutOverTimeStr = ""
+const cutOverFmtStr = "2006-01-02T15:04:05-0700"
+
+// gossipCutoverCompleteFlag indicates if the cutover time has passed, meaning we should publish only on the new topics.
+var gossipCutoverCompleteFlag atomic.Bool
+
+// GossipCutoverComplete returns true if the cutover time has passed, meaning we should publish on the new topic.
+func GossipCutoverComplete() bool {
+	return gossipCutoverCompleteFlag.Load()
+}
+
+// evaluateCutOver determines if the gossip cutover time has passed yet and sets the global flag accordingly. If the time has
+// not yet passed, it creates a go routine to wait for that time and then set the flag.
+func evaluateGossipCutOver(logger *zap.Logger, networkID string) error {
+	cutOverTimeStr := getCutOverTimeStr(networkID)
+
+	sco, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, time.Now())
+	if err != nil {
+		return err
+	}
+
+	gossipCutoverCompleteFlag.Store(sco)
+	logger.Info("evaluated cutover flag", zap.Bool("cutOverFlag", GossipCutoverComplete()), zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco"))
+
+	if delay != time.Duration(0) {
+		// Wait for the cut over time and then update the flag.
+		go func() {
+			time.Sleep(delay)
+			logger.Info("time to cut over to new gossip topics", zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco"))
+			gossipCutoverCompleteFlag.Store(true)
+		}()
+	}
+
+	return nil
+}
+
+// evaluateGossipCutOverImpl performs the actual cut over check. It is a separate function for testing purposes.
+func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now time.Time) (bool, time.Duration, error) {
+	if cutOverTimeStr == "" {
+		return false, 0, nil
+	}
+
+	cutOverTime, err := time.Parse(cutOverFmtStr, cutOverTimeStr)
+	if err != nil {
+		return false, 0, fmt.Errorf(`failed to parse cut over time: %w`, err)
+	}
+
+	if cutOverTime.Before(now) {
+		logger.Info("cut over time has passed, should use new gossip topics", zap.String("cutOverTime", cutOverTime.Format(cutOverFmtStr)), zap.String("now", now.Format(cutOverFmtStr)), zap.String("component", "p2pco"))
+		return true, 0, nil
+	}
+
+	// If we get here, we need to wait for the cutover and then switch the global flag.
+	delay := cutOverTime.Sub(now)
+	logger.Info("still waiting for cut over time",
+		zap.Stringer("cutOverTime", cutOverTime),
+		zap.String("now", now.Format(cutOverFmtStr)),
+		zap.Stringer("delay", delay),
+		zap.String("component", "p2pco"))
+
+	return false, delay, nil
+}
+
+// getCutOverTimeStr returns the cut over time string based on the network ID passed in.
+func getCutOverTimeStr(networkID string) string { //nolint:unparam
+	if strings.Contains(networkID, "/mainnet/") {
+		return mainnetCutOverTimeStr
+	}
+	if strings.Contains(networkID, "/testnet/") {
+		return testnetCutOverTimeStr
+	}
+	return devnetCutOverTimeStr
+}

+ 81 - 0
node/pkg/p2p/gossip_cutover_test.go

@@ -0,0 +1,81 @@
+package p2p
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"go.uber.org/zap"
+)
+
+func TestVerifyCutOverTime(t *testing.T) {
+	if mainnetCutOverTimeStr != "" {
+		_, err := time.Parse(cutOverFmtStr, mainnetCutOverTimeStr)
+		require.NoError(t, err)
+	}
+	if testnetCutOverTimeStr != "" {
+		_, err := time.Parse(cutOverFmtStr, testnetCutOverTimeStr)
+		require.NoError(t, err)
+	}
+	if devnetCutOverTimeStr != "" {
+		_, err := time.Parse(cutOverFmtStr, devnetCutOverTimeStr)
+		require.NoError(t, err)
+	}
+}
+
+func TestGetCutOverTimeStr(t *testing.T) {
+	assert.Equal(t, mainnetCutOverTimeStr, getCutOverTimeStr("blah/blah/mainnet/blah"))
+	assert.Equal(t, testnetCutOverTimeStr, getCutOverTimeStr("blah/blah/testnet/blah"))
+	assert.Equal(t, devnetCutOverTimeStr, getCutOverTimeStr("blah/blah/devnet/blah"))
+}
+
+func TestCutOverDisabled(t *testing.T) {
+	logger := zap.NewNop()
+
+	cutOverTimeStr := ""
+	now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
+	require.NoError(t, err)
+
+	cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
+	require.NoError(t, err)
+	assert.False(t, cuttingOver)
+	assert.Equal(t, time.Duration(0), delay)
+}
+
+func TestCutOverInvalidTime(t *testing.T) {
+	logger := zap.NewNop()
+
+	cutOverTimeStr := "Hello World"
+	now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
+	require.NoError(t, err)
+
+	_, _, err = evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
+	require.EqualError(t, err, `failed to parse cut over time: parsing time "Hello World" as "2006-01-02T15:04:05-0700": cannot parse "Hello World" as "2006"`)
+}
+
+func TestCutOverAlreadyHappened(t *testing.T) {
+	logger := zap.NewNop()
+
+	cutOverTimeStr := "2023-10-06T18:18:00-0000"
+	now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
+	require.NoError(t, err)
+
+	cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
+	require.NoError(t, err)
+	assert.True(t, cuttingOver)
+	assert.Equal(t, time.Duration(0), delay)
+}
+
+func TestCutOverDelayRequired(t *testing.T) {
+	logger := zap.NewNop()
+
+	cutOverTimeStr := "2023-10-06T18:18:00-0000"
+	now, err := time.Parse(cutOverFmtStr, "2023-10-06T17:18:00-0000")
+	require.NoError(t, err)
+
+	cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
+	require.NoError(t, err)
+	assert.False(t, cuttingOver)
+	assert.Equal(t, time.Duration(60*time.Minute), delay)
+}

+ 616 - 258
node/pkg/p2p/p2p.go

@@ -54,11 +54,11 @@ var (
 			Name: "wormhole_p2p_heartbeats_sent_total",
 			Help: "Total number of p2p heartbeats sent",
 		})
-	p2pMessagesSent = promauto.NewCounter(
+	p2pMessagesSent = promauto.NewCounterVec(
 		prometheus.CounterOpts{
 			Name: "wormhole_p2p_broadcast_messages_sent_total",
 			Help: "Total number of p2p pubsub broadcast messages sent",
-		})
+		}, []string{"type"})
 	p2pMessagesReceived = promauto.NewCounterVec(
 		prometheus.CounterOpts{
 			Name: "wormhole_p2p_broadcast_messages_received_total",
@@ -110,6 +110,7 @@ type Components struct {
 	// is only accessed by a single routine at any given time in a running Guardian.
 	ProtectedHostByGuardianKeyLock sync.Mutex
 	// WarnChannelOverflow: If true, errors due to overflowing channels will produce logger.Warn
+	// WARNING: This should not be enabled in production. It is only used in node tests to watch for overflows.
 	WarnChannelOverflow bool
 	// SignedHeartbeatLogLevel is the log level at which SignedHeartbeatReceived events will be logged.
 	SignedHeartbeatLogLevel zapcore.Level
@@ -301,12 +302,25 @@ func Run(params *RunParams) func(ctx context.Context) error {
 	}
 
 	return func(ctx context.Context) error {
+		p2pMessagesSent.WithLabelValues("control").Add(0)
+		p2pMessagesSent.WithLabelValues("attestation").Add(0)
+		p2pMessagesSent.WithLabelValues("vaa").Add(0)
 		p2pReceiveChannelOverflow.WithLabelValues("observation").Add(0)
 		p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Add(0)
 		p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Add(0)
 
 		logger := supervisor.Logger(ctx)
 
+		// Evaluate the gossip cutover time. If it has passed, then the flag will be set to make us publish on the new topics.
+		// If not, a routine will be started to wait for that time before starting to publish on the new topics.
+		cutoverErr := evaluateGossipCutOver(logger, params.networkID)
+		if cutoverErr != nil {
+			panic(cutoverErr)
+		}
+
+		// If the cutover has not happened yet, we need to join and subscribe to the VAA topic because it is also the old topic.
+		needOldTopic := !GossipCutoverComplete()
+
 		defer func() {
 			// TODO: Right now we're canceling the root context because it used to be the case that libp2p cannot be cleanly restarted.
 			// But that seems to no longer be the case. We may want to revisit this. See (https://github.com/libp2p/go-libp2p/issues/992) for background.
@@ -330,8 +344,6 @@ func Run(params *RunParams) func(ctx context.Context) error {
 			panic(err)
 		}
 
-		topic := fmt.Sprintf("%s/%s", params.networkID, "broadcast")
-
 		bootstrappers, bootstrapNode := BootstrapAddrs(logger, params.bootstrapPeers, h.ID())
 
 		if bootstrapNode {
@@ -342,7 +354,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
 			}
 		}
 
-		logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
+		logger.Info("connecting to pubsub")
 		ourTracer := &traceHandler{}
 		ps, err := pubsub.NewGossipSub(ctx, h,
 			pubsub.WithValidateQueueSize(P2P_VALIDATE_QUEUE_SIZE),
@@ -355,24 +367,84 @@ func Run(params *RunParams) func(ctx context.Context) error {
 			panic(err)
 		}
 
-		th, err := ps.Join(topic)
-		if err != nil {
-			return fmt.Errorf("failed to join topic: %w", err)
+		// These will only be non-nil if the application plans to listen for or publish to that topic.
+		var controlPubsubTopic, attestationPubsubTopic, vaaPubsubTopic *pubsub.Topic
+		var controlSubscription, attestationSubscription, vaaSubscription *pubsub.Subscription
+
+		// Set up the control channel. ////////////////////////////////////////////////////////////////////
+		if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil {
+			controlTopic := fmt.Sprintf("%s/%s", params.networkID, "control")
+			logger.Info("joining the control topic", zap.String("topic", controlTopic))
+			controlPubsubTopic, err = ps.Join(controlTopic)
+			if err != nil {
+				return fmt.Errorf("failed to join the control topic: %w", err)
+			}
+
+			defer func() {
+				if err := controlPubsubTopic.Close(); err != nil && !errors.Is(err, context.Canceled) {
+					logger.Error("Error closing the control topic", zap.Error(err))
+				}
+			}()
+
+			if params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil {
+				logger.Info("subscribing to the control topic", zap.String("topic", controlTopic))
+				controlSubscription, err = controlPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
+				if err != nil {
+					return fmt.Errorf("failed to subscribe to the control topic: %w", err)
+				}
+				defer controlSubscription.Cancel()
+			}
 		}
 
-		defer func() {
-			if err := th.Close(); err != nil && !errors.Is(err, context.Canceled) {
-				logger.Error("Error closing the topic", zap.Error(err))
+		// Set up the attestation channel. ////////////////////////////////////////////////////////////////////
+		if params.gossipAttestationSendC != nil || params.obsvRecvC != nil {
+			attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation")
+			logger.Info("joining the attestation topic", zap.String("topic", attestationTopic))
+			attestationPubsubTopic, err = ps.Join(attestationTopic)
+			if err != nil {
+				return fmt.Errorf("failed to join the attestation topic: %w", err)
 			}
-		}()
 
-		// Increase the buffer size to prevent failed delivery
-		// to slower subscribers
-		sub, err := th.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
-		if err != nil {
-			return fmt.Errorf("failed to subscribe topic: %w", err)
+			defer func() {
+				if err := attestationPubsubTopic.Close(); err != nil && !errors.Is(err, context.Canceled) {
+					logger.Error("Error closing the attestation topic", zap.Error(err))
+				}
+			}()
+
+			if params.obsvRecvC != nil {
+				logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic))
+				attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
+				if err != nil {
+					return fmt.Errorf("failed to subscribe to the attestation topic: %w", err)
+				}
+				defer attestationSubscription.Cancel()
+			}
+		}
+
+		// Set up the VAA channel. ////////////////////////////////////////////////////////////////////
+		if params.gossipVaaSendC != nil || params.signedIncomingVaaRecvC != nil || needOldTopic {
+			vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast")
+			logger.Info("joining the vaa topic", zap.String("topic", vaaTopic))
+			vaaPubsubTopic, err = ps.Join(vaaTopic)
+			if err != nil {
+				return fmt.Errorf("failed to join the vaa topic: %w", err)
+			}
+
+			defer func() {
+				if err := vaaPubsubTopic.Close(); err != nil && !errors.Is(err, context.Canceled) {
+					logger.Error("Error closing the vaa topic", zap.Error(err))
+				}
+			}()
+
+			if params.signedIncomingVaaRecvC != nil || needOldTopic {
+				logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic))
+				vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
+				if err != nil {
+					return fmt.Errorf("failed to subscribe to the vaa topic: %w", err)
+				}
+				defer vaaSubscription.Cancel()
+			}
 		}
-		defer sub.Cancel()
 
 		// Make sure we connect to at least 1 bootstrap node (this is particularly important in a local devnet and CI
 		// as peer discovery can take a long time).
@@ -426,119 +498,169 @@ func Run(params *RunParams) func(ctx context.Context) error {
 			}
 		}()
 
-		go func() {
-			// Disable heartbeat when no node name is provided (spy mode)
-			if params.nodeName == "" {
-				return
-			}
-			ourAddr := ethcrypto.PubkeyToAddress(params.gk.PublicKey)
+		// Start up heartbeating if it is enabled.
+		if params.nodeName != "" {
+			go func() {
+				ourAddr := ethcrypto.PubkeyToAddress(params.gk.PublicKey)
 
-			ctr := int64(0)
-			// Guardians should send out their first heartbeat immediately to speed up test runs.
-			// But we also want to wait a little bit such that network connections can be established by then.
-			timer := time.NewTimer(time.Second * 2)
-			defer timer.Stop()
+				ctr := int64(0)
+				// Guardians should send out their first heartbeat immediately to speed up test runs.
+				// But we also want to wait a little bit such that network connections can be established by then.
+				timer := time.NewTimer(time.Second * 2)
+				defer timer.Stop()
 
-			for {
-				select {
-				case <-ctx.Done():
-					return
-				case <-timer.C:
-					timer.Reset(15 * time.Second)
-
-					// create a heartbeat
-					b := func() []byte {
-						DefaultRegistry.mu.Lock()
-						defer DefaultRegistry.mu.Unlock()
-						networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats))
-						for _, v := range DefaultRegistry.networkStats {
-							errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id))
-							v.ErrorCount = errCtr
-							networks = append(networks, v)
-						}
+				for {
+					select {
+					case <-ctx.Done():
+						return
+					case <-timer.C:
+						timer.Reset(15 * time.Second)
+
+						// create a heartbeat
+						b := func() []byte {
+							DefaultRegistry.mu.Lock()
+							defer DefaultRegistry.mu.Unlock()
+							networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats))
+							for _, v := range DefaultRegistry.networkStats {
+								errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id))
+								v.ErrorCount = errCtr
+								networks = append(networks, v)
+							}
 
-						features := make([]string, 0)
-						if params.gov != nil {
-							if params.gov.IsFlowCancelEnabled() {
-								features = append(features, "governor:fc")
-							} else {
-								features = append(features, "governor")
+							features := make([]string, 0)
+							if params.gov != nil {
+								if params.gov.IsFlowCancelEnabled() {
+									features = append(features, "governor:fc")
+								} else {
+									features = append(features, "governor")
+								}
 							}
-						}
-						if params.acct != nil {
-							features = append(features, params.acct.FeatureString())
-						}
-						if params.ibcFeaturesFunc != nil {
-							ibcFlags := params.ibcFeaturesFunc()
-							if ibcFlags != "" {
-								features = append(features, ibcFlags)
+							if params.acct != nil {
+								features = append(features, params.acct.FeatureString())
+							}
+							if params.ibcFeaturesFunc != nil {
+								ibcFlags := params.ibcFeaturesFunc()
+								if ibcFlags != "" {
+									features = append(features, ibcFlags)
+								}
+							}
+							if params.gatewayRelayerEnabled {
+								features = append(features, "gwrelayer")
+							}
+							if params.ccqEnabled {
+								features = append(features, "ccq")
 							}
-						}
-						if params.gatewayRelayerEnabled {
-							features = append(features, "gwrelayer")
-						}
-						if params.ccqEnabled {
-							features = append(features, "ccq")
-						}
 
-						heartbeat := &gossipv1.Heartbeat{
-							NodeName:      params.nodeName,
-							Counter:       ctr,
-							Timestamp:     time.Now().UnixNano(),
-							Networks:      networks,
-							Version:       version.Version(),
-							GuardianAddr:  ourAddr.String(),
-							BootTimestamp: bootTime.UnixNano(),
-							Features:      features,
-						}
+							heartbeat := &gossipv1.Heartbeat{
+								NodeName:      params.nodeName,
+								Counter:       ctr,
+								Timestamp:     time.Now().UnixNano(),
+								Networks:      networks,
+								Version:       version.Version(),
+								GuardianAddr:  ourAddr.String(),
+								BootTimestamp: bootTime.UnixNano(),
+								Features:      features,
+							}
 
-						if params.components.P2PIDInHeartbeat {
-							heartbeat.P2PNodeId = nodeIdBytes
-						}
+							if params.components.P2PIDInHeartbeat {
+								heartbeat.P2PNodeId = nodeIdBytes
+							}
 
-						if err := params.gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil {
-							panic(err)
-						}
-						collectNodeMetrics(ourAddr, h.ID(), heartbeat)
+							if err := params.gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil {
+								panic(err)
+							}
+							collectNodeMetrics(ourAddr, h.ID(), heartbeat)
 
-						if params.gov != nil {
-							params.gov.CollectMetrics(heartbeat, params.gossipSendC, params.gk, ourAddr)
-						}
+							if params.gov != nil {
+								params.gov.CollectMetrics(heartbeat, params.gossipControlSendC, params.gk, ourAddr)
+							}
 
-						msg := gossipv1.GossipMessage{
-							Message: &gossipv1.GossipMessage_SignedHeartbeat{
-								SignedHeartbeat: createSignedHeartbeat(params.gk, heartbeat),
-							},
-						}
+							msg := gossipv1.GossipMessage{
+								Message: &gossipv1.GossipMessage_SignedHeartbeat{
+									SignedHeartbeat: createSignedHeartbeat(params.gk, heartbeat),
+								},
+							}
 
-						b, err := proto.Marshal(&msg)
-						if err != nil {
-							panic(err)
+							b, err := proto.Marshal(&msg)
+							if err != nil {
+								panic(err)
+							}
+							return b
+						}()
+
+						if GossipCutoverComplete() {
+							if controlPubsubTopic == nil {
+								panic("controlPubsubTopic should not be nil when nodeName is set")
+							}
+							err = controlPubsubTopic.Publish(ctx, b)
+							p2pMessagesSent.WithLabelValues("control").Inc()
+							if err != nil {
+								logger.Warn("failed to publish heartbeat message", zap.Error(err))
+							}
+						} else if vaaPubsubTopic != nil {
+							err = vaaPubsubTopic.Publish(ctx, b)
+							p2pMessagesSent.WithLabelValues("old_control").Inc()
+							if err != nil {
+								logger.Warn("failed to publish heartbeat message to old topic", zap.Error(err))
+							}
 						}
-						return b
-					}()
 
-					err = th.Publish(ctx, b)
-					if err != nil {
-						logger.Warn("failed to publish heartbeat message", zap.Error(err))
+						p2pHeartbeatsSent.Inc()
+						ctr += 1
 					}
-
-					p2pHeartbeatsSent.Inc()
-					ctr += 1
 				}
-			}
-		}()
+			}()
+		}
 
+		// This routine processes messages received from the internal channels and publishes them to gossip. ///////////////////
+		// NOTE: The go specification says that it is safe to receive on a nil channel, it just blocks forever.
 		go func() {
 			for {
 				select {
 				case <-ctx.Done():
 					return
-				case msg := <-params.gossipSendC:
-					err := th.Publish(ctx, msg)
-					p2pMessagesSent.Inc()
+				case msg := <-params.gossipControlSendC:
+					if GossipCutoverComplete() {
+						if controlPubsubTopic == nil {
+							panic("controlPubsubTopic should not be nil when gossipControlSendC is set")
+						}
+						err := controlPubsubTopic.Publish(ctx, msg)
+						p2pMessagesSent.WithLabelValues("control").Inc()
+						if err != nil {
+							logger.Error("failed to publish message from control queue", zap.Error(err))
+						}
+					} else if vaaPubsubTopic != nil {
+						err := vaaPubsubTopic.Publish(ctx, msg)
+						p2pMessagesSent.WithLabelValues("old_control").Inc()
+						if err != nil {
+							logger.Error("failed to publish message from control queue to old topic", zap.Error(err))
+						}
+					}
+				case msg := <-params.gossipAttestationSendC:
+					if GossipCutoverComplete() {
+						if attestationPubsubTopic == nil {
+							panic("attestationPubsubTopic should not be nil when gossipAttestationSendC is set")
+						}
+						err := attestationPubsubTopic.Publish(ctx, msg)
+						p2pMessagesSent.WithLabelValues("attestation").Inc()
+						if err != nil {
+							logger.Error("failed to publish message from attestation queue", zap.Error(err))
+						}
+					} else if vaaPubsubTopic != nil {
+						err := vaaPubsubTopic.Publish(ctx, msg)
+						p2pMessagesSent.WithLabelValues("old_attestation").Inc()
+						if err != nil {
+							logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err))
+						}
+					}
+				case msg := <-params.gossipVaaSendC:
+					if vaaPubsubTopic == nil {
+						panic("vaaPubsubTopic should not be nil when gossipVaaSendC is set")
+					}
+					err := vaaPubsubTopic.Publish(ctx, msg)
+					p2pMessagesSent.WithLabelValues("vaa").Inc()
 					if err != nil {
-						logger.Error("failed to publish message from queue", zap.Error(err))
+						logger.Error("failed to publish message from vaa queue", zap.Error(err))
 					}
 				case msg := <-params.obsvReqSendC:
 					b, err := proto.Marshal(msg)
@@ -569,193 +691,429 @@ func Run(params *RunParams) func(ctx context.Context) error {
 					}
 
 					// Send to local observation request queue (the loopback message is ignored)
-					if params.obsvReqC != nil {
-						params.obsvReqC <- msg
+					if params.obsvReqRecvC != nil {
+						params.obsvReqRecvC <- msg
 					}
 
-					err = th.Publish(ctx, b)
-					p2pMessagesSent.Inc()
-					if err != nil {
-						logger.Error("failed to publish observation request", zap.Error(err))
-					} else {
-						logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
+					if GossipCutoverComplete() {
+						if controlPubsubTopic == nil {
+							panic("controlPubsubTopic should not be nil when obsvReqSendC is set")
+						}
+						err = controlPubsubTopic.Publish(ctx, b)
+						p2pMessagesSent.WithLabelValues("control").Inc()
+						if err != nil {
+							logger.Error("failed to publish observation request", zap.Error(err))
+						} else {
+							logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
+						}
+					} else if vaaPubsubTopic != nil {
+						err = vaaPubsubTopic.Publish(ctx, b)
+						p2pMessagesSent.WithLabelValues("old_control").Inc()
+						if err != nil {
+							logger.Error("failed to publish observation request to old topic", zap.Error(err))
+						}
 					}
 				}
 			}
 		}()
 
-		for {
-			envelope, err := sub.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
-			if err != nil {
-				return fmt.Errorf("failed to receive pubsub message: %w", err)
-			}
+		errC := make(chan error)
 
-			var msg gossipv1.GossipMessage
-			err = proto.Unmarshal(envelope.Data, &msg)
-			if err != nil {
-				logger.Info("received invalid message",
-					zap.Binary("data", envelope.Data),
-					zap.String("from", envelope.GetFrom().String()))
-				p2pMessagesReceived.WithLabelValues("invalid").Inc()
-				continue
-			}
+		// This routine processes control messages received from gossip. //////////////////////////////////////////////
+		if controlSubscription != nil {
+			go func() {
+				for {
+					envelope, err := controlSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
+					if err != nil {
+						errC <- fmt.Errorf("failed to receive pubsub message on control topic: %w", err)
+						return
+					}
 
-			if envelope.GetFrom() == h.ID() {
-				if logger.Level().Enabled(zapcore.DebugLevel) {
-					logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message))
-				}
-				p2pMessagesReceived.WithLabelValues("loopback").Inc()
-				continue
-			}
+					var msg gossipv1.GossipMessage
+					err = proto.Unmarshal(envelope.Data, &msg)
+					if err != nil {
+						logger.Info("received invalid message on control topic",
+							zap.Binary("data", envelope.Data),
+							zap.String("from", envelope.GetFrom().String()))
+						p2pMessagesReceived.WithLabelValues("invalid").Inc()
+						continue
+					}
 
-			if logger.Level().Enabled(zapcore.DebugLevel) {
-				logger.Debug("received message",
-					zap.Any("payload", msg.Message),
-					zap.Binary("raw", envelope.Data),
-					zap.String("from", envelope.GetFrom().String()))
-			}
+					if envelope.GetFrom() == h.ID() {
+						if logger.Level().Enabled(zapcore.DebugLevel) {
+							logger.Debug("received message from ourselves on control topic, ignoring", zap.Any("payload", msg.Message))
+						}
+						p2pMessagesReceived.WithLabelValues("loopback").Inc()
+						continue
+					}
 
-			switch m := msg.Message.(type) {
-			case *gossipv1.GossipMessage_SignedHeartbeat:
-				s := m.SignedHeartbeat
-				gs := params.gst.Get()
-				if gs == nil {
-					// No valid guardian set yet - dropping heartbeat
-					logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set",
-						zap.Any("value", s),
-						zap.String("from", envelope.GetFrom().String()))
-					break
-				}
-				if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil {
-					p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
-					logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received",
-						zap.Error(err),
-						zap.Any("payload", msg.Message),
-						zap.Any("value", s),
-						zap.Binary("raw", envelope.Data),
-						zap.String("from", envelope.GetFrom().String()))
-				} else {
-					p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc()
-					logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received",
-						zap.Any("value", heartbeat),
-						zap.String("from", envelope.GetFrom().String()))
-
-					func() {
-						if len(heartbeat.P2PNodeId) != 0 {
-							params.components.ProtectedHostByGuardianKeyLock.Lock()
-							defer params.components.ProtectedHostByGuardianKeyLock.Unlock()
-							var peerId peer.ID
-							if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil {
-								logger.Error("p2p_node_id_in_heartbeat_invalid",
-									zap.Any("payload", msg.Message),
-									zap.Any("value", s),
-									zap.Binary("raw", envelope.Data),
-									zap.String("from", envelope.GetFrom().String()))
-							} else {
-								guardianAddr := eth_common.BytesToAddress(s.GuardianAddr)
-								if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) {
-									prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr]
-									if ok {
-										if prevPeerId != peerId {
-											logger.Info("p2p_guardian_peer_changed",
-												zap.String("guardian_addr", guardianAddr.String()),
-												zap.String("prevPeerId", prevPeerId.String()),
-												zap.String("newPeerId", peerId.String()),
-											)
-											params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat")
-											params.components.ConnMgr.Protect(peerId, "heartbeat")
-											params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
-										}
+					if logger.Level().Enabled(zapcore.DebugLevel) {
+						logger.Debug("received message on control topic",
+							zap.Any("payload", msg.Message),
+							zap.Binary("raw", envelope.Data),
+							zap.String("from", envelope.GetFrom().String()))
+					}
+
+					switch m := msg.Message.(type) {
+					case *gossipv1.GossipMessage_SignedHeartbeat:
+						s := m.SignedHeartbeat
+						gs := params.gst.Get()
+						if gs == nil {
+							// No valid guardian set yet - dropping heartbeat
+							logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set",
+								zap.Any("value", s),
+								zap.String("from", envelope.GetFrom().String()))
+							break
+						}
+						if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil {
+							p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
+							logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received",
+								zap.Error(err),
+								zap.Any("payload", msg.Message),
+								zap.Any("value", s),
+								zap.Binary("raw", envelope.Data),
+								zap.String("from", envelope.GetFrom().String()))
+						} else {
+							p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc()
+							logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received",
+								zap.Any("value", heartbeat),
+								zap.String("from", envelope.GetFrom().String()))
+
+							func() {
+								if len(heartbeat.P2PNodeId) != 0 {
+									params.components.ProtectedHostByGuardianKeyLock.Lock()
+									defer params.components.ProtectedHostByGuardianKeyLock.Unlock()
+									var peerId peer.ID
+									if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil {
+										logger.Error("p2p_node_id_in_heartbeat_invalid",
+											zap.Any("payload", msg.Message),
+											zap.Any("value", s),
+											zap.Binary("raw", envelope.Data),
+											zap.String("from", envelope.GetFrom().String()))
 									} else {
-										params.components.ConnMgr.Protect(peerId, "heartbeat")
-										params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
+										guardianAddr := eth_common.BytesToAddress(s.GuardianAddr)
+										if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) {
+											prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr]
+											if ok {
+												if prevPeerId != peerId {
+													logger.Info("p2p_guardian_peer_changed",
+														zap.String("guardian_addr", guardianAddr.String()),
+														zap.String("prevPeerId", prevPeerId.String()),
+														zap.String("newPeerId", peerId.String()),
+													)
+													params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat")
+													params.components.ConnMgr.Protect(peerId, "heartbeat")
+													params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
+												}
+											} else {
+												params.components.ConnMgr.Protect(peerId, "heartbeat")
+												params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
+											}
+										}
+									}
+								} else {
+									if logger.Level().Enabled(zapcore.DebugLevel) {
+										logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName))
 									}
 								}
+							}()
+						}
+					case *gossipv1.GossipMessage_SignedObservationRequest:
+						if params.obsvReqRecvC != nil {
+							s := m.SignedObservationRequest
+							gs := params.gst.Get()
+							if gs == nil {
+								if logger.Level().Enabled(zapcore.DebugLevel) {
+									logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String()))
+								}
+								break
 							}
-						} else {
-							if logger.Level().Enabled(zapcore.DebugLevel) {
-								logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName))
+							r, err := processSignedObservationRequest(s, gs)
+							if err != nil {
+								p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc()
+								if logger.Level().Enabled(zapcore.DebugLevel) {
+									logger.Debug("invalid signed observation request received",
+										zap.Error(err),
+										zap.Any("payload", msg.Message),
+										zap.Any("value", s),
+										zap.Binary("raw", envelope.Data),
+										zap.String("from", envelope.GetFrom().String()))
+								}
+							} else {
+								if logger.Level().Enabled(zapcore.DebugLevel) {
+									logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String()))
+								}
+
+								select {
+								case params.obsvReqRecvC <- r:
+									p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
+								default:
+									p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc()
+								}
 							}
 						}
-					}()
-				}
-			case *gossipv1.GossipMessage_SignedObservation:
-				if params.obsvC != nil {
-					if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, params.obsvC); err == nil {
-						p2pMessagesReceived.WithLabelValues("observation").Inc()
-					} else {
-						if params.components.WarnChannelOverflow {
-							logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
+					case *gossipv1.GossipMessage_SignedChainGovernorConfig:
+						if params.signedGovCfgRecvC != nil {
+							params.signedGovCfgRecvC <- m.SignedChainGovernorConfig
+						}
+					case *gossipv1.GossipMessage_SignedChainGovernorStatus:
+						if params.signedGovStatusRecvC != nil {
+							params.signedGovStatusRecvC <- m.SignedChainGovernorStatus
 						}
-						p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
+					default:
+						p2pMessagesReceived.WithLabelValues("unknown").Inc()
+						logger.Warn("received unknown message type on control topic (running outdated software?)",
+							zap.Any("payload", msg.Message),
+							zap.Binary("raw", envelope.Data),
+							zap.String("from", envelope.GetFrom().String()))
 					}
 				}
-			case *gossipv1.GossipMessage_SignedVaaWithQuorum:
-				if params.signedInC != nil {
-					select {
-					case params.signedInC <- m.SignedVaaWithQuorum:
-						p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc()
-					default:
-						if params.components.WarnChannelOverflow {
-							// TODO do not log this in production
-							var hexStr string
-							if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil {
-								hexStr = vaa.HexDigest()
+			}()
+		}
+
+		// This routine processes attestation messages received from gossip. //////////////////////////////////////////////
+		if attestationSubscription != nil {
+			go func() {
+				for {
+					envelope, err := attestationSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
+					if err != nil {
+						errC <- fmt.Errorf("failed to receive pubsub message on attestation topic: %w", err)
+						return
+					}
+
+					var msg gossipv1.GossipMessage
+					err = proto.Unmarshal(envelope.Data, &msg)
+					if err != nil {
+						logger.Info("received invalid message on attestation topic",
+							zap.Binary("data", envelope.Data),
+							zap.String("from", envelope.GetFrom().String()))
+						p2pMessagesReceived.WithLabelValues("invalid").Inc()
+						continue
+					}
+
+					if envelope.GetFrom() == h.ID() {
+						if logger.Level().Enabled(zapcore.DebugLevel) {
+							logger.Debug("received message from ourselves on attestation topic, ignoring", zap.Any("payload", msg.Message))
+						}
+						p2pMessagesReceived.WithLabelValues("loopback").Inc()
+						continue
+					}
+
+					if logger.Level().Enabled(zapcore.DebugLevel) {
+						logger.Debug("received message on attestation topic",
+							zap.Any("payload", msg.Message),
+							zap.Binary("raw", envelope.Data),
+							zap.String("from", envelope.GetFrom().String()))
+					}
+
+					switch m := msg.Message.(type) {
+					case *gossipv1.GossipMessage_SignedObservation:
+						if params.obsvRecvC != nil {
+							if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil {
+								p2pMessagesReceived.WithLabelValues("observation").Inc()
+							} else {
+								if params.components.WarnChannelOverflow {
+									logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr)))
+								}
+								p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
 							}
-							logger.Warn("Ignoring SignedVaaWithQuorum because signedInC full", zap.String("hash", hexStr))
 						}
-						p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc()
+					default:
+						p2pMessagesReceived.WithLabelValues("unknown").Inc()
+						logger.Warn("received unknown message type on attestation topic (running outdated software?)",
+							zap.Any("payload", msg.Message),
+							zap.Binary("raw", envelope.Data),
+							zap.String("from", envelope.GetFrom().String()))
 					}
 				}
-			case *gossipv1.GossipMessage_SignedObservationRequest:
-				if params.obsvReqC != nil {
-					s := m.SignedObservationRequest
-					gs := params.gst.Get()
-					if gs == nil {
-						if logger.Level().Enabled(zapcore.DebugLevel) {
-							logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String()))
-						}
-						break
+			}()
+		}
+
+		// This routine processes signed VAA messages received from gossip. //////////////////////////////////////////////
+		if vaaSubscription != nil {
+			go func() {
+				for {
+					envelope, err := vaaSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
+					if err != nil {
+						errC <- fmt.Errorf("failed to receive pubsub message on vaa topic: %w", err)
+						return
 					}
-					r, err := processSignedObservationRequest(s, gs)
+
+					var msg gossipv1.GossipMessage
+					err = proto.Unmarshal(envelope.Data, &msg)
 					if err != nil {
-						p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc()
+						logger.Info("received invalid message on vaa topic",
+							zap.Binary("data", envelope.Data),
+							zap.String("from", envelope.GetFrom().String()))
+						p2pMessagesReceived.WithLabelValues("invalid").Inc()
+						continue
+					}
+
+					if envelope.GetFrom() == h.ID() {
 						if logger.Level().Enabled(zapcore.DebugLevel) {
-							logger.Debug("invalid signed observation request received",
+							logger.Debug("received message from ourselves on vaa topic, ignoring", zap.Any("payload", msg.Message))
+						}
+						p2pMessagesReceived.WithLabelValues("loopback").Inc()
+						continue
+					}
+
+					if logger.Level().Enabled(zapcore.DebugLevel) {
+						logger.Debug("received message on vaa topic",
+							zap.Any("payload", msg.Message),
+							zap.Binary("raw", envelope.Data),
+							zap.String("from", envelope.GetFrom().String()))
+					}
+
+					switch m := msg.Message.(type) {
+					case *gossipv1.GossipMessage_SignedHeartbeat: // TODO: Get rid of this after the cutover.
+						s := m.SignedHeartbeat
+						gs := params.gst.Get()
+						if gs == nil {
+							// No valid guardian set yet - dropping heartbeat
+							logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set",
+								zap.Any("value", s),
+								zap.String("from", envelope.GetFrom().String()))
+							break
+						}
+						if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil {
+							p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
+							logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received",
 								zap.Error(err),
 								zap.Any("payload", msg.Message),
 								zap.Any("value", s),
 								zap.Binary("raw", envelope.Data),
 								zap.String("from", envelope.GetFrom().String()))
+						} else {
+							p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc()
+							logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received",
+								zap.Any("value", heartbeat),
+								zap.String("from", envelope.GetFrom().String()))
+
+							func() {
+								if len(heartbeat.P2PNodeId) != 0 {
+									params.components.ProtectedHostByGuardianKeyLock.Lock()
+									defer params.components.ProtectedHostByGuardianKeyLock.Unlock()
+									var peerId peer.ID
+									if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil {
+										logger.Error("p2p_node_id_in_heartbeat_invalid",
+											zap.Any("payload", msg.Message),
+											zap.Any("value", s),
+											zap.Binary("raw", envelope.Data),
+											zap.String("from", envelope.GetFrom().String()))
+									} else {
+										guardianAddr := eth_common.BytesToAddress(s.GuardianAddr)
+										if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) {
+											prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr]
+											if ok {
+												if prevPeerId != peerId {
+													logger.Info("p2p_guardian_peer_changed",
+														zap.String("guardian_addr", guardianAddr.String()),
+														zap.String("prevPeerId", prevPeerId.String()),
+														zap.String("newPeerId", peerId.String()),
+													)
+													params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat")
+													params.components.ConnMgr.Protect(peerId, "heartbeat")
+													params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
+												}
+											} else {
+												params.components.ConnMgr.Protect(peerId, "heartbeat")
+												params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
+											}
+										}
+									}
+								} else {
+									if logger.Level().Enabled(zapcore.DebugLevel) {
+										logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName))
+									}
+								}
+							}()
 						}
-					} else {
-						if logger.Level().Enabled(zapcore.DebugLevel) {
-							logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String()))
+					case *gossipv1.GossipMessage_SignedObservation: // TODO: Get rid of this after the cutover.
+						if params.obsvRecvC != nil {
+							if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil {
+								p2pMessagesReceived.WithLabelValues("observation").Inc()
+							} else {
+								if params.components.WarnChannelOverflow {
+									logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
+								}
+								p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
+							}
 						}
+					case *gossipv1.GossipMessage_SignedVaaWithQuorum:
+						if params.signedIncomingVaaRecvC != nil {
+							select {
+							case params.signedIncomingVaaRecvC <- m.SignedVaaWithQuorum:
+								p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc()
+							default:
+								if params.components.WarnChannelOverflow {
+									var hexStr string
+									if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil {
+										hexStr = vaa.HexDigest()
+									}
+									logger.Warn("Ignoring SignedVaaWithQuorum because signedIncomingVaaRecvC full", zap.String("hash", hexStr))
+								}
+								p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc()
+							}
+						}
+					case *gossipv1.GossipMessage_SignedObservationRequest: // TODO: Get rid of this after the cutover.
+						if params.obsvReqRecvC != nil {
+							s := m.SignedObservationRequest
+							gs := params.gst.Get()
+							if gs == nil {
+								if logger.Level().Enabled(zapcore.DebugLevel) {
+									logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String()))
+								}
+								break
+							}
+							r, err := processSignedObservationRequest(s, gs)
+							if err != nil {
+								p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc()
+								if logger.Level().Enabled(zapcore.DebugLevel) {
+									logger.Debug("invalid signed observation request received",
+										zap.Error(err),
+										zap.Any("payload", msg.Message),
+										zap.Any("value", s),
+										zap.Binary("raw", envelope.Data),
+										zap.String("from", envelope.GetFrom().String()))
+								}
+							} else {
+								if logger.Level().Enabled(zapcore.DebugLevel) {
+									logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String()))
+								}
 
-						select {
-						case params.obsvReqC <- r:
-							p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
-						default:
-							p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc()
+								select {
+								case params.obsvReqRecvC <- r:
+									p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
+								default:
+									p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc()
+								}
+							}
 						}
+					case *gossipv1.GossipMessage_SignedChainGovernorConfig: // TODO: Get rid of this after the cutover.
+						if params.signedGovCfgRecvC != nil {
+							params.signedGovCfgRecvC <- m.SignedChainGovernorConfig
+						}
+					case *gossipv1.GossipMessage_SignedChainGovernorStatus: // TODO: Get rid of this after the cutover.
+						if params.signedGovStatusRecvC != nil {
+							params.signedGovStatusRecvC <- m.SignedChainGovernorStatus
+						}
+					default:
+						p2pMessagesReceived.WithLabelValues("unknown").Inc()
+						logger.Warn("received unknown message type on vaa topic (running outdated software?)",
+							zap.Any("payload", msg.Message),
+							zap.Binary("raw", envelope.Data),
+							zap.String("from", envelope.GetFrom().String()))
 					}
 				}
-			case *gossipv1.GossipMessage_SignedChainGovernorConfig:
-				if params.signedGovCfg != nil {
-					params.signedGovCfg <- m.SignedChainGovernorConfig
-				}
-			case *gossipv1.GossipMessage_SignedChainGovernorStatus:
-				if params.signedGovSt != nil {
-					params.signedGovSt <- m.SignedChainGovernorStatus
-				}
-			default:
-				p2pMessagesReceived.WithLabelValues("unknown").Inc()
-				logger.Warn("received unknown message type (running outdated software?)",
-					zap.Any("payload", msg.Message),
-					zap.Binary("raw", envelope.Data),
-					zap.String("from", envelope.GetFrom().String()))
-			}
+			}()
+		}
+
+		// Wait for either a shutdown or a fatal error from a pubsub subscription.
+		select {
+		case <-ctx.Done():
+			return nil
+		case err := <-errC:
+			return err
 		}
 	}
 }

+ 49 - 43
node/pkg/p2p/run_params.go

@@ -23,40 +23,42 @@ type (
 		gst            *common.GuardianSetState
 		rootCtxCancel  context.CancelFunc
 
-		// obsvC is optional and can be set with `WithSignedObservationListener`.
-		obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]
+		// obsvRecvC is optional and can be set with `WithSignedObservationListener`.
+		obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]
 
-		// obsvReqC is optional and can be set with `WithObservationRequestListener`.
-		obsvReqC chan<- *gossipv1.ObservationRequest
+		// obsvReqRecvC is optional and can be set with `WithObservationRequestListener`.
+		obsvReqRecvC chan<- *gossipv1.ObservationRequest
 
-		// signedInC is optional and can be set with `WithSignedVAAListener`.
-		signedInC chan<- *gossipv1.SignedVAAWithQuorum
+		// signedIncomingVaaRecvC is optional and can be set with `WithSignedVAAListener`.
+		signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum
 
-		// signedGovCfg is optional and can be set with `WithChainGovernorConfigListener`.
-		signedGovCfg chan *gossipv1.SignedChainGovernorConfig
+		// signedGovCfgRecvC is optional and can be set with `WithChainGovernorConfigListener`.
+		signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig
 
-		// WithChainGovernorStatusListener is optional and can be set with `WithChainGovernorStatusListener`.
-		signedGovSt chan *gossipv1.SignedChainGovernorStatus
+		// signedGovStatusRecvC is optional and can be set with `WithChainGovernorStatusListener`.
+		signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus
 
 		// disableHeartbeatVerify is optional and can be set with `WithDisableHeartbeatVerify` or `WithGuardianOptions`.
 		disableHeartbeatVerify bool
 
 		// The following options are guardian specific. Set with `WithGuardianOptions`.
-		nodeName              string
-		gk                    *ecdsa.PrivateKey
-		gossipSendC           chan []byte
-		obsvReqSendC          <-chan *gossipv1.ObservationRequest
-		acct                  *accountant.Accountant
-		gov                   *governor.ChainGovernor
-		components            *Components
-		ibcFeaturesFunc       func() string
-		gatewayRelayerEnabled bool
-		ccqEnabled            bool
-		signedQueryReqC       chan<- *gossipv1.SignedQueryRequest
-		queryResponseReadC    <-chan *query.QueryResponsePublication
-		ccqBootstrapPeers     string
-		ccqPort               uint
-		ccqAllowedPeers       string
+		nodeName               string
+		gk                     *ecdsa.PrivateKey
+		gossipControlSendC     chan []byte
+		gossipAttestationSendC chan []byte
+		gossipVaaSendC         chan []byte
+		obsvReqSendC           <-chan *gossipv1.ObservationRequest
+		acct                   *accountant.Accountant
+		gov                    *governor.ChainGovernor
+		components             *Components
+		ibcFeaturesFunc        func() string
+		gatewayRelayerEnabled  bool
+		ccqEnabled             bool
+		signedQueryReqC        chan<- *gossipv1.SignedQueryRequest
+		queryResponseReadC     <-chan *query.QueryResponsePublication
+		ccqBootstrapPeers      string
+		ccqPort                uint
+		ccqAllowedPeers        string
 	}
 
 	// RunOpt is used to specify optional parameters.
@@ -96,41 +98,41 @@ func NewRunParams(
 }
 
 // WithSignedObservationListener is used to set the channel to receive `SignedObservation“ messages.
-func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
+func WithSignedObservationListener(obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
 	return func(p *RunParams) error {
-		p.obsvC = obsvC
+		p.obsvRecvC = obsvRecvC
 		return nil
 	}
 }
 
 // WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages.
-func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt {
+func WithSignedVAAListener(signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt {
 	return func(p *RunParams) error {
-		p.signedInC = signedInC
+		p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
 		return nil
 	}
 }
 
 // WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages.
-func WithObservationRequestListener(obsvReqC chan<- *gossipv1.ObservationRequest) RunOpt {
+func WithObservationRequestListener(obsvReqRecvC chan<- *gossipv1.ObservationRequest) RunOpt {
 	return func(p *RunParams) error {
-		p.obsvReqC = obsvReqC
+		p.obsvReqRecvC = obsvReqRecvC
 		return nil
 	}
 }
 
 // WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages.
-func WithChainGovernorConfigListener(signedGovCfg chan *gossipv1.SignedChainGovernorConfig) RunOpt {
+func WithChainGovernorConfigListener(signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig) RunOpt {
 	return func(p *RunParams) error {
-		p.signedGovCfg = signedGovCfg
+		p.signedGovCfgRecvC = signedGovCfgRecvC
 		return nil
 	}
 }
 
 // WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages.
-func WithChainGovernorStatusListener(signedGovSt chan *gossipv1.SignedChainGovernorStatus) RunOpt {
+func WithChainGovernorStatusListener(signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus) RunOpt {
 	return func(p *RunParams) error {
-		p.signedGovSt = signedGovSt
+		p.signedGovStatusRecvC = signedGovStatusRecvC
 		return nil
 	}
 }
@@ -147,10 +149,12 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt {
 func WithGuardianOptions(
 	nodeName string,
 	gk *ecdsa.PrivateKey,
-	obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
-	signedInC chan<- *gossipv1.SignedVAAWithQuorum,
-	obsvReqC chan<- *gossipv1.ObservationRequest,
-	gossipSendC chan []byte,
+	obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
+	signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum,
+	obsvReqRecvC chan<- *gossipv1.ObservationRequest,
+	gossipControlSendC chan []byte,
+	gossipAttestationSendC chan []byte,
+	gossipVaaSendC chan []byte,
 	obsvReqSendC <-chan *gossipv1.ObservationRequest,
 	acct *accountant.Accountant,
 	gov *governor.ChainGovernor,
@@ -168,10 +172,12 @@ func WithGuardianOptions(
 	return func(p *RunParams) error {
 		p.nodeName = nodeName
 		p.gk = gk
-		p.obsvC = obsvC
-		p.signedInC = signedInC
-		p.obsvReqC = obsvReqC
-		p.gossipSendC = gossipSendC
+		p.obsvRecvC = obsvRecvC
+		p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
+		p.obsvReqRecvC = obsvReqRecvC
+		p.gossipControlSendC = gossipControlSendC
+		p.gossipAttestationSendC = gossipAttestationSendC
+		p.gossipVaaSendC = gossipVaaSendC
 		p.obsvReqSendC = obsvReqSendC
 		p.acct = acct
 		p.gov = gov

+ 12 - 6
node/pkg/p2p/run_params_test.go

@@ -143,7 +143,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
 	obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42)
 	signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42)
 	obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42)
-	gossipSendC := make(chan []byte, 42)
+	gossipControlSendC := make(chan []byte, 42)
+	gossipAttestationSendC := make(chan []byte, 42)
+	gossipVaaSendC := make(chan []byte, 42)
 	obsvReqSendC := make(<-chan *gossipv1.ObservationRequest, 42)
 
 	acct := &accountant.Accountant{}
@@ -172,7 +174,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
 			obsvC,
 			signedInC,
 			obsvReqC,
-			gossipSendC,
+			gossipControlSendC,
+			gossipAttestationSendC,
+			gossipVaaSendC,
 			obsvReqSendC,
 			acct,
 			gov,
@@ -191,10 +195,12 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
 	require.NoError(t, err)
 	require.NotNil(t, params)
 	assert.Equal(t, nodeName, params.nodeName)
-	assert.Equal(t, obsvC, params.obsvC)
-	assert.Equal(t, signedInC, params.signedInC)
-	assert.Equal(t, obsvReqC, params.obsvReqC)
-	assert.Equal(t, gossipSendC, params.gossipSendC)
+	assert.Equal(t, obsvC, params.obsvRecvC)
+	assert.Equal(t, signedInC, params.signedIncomingVaaRecvC)
+	assert.Equal(t, obsvReqC, params.obsvReqRecvC)
+	assert.Equal(t, gossipControlSendC, params.gossipControlSendC)
+	assert.Equal(t, gossipAttestationSendC, params.gossipAttestationSendC)
+	assert.Equal(t, gossipVaaSendC, params.gossipVaaSendC)
 	assert.Equal(t, obsvReqSendC, params.obsvReqSendC)
 	assert.Equal(t, acct, params.acct)
 	assert.Equal(t, gov, params.gov)

+ 12 - 4
node/pkg/p2p/watermark_test.go

@@ -30,7 +30,9 @@ type G struct {
 	obsvC                  chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]
 	obsvReqC               chan *gossipv1.ObservationRequest
 	obsvReqSendC           chan *gossipv1.ObservationRequest
-	sendC                  chan []byte
+	controlSendC           chan []byte
+	attestationSendC       chan []byte
+	vaaSendC               chan []byte
 	signedInC              chan *gossipv1.SignedVAAWithQuorum
 	priv                   p2pcrypto.PrivKey
 	gk                     *ecdsa.PrivateKey
@@ -67,7 +69,9 @@ func NewG(t *testing.T, nodeName string) *G {
 		obsvC:                  make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs),
 		obsvReqC:               make(chan *gossipv1.ObservationRequest, cs),
 		obsvReqSendC:           make(chan *gossipv1.ObservationRequest, cs),
-		sendC:                  make(chan []byte, cs),
+		controlSendC:           make(chan []byte, cs),
+		attestationSendC:       make(chan []byte, cs),
+		vaaSendC:               make(chan []byte, cs),
 		signedInC:              make(chan *gossipv1.SignedVAAWithQuorum, cs),
 		priv:                   p2ppriv,
 		gk:                     guardianpriv,
@@ -91,7 +95,9 @@ func NewG(t *testing.T, nodeName string) *G {
 		case <-g.signedInC:
 		case <-g.signedGovCfg:
 		case <-g.signedGovSt:
-		case <-g.sendC:
+		case <-g.controlSendC:
+		case <-g.attestationSendC:
+		case <-g.vaaSendC:
 		}
 	}()
 
@@ -178,7 +184,9 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
 			g.obsvC,
 			g.signedInC,
 			g.obsvReqC,
-			g.sendC,
+			g.controlSendC,
+			g.attestationSendC,
+			g.vaaSendC,
 			g.obsvReqSendC,
 			g.acct,
 			g.gov,

+ 2 - 2
node/pkg/processor/broadcast.go

@@ -57,7 +57,7 @@ func (p *Processor) broadcastSignature(
 	}
 
 	// Broadcast the observation.
-	p.gossipSendC <- msg
+	p.gossipAttestationSendC <- msg
 	observationsBroadcast.Inc()
 
 	hash := hex.EncodeToString(digest.Bytes())
@@ -106,7 +106,7 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
 	}
 
 	// Broadcast the signed VAA.
-	p.gossipSendC <- msg
+	p.gossipVaaSendC <- msg
 	signedVAAsBroadcast.Inc()
 
 	if p.gatewayRelayer != nil {

+ 2 - 1
node/pkg/processor/cleanup.go

@@ -228,7 +228,8 @@ func (p *Processor) handleCleanup(ctx context.Context) {
 					if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
 						p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err))
 					}
-					p.gossipSendC <- s.ourMsg
+
+					p.gossipAttestationSendC <- s.ourMsg
 					s.retryCtr++
 					s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr))
 					aggregationStateRetries.Inc()

+ 19 - 12
node/pkg/processor/processor.go

@@ -103,8 +103,13 @@ type Processor struct {
 	msgC <-chan *common.MessagePublication
 	// setC is a channel of guardian set updates
 	setC <-chan *common.GuardianSet
-	// gossipSendC is a channel of outbound messages to broadcast on p2p
-	gossipSendC chan<- []byte
+
+	// gossipAttestationSendC is a channel of outbound observation messages to broadcast on p2p
+	gossipAttestationSendC chan<- []byte
+
+	// gossipVaaSendC is a channel of outbound VAA messages to broadcast on p2p
+	gossipVaaSendC chan<- []byte
+
 	// obsvC is a channel of inbound decoded observations from p2p
 	obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
 
@@ -162,7 +167,8 @@ func NewProcessor(
 	db *db.Database,
 	msgC <-chan *common.MessagePublication,
 	setC <-chan *common.GuardianSet,
-	gossipSendC chan<- []byte,
+	gossipAttestationSendC chan<- []byte,
+	gossipVaaSendC chan<- []byte,
 	obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation],
 	obsvReqSendC chan<- *gossipv1.ObservationRequest,
 	signedInC <-chan *gossipv1.SignedVAAWithQuorum,
@@ -175,15 +181,16 @@ func NewProcessor(
 ) *Processor {
 
 	return &Processor{
-		msgC:         msgC,
-		setC:         setC,
-		gossipSendC:  gossipSendC,
-		obsvC:        obsvC,
-		obsvReqSendC: obsvReqSendC,
-		signedInC:    signedInC,
-		gk:           gk,
-		gst:          gst,
-		db:           db,
+		msgC:                   msgC,
+		setC:                   setC,
+		gossipAttestationSendC: gossipAttestationSendC,
+		gossipVaaSendC:         gossipVaaSendC,
+		obsvC:                  obsvC,
+		obsvReqSendC:           obsvReqSendC,
+		signedInC:              signedInC,
+		gk:                     gk,
+		gst:                    gst,
+		db:                     db,
 
 		logger:         supervisor.Logger(ctx),
 		state:          &aggregationState{observationMap{}},