spy.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. package spy
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. "github.com/certusone/wormhole/node/pkg/common"
  7. "github.com/certusone/wormhole/node/pkg/p2p"
  8. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  9. "github.com/certusone/wormhole/node/pkg/proto/spy/v1"
  10. "github.com/certusone/wormhole/node/pkg/supervisor"
  11. "github.com/certusone/wormhole/node/pkg/vaa"
  12. "github.com/google/uuid"
  13. "github.com/gorilla/mux"
  14. ipfslog "github.com/ipfs/go-log/v2"
  15. "github.com/libp2p/go-libp2p-core/crypto"
  16. "github.com/prometheus/client_golang/prometheus/promhttp"
  17. "github.com/spf13/cobra"
  18. "go.uber.org/zap"
  19. "google.golang.org/grpc"
  20. "google.golang.org/grpc/codes"
  21. "google.golang.org/grpc/status"
  22. "net"
  23. "net/http"
  24. "os"
  25. "sync"
  26. )
  27. var (
  28. rootCtx context.Context
  29. rootCtxCancel context.CancelFunc
  30. )
  31. var (
  32. p2pNetworkID *string
  33. p2pPort *uint
  34. p2pBootstrap *string
  35. statusAddr *string
  36. nodeKeyPath *string
  37. logLevel *string
  38. spyRPC *string
  39. )
  40. func init() {
  41. p2pNetworkID = SpyCmd.Flags().String("network", "/wormhole/dev", "P2P network identifier")
  42. p2pPort = SpyCmd.Flags().Uint("port", 8999, "P2P UDP listener port")
  43. p2pBootstrap = SpyCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (comma-separated)")
  44. statusAddr = SpyCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
  45. nodeKeyPath = SpyCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
  46. logLevel = SpyCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
  47. spyRPC = SpyCmd.Flags().String("spyRPC", "", "Listen address for gRPC interface")
  48. }
  49. // SpyCmd represents the node command
  50. var SpyCmd = &cobra.Command{
  51. Use: "spy",
  52. Short: "Run gossip spy client",
  53. Run: runSpy,
  54. }
  55. type spyServer struct {
  56. spyv1.UnimplementedSpyRPCServiceServer
  57. logger *zap.Logger
  58. subs map[string]*subscription
  59. subsMu sync.Mutex
  60. }
  61. type message struct {
  62. vaaBytes []byte
  63. }
  64. type filter struct {
  65. chainId vaa.ChainID
  66. emitterAddr vaa.Address
  67. }
  68. type subscription struct {
  69. filters []filter
  70. ch chan message
  71. }
  72. func subscriptionId() string {
  73. return uuid.New().String()
  74. }
  75. func decodeEmitterAddr(hexAddr string) (vaa.Address, error) {
  76. address, err := hex.DecodeString(hexAddr)
  77. if err != nil {
  78. return vaa.Address{}, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode address: %v", err))
  79. }
  80. if len(address) != 32 {
  81. return vaa.Address{}, status.Error(codes.InvalidArgument, "address must be 32 bytes")
  82. }
  83. addr := vaa.Address{}
  84. copy(addr[:], address)
  85. return addr, nil
  86. }
  87. func (s *spyServer) Publish(vaaBytes []byte) error {
  88. s.subsMu.Lock()
  89. defer s.subsMu.Unlock()
  90. var v *vaa.VAA
  91. for _, sub := range s.subs {
  92. if len(sub.filters) == 0 {
  93. sub.ch <- message{vaaBytes: vaaBytes}
  94. } else {
  95. if v == nil {
  96. var err error
  97. v, err = vaa.Unmarshal(vaaBytes)
  98. if err != nil {
  99. return err
  100. }
  101. }
  102. for _, fi := range sub.filters {
  103. if fi.chainId == v.EmitterChain && fi.emitterAddr == v.EmitterAddress {
  104. sub.ch <- message{vaaBytes: vaaBytes}
  105. }
  106. }
  107. }
  108. }
  109. return nil
  110. }
  111. func (s *spyServer) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp spyv1.SpyRPCService_SubscribeSignedVAAServer) error {
  112. var fi []filter
  113. if req.Filters != nil {
  114. for _, f := range req.Filters {
  115. switch t := f.Filter.(type) {
  116. case *spyv1.FilterEntry_EmitterFilter:
  117. addr, err := decodeEmitterAddr(t.EmitterFilter.EmitterAddress)
  118. if err != nil {
  119. return status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode emitter address: %v", err))
  120. }
  121. fi = append(fi, filter{
  122. chainId: vaa.ChainID(t.EmitterFilter.ChainId),
  123. emitterAddr: addr,
  124. })
  125. default:
  126. return status.Error(codes.InvalidArgument, "unsupported filter type")
  127. }
  128. }
  129. }
  130. s.subsMu.Lock()
  131. id := subscriptionId()
  132. sub := &subscription{
  133. ch: make(chan message, 1),
  134. filters: fi,
  135. }
  136. s.subs[id] = sub
  137. s.subsMu.Unlock()
  138. defer func() {
  139. s.subsMu.Lock()
  140. defer s.subsMu.Unlock()
  141. delete(s.subs, id)
  142. }()
  143. for {
  144. select {
  145. case <-resp.Context().Done():
  146. return resp.Context().Err()
  147. case msg := <-sub.ch:
  148. if err := resp.Send(&spyv1.SubscribeSignedVAAResponse{
  149. VaaBytes: msg.vaaBytes,
  150. }); err != nil {
  151. return err
  152. }
  153. }
  154. }
  155. }
  156. func newSpyServer(logger *zap.Logger) *spyServer {
  157. return &spyServer{
  158. logger: logger.Named("spyserver"),
  159. subs: make(map[string]*subscription),
  160. }
  161. }
  162. func spyServerRunnable(s *spyServer, logger *zap.Logger, listenAddr string) (supervisor.Runnable, *grpc.Server, error) {
  163. l, err := net.Listen("tcp", listenAddr)
  164. if err != nil {
  165. return nil, nil, fmt.Errorf("failed to listen: %w", err)
  166. }
  167. logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String()))
  168. grpcServer := common.NewInstrumentedGRPCServer(logger)
  169. spyv1.RegisterSpyRPCServiceServer(grpcServer, s)
  170. return supervisor.GRPCServer(grpcServer, l, false), grpcServer, nil
  171. }
  172. func runSpy(cmd *cobra.Command, args []string) {
  173. common.SetRestrictiveUmask()
  174. lvl, err := ipfslog.LevelFromString(*logLevel)
  175. if err != nil {
  176. fmt.Println("Invalid log level")
  177. os.Exit(1)
  178. }
  179. logger := ipfslog.Logger("wormhole-spy").Desugar()
  180. ipfslog.SetAllLoggers(lvl)
  181. // Status server
  182. if *statusAddr != "" {
  183. router := mux.NewRouter()
  184. router.Handle("/metrics", promhttp.Handler())
  185. go func() {
  186. logger.Info("status server listening on [::]:6060")
  187. logger.Error("status server crashed", zap.Error(http.ListenAndServe(*statusAddr, router)))
  188. }()
  189. }
  190. // Verify flags
  191. if *nodeKeyPath == "" {
  192. logger.Fatal("Please specify --nodeKey")
  193. }
  194. if *p2pBootstrap == "" {
  195. logger.Fatal("Please specify --bootstrap")
  196. }
  197. // Node's main lifecycle context.
  198. rootCtx, rootCtxCancel = context.WithCancel(context.Background())
  199. defer rootCtxCancel()
  200. // Outbound gossip message queue
  201. sendC := make(chan []byte)
  202. // Inbound observations
  203. obsvC := make(chan *gossipv1.SignedObservation, 50)
  204. // Inbound signed VAAs
  205. signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
  206. // Guardian set state managed by processor
  207. gst := common.NewGuardianSetState()
  208. // RPC server
  209. s := newSpyServer(logger)
  210. rpcSvc, _, err := spyServerRunnable(s, logger, *spyRPC)
  211. if err != nil {
  212. logger.Fatal("failed to start RPC server", zap.Error(err))
  213. }
  214. // Ignore observations
  215. go func() {
  216. for {
  217. select {
  218. case <-rootCtx.Done():
  219. return
  220. case <-obsvC:
  221. }
  222. }
  223. }()
  224. // Log signed VAAs
  225. go func() {
  226. for {
  227. select {
  228. case <-rootCtx.Done():
  229. return
  230. case v := <-signedInC:
  231. logger.Info("Received signed VAA",
  232. zap.Any("vaa", v.Vaa))
  233. if err := s.Publish(v.Vaa); err != nil {
  234. logger.Error("failed to publish signed VAA", zap.Error(err))
  235. }
  236. }
  237. }
  238. }()
  239. // Load p2p private key
  240. var priv crypto.PrivKey
  241. priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath)
  242. if err != nil {
  243. logger.Fatal("Failed to load node key", zap.Error(err))
  244. }
  245. // Run supervisor.
  246. supervisor.New(rootCtx, logger, func(ctx context.Context) error {
  247. if err := supervisor.Run(ctx, "p2p", p2p.Run(obsvC, nil, nil, sendC, signedInC, priv, nil, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, "", false, rootCtxCancel)); err != nil {
  248. return err
  249. }
  250. if err := supervisor.Run(ctx, "spyrpc", rpcSvc); err != nil {
  251. return err
  252. }
  253. logger.Info("Started internal services")
  254. <-ctx.Done()
  255. return nil
  256. },
  257. // It's safer to crash and restart the process in case we encounter a panic,
  258. // rather than attempting to reschedule the runnable.
  259. supervisor.WithPropagatePanic)
  260. <-rootCtx.Done()
  261. logger.Info("root context cancelled, exiting...")
  262. // TODO: wait for things to shut down gracefully
  263. }