瀏覽代碼

node: add reobservation requests for Eth chains

commit-id:e4fa6adf
Leo 3 年之前
父節點
當前提交
c28b492e03

+ 46 - 15
node/cmd/guardiand/node.go

@@ -3,6 +3,7 @@ package guardiand
 import (
 	"context"
 	"encoding/base64"
+	"encoding/hex"
 	"fmt"
 	"github.com/certusone/wormhole/node/pkg/db"
 	"github.com/certusone/wormhole/node/pkg/notify/discord"
@@ -521,7 +522,7 @@ func runNode(cmd *cobra.Command, args []string) {
 	// Inbound signed VAAs
 	signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
 
-	// Inbound observation requests
+	// Inbound observation requests from the p2p service (for all chains)
 	obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
 
 	// Outbound observation requests
@@ -533,6 +534,38 @@ func runNode(cmd *cobra.Command, args []string) {
 	// Guardian set state managed by processor
 	gst := common.NewGuardianSetState()
 
+	// Per-chain observation requests
+	chainObsvReqC := make(map[vaa.ChainID]chan *gossipv1.ObservationRequest)
+
+	// Observation request channel for each chain supporting observation requests.
+	chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest)
+	chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest)
+	chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest)
+	chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest)
+	chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest)
+	chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest)
+	if *testnetMode {
+		chainObsvReqC[vaa.ChainIDEthereumRopsten] = make(chan *gossipv1.ObservationRequest)
+	}
+
+	// Multiplex observation requests to the appropriate chain
+	go func() {
+		for {
+			select {
+			case <-rootCtx.Done():
+				return
+			case req := <-obsvReqC:
+				if channel, ok := chainObsvReqC[vaa.ChainID(req.ChainId)]; ok {
+					channel <- req
+				} else {
+					logger.Error("unknown chain ID for reobservation request",
+						zap.Uint32("chain_id", req.ChainId),
+						zap.String("tx_hash", hex.EncodeToString(req.TxHash)))
+				}
+			}
+		}
+	}()
+
 	var notifier *discord.DiscordNotifier
 	if *discordToken != "" {
 		notifier, err = discord.NewDiscordNotifier(*discordToken, *discordChannel, logger)
@@ -624,38 +657,36 @@ func runNode(cmd *cobra.Command, args []string) {
 		}
 
 		if err := supervisor.Run(ctx, "ethwatch",
-			ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1).Run); err != nil {
+			ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum]).Run); err != nil {
 			return err
 		}
 
 		if err := supervisor.Run(ctx, "bscwatch",
-			ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1).Run); err != nil {
+			ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC]).Run); err != nil {
 			return err
 		}
 
 		if err := supervisor.Run(ctx, "polygonwatch",
-			ethereum.NewEthWatcher(
-				*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil,
-				// Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is
-				//
-				// Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect
-				// developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon
-				// as specific public guidance exists for Polygon developers.
-				512).Run); err != nil {
+			ethereum.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, 512, chainObsvReqC[vaa.ChainIDPolygon]).Run); err != nil {
+			// Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is
+			//
+			// Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect
+			// developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon
+			// as specific public guidance exists for Polygon developers.
 			return err
 		}
 		if err := supervisor.Run(ctx, "avalanchewatch",
-			ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1).Run); err != nil {
+			ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche]).Run); err != nil {
 			return err
 		}
 		if err := supervisor.Run(ctx, "oasiswatch",
-			ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1).Run); err != nil {
+			ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis]).Run); err != nil {
 			return err
 		}
 
 		if *testnetMode {
 			if err := supervisor.Run(ctx, "ethropstenwatch",
-				ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, setC, 1).Run); err != nil {
+				ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten]).Run); err != nil {
 				return err
 			}
 		}
@@ -680,7 +711,7 @@ func runNode(cmd *cobra.Command, args []string) {
 		}
 
 		if err := supervisor.Run(ctx, "solwatch-finalized",
-			solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, obsvReqC, rpc.CommitmentFinalized).Run); err != nil {
+			solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized).Run); err != nil {
 			return err
 		}
 

+ 2 - 1
node/hack/parse_eth_tx/parse_eth_tx.go

