Преглед изворни кода

Node/EVM: Remove obsolete field in logs (#3851)

* Node/EVM: Remove obsolete field in logs

* Rework

* More rework

* More rework
bruce-riley пре 1 година
родитељ
комит
21eea8c2ed

+ 1 - 1
node/pkg/watchers/evm/connectors/celo.go

@@ -56,7 +56,7 @@ func NewCeloConnector(ctx context.Context, networkName, rawUrl string, address e
 	return &CeloConnector{
 		networkName: networkName,
 		address:     address,
-		logger:      logger.With(zap.String("eth_network", networkName)),
+		logger:      logger,
 		client:      client,
 		rawClient:   rawClient,
 		filterer:    filterer,

+ 1 - 1
node/pkg/watchers/evm/connectors/ethereum.go

@@ -51,7 +51,7 @@ func NewEthereumBaseConnector(ctx context.Context, networkName, rawUrl string, a
 	return &EthereumBaseConnector{
 		networkName: networkName,
 		address:     address,
-		logger:      logger.With(zap.String("eth_network", networkName)),
+		logger:      logger,
 		client:      client,
 		filterer:    filterer,
 		caller:      caller,

+ 13 - 12
node/pkg/watchers/evm/connectors/finalizer_poller.go

@@ -22,18 +22,20 @@ type PollFinalizer interface {
 // FinalizerPollConnector polls for new blocks. It takes a finalizer which will be used to determine when a block is finalized.
 type FinalizerPollConnector struct {
 	Connector
+	logger    *zap.Logger
 	Delay     time.Duration
 	finalizer PollFinalizer
 	blockFeed ethEvent.Feed
 	errFeed   ethEvent.Feed
 }
 
-func NewFinalizerPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration) (*FinalizerPollConnector, error) {
+func NewFinalizerPollConnector(ctx context.Context, logger *zap.Logger, baseConnector Connector, finalizer PollFinalizer, delay time.Duration) (*FinalizerPollConnector, error) {
 	if finalizer == nil {
 		panic("finalizer must not be nil")
 	}
 	connector := &FinalizerPollConnector{
 		Connector: baseConnector,
+		logger:    logger,
 		Delay:     delay,
 		finalizer: finalizer,
 	}
@@ -74,13 +76,12 @@ func (b *FinalizerPollConnector) SubscribeForBlocks(ctx context.Context, errC ch
 }
 
 func (b *FinalizerPollConnector) runFromSupervisor(ctx context.Context) error {
-	logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName()))
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
-	return b.run(ctx, logger)
+	return b.run(ctx)
 }
 
-func (b *FinalizerPollConnector) run(ctx context.Context, logger *zap.Logger) error {
-	prevLatest, err := GetLatestBlock(ctx, logger, b.Connector)
+func (b *FinalizerPollConnector) run(ctx context.Context) error {
+	prevLatest, err := GetLatestBlock(ctx, b.logger, b.Connector)
 	if err != nil {
 		return err
 	}
@@ -101,10 +102,10 @@ func (b *FinalizerPollConnector) run(ctx context.Context, logger *zap.Logger) er
 		case <-ctx.Done():
 			return ctx.Err()
 		case <-timer.C:
-			prevLatest, prevFinalized, err = b.pollBlock(ctx, logger, prevLatest, prevFinalized)
+			prevLatest, prevFinalized, err = b.pollBlock(ctx, prevLatest, prevFinalized)
 			if err != nil {
 				errCount++
-				logger.Error("polling encountered an error", zap.Int("errCount", errCount), zap.Error(err))
+				b.logger.Error("polling encountered an error", zap.Int("errCount", errCount), zap.Error(err))
 				if errCount > 3 {
 					b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err))
 					errCount = 0
@@ -120,8 +121,8 @@ func (b *FinalizerPollConnector) run(ctx context.Context, logger *zap.Logger) er
 
 // pollBlock poll for the latest block, compares them to the last one, and publishes any new ones.
 // In the case of an error, it returns the last block that were passed in, otherwise it returns the new block.
-func (b *FinalizerPollConnector) pollBlock(ctx context.Context, logger *zap.Logger, prevLatest *NewBlock, prevFinalized *NewBlock) (newLatest *NewBlock, newFinalized *NewBlock, err error) {
-	newLatest, err = GetLatestBlock(ctx, logger, b.Connector)
+func (b *FinalizerPollConnector) pollBlock(ctx context.Context, prevLatest *NewBlock, prevFinalized *NewBlock) (newLatest *NewBlock, newFinalized *NewBlock, err error) {
+	newLatest, err = GetLatestBlock(ctx, b.logger, b.Connector)
 	if err != nil {
 		err = fmt.Errorf("failed to get latest block: %w", err)
 		newLatest = prevLatest
@@ -135,7 +136,7 @@ func (b *FinalizerPollConnector) pollBlock(ctx context.Context, logger *zap.Logg
 		// If there is a gap between prev and new, we have to look up the hashes for the missing ones. Do that in batches.
 		newBlockNum := newLatest.Number.Uint64()
 		for blockNum := prevLatest.Number.Uint64() + 1; blockNum < newBlockNum; blockNum++ {
-			block, err = GetBlockByNumberUint64(ctx, logger, b.Connector, blockNum, Latest)
+			block, err = GetBlockByNumberUint64(ctx, b.logger, b.Connector, blockNum, Latest)
 			if err != nil {
 				err = fmt.Errorf("failed to get gap block: %w", err)
 				newLatest = prevLatest
@@ -148,7 +149,7 @@ func (b *FinalizerPollConnector) pollBlock(ctx context.Context, logger *zap.Logg
 
 		b.blockFeed.Send(newLatest)
 	} else if newLatest.Number.Cmp(prevLatest.Number) < 0 {
-		logger.Debug("latest block number went backwards, ignoring it", zap.Any("newLatest", newLatest), zap.Any("prevLatest", prevLatest))
+		b.logger.Debug("latest block number went backwards, ignoring it", zap.Any("newLatest", newLatest), zap.Any("prevLatest", prevLatest))
 		newLatest = prevLatest
 	}
 
@@ -159,7 +160,7 @@ func (b *FinalizerPollConnector) pollBlock(ctx context.Context, logger *zap.Logg
 		// If there is a gap between prev and new, we have to look up the hashes for the missing ones. Do that in batches.
 		newBlockNum := newLatest.Number.Uint64()
 		for blockNum := prevFinalized.Number.Uint64() + 1; blockNum <= newBlockNum; blockNum++ {
-			block, err = GetBlockByNumberUint64(ctx, logger, b.Connector, blockNum, Finalized)
+			block, err = GetBlockByNumberUint64(ctx, b.logger, b.Connector, blockNum, Finalized)
 			if err != nil {
 				err = fmt.Errorf("failed to get gap block: %w", err)
 				newLatest = prevLatest

+ 2 - 1
node/pkg/watchers/evm/connectors/finalizer_poller_test.go

@@ -206,6 +206,7 @@ func TestBlockPoller(t *testing.T) {
 
 	poller := &FinalizerPollConnector{
 		Connector: &baseConnector,
+		logger:    logger,
 		Delay:     1 * time.Millisecond,
 		finalizer: finalizer,
 	}
@@ -227,7 +228,7 @@ func TestBlockPoller(t *testing.T) {
 		mutex.Lock()
 		pollerStatus = pollerRunning
 		mutex.Unlock()
-		err := poller.run(ctx, logger)
+		err := poller.run(ctx)
 		require.NoError(t, err)
 		mutex.Lock()
 		pollerStatus = pollerExited

+ 11 - 8
node/pkg/watchers/evm/connectors/logpoller.go

@@ -23,6 +23,7 @@ import (
 // finalized message log events.
 type LogPollConnector struct {
 	Connector
+	logger      *zap.Logger
 	client      *ethClient.Client
 	messageFeed ethEvent.Feed
 	errFeed     ethEvent.Feed
@@ -30,8 +31,12 @@ type LogPollConnector struct {
 	prevBlockNum *big.Int
 }
 
-func NewLogPollConnector(ctx context.Context, baseConnector Connector, client *ethClient.Client) (*LogPollConnector, error) {
-	connector := &LogPollConnector{Connector: baseConnector, client: client}
+func NewLogPollConnector(ctx context.Context, logger *zap.Logger, baseConnector Connector, client *ethClient.Client) (*LogPollConnector, error) {
+	connector := &LogPollConnector{
+		Connector: baseConnector,
+		logger:    logger,
+		client:    client,
+	}
 	// The supervisor will keep the poller running
 	err := supervisor.Run(ctx, "logPoller", common.WrapWithScissors(connector.run, "logPoller"))
 	if err != nil {
@@ -41,8 +46,6 @@ func NewLogPollConnector(ctx context.Context, baseConnector Connector, client *e
 }
 
 func (l *LogPollConnector) run(ctx context.Context) error {
-	logger := supervisor.Logger(ctx).With(zap.String("eth_network", l.Connector.NetworkName()))
-
 	blockChan := make(chan *NewBlock)
 	errC := make(chan error)
 
@@ -62,7 +65,7 @@ func (l *LogPollConnector) run(ctx context.Context) error {
 		case err := <-errC:
 			return err
 		case block := <-blockChan:
-			if err := l.processBlock(ctx, logger, block); err != nil {
+			if err := l.processBlock(ctx, block); err != nil {
 				l.errFeed.Send(err.Error())
 			}
 		}
@@ -103,7 +106,7 @@ var (
 	logsLogMessageTopic = ethCommon.HexToHash("0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2")
 )
 
-func (l *LogPollConnector) processBlock(ctx context.Context, logger *zap.Logger, block *NewBlock) error {
+func (l *LogPollConnector) processBlock(ctx context.Context, block *NewBlock) error {
 	if l.prevBlockNum == nil {
 		l.prevBlockNum = new(big.Int).Set(block.Number)
 	} else {
@@ -122,7 +125,7 @@ func (l *LogPollConnector) processBlock(ctx context.Context, logger *zap.Logger,
 	defer cancel()
 	logs, err := l.client.FilterLogs(tCtx, filter)
 	if err != nil {
-		logger.Error("GetLogsQuery: query of eth_getLogs failed",
+		l.logger.Error("GetLogsQuery: query of eth_getLogs failed",
 			zap.Stringer("FromBlock", filter.FromBlock),
 			zap.Stringer("ToBlock", filter.ToBlock),
 			zap.Error(err),
@@ -141,7 +144,7 @@ func (l *LogPollConnector) processBlock(ctx context.Context, logger *zap.Logger,
 		}
 		ev, err := l.ParseLogMessagePublished(log)
 		if err != nil {
-			logger.Error("GetLogsQuery: failed to parse log entry",
+			l.logger.Error("GetLogsQuery: failed to parse log entry",
 				zap.Stringer("FromBlock", filter.FromBlock),
 				zap.Stringer("ToBlock", filter.ToBlock),
 				zap.Error(err),

+ 16 - 33
node/pkg/watchers/evm/watcher.go

@@ -311,9 +311,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 				}
 
 				tx := eth_common.BytesToHash(r.TxHash)
-				logger.Info("received observation request",
-					zap.String("eth_network", w.networkName),
-					zap.String("tx_hash", tx.Hex()))
+				logger.Info("received observation request", zap.String("tx_hash", tx.Hex()))
 
 				// SECURITY: Load the block number before requesting the transaction to avoid a
 				// race condition where requesting the tx succeeds and is then dropped due to a fork,
@@ -330,9 +328,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 				cancel()
 
 				if err != nil {
-					logger.Error("failed to process observation request",
-						zap.Error(err), zap.String("eth_network", w.networkName),
-						zap.String("tx_hash", tx.Hex()))
+					logger.Error("failed to process observation request", zap.String("tx_hash", tx.Hex()), zap.Error(err))
 					continue
 				}
 
@@ -345,7 +341,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 							zap.Uint64("sequence", msg.Sequence),
 							zap.Uint64("current_block", blockNumberU),
 							zap.Uint64("observed_block", blockNumber),
-							zap.String("eth_network", w.networkName),
 						)
 						w.msgC <- msg
 						continue
@@ -353,8 +348,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 
 					if msg.ConsistencyLevel == vaa.ConsistencyLevelSafe {
 						if safeBlockNumberU == 0 {
-							logger.Error("no safe block number available, ignoring observation request",
-								zap.String("eth_network", w.networkName))
+							logger.Error("no safe block number available, ignoring observation request")
 							continue
 						}
 
@@ -365,7 +359,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 								zap.Uint64("sequence", msg.Sequence),
 								zap.Uint64("current_safe_block", safeBlockNumberU),
 								zap.Uint64("observed_block", blockNumber),
-								zap.String("eth_network", w.networkName),
 							)
 							w.msgC <- msg
 						} else {
@@ -375,7 +368,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 								zap.Uint64("sequence", msg.Sequence),
 								zap.Uint64("current_safe_block", safeBlockNumberU),
 								zap.Uint64("observed_block", blockNumber),
-								zap.String("eth_network", w.networkName),
 							)
 						}
 
@@ -383,8 +375,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 					}
 
 					if blockNumberU == 0 {
-						logger.Error("no block number available, ignoring observation request",
-							zap.String("eth_network", w.networkName))
+						logger.Error("no block number available, ignoring observation request")
 						continue
 					}
 
@@ -404,7 +395,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 							zap.Uint64("sequence", msg.Sequence),
 							zap.Uint64("current_block", blockNumberU),
 							zap.Uint64("observed_block", blockNumber),
-							zap.String("eth_network", w.networkName),
 						)
 						w.msgC <- msg
 					} else {
@@ -414,7 +404,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 							zap.Uint64("sequence", msg.Sequence),
 							zap.Uint64("current_block", blockNumberU),
 							zap.Uint64("observed_block", blockNumber),
-							zap.String("eth_network", w.networkName),
 						)
 					}
 				}
@@ -479,11 +468,11 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 			case ev := <-headSink:
 				// These two pointers should have been checked before the event was placed on the channel, but just being safe.
 				if ev == nil {
-					logger.Error("new header event is nil", zap.String("eth_network", w.networkName))
+					logger.Error("new header event is nil")
 					continue
 				}
 				if ev.Number == nil {
-					logger.Error("new header block number is nil", zap.String("eth_network", w.networkName), zap.Stringer("finality", ev.Finality))
+					logger.Error("new header block number is nil", zap.Stringer("finality", ev.Finality))
 					continue
 				}
 
@@ -494,7 +483,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 					zap.Uint64("block_time", ev.Time),
 					zap.Stringer("current_blockhash", currentHash),
 					zap.Stringer("finality", ev.Finality),
-					zap.String("eth_network", w.networkName))
+				)
 				readiness.SetReady(w.readinessSync)
 
 				blockNumberU := ev.Number.Uint64()
@@ -538,7 +527,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 							zap.Stringer("current_block", ev.Number),
 							zap.Stringer("finality", ev.Finality),
 							zap.Stringer("current_blockhash", currentHash),
-							zap.String("eth_network", w.networkName),
 						)
 						ethMessagesOrphaned.WithLabelValues(w.networkName, "timeout").Inc()
 						delete(w.pending, key)
@@ -568,7 +556,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 								zap.Stringer("current_block", ev.Number),
 								zap.Stringer("finality", ev.Finality),
 								zap.Stringer("current_blockhash", currentHash),
-								zap.String("eth_network", w.networkName),
 								zap.Error(err))
 							delete(w.pending, key)
 							ethMessagesOrphaned.WithLabelValues(w.networkName, "not_found").Inc()
@@ -587,7 +574,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 								zap.Stringer("current_block", ev.Number),
 								zap.Stringer("finality", ev.Finality),
 								zap.Stringer("current_blockhash", currentHash),
-								zap.String("eth_network", w.networkName),
 								zap.Error(err))
 							delete(w.pending, key)
 							ethMessagesOrphaned.WithLabelValues(w.networkName, "tx_failed").Inc()
@@ -604,7 +590,6 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 								zap.Stringer("current_block", ev.Number),
 								zap.Stringer("finality", ev.Finality),
 								zap.Stringer("current_blockhash", currentHash),
-								zap.String("eth_network", w.networkName),
 								zap.Error(err))
 							continue
 						}
@@ -621,7 +606,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 								zap.Stringer("current_block", ev.Number),
 								zap.Stringer("finality", ev.Finality),
 								zap.Stringer("current_blockhash", currentHash),
-								zap.String("eth_network", w.networkName))
+							)
 							delete(w.pending, key)
 							ethMessagesOrphaned.WithLabelValues(w.networkName, "blockhash_mismatch").Inc()
 							continue
@@ -635,7 +620,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 							zap.Stringer("current_block", ev.Number),
 							zap.Stringer("finality", ev.Finality),
 							zap.Stringer("current_blockhash", currentHash),
-							zap.String("eth_network", w.networkName))
+						)
 						delete(w.pending, key)
 						w.msgC <- pLock.message
 						ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
@@ -648,7 +633,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
 					zap.Stringer("finality", ev.Finality),
 					zap.Stringer("current_blockhash", currentHash),
 					zap.Duration("took", time.Since(start)),
-					zap.String("eth_network", w.networkName))
+				)
 			}
 		}
 	})
@@ -687,9 +672,7 @@ func (w *Watcher) fetchAndUpdateGuardianSet(
 		return nil
 	}
 
-	logger.Info("updated guardian set found",
-		zap.Any("value", gs), zap.Uint32("index", idx),
-		zap.String("eth_network", w.networkName))
+	logger.Info("updated guardian set found", zap.Any("value", gs), zap.Uint32("index", idx))
 
 	w.currentGuardianSet = &idx
 
@@ -843,7 +826,7 @@ func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublis
 			zap.Uint64("Sequence", ev.Sequence),
 			zap.Uint32("Nonce", ev.Nonce),
 			zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
-			zap.String("eth_network", w.networkName))
+		)
 
 		w.msgC <- message
 		ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
@@ -858,7 +841,7 @@ func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublis
 		zap.Uint64("Sequence", ev.Sequence),
 		zap.Uint32("Nonce", ev.Nonce),
 		zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
-		zap.String("eth_network", w.networkName))
+	)
 
 	key := pendingKey{
 		TxHash:         message.TxHash,
@@ -890,7 +873,7 @@ func (w *Watcher) waitForBlockTime(ctx context.Context, logger *zap.Logger, errC
 		zap.Uint64("Sequence", ev.Sequence),
 		zap.Uint32("Nonce", ev.Nonce),
 		zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
-		zap.String("eth_network", w.networkName))
+	)
 
 	const RetryInterval = 5 * time.Second
 	const MaxRetries = 3
@@ -915,7 +898,7 @@ func (w *Watcher) waitForBlockTime(ctx context.Context, logger *zap.Logger, errC
 					zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
 					zap.Stringer("startTime", start),
 					zap.Int("retries", retries),
-					zap.String("eth_network", w.networkName))
+				)
 
 				w.postMessage(logger, ev, blockTime)
 				return
@@ -937,7 +920,7 @@ func (w *Watcher) waitForBlockTime(ctx context.Context, logger *zap.Logger, errC
 					zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
 					zap.Stringer("startTime", start),
 					zap.Int("retries", retries),
-					zap.String("eth_network", w.networkName))
+				)
 
 				return
 			}