Bläddra i källkod

Add publicrpc endpoint for external clients.

- Distribute raw heartbeats via new proto package publicrpc

- Manage channel subscription on client req/close.

- Expose publicprc endpoint in devnet Service.

Change-Id: Ic96d624733961aa56e00b03c3b5cff6af11523a4
jschuldt 4 år sedan
förälder
incheckning
16157d339d

+ 15 - 1
bridge/cmd/guardiand/bridge.go

@@ -26,6 +26,7 @@ import (
 	"github.com/certusone/wormhole/bridge/pkg/p2p"
 	"github.com/certusone/wormhole/bridge/pkg/processor"
 	gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
+	"github.com/certusone/wormhole/bridge/pkg/publicrpc"
 	"github.com/certusone/wormhole/bridge/pkg/readiness"
 	solana "github.com/certusone/wormhole/bridge/pkg/solana"
 	"github.com/certusone/wormhole/bridge/pkg/supervisor"
@@ -71,6 +72,8 @@ var (
 	unsafeDevMode   *bool
 	devNumGuardians *uint
 	nodeName        *string
+
+	publicRPC *string
 )
 
 func init() {
@@ -108,6 +111,8 @@ func init() {
 	unsafeDevMode = BridgeCmd.Flags().Bool("unsafeDevMode", false, "Launch node in unsafe, deterministic devnet mode")
 	devNumGuardians = BridgeCmd.Flags().Uint("devNumGuardians", 5, "Number of devnet guardians to include in guardian set")
 	nodeName = BridgeCmd.Flags().String("nodeName", "", "Node name to announce in gossip heartbeats")
+
+	publicRPC = BridgeCmd.Flags().String("publicRPC", "", "Listen address for public gRPC interface")
 }
 
 var (
@@ -380,10 +385,13 @@ func runBridge(cmd *cobra.Command, args []string) {
 		logger.Fatal("failed to create admin service socket", zap.Error(err))
 	}
 
+	// subscriber channel multiplexing for public gPRC streams
+	rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger)
+
 	// Run supervisor.
 	supervisor.New(rootCtx, logger, func(ctx context.Context) error {
 		if err := supervisor.Run(ctx, "p2p", p2p.Run(
-			obsvC, sendC, priv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil {
+			obsvC, sendC, rawHeartbeatListeners, priv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil {
 			return err
 		}
 
@@ -436,6 +444,12 @@ func runBridge(cmd *cobra.Command, args []string) {
 		if err := supervisor.Run(ctx, "admin", adminService); err != nil {
 			return err
 		}
+		if *publicRPC != "" {
+			if err := supervisor.Run(ctx, "publicrpc",
+				publicrpc.PublicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners)); err != nil {
+				return err
+			}
+		}
 
 		logger.Info("Started internal services")
 

+ 5 - 0
bridge/pkg/p2p/p2p.go

@@ -25,6 +25,7 @@ import (
 	"google.golang.org/protobuf/proto"
 
 	gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
+	"github.com/certusone/wormhole/bridge/pkg/publicrpc"
 	"github.com/certusone/wormhole/bridge/pkg/supervisor"
 )
 
@@ -54,6 +55,7 @@ func init() {
 
 func Run(obsvC chan *gossipv1.SignedObservation,
 	sendC chan []byte,
+	rawHeartbeatListeners *publicrpc.PublicRawHeartbeatConnections,
 	priv crypto.PrivKey,
 	port uint,
 	networkID string,
@@ -203,6 +205,8 @@ func Run(obsvC chan *gossipv1.SignedObservation,
 							GuardianAddr: DefaultRegistry.guardianAddress,
 						}}}
 
+					rawHeartbeatListeners.PublishHeartbeat(msg.GetHeartbeat())
+
 					b, err := proto.Marshal(&msg)
 					if err != nil {
 						panic(err)
@@ -268,6 +272,7 @@ func Run(obsvC chan *gossipv1.SignedObservation,
 				logger.Debug("heartbeat received",
 					zap.Any("value", m.Heartbeat),
 					zap.String("from", envelope.GetFrom().String()))
+				rawHeartbeatListeners.PublishHeartbeat(msg.GetHeartbeat())
 				p2pMessagesReceived.WithLabelValues("heartbeat").Inc()
 			case *gossipv1.GossipMessage_SignedObservation:
 				obsvC <- m.SignedObservation

+ 57 - 0
bridge/pkg/publicrpc/publicrpcserver.go

@@ -0,0 +1,57 @@
+package publicrpc
+
+import (
+	"fmt"
+	"net"
+
+	"go.uber.org/zap"
+	"google.golang.org/grpc"
+
+	publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
+	"github.com/certusone/wormhole/bridge/pkg/supervisor"
+)
+
+// gRPC server & method for handling streaming proto connection
+type publicrpcServer struct {
+	publicrpcv1.UnimplementedPublicrpcServer
+	rawHeartbeatListeners *PublicRawHeartbeatConnections
+	logger                *zap.Logger
+}
+
+func (s *publicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequest, stream publicrpcv1.Publicrpc_GetRawHeartbeatsServer) error {
+	s.logger.Info("gRPC heartbeat stream opened by client")
+
+	// create a channel and register it for heartbeats
+	receiveChan := make(chan *publicrpcv1.Heartbeat, 50)
+	// clientId is the reference to the subscription that we will use for unsubscribing when the client disconnects.
+	clientId := s.rawHeartbeatListeners.subscribeHeartbeats(receiveChan)
+
+	for {
+		select {
+		// Exit on stream context done
+		case <-stream.Context().Done():
+			s.logger.Info("raw heartbeat stream closed by client", zap.Int("clientId", clientId))
+			s.rawHeartbeatListeners.unsubscribeHeartbeats(clientId)
+			return stream.Context().Err()
+		case msg := <-receiveChan:
+			stream.Send(msg)
+		}
+	}
+}
+
+func PublicrpcServiceRunnable(logger *zap.Logger, listenAddr string, rawHeartbeatListeners *PublicRawHeartbeatConnections) supervisor.Runnable {
+	l, err := net.Listen("tcp", listenAddr)
+	if err != nil {
+		logger.Fatal("failed to listen for publicrpc service", zap.Error(err))
+	}
+	logger.Info(fmt.Sprintf("publicrpc server listening on %s", listenAddr))
+
+	rpcServer := &publicrpcServer{
+		rawHeartbeatListeners: rawHeartbeatListeners,
+		logger:                logger.Named("publicrpcserver"),
+	}
+
+	grpcServer := grpc.NewServer()
+	publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer)
+	return supervisor.GRPCServer(grpcServer, l, false)
+}

+ 87 - 0
bridge/pkg/publicrpc/rawheartbeats.go

@@ -0,0 +1,87 @@
+package publicrpc
+
+import (
+	"math/rand"
+	"sync"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"go.uber.org/zap"
+
+	publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
+)
+
+// track the number of active connections
+var (
+	currentPublicHeartbeatStreamsOpen = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "wormhole_publicrpc_rawheartbeat_connections",
+			Help: "Current number of clients consuming gRPC raw heartbeat streams",
+		})
+)
+
+func init() {
+	prometheus.MustRegister(currentPublicHeartbeatStreamsOpen)
+}
+
+// multiplexing to distribute heartbeat messages to all the open connections
+type PublicRawHeartbeatConnections struct {
+	mu     sync.RWMutex
+	subs   map[int]chan<- *publicrpcv1.Heartbeat
+	logger *zap.Logger
+}
+
+func HeartbeatStreamMultiplexer(logger *zap.Logger) *PublicRawHeartbeatConnections {
+	ps := &PublicRawHeartbeatConnections{
+		subs:   map[int]chan<- *publicrpcv1.Heartbeat{},
+		logger: logger.Named("heartbeatmultiplexer"),
+	}
+	return ps
+}
+
+// getUniqueClientId loops to generate & test integers for existence as key of map. returns an int that is not a key in map.
+func (ps *PublicRawHeartbeatConnections) getUniqueClientId() int {
+	clientId := rand.Intn(1e6)
+	found := false
+	for found {
+		clientId = rand.Intn(1e6)
+		_, found = ps.subs[clientId]
+	}
+	return clientId
+}
+
+// subscribeHeartbeats adds a channel to the subscriber map, keyed by arbitary clientId
+func (ps *PublicRawHeartbeatConnections) subscribeHeartbeats(ch chan *publicrpcv1.Heartbeat) int {
+	ps.mu.Lock()
+	defer ps.mu.Unlock()
+
+	clientId := ps.getUniqueClientId()
+	ps.logger.Info("subscribeHeartbeats for client", zap.Int("client", clientId))
+	ps.subs[clientId] = ch
+	currentPublicHeartbeatStreamsOpen.Set(float64(len(ps.subs)))
+	return clientId
+}
+
+// PublishHeartbeat sends a message to all channels in the subscription map
+func (ps *PublicRawHeartbeatConnections) PublishHeartbeat(msg *publicrpcv1.Heartbeat) {
+	ps.mu.RLock()
+	defer ps.mu.RUnlock()
+
+	for client, ch := range ps.subs {
+		select {
+		case ch <- msg:
+			ps.logger.Debug("published message to client", zap.Int("client", client))
+		default:
+			ps.logger.Debug("buffer overrrun when attempting to publish message", zap.Int("client", client))
+		}
+	}
+}
+
+// unsubscribeHeartbeats removes the client's channel from the subscription map
+func (ps *PublicRawHeartbeatConnections) unsubscribeHeartbeats(clientId int) {
+	ps.mu.Lock()
+	defer ps.mu.Unlock()
+
+	ps.logger.Debug("unsubscribeHeartbeats for client", zap.Int("clientId", clientId))
+	delete(ps.subs, clientId)
+	currentPublicHeartbeatStreamsOpen.Set(float64(len(ps.subs)))
+}

+ 8 - 0
devnet/bridge.yaml

@@ -10,6 +10,9 @@ spec:
     - port: 8999
       name: p2p
       protocol: UDP
+    - port: 7070
+      name: public-grpc
+      protocol: TCP
   clusterIP: None
   selector:
     app: guardian
@@ -84,6 +87,8 @@ spec:
             - --unsafeDevMode
             - --bridgeKey
             - /tmp/bridge.key
+            - --publicRPC
+            - "[::]:7070"
             - --adminSocket
             - /tmp/admin.sock
 #            - --logLevel=debug
@@ -103,6 +108,9 @@ spec:
             - containerPort: 6060
               name: pprof
               protocol: TCP
+            - containerPort: 7070
+              name: public-grpc
+              protocol: TCP
         - name: agent
           image: solana-agent
           volumeMounts:

+ 2 - 1
proto/gossip/v1/gossip.proto

@@ -2,7 +2,8 @@ syntax = "proto3";
 
 package gossip.v1;
 
-option go_package = "proto/gossip/v1;gossipv1";
+// full path of the resulting Go file is required in order to import in whisper.proto
+option go_package = "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1;gossipv1";
 
 message GossipMessage {
   oneof message {

+ 22 - 0
proto/publicrpc/v1/publicrpc.proto

@@ -0,0 +1,22 @@
+syntax = "proto3";
+
+// only relevant for protobuf namespace
+package publicrpc.v1;
+
+// only relevant for Go namespace
+option go_package = "proto/publicrpc/v1;publicrpcv1";
+
+// public import will include the required types in the Go output
+import public "gossip/v1/gossip.proto";
+
+// Publicrpc service exposes endpoints to be consumed externally; GUIs, historical record keeping, etc.
+service Publicrpc {
+  // GetRawHeartbeats rpc endpoint returns a stream of the p2p heartbeat messages received.
+  // The GetRawHeartbeats stream will include all messages received by the guardian,
+  // without any filtering or verification of message content.
+  rpc GetRawHeartbeats (GetRawHeartbeatsRequest) returns (stream gossip.v1.Heartbeat);
+}
+
+// GetRawHeartbeatsRequest is an empty request, sent as part of a request to start a stream.
+message GetRawHeartbeatsRequest {
+}