Jelajahi Sumber

Node: Channel writes without blocking (#4276)

* Node: Channel writes without blocking

* Fix a few more things
bruce-riley 4 bulan lalu
induk
melakukan
dd94e09e16
37 mengubah file dengan 136 tambahan dan 81 penghapusan
  1. 3 3
      node/cmd/spy/spy.go
  2. 2 2
      node/hack/evm_test/wstest.go
  3. 2 2
      node/pkg/adminrpc/adminserver.go
  4. 21 0
      node/pkg/common/channel_utils.go
  5. 14 0
      node/pkg/common/channel_utils_test.go
  6. 1 1
      node/pkg/common/guardianset.go
  7. 3 2
      node/pkg/governor/governor_monitoring.go
  8. 1 1
      node/pkg/governor/governor_prices.go
  9. 5 1
      node/pkg/node/node.go
  10. 2 2
      node/pkg/node/options.go
  11. 2 2
      node/pkg/node/publicwebRunnable.go
  12. 6 6
      node/pkg/p2p/p2p.go
  13. 8 3
      node/pkg/processor/broadcast.go
  14. 5 1
      node/pkg/processor/cleanup.go
  15. 6 0
      node/pkg/processor/processor.go
  16. 1 1
      node/pkg/supervisor/supervisor.go
  17. 1 1
      node/pkg/supervisor/supervisor_node.go
  18. 3 3
      node/pkg/supervisor/supervisor_processor.go
  19. 1 1
      node/pkg/supervisor/supervisor_support.go
  20. 4 4
      node/pkg/telemetry/loki.go
  21. 3 3
      node/pkg/txverifier/evmtypes.go
  22. 1 1
      node/pkg/watchers/algorand/watcher.go
  23. 1 1
      node/pkg/watchers/aptos/watcher.go
  24. 4 4
      node/pkg/watchers/cosmwasm/watcher.go
  25. 8 8
      node/pkg/watchers/evm/connectors/batch_poller.go
  26. 1 1
      node/pkg/watchers/evm/connectors/common.go
  27. 3 3
      node/pkg/watchers/evm/connectors/instant_finality.go
  28. 7 7
      node/pkg/watchers/evm/watcher.go
  29. 1 1
      node/pkg/watchers/ibc/watcher.go
  30. 3 3
      node/pkg/watchers/mock/watcher.go
  31. 1 1
      node/pkg/watchers/near/finalizer.go
  32. 2 2
      node/pkg/watchers/near/poll.go
  33. 2 2
      node/pkg/watchers/near/tx_processing.go
  34. 2 2
      node/pkg/watchers/near/watcher.go
  35. 4 4
      node/pkg/watchers/solana/client.go
  36. 1 1
      node/pkg/watchers/solana/shim.go
  37. 1 1
      node/pkg/watchers/sui/watcher.go

+ 3 - 3
node/cmd/spy/spy.go

@@ -126,7 +126,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
 					return err
 				}
 			}
-			sub.ch <- message{vaaBytes: vaaBytes}
+			sub.ch <- message{vaaBytes: vaaBytes} //nolint:channelcheck // Don't want to drop incoming VAAs
 			continue
 		}
 
@@ -146,7 +146,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
 						return err
 					}
 				}
-				sub.ch <- message{vaaBytes: vaaBytes}
+				sub.ch <- message{vaaBytes: vaaBytes} //nolint:channelcheck // Don't want to drop incoming VAAs
 			}
 		}
 
