|
|
@@ -0,0 +1,322 @@
|
|
|
+package spy
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "encoding/hex"
|
|
|
+ "fmt"
|
|
|
+ "github.com/certusone/wormhole/node/pkg/common"
|
|
|
+ "github.com/certusone/wormhole/node/pkg/p2p"
|
|
|
+ gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
|
|
|
+ "github.com/certusone/wormhole/node/pkg/proto/spy/v1"
|
|
|
+ "github.com/certusone/wormhole/node/pkg/supervisor"
|
|
|
+ "github.com/certusone/wormhole/node/pkg/vaa"
|
|
|
+ "github.com/google/uuid"
|
|
|
+ "github.com/gorilla/mux"
|
|
|
+ ipfslog "github.com/ipfs/go-log/v2"
|
|
|
+ "github.com/libp2p/go-libp2p-core/crypto"
|
|
|
+ "github.com/prometheus/client_golang/prometheus/promhttp"
|
|
|
+ "github.com/spf13/cobra"
|
|
|
+ "go.uber.org/zap"
|
|
|
+ "google.golang.org/grpc"
|
|
|
+ "google.golang.org/grpc/codes"
|
|
|
+ "google.golang.org/grpc/status"
|
|
|
+ "net"
|
|
|
+ "net/http"
|
|
|
+ "os"
|
|
|
+ "sync"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ rootCtx context.Context
|
|
|
+ rootCtxCancel context.CancelFunc
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ p2pNetworkID *string
|
|
|
+ p2pPort *uint
|
|
|
+ p2pBootstrap *string
|
|
|
+
|
|
|
+ statusAddr *string
|
|
|
+
|
|
|
+ nodeKeyPath *string
|
|
|
+
|
|
|
+ logLevel *string
|
|
|
+
|
|
|
+ spyRPC *string
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ p2pNetworkID = SpyCmd.Flags().String("network", "/wormhole/dev", "P2P network identifier")
|
|
|
+ p2pPort = SpyCmd.Flags().Uint("port", 8999, "P2P UDP listener port")
|
|
|
+ p2pBootstrap = SpyCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (comma-separated)")
|
|
|
+
|
|
|
+ statusAddr = SpyCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
|
|
|
+
|
|
|
+ nodeKeyPath = SpyCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
|
|
|
+
|
|
|
+ logLevel = SpyCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
|
|
|
+
|
|
|
+ spyRPC = SpyCmd.Flags().String("spyRPC", "", "Listen address for gRPC interface")
|
|
|
+}
|
|
|
+
|
|
|
+// SpyCmd represents the node command
|
|
|
+var SpyCmd = &cobra.Command{
|
|
|
+ Use: "spy",
|
|
|
+ Short: "Run gossip spy client",
|
|
|
+ Run: runSpy,
|
|
|
+}
|
|
|
+
|
|
|
+type spyServer struct {
|
|
|
+ spyv1.UnimplementedSpyRPCServiceServer
|
|
|
+ logger *zap.Logger
|
|
|
+ subs map[string]*subscription
|
|
|
+ subsMu sync.Mutex
|
|
|
+}
|
|
|
+
|
|
|
+type message struct {
|
|
|
+ vaaBytes []byte
|
|
|
+}
|
|
|
+
|
|
|
+type filter struct {
|
|
|
+ chainId vaa.ChainID
|
|
|
+ emitterAddr vaa.Address
|
|
|
+}
|
|
|
+
|
|
|
+type subscription struct {
|
|
|
+ filters []filter
|
|
|
+ ch chan message
|
|
|
+}
|
|
|
+
|
|
|
+func subscriptionId() string {
|
|
|
+ return uuid.New().String()
|
|
|
+}
|
|
|
+
|
|
|
+func decodeEmitterAddr(hexAddr string) (vaa.Address, error) {
|
|
|
+ address, err := hex.DecodeString(hexAddr)
|
|
|
+ if err != nil {
|
|
|
+ return vaa.Address{}, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode address: %v", err))
|
|
|
+ }
|
|
|
+ if len(address) != 32 {
|
|
|
+ return vaa.Address{}, status.Error(codes.InvalidArgument, "address must be 32 bytes")
|
|
|
+ }
|
|
|
+
|
|
|
+ addr := vaa.Address{}
|
|
|
+ copy(addr[:], address)
|
|
|
+
|
|
|
+ return addr, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (s *spyServer) Publish(vaaBytes []byte) error {
|
|
|
+ s.subsMu.Lock()
|
|
|
+ defer s.subsMu.Unlock()
|
|
|
+
|
|
|
+ var v *vaa.VAA
|
|
|
+
|
|
|
+ for _, sub := range s.subs {
|
|
|
+ if len(sub.filters) == 0 {
|
|
|
+ sub.ch <- message{vaaBytes: vaaBytes}
|
|
|
+ } else {
|
|
|
+ if v == nil {
|
|
|
+ var err error
|
|
|
+ v, err = vaa.Unmarshal(vaaBytes)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, fi := range sub.filters {
|
|
|
+ if fi.chainId == v.EmitterChain && fi.emitterAddr == v.EmitterAddress {
|
|
|
+ sub.ch <- message{vaaBytes: vaaBytes}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (s *spyServer) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp spyv1.SpyRPCService_SubscribeSignedVAAServer) error {
|
|
|
+ var fi []filter
|
|
|
+ if req.Filters != nil {
|
|
|
+ for _, f := range req.Filters {
|
|
|
+ switch t := f.Filter.(type) {
|
|
|
+ case *spyv1.FilterEntry_EmitterFilter:
|
|
|
+ addr, err := decodeEmitterAddr(t.EmitterFilter.EmitterAddress)
|
|
|
+ if err != nil {
|
|
|
+ return status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode emitter address: %v", err))
|
|
|
+ }
|
|
|
+ fi = append(fi, filter{
|
|
|
+ chainId: vaa.ChainID(t.EmitterFilter.ChainId),
|
|
|
+ emitterAddr: addr,
|
|
|
+ })
|
|
|
+ default:
|
|
|
+ return status.Error(codes.InvalidArgument, "unsupported filter type")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ s.subsMu.Lock()
|
|
|
+ id := subscriptionId()
|
|
|
+ sub := &subscription{
|
|
|
+ ch: make(chan message, 1),
|
|
|
+ filters: fi,
|
|
|
+ }
|
|
|
+ s.subs[id] = sub
|
|
|
+ s.subsMu.Unlock()
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ s.subsMu.Lock()
|
|
|
+ defer s.subsMu.Unlock()
|
|
|
+ delete(s.subs, id)
|
|
|
+ }()
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-resp.Context().Done():
|
|
|
+ return resp.Context().Err()
|
|
|
+ case msg := <-sub.ch:
|
|
|
+ if err := resp.Send(&spyv1.SubscribeSignedVAAResponse{
|
|
|
+ VaaBytes: msg.vaaBytes,
|
|
|
+ }); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func newSpyServer(logger *zap.Logger) *spyServer {
|
|
|
+ return &spyServer{
|
|
|
+ logger: logger.Named("spyserver"),
|
|
|
+ subs: make(map[string]*subscription),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func spyServerRunnable(s *spyServer, logger *zap.Logger, listenAddr string) (supervisor.Runnable, *grpc.Server, error) {
|
|
|
+ l, err := net.Listen("tcp", listenAddr)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, fmt.Errorf("failed to listen: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String()))
|
|
|
+
|
|
|
+ grpcServer := common.NewInstrumentedGRPCServer(logger)
|
|
|
+ spyv1.RegisterSpyRPCServiceServer(grpcServer, s)
|
|
|
+
|
|
|
+ return supervisor.GRPCServer(grpcServer, l, false), grpcServer, nil
|
|
|
+}
|
|
|
+
|
|
|
+func runSpy(cmd *cobra.Command, args []string) {
|
|
|
+ common.SetRestrictiveUmask()
|
|
|
+
|
|
|
+ lvl, err := ipfslog.LevelFromString(*logLevel)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("Invalid log level")
|
|
|
+ os.Exit(1)
|
|
|
+ }
|
|
|
+
|
|
|
+ logger := ipfslog.Logger("wormhole-spy").Desugar()
|
|
|
+
|
|
|
+ ipfslog.SetAllLoggers(lvl)
|
|
|
+
|
|
|
+ // Status server
|
|
|
+ if *statusAddr != "" {
|
|
|
+ router := mux.NewRouter()
|
|
|
+
|
|
|
+ router.Handle("/metrics", promhttp.Handler())
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ logger.Info("status server listening on [::]:6060")
|
|
|
+ logger.Error("status server crashed", zap.Error(http.ListenAndServe(*statusAddr, router)))
|
|
|
+ }()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Verify flags
|
|
|
+
|
|
|
+ if *nodeKeyPath == "" {
|
|
|
+ logger.Fatal("Please specify --nodeKey")
|
|
|
+ }
|
|
|
+ if *p2pBootstrap == "" {
|
|
|
+ logger.Fatal("Please specify --bootstrap")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Node's main lifecycle context.
|
|
|
+ rootCtx, rootCtxCancel = context.WithCancel(context.Background())
|
|
|
+ defer rootCtxCancel()
|
|
|
+
|
|
|
+ // Outbound gossip message queue
|
|
|
+ sendC := make(chan []byte)
|
|
|
+
|
|
|
+ // Inbound observations
|
|
|
+ obsvC := make(chan *gossipv1.SignedObservation, 50)
|
|
|
+
|
|
|
+ // Inbound signed VAAs
|
|
|
+ signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
|
|
|
+
|
|
|
+ // Guardian set state managed by processor
|
|
|
+ gst := common.NewGuardianSetState()
|
|
|
+
|
|
|
+ // RPC server
|
|
|
+ s := newSpyServer(logger)
|
|
|
+ rpcSvc, _, err := spyServerRunnable(s, logger, *spyRPC)
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatal("failed to start RPC server", zap.Error(err))
|
|
|
+ }
|
|
|
+
|
|
|
+ // Ignore observations
|
|
|
+ go func() {
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-rootCtx.Done():
|
|
|
+ return
|
|
|
+ case <-obsvC:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // Log signed VAAs
|
|
|
+ go func() {
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-rootCtx.Done():
|
|
|
+ return
|
|
|
+ case v := <-signedInC:
|
|
|
+ logger.Info("Received signed VAA",
|
|
|
+ zap.Any("vaa", v.Vaa))
|
|
|
+ if err := s.Publish(v.Vaa); err != nil {
|
|
|
+ logger.Error("failed to publish signed VAA", zap.Error(err))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // Load p2p private key
|
|
|
+ var priv crypto.PrivKey
|
|
|
+ priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath)
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatal("Failed to load node key", zap.Error(err))
|
|
|
+ }
|
|
|
+
|
|
|
+ // Run supervisor.
|
|
|
+ supervisor.New(rootCtx, logger, func(ctx context.Context) error {
|
|
|
+ if err := supervisor.Run(ctx, "p2p", p2p.Run(
|
|
|
+ obsvC, sendC, signedInC, priv, nil, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, "", false, rootCtxCancel)); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := supervisor.Run(ctx, "spyrpc", rpcSvc); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.Info("Started internal services")
|
|
|
+
|
|
|
+ <-ctx.Done()
|
|
|
+ return nil
|
|
|
+ },
|
|
|
+ // It's safer to crash and restart the process in case we encounter a panic,
|
|
|
+ // rather than attempting to reschedule the runnable.
|
|
|
+ supervisor.WithPropagatePanic)
|
|
|
+
|
|
|
+ <-rootCtx.Done()
|
|
|
+ logger.Info("root context cancelled, exiting...")
|
|
|
+ // TODO: wait for things to shut down gracefully
|
|
|
+}
|