Kaynağa Gözat

Node/spy: Adds optional VAA signature verification (#3854)

* Node/Spy: VAA Verifier

* Don't use parseAndVerifyVM

* Only verify if VAA will be published
bruce-riley 1 yıl önce
ebeveyn
işleme
2d680058cf
3 değiştirilmiş dosya ile 196 ekleme ve 2 silme
  1. 4 0
      devnet/spy.yaml
  2. 61 2
      node/cmd/spy/spy.go
  3. 131 0
      node/cmd/spy/vaa_verifier.go

+ 4 - 0
devnet/spy.yaml

@@ -48,6 +48,10 @@ spec:
             # Hardcoded devnet bootstrap (generated from deterministic key in guardiand)
             - --bootstrap
             - /dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw
+            - --ethRPC
+            - http://eth-devnet:8545
+            - --ethContract
+            - "0xC89Ce4735882C9F0f0FE26686c53074E09B0D550"
             - --logLevel=warn
           ports:
             - containerPort: 7072

+ 61 - 2
node/cmd/spy/spy.go

@@ -2,6 +2,7 @@ package spy
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"net"
 	"net/http"
@@ -48,6 +49,9 @@ var (
 	spyRPC *string
 
 	sendTimeout *time.Duration
+
+	ethRPC      *string
+	ethContract *string
 )
 
 func init() {
@@ -65,6 +69,9 @@ func init() {
 	spyRPC = SpyCmd.Flags().String("spyRPC", "", "Listen address for gRPC interface")
 
 	sendTimeout = SpyCmd.Flags().Duration("sendTimeout", 5*time.Second, "Timeout for sending a message to a subscriber")
+
+	ethRPC = SpyCmd.Flags().String("ethRPC", "", "Ethereum RPC for verifying VAAs (optional)")
+	ethContract = SpyCmd.Flags().String("ethContract", "", "Ethereum core bridge address for verifying VAAs (required if ethRPC is specified)")
 }
 
 // SpyCmd represents the node command
@@ -79,6 +86,7 @@ type spyServer struct {
 	logger          *zap.Logger
 	subsSignedVaa   map[string]*subscriptionSignedVaa
 	subsSignedVaaMu sync.Mutex
+	vaaVerifier     *VaaVerifier
 }
 
 type message struct {
@@ -103,15 +111,23 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
 	defer s.subsSignedVaaMu.Unlock()
 
 	var v *vaa.VAA
+	var err error
+	verified := s.vaaVerifier == nil
 
 	for _, sub := range s.subsSignedVaa {
 		if len(sub.filters) == 0 {
+			if !verified {
+				verified = true
+				v, err = s.verifyVAA(v, vaaBytes)
+				if err != nil {
+					return err
+				}
+			}
 			sub.ch <- message{vaaBytes: vaaBytes}
 			continue
 		}
 
 		if v == nil {
-			var err error
 			v, err = vaa.Unmarshal(vaaBytes)
 			if err != nil {
 				return err
@@ -120,6 +136,13 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
 
 		for _, fi := range sub.filters {
 			if fi.chainId == v.EmitterChain && fi.emitterAddr == v.EmitterAddress {
+				if !verified {
+					verified = true
+					v, err = s.verifyVAA(v, vaaBytes)
+					if err != nil {
+						return err
+					}
+				}
 				sub.ch <- message{vaaBytes: vaaBytes}
 			}
 		}
@@ -129,6 +152,31 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
 	return nil
 }
 
+func (s *spyServer) verifyVAA(v *vaa.VAA, vaaBytes []byte) (*vaa.VAA, error) {
+	if s.vaaVerifier == nil {
+		panic("verifier is nil")
+	}
+
+	if v == nil {
+		var err error
+		v, err = vaa.Unmarshal(vaaBytes)
+		if err != nil {
+			return v, fmt.Errorf(`failed to unmarshal VAA: %w`, err)
+		}
+	}
+
+	valid, err := s.vaaVerifier.VerifySignatures(v)
+	if err != nil {
+		return v, fmt.Errorf(`failed to verify VAA: %w`, err)
+	}
+
+	if !valid {
+		return v, errors.New(`invalid VAA signature`)
+	}
+
+	return v, nil
+}
+
 func (s *spyServer) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp spyv1.SpyRPCService_SubscribeSignedVAAServer) error {
 	var fi []filterSignedVaa
 	if req.Filters != nil {
@@ -311,6 +359,17 @@ func runSpy(cmd *cobra.Command, args []string) {
 		logger.Fatal("failed to start RPC server", zap.Error(err))
 	}
 
+	// VAA verifier (optional)
+	if *ethRPC != "" {
+		if *ethContract == "" {
+			logger.Fatal(`If "--ethRPC" is specified, "--ethContract" must also be specified`)
+		}
+		s.vaaVerifier = NewVaaVerifier(logger, *ethRPC, *ethContract)
+		if err := s.vaaVerifier.GetInitialGuardianSet(); err != nil {
+			logger.Fatal(`Failed to read initial guardian set for VAA verification`, zap.Error(err))
+		}
+	}
+
 	// Ignore observations
 	go func() {
 		for {
@@ -344,7 +403,7 @@ func runSpy(cmd *cobra.Command, args []string) {
 				logger.Info("Received signed VAA",
 					zap.Any("vaa", v.Vaa))
 				if err := s.PublishSignedVAA(v.Vaa); err != nil {
-					logger.Error("failed to publish signed VAA", zap.Error(err))
+					logger.Error("failed to publish signed VAA", zap.Error(err), zap.Any("vaa", v.Vaa))
 				}
 			}
 		}

+ 131 - 0
node/cmd/spy/vaa_verifier.go

@@ -0,0 +1,131 @@
+package spy
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
+	"go.uber.org/zap"
+
+	ethAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
+	ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
+	ethCommon "github.com/ethereum/go-ethereum/common"
+	ethClient "github.com/ethereum/go-ethereum/ethclient"
+	ethRpc "github.com/ethereum/go-ethereum/rpc"
+)
+
+// VaaVerifier is an object that can be used to validate VAA signatures.
+// It reads the guardian set on chain whenever a new guardian set index is detected.
+type VaaVerifier struct {
+	logger       *zap.Logger
+	rpcUrl       string
+	coreAddr     ethCommon.Address
+	lock         sync.Mutex
+	guardianSets map[uint32]*common.GuardianSet
+}
+
+// RpcTimeout is the context timeout on RPC calls.
+const RpcTimeout = time.Second * 5
+
+// NewVaaVerifier creates a VaaVerifier.
+func NewVaaVerifier(logger *zap.Logger, rpcUrl string, coreAddr string) *VaaVerifier {
+	return &VaaVerifier{
+		logger:       logger,
+		rpcUrl:       rpcUrl,
+		coreAddr:     ethCommon.HexToAddress(coreAddr),
+		guardianSets: make(map[uint32]*common.GuardianSet),
+	}
+}
+
+// GetInitialGuardianSet gets the current guardian set and adds it to the map. It is not necessary
+// to call this function, but doing so will allow you to verify that the RPC endpoint works on start up,
+// rather than having it fail the first VAA is received.
+func (v *VaaVerifier) GetInitialGuardianSet() error {
+	timeout, cancel := context.WithTimeout(context.Background(), RpcTimeout)
+	defer cancel()
+
+	rawClient, err := ethRpc.DialContext(timeout, v.rpcUrl)
+	if err != nil {
+		return fmt.Errorf("failed to connect to ethereum: %w", err)
+	}
+
+	client := ethClient.NewClient(rawClient)
+	caller, err := ethAbi.NewAbiCaller(v.coreAddr, client)
+	if err != nil {
+		return fmt.Errorf("failed to create caller: %w", err)
+	}
+
+	gsIndex, err := caller.GetCurrentGuardianSetIndex(&ethBind.CallOpts{Context: timeout})
+	if err != nil {
+		return fmt.Errorf("error requesting current guardian set index: %w", err)
+	}
+
+	result, err := caller.GetGuardianSet(&ethBind.CallOpts{Context: timeout}, gsIndex)
+	if err != nil {
+		return fmt.Errorf("error requesting guardian set for index %d: %w", gsIndex, err)
+	}
+
+	gs := &common.GuardianSet{
+		Keys:  result.Keys,
+		Index: gsIndex,
+	}
+
+	v.logger.Warn("read current guardian set", zap.Uint32("index", gsIndex), zap.Any("gs", *gs))
+	v.guardianSets[gsIndex] = gs
+	return nil
+}
+
+// VerifySignatures verifies that the signature on a VAA is valid, based on the guardian set contained in the VAA.
+// If the guardian set is not currently in our map, it queries that guardian set and adds it.
+func (v *VaaVerifier) VerifySignatures(vv *vaa.VAA) (bool, error) {
+	v.lock.Lock()
+	defer v.lock.Unlock()
+
+	gs, exists := v.guardianSets[vv.GuardianSetIndex]
+	if !exists {
+		var err error
+		gs, err = v.fetchGuardianSet(vv.GuardianSetIndex)
+		if err != nil {
+			return false, fmt.Errorf("failed to fetch guardian set for index %d: %w", vv.GuardianSetIndex, err)
+		}
+
+		v.logger.Warn("read guardian set", zap.Uint32("index", gs.Index), zap.Any("gs", *gs))
+		v.guardianSets[gs.Index] = gs
+	}
+
+	if err := vv.Verify(gs.Keys); err != nil {
+		return false, nil
+	}
+
+	return true, nil
+}
+
+// fetchGuardianSet reads the guardian set for the index passed in.
+func (v *VaaVerifier) fetchGuardianSet(gsIndex uint32) (*common.GuardianSet, error) {
+	timeout, cancel := context.WithTimeout(context.Background(), RpcTimeout)
+	defer cancel()
+
+	rawClient, err := ethRpc.DialContext(timeout, v.rpcUrl)
+	if err != nil {
+		return nil, fmt.Errorf("failed to connect to ethereum: %w", err)
+	}
+
+	client := ethClient.NewClient(rawClient)
+	caller, err := ethAbi.NewAbiCaller(v.coreAddr, client)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create caller: %w", err)
+	}
+
+	gs, err := caller.GetGuardianSet(&ethBind.CallOpts{Context: timeout}, gsIndex)
+	if err != nil {
+		return nil, fmt.Errorf("error requesting current guardian set for index %d: %w", gsIndex, err)
+	}
+
+	return &common.GuardianSet{
+		Keys:  gs.Keys,
+		Index: gsIndex,
+	}, nil
+}