@@ -252,7 +252,7 @@ func newSpyServer(logger *zap.Logger) *spyServer {
 func DoWithTimeout(f func() error, d time.Duration) error {
 	errChan := make(chan error, 1)
 	go func() {
-		errChan <- f()
+		errChan <- f() //nolint:channelcheck // Has timeout below
 		close(errChan)
 	}()
 	t := time.NewTimer(d)

+ 2 - 2
node/hack/evm_test/wstest.go

@@ -77,7 +77,7 @@ func main() {
 				case <-ctx.Done():
 					return
 				case err := <-headerSubscription.Err():
-					errC <- fmt.Errorf("block subscription failed: %w", err)
+					errC <- fmt.Errorf("block subscription failed: %w", err) //nolint:channelcheck // The watcher will exit anyway
 					return
 				case block := <-headSink:
 					// These two pointers should have been checked before the event was placed on the channel, but just being safe.
@@ -114,7 +114,7 @@ func main() {
 				case <-ctx.Done():
 					return
 				case err := <-messageSub.Err():
-					errC <- fmt.Errorf("message subscription failed: %w", err)
+					errC <- fmt.Errorf("message subscription failed: %w", err) //nolint:channelcheck // The watcher will exit anyway
 					return
 				case ev := <-messageC:
 					logger.Info("Received a log event from the contract", zap.Any("ev", ev))

+ 2 - 2
node/pkg/adminrpc/adminserver.go

@@ -840,7 +840,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
 
 		vaaInjectionsTotal.Inc()
 
-		s.injectC <- &common.MessagePublication{
+		s.injectC <- &common.MessagePublication{ //nolint:channelcheck // Only blocks this command
 			TxID:             ethcommon.Hash{}.Bytes(),
 			Timestamp:        v.Timestamp,
 			Nonce:            v.Nonce,
@@ -940,7 +940,7 @@ func (s *nodePrivilegedService) fetchMissing(
 			// Inject into the gossip signed VAA receive path.
 			// This has the same effect as if the VAA was received from the network
 			// (verifying signature, storing in local DB...).
-			s.signedInC <- &gossipv1.SignedVAAWithQuorum{
+			s.signedInC <- &gossipv1.SignedVAAWithQuorum{ //nolint:channelcheck // Only blocks this command
 				Vaa: vaaBytes,
 			}
 

+ 21 - 0
node/pkg/common/channel_utils.go

@@ -2,8 +2,29 @@ package common
 
 import (
 	"context"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+var (
+	channelWriteDrops = promauto.NewCounterVec(
+		prometheus.CounterOpts{
+			Name: "wormhole_channel_write_drops",
+			Help: "Total number of channel writes that were dropped due to channel overflow",
+		}, []string{"channel_id"})
 )
 
+// WriteToChannelWithoutBlocking attempts to write the specified event to the specified channel. If the write would block,
+// it increments the `channelWriteDrops` metric with the specified channel ID.
+func WriteToChannelWithoutBlocking[T any](channel chan<- T, evt T, label string) {
+	select {
+	case channel <- evt:
+	default:
+		channelWriteDrops.WithLabelValues(label).Inc()
+	}
+}
+
 // ReadFromChannelWithTimeout reads events from the channel until a timeout occurs or the max maxCount is reached.
 func ReadFromChannelWithTimeout[T any](ctx context.Context, ch <-chan T, maxCount int) ([]T, error) {
 	out := make([]T, 0, maxCount)

+ 14 - 0
node/pkg/common/channel_utils_test.go

@@ -78,3 +78,17 @@ func TestReadFromChannelWithTimeout_TooMuchData(t *testing.T) {
 	require.Equal(t, 1, len(observations))
 	assert.Equal(t, 3, observations[0])
 }
+
+func TestWriteToChannelWithoutBlocking(t *testing.T) {
+	myChan := make(chan int, 1)
+	assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
+	WriteToChannelWithoutBlocking(myChan, 42, "numbers")
+	assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
+	WriteToChannelWithoutBlocking(myChan, 43, "numbers")
+	assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "numbers"))
+	WriteToChannelWithoutBlocking(myChan, 44, "numbers")
+	assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
+	WriteToChannelWithoutBlocking(myChan, 44, "different_label")
+	assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "different_label"))
+	assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
+}

+ 1 - 1
node/pkg/common/guardianset.go

@@ -177,7 +177,7 @@ func (st *GuardianSetState) SetHeartbeat(addr common.Address, peerId peer.ID, hb
 
 	v[peerId] = hb
 	if st.updateC != nil {
-		st.updateC <- hb
+		WriteToChannelWithoutBlocking(st.updateC, hb, "heartbeat")
 	}
 	return nil
 }

+ 3 - 2
node/pkg/governor/governor_monitoring.go

@@ -81,6 +81,7 @@ import (
 	"sort"
 	"time"
 
+	"github.com/certusone/wormhole/node/pkg/common"
 	"github.com/certusone/wormhole/node/pkg/guardiansigner"
 	gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
 	publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
@@ -628,7 +629,7 @@ func (gov *ChainGovernor) publishConfig(ctx context.Context, hb *gossipv1.Heartb
 		panic(err)
 	}
 
-	sendC <- b
+	common.WriteToChannelWithoutBlocking(sendC, b, "gov_config_gossip_out")
 }
 
 func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, guardianSigner guardiansigner.GuardianSigner, ourAddr ethCommon.Address) {
@@ -713,5 +714,5 @@ func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartb
 		panic(err)
 	}
 
-	sendC <- b
+	common.WriteToChannelWithoutBlocking(sendC, b, "gov_status_gossip_out")
 }

+ 1 - 1
node/pkg/governor/governor_prices.go

@@ -158,7 +158,7 @@ func (gov *ChainGovernor) queryCoinGecko(ctx context.Context) error {
 		for {
 			select {
 			case <-ticker.C:
-				throttle <- 1
+				throttle <- 1 //nolint:channelcheck // We want this to block for throttling
 			case <-ctx.Done():
 				return
 			}

+ 5 - 1
node/pkg/node/node.go

@@ -36,6 +36,10 @@ const (
 	// per second during normal operations. However, since some messages get published immediately, we need to allow extra room.
 	inboundBatchObservationBufferSize = 1000
 
+	// inboundMessageBufferSize configures the size of the msgC channel used to publish new observations from the watcher to the processor.
+	// This channel is shared across all the watchers so we don't want to hang up other watchers while the processor is handling an observation from one.
+	inboundMessageBufferSize = 1000
+
 	// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
 	// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
 	// So in the worst case the entire queue can be processed in 2s.
@@ -143,7 +147,7 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
 	g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
 	g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
 	g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
-	g.msgC = makeChannelPair[*common.MessagePublication](0)
+	g.msgC = makeChannelPair[*common.MessagePublication](inboundMessageBufferSize)
 	g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
 	g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
 	g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize)

+ 2 - 2
node/pkg/node/options.go

@@ -398,7 +398,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
 									zap.String("txID", msg.TxIDString()),
 									zap.Time("timestamp", msg.Timestamp))
 							} else {
-								g.msgC.writeC <- msg
+								g.msgC.writeC <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 							}
 						}
 					}
@@ -424,7 +424,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
 									zap.Stringer("watcherChainId", chainId),
 								)
 							}
