Browse Source

Node/P2P: Add protected peers list (#4292)

* Node/P2P: Add protected peers list

* Add support for spy and CCQ proxy

* Add tests
bruce-riley 8 months ago
parent
commit
d36a16cac3

+ 8 - 0
node/cmd/ccq/p2p.go

@@ -17,6 +17,7 @@ import (
 	pubsub "github.com/libp2p/go-libp2p-pubsub"
 	"github.com/libp2p/go-libp2p/core/crypto"
 	"github.com/libp2p/go-libp2p/core/host"
+	"github.com/libp2p/go-libp2p/core/peer"
 	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 	"go.uber.org/zap"
 	"google.golang.org/protobuf/proto"
@@ -52,6 +53,7 @@ func runP2P(
 	monitorPeers bool,
 	loggingMap *LoggingMap,
 	gossipAdvertiseAddress string,
+	protectedPeers []string,
 ) (*P2PSub, error) {
 	// p2p setup
 	components := p2p.DefaultComponents()
@@ -63,6 +65,12 @@ func runP2P(
 		return nil, err
 	}
 
+	if len(protectedPeers) != 0 {
+		for _, peerId := range protectedPeers {
+			components.ConnMgr.Protect(peer.ID(peerId), "configured")
+		}
+	}
+
 	topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
 	topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")
 

+ 3 - 1
node/cmd/ccq/query_server.go

@@ -32,6 +32,7 @@ var (
 	p2pNetworkID           *string
 	p2pPort                *uint
 	p2pBootstrap           *string
+	protectedPeers         []string
 	listenAddr             *string
 	nodeKeyPath            *string
 	signerKeyPath          *string
@@ -57,6 +58,7 @@ func init() {
 	p2pNetworkID = QueryServerCmd.Flags().String("network", "", "P2P network identifier (optional, overrides default for environment)")
 	p2pPort = QueryServerCmd.Flags().Uint("port", 8995, "P2P UDP listener port")
 	p2pBootstrap = QueryServerCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)")
+	QueryServerCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")
 	nodeKeyPath = QueryServerCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
 	signerKeyPath = QueryServerCmd.Flags().String("signerKey", "", "Path to key used to sign unsigned queries")
 	listenAddr = QueryServerCmd.Flags().String("listenAddr", "[::]:6069", "Listen address for query server (disabled if blank)")
@@ -204,7 +206,7 @@ func runQueryServer(cmd *cobra.Command, args []string) {
 
 	// Run p2p
 	pendingResponses := NewPendingResponses(logger)
-	p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap, *gossipAdvertiseAddress)
+	p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap, *gossipAdvertiseAddress, protectedPeers)
 	if err != nil {
 		logger.Fatal("Failed to start p2p", zap.Error(err))
 	}

+ 8 - 4
node/cmd/guardiand/node.go

@@ -50,9 +50,10 @@ import (
 )
 
 var (
-	p2pNetworkID *string
-	p2pPort      *uint
-	p2pBootstrap *string
+	p2pNetworkID   *string
+	p2pPort        *uint
+	p2pBootstrap   *string
+	protectedPeers []string
 
 	nodeKeyPath *string
 
@@ -262,6 +263,7 @@ var (
 	ccqAllowedRequesters *string
 	ccqP2pPort           *uint
 	ccqP2pBootstrap      *string
+	ccqProtectedPeers    []string
 	ccqAllowedPeers      *string
 	ccqBackfillCache     *bool
 
@@ -282,6 +284,7 @@ func init() {
 	p2pNetworkID = NodeCmd.Flags().String("network", "", "P2P network identifier (optional, overrides default for environment)")
 	p2pPort = NodeCmd.Flags().Uint("port", p2p.DefaultPort, "P2P UDP listener port")
 	p2pBootstrap = NodeCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for mainnet or testnet, overrides default, required for unsafeDevMode)")
+	NodeCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")
 
 	statusAddr = NodeCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
 
@@ -491,6 +494,7 @@ func init() {
 	ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries")
 	ccqP2pPort = NodeCmd.Flags().Uint("ccqP2pPort", 8996, "CCQ P2P UDP listener port")
 	ccqP2pBootstrap = NodeCmd.Flags().String("ccqP2pBootstrap", "", "CCQ P2P bootstrap peers (optional for mainnet or testnet, overrides default, required for unsafeDevMode)")
+	NodeCmd.Flags().StringSliceVarP(&ccqProtectedPeers, "ccqProtectedPeers", "", []string{}, "")
 	ccqAllowedPeers = NodeCmd.Flags().String("ccqAllowedPeers", "", "CCQ allowed P2P peers (comma-separated)")
 	ccqBackfillCache = NodeCmd.Flags().Bool("ccqBackfillCache", true, "Should EVM chains backfill CCQ timestamp cache on startup")
 	gossipAdvertiseAddress = NodeCmd.Flags().String("gossipAdvertiseAddress", "", "External IP to advertize on Guardian and CCQ p2p (use if behind a NAT or running in k8s)")
@@ -1777,7 +1781,7 @@ func runNode(cmd *cobra.Command, args []string) {
 		node.GuardianOptionGatewayRelayer(*gatewayRelayerContract, gatewayRelayerWormchainConn),
 		node.GuardianOptionQueryHandler(*ccqEnabled, *ccqAllowedRequesters),
 		node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
-		node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures),
+		node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures, protectedPeers, ccqProtectedPeers),
 		node.GuardianOptionStatusServer(*statusAddr),
 		node.GuardianOptionProcessor(*p2pNetworkID),
 	}

+ 6 - 3
node/cmd/spy/spy.go

@@ -36,9 +36,10 @@ var (
 var (
 	envStr *string
 
-	p2pNetworkID *string
-	p2pPort      *uint
-	p2pBootstrap *string
+	p2pNetworkID   *string
+	p2pPort        *uint
+	p2pBootstrap   *string
+	protectedPeers []string
 
 	statusAddr *string
 
@@ -59,6 +60,7 @@ func init() {
 	p2pNetworkID = SpyCmd.Flags().String("network", "", "P2P network identifier (optional for testnet or mainnet, overrides default, required for devnet)")
 	p2pPort = SpyCmd.Flags().Uint("port", 8999, "P2P UDP listener port")
 	p2pBootstrap = SpyCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)")
+	SpyCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")
 
 	statusAddr = SpyCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
 
@@ -396,6 +398,7 @@ func runSpy(cmd *cobra.Command, args []string) {
 			rootCtxCancel,
 			p2p.WithSignedVAAListener(signedInC),
 			p2p.WithComponents(components),
+			p2p.WithProtectedPeers(protectedPeers),
 		)
 		if err != nil {
 			return err

+ 1 - 1
node/pkg/node/node_test.go

@@ -190,7 +190,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
 			GuardianOptionNoAccountant(), // disable accountant
 			GuardianOptionGovernor(true, false, ""),
 			GuardianOptionGatewayRelayer("", nil), // disable gateway relayer
-			GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }),
+			GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }, []string{}, []string{}),
 			GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail),
 			GuardianOptionPublicrpcTcpService(cfg.publicRpc, publicRpcLogDetail),
 			GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""),

+ 6 - 1
node/pkg/node/options.go

@@ -52,6 +52,8 @@ func GuardianOptionP2P(
 	ccqAllowedPeers string,
 	gossipAdvertiseAddress string,
 	ibcFeaturesFunc func() string,
+	protectedPeers []string,
+	ccqProtectedPeers []string,
 ) *GuardianOption {
 	return &GuardianOption{
 		name:         "p2p",
@@ -101,7 +103,10 @@ func GuardianOptionP2P(
 					g.queryResponsePublicationC.readC,
 					ccqBootstrapPeers,
 					ccqPort,
-					ccqAllowedPeers),
+					ccqAllowedPeers,
+					protectedPeers,
+					ccqProtectedPeers,
+				),
 				p2p.WithProcessorFeaturesFunc(processor.GetFeatures),
 			)
 			if err != nil {

+ 7 - 0
node/pkg/p2p/ccq_p2p.go

@@ -76,6 +76,7 @@ func (ccq *ccqP2p) run(
 	port uint,
 	signedQueryReqC chan<- *gossipv1.SignedQueryRequest,
 	queryResponseReadC <-chan *query.QueryResponsePublication,
+	protectedPeers []string,
 	errC chan error,
 ) error {
 	networkID := p2pNetworkID + "/ccq"
@@ -95,6 +96,12 @@ func (ccq *ccqP2p) run(
 		return fmt.Errorf("failed to create p2p: %w", err)
 	}
 
+	if len(protectedPeers) != 0 {
+		for _, peerId := range protectedPeers {
+			components.ConnMgr.Protect(peer.ID(peerId), "configured")
+		}
+	}
+
 	// Build a map of bootstrap peers so we can always allow subscribe requests from them.
 	bootstrapPeersMap := map[string]struct{}{}
 	bootstrappers, _ := BootstrapAddrs(ccq.logger, bootstrapPeers, ccq.h.ID())

+ 8 - 1
node/pkg/p2p/p2p.go

@@ -335,6 +335,13 @@ func Run(params *RunParams) func(ctx context.Context) error {
 			}
 		}()
 
+		if len(params.protectedPeers) != 0 {
+			for _, peerId := range params.protectedPeers {
+				logger.Info("protecting peer", zap.String("peerId", peerId))
+				params.components.ConnMgr.Protect(peer.ID(peerId), "configured")
+			}
+		}
+
 		nodeIdBytes, err := h.ID().Marshal()
 		if err != nil {
 			panic(err)
@@ -462,7 +469,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
 		if params.ccqEnabled {
 			ccqErrC := make(chan error)
 			ccq := newCcqRunP2p(logger, params.ccqAllowedPeers, params.components)
-			if err := ccq.run(ctx, params.priv, params.guardianSigner, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, ccqErrC); err != nil {
+			if err := ccq.run(ctx, params.priv, params.guardianSigner, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, params.ccqProtectedPeers, ccqErrC); err != nil {
 				return fmt.Errorf("failed to start p2p for CCQ: %w", err)
 			}
 			defer ccq.close()

+ 22 - 0
node/pkg/p2p/run_params.go

@@ -60,6 +60,8 @@ type (
 		ccqBootstrapPeers      string
 		ccqPort                uint
 		ccqAllowedPeers        string
+		protectedPeers         []string
+		ccqProtectedPeers      []string
 	}
 
 	// RunOpt is used to specify optional parameters.
@@ -162,6 +164,22 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt {
 	}
 }
 
+// WithProtectedPeers is used to set the protected peers.
+func WithProtectedPeers(protectedPeers []string) RunOpt {
+	return func(p *RunParams) error {
+		p.protectedPeers = protectedPeers
+		return nil
+	}
+}
+
+// WithCcqProtectedPeers is used to set the protected peers for CCQ.
+func WithCcqProtectedPeers(ccqProtectedPeers []string) RunOpt {
+	return func(p *RunParams) error {
+		p.ccqProtectedPeers = ccqProtectedPeers
+		return nil
+	}
+}
+
 // WithGuardianOptions is used to set options that are only meaningful to the guardian.
 func WithGuardianOptions(
 	nodeName string,
@@ -185,6 +203,8 @@ func WithGuardianOptions(
 	ccqBootstrapPeers string,
 	ccqPort uint,
 	ccqAllowedPeers string,
+	protectedPeers []string,
+	ccqProtectedPeers []string,
 ) RunOpt {
 	return func(p *RunParams) error {
 		p.nodeName = nodeName
@@ -208,6 +228,8 @@ func WithGuardianOptions(
 		p.ccqBootstrapPeers = ccqBootstrapPeers
 		p.ccqPort = ccqPort
 		p.ccqAllowedPeers = ccqAllowedPeers
+		p.protectedPeers = protectedPeers
+		p.ccqProtectedPeers = ccqProtectedPeers
 		return nil
 	}
 }

+ 66 - 1
node/pkg/p2p/run_params_test.go

@@ -127,6 +127,57 @@ func TestRunParamsWithDisableHeartbeatVerify(t *testing.T) {
 	assert.True(t, params.disableHeartbeatVerify)
 }
 
+func TestRunParamsWithProtectedPeers(t *testing.T) {
+	priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
+	require.NoError(t, err)
+	gst := common.NewGuardianSetState(nil)
+	_, rootCtxCancel := context.WithCancel(context.Background())
+	defer rootCtxCancel()
+
+	protectedPeers := []string{"peer1", "peer2", "peer3"}
+	params, err := NewRunParams(
+		bootstrapPeers,
+		networkId,
+		priv,
+		gst,
+		rootCtxCancel,
+		WithProtectedPeers(protectedPeers),
+	)
+
+	require.NoError(t, err)
+	require.NotNil(t, params)
+
+	require.Equal(t, len(protectedPeers), len(params.protectedPeers))
+	assert.Equal(t, protectedPeers[0], params.protectedPeers[0])
+	assert.Equal(t, protectedPeers[1], params.protectedPeers[1])
+	assert.Equal(t, protectedPeers[2], params.protectedPeers[2])
+}
+
+func TestRunParamsWithCcqProtectedPeers(t *testing.T) {
+	priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
+	require.NoError(t, err)
+	gst := common.NewGuardianSetState(nil)
+	_, rootCtxCancel := context.WithCancel(context.Background())
+	defer rootCtxCancel()
+
+	ccqProtectedPeers := []string{"peerA", "peerB"}
+	params, err := NewRunParams(
+		bootstrapPeers,
+		networkId,
+		priv,
+		gst,
+		rootCtxCancel,
+		WithCcqProtectedPeers(ccqProtectedPeers),
+	)
+
+	require.NoError(t, err)
+	require.NotNil(t, params)
+
+	require.Equal(t, len(ccqProtectedPeers), len(params.ccqProtectedPeers))
+	assert.Equal(t, ccqProtectedPeers[0], params.ccqProtectedPeers[0])
+	assert.Equal(t, ccqProtectedPeers[1], params.ccqProtectedPeers[1])
+}
+
 func TestRunParamsWithGuardianOptions(t *testing.T) {
 	priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
 	require.NoError(t, err)
@@ -159,6 +210,8 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
 	ccqBootstrapPeers := "some bootstrap string"
 	ccqPort := uint(4242)
 	ccqAllowedPeers := "some allowed peers"
+	protectedPeers := []string{"peer1", "peer2", "peer3"}
+	ccqProtectedPeers := []string{"peerA", "peerB"}
 
 	params, err := NewRunParams(
 		bootstrapPeers,
@@ -187,7 +240,10 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
 			queryResponseReadC,
 			ccqBootstrapPeers,
 			ccqPort,
-			ccqAllowedPeers),
+			ccqAllowedPeers,
+			protectedPeers,
+			ccqProtectedPeers,
+		),
 	)
 
 	require.NoError(t, err)
@@ -210,4 +266,13 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
 	assert.Equal(t, ccqBootstrapPeers, params.ccqBootstrapPeers)
 	assert.Equal(t, ccqPort, params.ccqPort)
 	assert.Equal(t, ccqAllowedPeers, params.ccqAllowedPeers)
+
+	require.Equal(t, len(protectedPeers), len(params.protectedPeers))
+	assert.Equal(t, protectedPeers[0], params.protectedPeers[0])
+	assert.Equal(t, protectedPeers[1], params.protectedPeers[1])
+	assert.Equal(t, protectedPeers[2], params.protectedPeers[2])
+
+	require.Equal(t, len(ccqProtectedPeers), len(params.ccqProtectedPeers))
+	assert.Equal(t, ccqProtectedPeers[0], params.ccqProtectedPeers[0])
+	assert.Equal(t, ccqProtectedPeers[1], params.ccqProtectedPeers[1])
 }

+ 10 - 8
node/pkg/p2p/watermark_test.go

@@ -190,14 +190,16 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
 			g.gov,
 			g.disableHeartbeatVerify,
 			g.components,
-			nil,   //g.ibcFeaturesFunc,
-			false, // gateway relayer enabled
-			false, // ccqEnabled
-			nil,   // signed query request channel
-			nil,   // query response channel
-			"",    // query bootstrap peers
-			0,     // query port
-			"",    // query allowed peers),
+			nil,        //g.ibcFeaturesFunc,
+			false,      // gateway relayer enabled
+			false,      // ccqEnabled
+			nil,        // signed query request channel
+			nil,        // query response channel
+			"",         // query bootstrap peers
+			0,          // query port
+			"",         // query allowed peers),
+			[]string{}, // protected peers
+			[]string{}, // ccq protected peers
 		))
 	require.NoError(t, err)