@@ -32,7 +32,7 @@ func main() {
 	contractAddr := common.HexToAddress(*flagContractAddr)
 	transactionHash := common.HexToHash(*flagTx)
 
-	msgs, err := ethereum.MessageEventsForTransaction(ctx, c, contractAddr, vaa.ChainIDEthereum, transactionHash)
+	block, msgs, err := ethereum.MessageEventsForTransaction(ctx, c, contractAddr, vaa.ChainIDEthereum, transactionHash)
 	if err != nil {
 		log.Fatal(err)
 	}
@@ -52,6 +52,7 @@ func main() {
 		}
 
 		log.Println("------------------------------------------------------")
+		log.Printf("Block: %d", block)
 		log.Printf("Message ID: %s", v.MessageID())
 		log.Printf("Digest: %s", v.HexDigest())
 		log.Printf("VAA: %+v", v)

+ 8 - 6
node/pkg/ethereum/by_transaction.go

@@ -18,28 +18,30 @@ var (
 	logMessagePublishedTopic = eth_common.HexToHash("0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2")
 )
 
+// MessageEventsForTransaction returns the lockup events for a given transaction.
+// Returns the block number and a list of MessagePublication events.
 func MessageEventsForTransaction(
 	ctx context.Context,
 	c *ethclient.Client,
 	contract eth_common.Address,
 	chainId vaa.ChainID,
-	tx eth_common.Hash) ([]*common.MessagePublication, error) {
+	tx eth_common.Hash) (uint64, []*common.MessagePublication, error) {
 
 	f, err := abi.NewAbiFilterer(contract, c)
 	if err != nil {
-		return nil, fmt.Errorf("failed to create ABI filterer: %w", err)
+		return 0, nil, fmt.Errorf("failed to create ABI filterer: %w", err)
 	}
 
 	// Get transactions logs from transaction
 	receipt, err := c.TransactionReceipt(ctx, tx)
 	if err != nil {
-		return nil, fmt.Errorf("failed to get transaction receipt: %w", err)
+		return 0, nil, fmt.Errorf("failed to get transaction receipt: %w", err)
 	}
 
 	// Get block
 	block, err := c.BlockByHash(ctx, receipt.BlockHash)
 	if err != nil {
-		return nil, fmt.Errorf("failed to get block: %w", err)
+		return 0, nil, fmt.Errorf("failed to get block: %w", err)
 	}
 
 	msgs := make([]*common.MessagePublication, 0, len(receipt.Logs))
@@ -61,7 +63,7 @@ func MessageEventsForTransaction(
 
 		ev, err := f.ParseLogMessagePublished(*l)
 		if err != nil {
-			return nil, fmt.Errorf("failed to parse log: %w", err)
+			return 0, nil, fmt.Errorf("failed to parse log: %w", err)
 		}
 
 		message := &common.MessagePublication{
@@ -78,5 +80,5 @@ func MessageEventsForTransaction(
 		msgs = append(msgs, message)
 	}
 
-	return msgs, nil
+	return receipt.BlockNumber.Uint64(), msgs, nil
 }

+ 96 - 1
node/pkg/ethereum/watcher.go

@@ -9,6 +9,7 @@ import (
 	"github.com/prometheus/client_golang/prometheus/promauto"
 	"math/big"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
@@ -87,6 +88,10 @@ type (
 		// the governance mechanism lives there),
 		setChan chan *common.GuardianSet
 
+		// Incoming re-observation requests from the network. Pre-filtered to only
+		// include requests for our chainID.
+		obsvReqC chan *gossipv1.ObservationRequest
+
 		pending   map[pendingKey]*pendingMessage
 		pendingMu sync.Mutex
 
@@ -118,7 +123,8 @@ func NewEthWatcher(
 	chainID vaa.ChainID,
 	messageEvents chan *common.MessagePublication,
 	setEvents chan *common.GuardianSet,
-	minConfirmations uint64) *Watcher {
+	minConfirmations uint64,
+	obsvReqC chan *gossipv1.ObservationRequest) *Watcher {
 	return &Watcher{
 		url:              url,
 		contract:         contract,
@@ -128,6 +134,7 @@ func NewEthWatcher(
 		chainID:          chainID,
 		msgChan:          messageEvents,
 		setChan:          setEvents,
+		obsvReqC:         obsvReqC,
 		pending:          map[pendingKey]*pendingMessage{}}
 }
 
@@ -193,6 +200,93 @@ func (e *Watcher) Run(ctx context.Context) error {
 		}
 	}()
 
+	// Track the current block number so we can compare it to the block number of
+	// the message publication for observation requests.
+	var currentBlockNumber uint64
+
+	go func() {
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case r := <-e.obsvReqC:
+				// This can't happen unless there is a programming error - the caller
+				// is expected to send us only requests for our chainID.
+				if vaa.ChainID(r.ChainId) != e.chainID {
+					panic("invalid chain ID")
+				}
+
+				tx := eth_common.BytesToHash(r.TxHash)
+				logger.Info("received observation request",
+					zap.String("eth_network", e.networkName),
+					zap.String("tx_hash", tx.Hex()))
+
+				// SECURITY: Load the block number before requesting the transaction to avoid a
+				// race condition where requesting the tx succeeds and is then dropped due to a fork,
+				// but blockNumberU had already advanced beyond the required threshold.
+				//
+				// In the primary watcher flow, this is of no concern since we assume the node
+				// always sends the head before it sends the logs (implicit synchronization
+				// by relying on the same websocket connection).
+				blockNumberU := atomic.LoadUint64(&currentBlockNumber)
+				if blockNumberU == 0 {
+					logger.Error("no block number available, ignoring observation request",
+						zap.String("eth_network", e.networkName))
+					continue
+				}
+
+				timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
+				blockNumber, msgs, err := MessageEventsForTransaction(timeout, c, e.contract, e.chainID, tx)
+				cancel()
+
+				if err != nil {
+					logger.Error("failed to process observation request",
+						zap.Error(err), zap.String("eth_network", e.networkName))
+					continue
+				}
+
+				for _, msg := range msgs {
+					expectedConfirmations := uint64(msg.ConsistencyLevel)
+					if expectedConfirmations < e.minConfirmations {
+						expectedConfirmations = e.minConfirmations
+					}
+
+					// SECURITY: In the recovery flow, we already know which transaction to
+					// observe, and we can assume that it has reached the expected finality
+					// level a long time ago. Therefore, the logic is much simpler than the
+					// primary watcher, which has to wait for finality.
+					//
+					// Instead, we can simply check if the transaction's block number is in
+					// the past by more than the expected confirmation number.
+					//
+					// Ensure that the current block number is at least expectedConfirmations
+					// larger than the message observation's block number.
+					if blockNumber+expectedConfirmations <= blockNumberU {
+						logger.Info("re-observed message publication transaction",
+							zap.Stringer("tx", msg.TxHash),
+							zap.Stringer("emitter_address", msg.EmitterAddress),
+							zap.Uint64("sequence", msg.Sequence),
+							zap.Uint64("current_block", blockNumberU),
+							zap.Uint64("observed_block", blockNumber),
+							zap.String("eth_network", e.networkName),
+						)
+						e.msgChan <- msg
+					} else {
+						logger.Info("ignoring re-observed message publication transaction",
+							zap.Stringer("tx", msg.TxHash),
+							zap.Stringer("emitter_address", msg.EmitterAddress),
+							zap.Uint64("sequence", msg.Sequence),
+							zap.Uint64("current_block", blockNumberU),
+							zap.Uint64("observed_block", blockNumber),
+							zap.Uint64("expected_confirmations", expectedConfirmations),
+							zap.String("eth_network", e.networkName),
+						)
+					}
+				}
+			}
+		}
+	}()
+
 	errC := make(chan error)
 	go func() {
 		for {
@@ -288,6 +382,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 				e.pendingMu.Lock()
 
 				blockNumberU := ev.Number.Uint64()
+				atomic.StoreUint64(&currentBlockNumber, blockNumberU)
 
 				for key, pLock := range e.pending {
 					expectedConfirmations := uint64(pLock.message.ConsistencyLevel)

+ 0 - 6
node/pkg/p2p/p2p.go

@@ -491,12 +491,6 @@ func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *n
 		return nil, fmt.Errorf("failed to unmarshal observation request: %w", err)
 	}
 
-	// For now, this supports Solana only. Once we add more chains, we'll have to add a
-	// multiplexer/router in node.go.
-	if h.ChainId != uint32(vaa.ChainIDSolana) {
-		return nil, fmt.Errorf("unsupported chain id: %d", h.ChainId)
-	}
-
 	// TODO: implement per-guardian rate limiting
 
 	return &h, nil