-							g.queryResponseC.writeC <- response
+							g.queryResponseC.writeC <- response //nolint:channelcheck // This channel is buffered, if it backs up we'll stop processing queries until it clears
 						}
 					}
 				}(chainQueryResponseC[chainId], chainId)

+ 2 - 2
node/pkg/node/publicwebRunnable.go

@@ -147,9 +147,9 @@ func publicwebServiceRunnable(
 		go func() {
 			logger.Info("publicweb server listening", zap.String("addr", srv.Addr))
 			if tlsHostname != "" {
-				errC <- srv.ServeTLS(listener, "", "")
+				errC <- srv.ServeTLS(listener, "", "") //nolint:channelcheck // Only does one write
 			} else {
-				errC <- srv.Serve(listener)
+				errC <- srv.Serve(listener) //nolint:channelcheck // Only does one write
 			}
 		}()
 		select {

+ 6 - 6
node/pkg/p2p/p2p.go

@@ -661,7 +661,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
 
 					// Send to local observation request queue (the loopback message is ignored)
 					if params.obsvReqRecvC != nil {
-						params.obsvReqRecvC <- msg
+						common.WriteToChannelWithoutBlocking(params.obsvReqRecvC, msg, "obs_req_internal")
 					}
 
 					if controlPubsubTopic == nil {
@@ -686,7 +686,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
 				for {
 					envelope, err := controlSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
 					if err != nil {
-						errC <- fmt.Errorf("failed to receive pubsub message on control topic: %w", err)
+						errC <- fmt.Errorf("failed to receive pubsub message on control topic: %w", err) //nolint:channelcheck // The runnable will exit anyway
 						return
 					}
 
@@ -821,11 +821,11 @@ func Run(params *RunParams) func(ctx context.Context) error {
 						}
 					case *gossipv1.GossipMessage_SignedChainGovernorConfig:
 						if params.signedGovCfgRecvC != nil {
-							params.signedGovCfgRecvC <- m.SignedChainGovernorConfig
+							common.WriteToChannelWithoutBlocking(params.signedGovCfgRecvC, m.SignedChainGovernorConfig, "gov_config_gossip_internal")
 						}
 					case *gossipv1.GossipMessage_SignedChainGovernorStatus:
 						if params.signedGovStatusRecvC != nil {
-							params.signedGovStatusRecvC <- m.SignedChainGovernorStatus
+							common.WriteToChannelWithoutBlocking(params.signedGovStatusRecvC, m.SignedChainGovernorStatus, "gov_status_gossip_internal")
 						}
 					default:
 						p2pMessagesReceived.WithLabelValues("unknown").Inc()
@@ -844,7 +844,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
 				for {
 					envelope, err := attestationSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
 					if err != nil {
-						errC <- fmt.Errorf("failed to receive pubsub message on attestation topic: %w", err)
+						errC <- fmt.Errorf("failed to receive pubsub message on attestation topic: %w", err) //nolint:channelcheck // The runnable will exit anyway
 						return
 					}
 
@@ -902,7 +902,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
 				for {
 					envelope, err := vaaSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
 					if err != nil {
-						errC <- fmt.Errorf("failed to receive pubsub message on vaa topic: %w", err)
+						errC <- fmt.Errorf("failed to receive pubsub message on vaa topic: %w", err) //nolint:channelcheck // The runnable will exit anyway
 						return
 					}
 

+ 8 - 3
node/pkg/processor/broadcast.go

@@ -79,9 +79,14 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
 		panic(err)
 	}
 
-	// Broadcast the signed VAA.
-	p.gossipVaaSendC <- msg
-	signedVAAsBroadcast.Inc()
+	// Broadcast the signed VAA. The channel is buffered. If it overflows, just drop it and rely on a reobservation if necessary.
+	common.WriteToChannelWithoutBlocking(p.gossipVaaSendC, msg, "vaa_broadcast")
+	select {
+	case p.gossipVaaSendC <- msg:
+		signedVAAsBroadcast.Inc()
+	default:
+		vaaPublishChannelOverflow.Inc()
+	}
 
 	if p.gatewayRelayer != nil {
 		p.gatewayRelayer.SubmitVAA(v)

+ 5 - 1
node/pkg/processor/cleanup.go

@@ -231,7 +231,11 @@ func (p *Processor) handleCleanup(ctx context.Context) {
 					}
 					if s.ourMsg != nil {
 						// This is the case for immediately published messages (as well as anything still pending from before the cutover).
-						p.gossipAttestationSendC <- s.ourMsg
+						select {
+						case p.gossipAttestationSendC <- s.ourMsg:
+						default:
+							batchObservationChannelOverflow.WithLabelValues("gossipResend").Inc()
+						}
 					} else {
 						p.postObservationToBatch(s.ourObs)
 					}

+ 6 - 0
node/pkg/processor/processor.go

@@ -186,6 +186,12 @@ var (
 			Help: "Total number of times a write to the batch observation publish channel failed",
 		}, []string{"channel"})
 
+	vaaPublishChannelOverflow = promauto.NewCounter(
+		prometheus.CounterOpts{
+			Name: "wormhole_vaa_publish_channel_overflow",
+			Help: "Total number of times a write to the vaa publish channel failed",
+		})
+
 	timeToHandleObservation = promauto.NewHistogram(
 		prometheus.HistogramOpts{
 			Name:    "wormhole_time_to_handle_observation_us",

+ 1 - 1
node/pkg/supervisor/supervisor.go

@@ -110,7 +110,7 @@ func New(ctx context.Context, logger *zap.Logger, rootRunnable Runnable, opts ..
 
 	go sup.processor(ctx)
 
-	sup.pReq <- &processorRequest{
+	sup.pReq <- &processorRequest{ //nolint:channelcheck // Only does one write
 		schedule: &processorRequestSchedule{dn: "root"},
 	}
 

+ 1 - 1
node/pkg/supervisor/supervisor_node.go

@@ -238,7 +238,7 @@ func (n *node) runGroup(runnables map[string]Runnable) error {
 	// Schedule execution of group members.
 	go func() {
 		for name := range runnables {
-			n.sup.pReq <- &processorRequest{
+			n.sup.pReq <- &processorRequest{ //nolint:channelcheck // Will only block this go routine
 				schedule: &processorRequestSchedule{
 					dn: dns[name],
 				},

+ 3 - 3
node/pkg/supervisor/supervisor_processor.go

@@ -134,7 +134,7 @@ func (s *supervisor) processSchedule(r *processorRequestSchedule) {
 		if !s.propagatePanic {
 			defer func() {
 				if rec := recover(); rec != nil {
-					s.pReq <- &processorRequest{
+					s.pReq <- &processorRequest{ //nolint:channelcheck // Will only block this go routine
 						died: &processorRequestDied{
 							dn:  r.dn,
 							err: fmt.Errorf("panic: %v, stacktrace: %s", rec, string(debug.Stack())),
@@ -146,7 +146,7 @@ func (s *supervisor) processSchedule(r *processorRequestSchedule) {
 
 		res := n.runnable(n.ctx)
 
-		s.pReq <- &processorRequest{
+		s.pReq <- &processorRequest{ //nolint:channelcheck // Will only block this go routine
 			died: &processorRequestDied{
 				dn:  r.dn,
 				err: res,
@@ -387,7 +387,7 @@ func (s *supervisor) processGC() {
 		// Reschedule node runnable to run after backoff.
 		go func(n *node, bo time.Duration) {
 			time.Sleep(bo)
-			s.pReq <- &processorRequest{
+			s.pReq <- &processorRequest{ //nolint:channelcheck // Will only block this go routine
 				schedule: &processorRequestSchedule{dn: n.dn()},
 			}
 		}(n, bo)

+ 1 - 1
node/pkg/supervisor/supervisor_support.go

@@ -18,7 +18,7 @@ func GRPCServer(srv *grpc.Server, lis net.Listener, graceful bool) Runnable {
 		Signal(ctx, SignalHealthy)
 		errC := make(chan error)
 		go func() {
-			errC <- srv.Serve(lis)
+			errC <- srv.Serve(lis) //nolint:channelcheck // Will only block this go routine
 		}()
 		select {
 		case <-ctx.Done():

+ 4 - 4
node/pkg/telemetry/loki.go

@@ -209,7 +209,7 @@ func logWriter(ctx context.Context, logger *zap.Logger, localC chan api.Entry, w
 
 			// Write to Loki in a blocking manner unless we are signaled to shutdown.
 			select {
-			case c.Chan() <- entry:
+			case c.Chan() <- entry: //nolint:channelcheck // We want to block on the Loki client.
 				pendingEntry = nil
 			case <-ctx.Done():
 				// Time to shutdown. We probably failed to write this message, save it so we can try to flush it.
@@ -240,7 +240,7 @@ func flushLogsWithTimeout(localC chan api.Entry, c client.Client, pendingEntry *
 
 	if pendingEntry != nil {
 		select {
-		case c.Chan() <- *pendingEntry:
+		case c.Chan() <- *pendingEntry: //nolint:channelcheck // We want to block on the Loki client. The timeout will interrupt us.
 		case <-timeout.Done():
 			// If we timeout, we didn't write the pending one, so count that as remaining.
 			return (1 + len(localC)), errors.New("timeout writing pending entry")
@@ -250,7 +250,7 @@ func flushLogsWithTimeout(localC chan api.Entry, c client.Client, pendingEntry *
 	for len(localC) > 0 {
 		select {
 		case entry := <-localC:
-			c.Chan() <- entry
+			c.Chan() <- entry //nolint:channelcheck // We want to block on the Loki client. The timeout will interrupt us.
 		case <-timeout.Done():
 			// If we timeout, we didn't write the current one, so count that as remaining.
 			return (1 + len(localC)), errors.New("timeout flushing buffered entry")
@@ -282,7 +282,7 @@ func stopClientWithTimeout(c client.Client) error {
 	stopExitedC := make(chan struct{}, 1)
 	go func(c client.Client) {
 		c.StopNow()
-		stopExitedC <- struct{}{}
+		stopExitedC <- struct{}{} //nolint:channelcheck // We only do a single write.
 	}(c)
 
 	// Wait for the go routine to exit or the timer to expire. Using `time.After` since this is a one shot and we don't have the context.

+ 3 - 3
node/pkg/txverifier/evmtypes.go

@@ -236,8 +236,8 @@ func (s *Subscription) Subscribe(ctx context.Context) {
 				)
 
 				if err != nil {
-					s.errC <- fmt.Errorf("failed to subscribe to logs: %w", err)
-					time.Sleep(RECONNECT_DELAY) // Wait before retrying
+					s.errC <- fmt.Errorf("failed to subscribe to logs: %w", err) //nolint:channelcheck // Will only block this subscriber routine
+					time.Sleep(RECONNECT_DELAY)                                  // Wait before retrying
 					continue
 				}
 
@@ -247,7 +247,7 @@ func (s *Subscription) Subscribe(ctx context.Context) {
 				err = s.handleSubscription(ctx, subscription)
 
 				if err != nil {
-					s.errC <- err
+					s.errC <- err               //nolint:channelcheck // Will only block this subscriber routine
 					time.Sleep(RECONNECT_DELAY) // Wait before retrying
 				}
 			}

+ 1 - 1
node/pkg/watchers/algorand/watcher.go

@@ -185,7 +185,7 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap.
 			zap.Uint8("consistency_level", observation.ConsistencyLevel),
 		)
 
-		e.msgC <- observation
+		e.msgC <- observation //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 	}
 }
 

+ 1 - 1
node/pkg/watchers/aptos/watcher.go

@@ -375,5 +375,5 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq u
 		zap.Uint8("consistencyLevel", observation.ConsistencyLevel),
 	)
 
-	e.msgC <- observation
+	e.msgC <- observation //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 }

+ 4 - 4
node/pkg/watchers/cosmwasm/watcher.go

@@ -216,7 +216,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				blocksBody, err := io.ReadAll(resp.Body)
 				if err != nil {
 					logger.Error("query latest block response read error", zap.String("network", networkName), zap.Error(err))
-					errC <- err
+					errC <- err //nolint:channelcheck // The watcher will exit anyway
 					resp.Body.Close()
 					continue
 				}
@@ -307,7 +307,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger, e.chainID, contractAddressLogKey, e.b64Encoded)
 				for _, msg := range msgs {
 					msg.IsReobservation = true
-					e.msgC <- msg
+					e.msgC <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 					messagesConfirmed.WithLabelValues(networkName).Inc()
 					watchers.ReobservationsByChain.WithLabelValues(networkName, "std").Inc()
 				}
@@ -326,7 +326,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 					p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
 					connectionErrors.WithLabelValues(networkName, "channel_read_error").Inc()
 					logger.Error("error reading channel", zap.String("network", networkName), zap.Error(err))
-					errC <- err
+					errC <- err //nolint:channelcheck // The watcher will exit anyway
 					return nil
 				}
 
@@ -348,7 +348,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 
 				msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger, e.chainID, e.contractAddressLogKey, e.b64Encoded)
 				for _, msg := range msgs {
-					e.msgC <- msg
+					e.msgC <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 					messagesConfirmed.WithLabelValues(networkName).Inc()
 				}
 

+ 8 - 8
node/pkg/watchers/evm/connectors/batch_poller.go

@@ -85,11 +85,11 @@ func (b *BatchPollConnector) SubscribeForBlocks(ctx context.Context, errC chan e
 	// Publish the initial finalized and safe blocks so we have a starting point for reobservation requests.
 	for idx, block := range lastBlocks {
 		b.logger.Info(fmt.Sprintf("publishing initial %s block", b.batchData[idx].finality), zap.Uint64("initial_block", block.Number.Uint64()))
-		sink <- block
+		sink <- block //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
 		if b.generateSafe && b.batchData[idx].finality == Finalized {
 			safe := block.Copy(Safe)
 			b.logger.Info("publishing generated initial safe block", zap.Uint64("initial_block", safe.Number.Uint64()))
-			sink <- safe
+			sink <- safe //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
 		}
 	}
 
@@ -106,7 +106,7 @@ func (b *BatchPollConnector) SubscribeForBlocks(ctx context.Context, errC chan e
 					errCount++
 					b.logger.Error("batch polling encountered an error", zap.Int("errCount", errCount), zap.Error(err))
 					if errCount > 3 {
-						errC <- fmt.Errorf("polling encountered too many errors: %w", err)
+						errC <- fmt.Errorf("polling encountered too many errors: %w", err) //nolint:channelcheck // The watcher will exit anyway
 						return nil
 					}
 				} else if errCount != 0 {
@@ -122,7 +122,7 @@ func (b *BatchPollConnector) SubscribeForBlocks(ctx context.Context, errC chan e
 					b.logger.Error("new latest header block number is nil")
 					continue
 				}
-				sink <- &NewBlock{
+				sink <- &NewBlock{ //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
 					Number:   ev.Number,
 					Time:     ev.Time,
 					Hash:     ev.Hash(),
@@ -197,9 +197,9 @@ func (b *BatchPollConnector) pollBlocks(ctx context.Context, sink chan<- *NewBlo
 							errorFound = true
 							break
 						}
-						sink <- block
+						sink <- block //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
 						if b.generateSafe && b.batchData[idx].finality == Finalized {
-							sink <- block.Copy(Safe)
+							sink <- block.Copy(Safe) //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
 						}
 						lastPublishedBlock = block
 					}
@@ -210,9 +210,9 @@ func (b *BatchPollConnector) pollBlocks(ctx context.Context, sink chan<- *NewBlo
 
 			if !errorFound {
 				// The original value of newBlocks is still good.
-				sink <- newBlock
+				sink <- newBlock //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
 				if b.generateSafe && b.batchData[idx].finality == Finalized {
-					sink <- newBlock.Copy(Safe)
+					sink <- newBlock.Copy(Safe) //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
 				}
 			} else {
 				newBlocks[idx] = lastPublishedBlock

+ 1 - 1
node/pkg/watchers/evm/connectors/common.go

@@ -87,7 +87,7 @@ func (sub *PollSubscription) Err() <-chan error {
 func (sub *PollSubscription) Unsubscribe() {
 	sub.errOnce.Do(func() {
 		select {
-		case sub.quit <- ErrUnsubscribed:
+		case sub.quit <- ErrUnsubscribed: //nolint:channelcheck // We only do a single write.
 			<-sub.unsubDone
 		case <-sub.unsubDone:
 		}

+ 3 - 3
node/pkg/watchers/evm/connectors/instant_finality.go

@@ -54,9 +54,9 @@ func (c *InstantFinalityConnector) SubscribeForBlocks(ctx context.Context, errC
 					Hash:     ev.Hash(),
 					Finality: Finalized,
 				}
-				sink <- block
-				sink <- block.Copy(Safe)
-				sink <- block.Copy(Latest)
+				sink <- block              //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
+				sink <- block.Copy(Safe)   //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
+				sink <- block.Copy(Latest) //nolint:channelcheck // This channel is buffered, if it backs up, we will just stop polling until it clears
 			}
 		}
 	})

+ 7 - 7
node/pkg/watchers/evm/watcher.go

@@ -362,7 +362,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 				return nil
 			case <-t.C:
 				if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil {
-					errC <- fmt.Errorf("failed to request guardian set: %v", err)
+					errC <- fmt.Errorf("failed to request guardian set: %v", err) //nolint:channelcheck // The watcher will exit anyway
 					return nil
 				}
 			}
@@ -418,7 +418,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 				return nil
 			case err := <-messageSub.Err():
 				ethConnectionErrors.WithLabelValues(w.networkName, "subscription_error").Inc()
-				errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
+				errC <- fmt.Errorf("error while processing message publication subscription: %w", err) //nolint:channelcheck // The watcher will exit anyway
 				p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
 				return nil
 			case ev := <-messageC:
@@ -430,7 +430,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 						continue
 					}
 					p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
-					errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err)
+					errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err) //nolint:channelcheck // The watcher will exit anyway
 					return nil
 				}
 
@@ -458,7 +458,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 			case err := <-headerSubscription.Err():
 				logger.Error("error while processing header subscription", zap.Error(err))
 				ethConnectionErrors.WithLabelValues(w.networkName, "header_subscription_error").Inc()
-				errC <- fmt.Errorf("error while processing header subscription: %w", err)
+				errC <- fmt.Errorf("error while processing header subscription: %w", err) //nolint:channelcheck // The watcher will exit anyway
 				p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
 				return nil
 			case ev := <-headSink:
@@ -683,7 +683,7 @@ func (w *Watcher) fetchAndUpdateGuardianSet(
 	w.currentGuardianSet = &idx
 
 	if w.setC != nil {
-		w.setC <- common.NewGuardianSet(gs.Keys, idx)
+		w.setC <- common.NewGuardianSet(gs.Keys, idx) //nolint:channelcheck // Will only block the guardian set update routine
 	}
 
 	return nil
@@ -900,7 +900,7 @@ func (w *Watcher) verifyAndPublish(
 		"publishing new message publication",
 		msg.ZapFields()...,
 	)
-	w.msgC <- msg
+	w.msgC <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 	ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
 	if msg.IsReobservation {
 		watchers.ReobservationsByChain.WithLabelValues(w.chainID.String(), "std").Inc()
@@ -954,7 +954,7 @@ func (w *Watcher) waitForBlockTime(ctx context.Context, logger *zap.Logger, errC
 			ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc()
 			if !canRetryGetBlockTime(err) {
 				p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
-				errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err)
+				errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err) //nolint:channelcheck // The watcher will exit anyway
 				return
 			}
 			if retries >= MaxRetries {

+ 1 - 1
node/pkg/watchers/ibc/watcher.go

@@ -670,7 +670,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs
 		zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel),
 	)
 
-	ce.msgC <- evt.Msg
+	ce.msgC <- evt.Msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 	messagesConfirmed.WithLabelValues(ce.chainName).Inc()
 	if evt.Msg.IsReobservation {
 		watchers.ReobservationsByChain.WithLabelValues(evt.Msg.EmitterChain.String(), "std").Inc()

+ 3 - 3
node/pkg/watchers/mock/watcher.go

@@ -33,9 +33,9 @@ func NewWatcherRunnable(
 				return nil
 			case observation := <-c.MockObservationC:
 				logger.Info("message observed", observation.ZapFields(zap.String("digest", observation.CreateDigest()))...)
-				msgC <- observation
+				msgC <- observation //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 			case gs := <-c.MockSetC:
-				setC <- gs
+				setC <- gs //nolint:channelcheck // Will only block this mock watcher
 			case o := <-obsvReqC:
 				hash := eth_common.BytesToHash(o.TxHash)
 				logger.Info("Received obsv request", zap.String("log_msg_type", "obsv_req_received"), zap.String("tx_hash", hash.Hex()))
@@ -43,7 +43,7 @@ func NewWatcherRunnable(
 				if ok {
 					msg2 := *msg
 					msg2.IsReobservation = true
-					msgC <- &msg2
+					msgC <- &msg2 //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 				}
 			}
 		}

+ 1 - 1
node/pkg/watchers/near/finalizer.go

@@ -64,7 +64,7 @@ func (f Finalizer) isFinalized(logger *zap.Logger, ctx context.Context, queriedB
 	}
 
 	logger.Debug("block finalization cache miss", zap.String("method", "isFinalized"), zap.String("parameters", queriedBlockHash))
-	f.eventChan <- EVENT_FINALIZED_CACHE_MISS
+	f.eventChan <- EVENT_FINALIZED_CACHE_MISS //nolint:channelcheck // Only pauses this watcher
 
 	queriedBlock, err := f.nearAPI.GetBlock(ctx, queriedBlockHash)
 	if err != nil {

+ 2 - 2
node/pkg/watchers/near/poll.go

@@ -50,7 +50,7 @@ func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context
 	// we want to avoid going too far back because that would increase the likelihood of error somewhere in the recursion stack.
 	// If we go back too far, we just report the error and terminate early.
 	if recursionDepth > maxFallBehindBlocks {
-		e.eventChan <- EVENT_NEAR_WATCHER_TOO_FAR_BEHIND
+		e.eventChan <- EVENT_NEAR_WATCHER_TOO_FAR_BEHIND //nolint:channelcheck // Only pauses this watcher
 		return errors.New("recursivelyReadFinalizedBlocks: maxFallBehindBlocks")
 	}
 
@@ -71,7 +71,7 @@ func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context
 	chunks := startBlock.ChunkHashes()
 	// process chunks after recursion such that youngest chunks get processed first
 	for i := 0; i < len(chunks); i++ {
-		chunkSink <- chunks[i]
+		chunkSink <- chunks[i] //nolint:channelcheck // Only pauses this watcher
 	}
 	return nil
 }

+ 2 - 2
node/pkg/watchers/near/tx_processing.go

@@ -254,7 +254,7 @@ func (e *Watcher) processWormholeLog(logger *zap.Logger, _ context.Context, job
 	// tell everyone about it
 	job.hasWormholeMsg = true
 
-	e.eventChan <- EVENT_NEAR_MESSAGE_CONFIRMED
+	e.eventChan <- EVENT_NEAR_MESSAGE_CONFIRMED //nolint:channelcheck // Only pauses this watcher
 
 	logger.Info("message observed",
 		zap.String("log_msg_type", "wormhole_event_success"),
@@ -268,7 +268,7 @@ func (e *Watcher) processWormholeLog(logger *zap.Logger, _ context.Context, job
 		zap.Uint8("consistency_level", observation.ConsistencyLevel),
 	)
 
-	e.msgC <- observation
+	e.msgC <- observation //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 
 	return nil
 }

+ 2 - 2
node/pkg/watchers/near/watcher.go

@@ -266,7 +266,7 @@ func (e *Watcher) runTxProcessor(ctx context.Context) error {
 
 			if job.hasWormholeMsg {
 				// report how long it took to process this transaction
-				e.eventChanTxProcessedDuration <- time.Since(job.creationTime)
+				e.eventChanTxProcessedDuration <- time.Since(job.creationTime) //nolint:channelcheck // Only pauses this watcher
 			}
 		}
 
@@ -343,7 +343,7 @@ func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, d
 				select {
 				case <-ctx.Done():
 					return nil
-				case e.transactionProcessingQueue <- job:
+				case e.transactionProcessingQueue <- job: //nolint:channelcheck // Only blocking this go routine.
 				}
 			}
 			return nil

+ 4 - 4
node/pkg/watchers/solana/client.go

@@ -334,7 +334,7 @@ func (s *SolanaWatcher) setupWebSocket(ctx context.Context) error {
 					logger.Error("failed to read from account web socket", zap.Error(err))
 					return err
 				} else {
-					s.pumpData <- msg
+					s.pumpData <- msg //nolint:channelcheck // Only pauses this watcher
 				}
 			}
 		}
@@ -416,7 +416,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
 				if err != nil {
 					p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
 					solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "account_subscription_data").Inc()
-					s.errC <- err
+					s.errC <- err //nolint:channelcheck // The watcher will exit anyway
 					return err
 				}
 			case m := <-s.obsvReqC:
@@ -453,7 +453,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
 				if err != nil {
 					p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
 					solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "get_slot_error").Inc()
-					s.errC <- err
+					s.errC <- err //nolint:channelcheck // The watcher will exit anyway
 					return err
 				}
 
@@ -1088,7 +1088,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
 		)
 	}
 
-	s.msgC <- observation
+	s.msgC <- observation //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 	return 1
 }
 

+ 1 - 1
node/pkg/watchers/solana/shim.go

@@ -385,7 +385,7 @@ func (s *SolanaWatcher) shimProcessRest(
 		)
 	}
 
-	s.msgC <- observation
+	s.msgC <- observation //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 
 	return nil
 }

+ 1 - 1
node/pkg/watchers/sui/watcher.go

@@ -291,7 +291,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservatio
 		zap.Uint8("consistencyLevel", observation.ConsistencyLevel),
 	)
 
-	e.msgChan <- observation
+	e.msgChan <- observation //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
 
 	return nil
 }