spy.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. package spy
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "os"
  9. "sync"
  10. "time"
  11. "github.com/certusone/wormhole/node/pkg/common"
  12. "github.com/certusone/wormhole/node/pkg/p2p"
  13. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  14. spyv1 "github.com/certusone/wormhole/node/pkg/proto/spy/v1"
  15. "github.com/certusone/wormhole/node/pkg/supervisor"
  16. "github.com/google/uuid"
  17. "github.com/gorilla/mux"
  18. ipfslog "github.com/ipfs/go-log/v2"
  19. "github.com/libp2p/go-libp2p/core/crypto"
  20. "github.com/prometheus/client_golang/prometheus/promhttp"
  21. "github.com/spf13/cobra"
  22. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  23. "go.uber.org/zap"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/codes"
  26. "google.golang.org/grpc/status"
  27. )
  28. var (
  29. rootCtx context.Context
  30. rootCtxCancel context.CancelFunc
  31. )
  32. var (
  33. envStr *string
  34. p2pNetworkID *string
  35. p2pPort *uint
  36. p2pBootstrap *string
  37. statusAddr *string
  38. nodeKeyPath *string
  39. logLevel *string
  40. spyRPC *string
  41. sendTimeout *time.Duration
  42. ethRPC *string
  43. ethContract *string
  44. )
  45. func init() {
  46. envStr = SpyCmd.Flags().String("env", "", `environment (may be "testnet" or "mainnet", required unless "--bootstrap" is specified)`)
  47. p2pNetworkID = SpyCmd.Flags().String("network", "", "P2P network identifier (optional for testnet or mainnet, overrides default, required for devnet)")
  48. p2pPort = SpyCmd.Flags().Uint("port", 8999, "P2P UDP listener port")
  49. p2pBootstrap = SpyCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)")
  50. statusAddr = SpyCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
  51. nodeKeyPath = SpyCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
  52. logLevel = SpyCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
  53. spyRPC = SpyCmd.Flags().String("spyRPC", "", "Listen address for gRPC interface")
  54. sendTimeout = SpyCmd.Flags().Duration("sendTimeout", 5*time.Second, "Timeout for sending a message to a subscriber")
  55. ethRPC = SpyCmd.Flags().String("ethRPC", "", "Ethereum RPC for verifying VAAs (optional)")
  56. ethContract = SpyCmd.Flags().String("ethContract", "", "Ethereum core bridge address for verifying VAAs (required if ethRPC is specified)")
  57. }
  58. // SpyCmd represents the node command
  59. var SpyCmd = &cobra.Command{
  60. Use: "spy",
  61. Short: "Run gossip spy client",
  62. Run: runSpy,
  63. }
  64. type spyServer struct {
  65. spyv1.UnimplementedSpyRPCServiceServer
  66. logger *zap.Logger
  67. subsSignedVaa map[string]*subscriptionSignedVaa
  68. subsSignedVaaMu sync.Mutex
  69. vaaVerifier *VaaVerifier
  70. }
  71. type message struct {
  72. vaaBytes []byte
  73. }
  74. type filterSignedVaa struct {
  75. chainId vaa.ChainID
  76. emitterAddr vaa.Address
  77. }
  78. type subscriptionSignedVaa struct {
  79. filters []filterSignedVaa
  80. ch chan message
  81. }
  82. func subscriptionId() string {
  83. return uuid.New().String()
  84. }
  85. func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
  86. s.subsSignedVaaMu.Lock()
  87. defer s.subsSignedVaaMu.Unlock()
  88. var v *vaa.VAA
  89. var err error
  90. verified := s.vaaVerifier == nil
  91. for _, sub := range s.subsSignedVaa {
  92. if len(sub.filters) == 0 {
  93. if !verified {
  94. verified = true
  95. v, err = s.verifyVAA(v, vaaBytes)
  96. if err != nil {
  97. return err
  98. }
  99. }
  100. sub.ch <- message{vaaBytes: vaaBytes}
  101. continue
  102. }
  103. if v == nil {
  104. v, err = vaa.Unmarshal(vaaBytes)
  105. if err != nil {
  106. return err
  107. }
  108. }
  109. for _, fi := range sub.filters {
  110. if fi.chainId == v.EmitterChain && fi.emitterAddr == v.EmitterAddress {
  111. if !verified {
  112. verified = true
  113. v, err = s.verifyVAA(v, vaaBytes)
  114. if err != nil {
  115. return err
  116. }
  117. }
  118. sub.ch <- message{vaaBytes: vaaBytes}
  119. }
  120. }
  121. }
  122. return nil
  123. }
  124. func (s *spyServer) verifyVAA(v *vaa.VAA, vaaBytes []byte) (*vaa.VAA, error) {
  125. if s.vaaVerifier == nil {
  126. panic("verifier is nil")
  127. }
  128. if v == nil {
  129. var err error
  130. v, err = vaa.Unmarshal(vaaBytes)
  131. if err != nil {
  132. return v, fmt.Errorf(`failed to unmarshal VAA: %w`, err)
  133. }
  134. }
  135. valid, err := s.vaaVerifier.VerifySignatures(v)
  136. if err != nil {
  137. return v, fmt.Errorf(`failed to verify VAA: %w`, err)
  138. }
  139. if !valid {
  140. return v, errors.New(`invalid VAA signature`)
  141. }
  142. return v, nil
  143. }
  144. func (s *spyServer) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp spyv1.SpyRPCService_SubscribeSignedVAAServer) error {
  145. var fi []filterSignedVaa
  146. if req.Filters != nil {
  147. for _, f := range req.Filters {
  148. switch t := f.Filter.(type) {
  149. case *spyv1.FilterEntry_EmitterFilter:
  150. addr, err := vaa.StringToAddress(t.EmitterFilter.EmitterAddress)
  151. if err != nil {
  152. return status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode emitter address: %v", err))
  153. }
  154. fi = append(fi, filterSignedVaa{
  155. chainId: vaa.ChainID(t.EmitterFilter.ChainId),
  156. emitterAddr: addr,
  157. })
  158. default:
  159. return status.Error(codes.InvalidArgument, "unsupported filter type")
  160. }
  161. }
  162. }
  163. s.subsSignedVaaMu.Lock()
  164. id := subscriptionId()
  165. sub := &subscriptionSignedVaa{
  166. ch: make(chan message, 1),
  167. filters: fi,
  168. }
  169. s.subsSignedVaa[id] = sub
  170. s.subsSignedVaaMu.Unlock()
  171. defer func() {
  172. for {
  173. // The channel sender locks the subscription mutex before sending to the channel.
  174. // If the channel is full, then the sender will block and we'll never be able to lock the mutex (resulting in deadlock).
  175. // So we empty the channel before trying acquire the lock.
  176. _ = DoWithTimeout(func() error { <-sub.ch; return nil }, time.Millisecond)
  177. if s.subsSignedVaaMu.TryLock() {
  178. delete(s.subsSignedVaa, id)
  179. s.subsSignedVaaMu.Unlock()
  180. return
  181. }
  182. }
  183. }()
  184. for {
  185. select {
  186. case <-resp.Context().Done():
  187. return resp.Context().Err()
  188. case msg := <-sub.ch:
  189. if err := DoWithTimeout(func() error {
  190. return resp.Send(&spyv1.SubscribeSignedVAAResponse{VaaBytes: msg.vaaBytes})
  191. }, *sendTimeout); err != nil {
  192. return err
  193. }
  194. }
  195. }
  196. }
  197. func newSpyServer(logger *zap.Logger) *spyServer {
  198. return &spyServer{
  199. logger: logger.Named("spyserver"),
  200. subsSignedVaa: make(map[string]*subscriptionSignedVaa),
  201. }
  202. }
  203. // DoWithTimeout runs f and returns its error. If the deadline d elapses first,
  204. // it returns a grpc DeadlineExceeded error instead.
  205. func DoWithTimeout(f func() error, d time.Duration) error {
  206. errChan := make(chan error, 1)
  207. go func() {
  208. errChan <- f()
  209. close(errChan)
  210. }()
  211. t := time.NewTimer(d)
  212. select {
  213. case <-t.C:
  214. return status.Errorf(codes.DeadlineExceeded, "too slow")
  215. case err := <-errChan:
  216. if !t.Stop() {
  217. <-t.C
  218. }
  219. return err
  220. }
  221. }
  222. func spyServerRunnable(s *spyServer, logger *zap.Logger, listenAddr string) (supervisor.Runnable, *grpc.Server, error) {
  223. l, err := net.Listen("tcp", listenAddr)
  224. if err != nil {
  225. return nil, nil, fmt.Errorf("failed to listen: %w", err)
  226. }
  227. logger.Info("spy server listening", zap.String("addr", l.Addr().String()))
  228. grpcServer := common.NewInstrumentedGRPCServer(logger, common.GrpcLogDetailFull)
  229. spyv1.RegisterSpyRPCServiceServer(grpcServer, s)
  230. return supervisor.GRPCServer(grpcServer, l, false), grpcServer, nil
  231. }
  232. func runSpy(cmd *cobra.Command, args []string) {
  233. common.SetRestrictiveUmask()
  234. lvl, err := ipfslog.LevelFromString(*logLevel)
  235. if err != nil {
  236. fmt.Println("Invalid log level")
  237. os.Exit(1)
  238. }
  239. logger := ipfslog.Logger("wormhole-spy").Desugar()
  240. ipfslog.SetAllLoggers(lvl)
  241. if *envStr != "" {
  242. // If they specify --env then use the defaults for the network parameters and don't allow them to override them.
  243. if *p2pNetworkID != "" || *p2pBootstrap != "" {
  244. logger.Fatal(`If "--env" is specified, "--network" and "--bootstrap" may not be specified`)
  245. }
  246. env, err := common.ParseEnvironment(*envStr)
  247. if err != nil || (env != common.MainNet && env != common.TestNet) {
  248. logger.Fatal(`Invalid value for "--env", should be "mainnet" or "testnet"`)
  249. }
  250. *p2pNetworkID = p2p.GetNetworkId(env)
  251. *p2pBootstrap, err = p2p.GetBootstrapPeers(env)
  252. if err != nil {
  253. logger.Fatal("failed to determine p2p bootstrap peers", zap.String("env", string(env)), zap.Error(err))
  254. }
  255. } else {
  256. // If they don't specify --env, then --network and --bootstrap are required.
  257. if *p2pNetworkID == "" {
  258. logger.Fatal(`If "--env" is not specified, "--network" must be specified`)
  259. }
  260. if *p2pBootstrap == "" {
  261. logger.Fatal(`If "--env" is not specified, "--bootstrap" must be specified`)
  262. }
  263. }
  264. // Status server
  265. if *statusAddr != "" {
  266. router := mux.NewRouter()
  267. router.Handle("/metrics", promhttp.Handler())
  268. go func() {
  269. logger.Info("status server listening on [::]:6060")
  270. logger.Error("status server crashed", zap.Error(http.ListenAndServe(*statusAddr, router))) // #nosec G114 local status server not vulnerable to DoS attack
  271. }()
  272. }
  273. // Verify flags
  274. if *nodeKeyPath == "" {
  275. logger.Fatal("Please specify --nodeKey")
  276. }
  277. if *p2pBootstrap == "" {
  278. logger.Fatal("Please specify --bootstrap")
  279. }
  280. // Node's main lifecycle context.
  281. rootCtx, rootCtxCancel = context.WithCancel(context.Background())
  282. defer rootCtxCancel()
  283. // Outbound gossip message queue
  284. sendC := make(chan []byte)
  285. // Inbound observations
  286. obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
  287. // Inbound observation requests
  288. obsvReqC := make(chan *gossipv1.ObservationRequest, 1024)
  289. // Inbound signed VAAs
  290. signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 1024)
  291. // Guardian set state managed by processor
  292. gst := common.NewGuardianSetState(nil)
  293. // RPC server
  294. s := newSpyServer(logger)
  295. rpcSvc, _, err := spyServerRunnable(s, logger, *spyRPC)
  296. if err != nil {
  297. logger.Fatal("failed to start RPC server", zap.Error(err))
  298. }
  299. // VAA verifier (optional)
  300. if *ethRPC != "" {
  301. if *ethContract == "" {
  302. logger.Fatal(`If "--ethRPC" is specified, "--ethContract" must also be specified`)
  303. }
  304. s.vaaVerifier = NewVaaVerifier(logger, *ethRPC, *ethContract)
  305. if err := s.vaaVerifier.GetInitialGuardianSet(); err != nil {
  306. logger.Fatal(`Failed to read initial guardian set for VAA verification`, zap.Error(err))
  307. }
  308. }
  309. // Ignore observations
  310. go func() {
  311. for {
  312. select {
  313. case <-rootCtx.Done():
  314. return
  315. case <-obsvC:
  316. }
  317. }
  318. }()
  319. // Ignore observation requests
  320. // Note: without this, the whole program hangs on observation requests
  321. go func() {
  322. for {
  323. select {
  324. case <-rootCtx.Done():
  325. return
  326. case <-obsvReqC:
  327. }
  328. }
  329. }()
  330. // Log signed VAAs
  331. go func() {
  332. for {
  333. select {
  334. case <-rootCtx.Done():
  335. return
  336. case v := <-signedInC:
  337. logger.Info("Received signed VAA",
  338. zap.Any("vaa", v.Vaa))
  339. if err := s.PublishSignedVAA(v.Vaa); err != nil {
  340. logger.Error("failed to publish signed VAA", zap.Error(err), zap.Any("vaa", v.Vaa))
  341. }
  342. }
  343. }
  344. }()
  345. // Load p2p private key
  346. var priv crypto.PrivKey
  347. priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath)
  348. if err != nil {
  349. logger.Fatal("Failed to load node key", zap.Error(err))
  350. }
  351. // Run supervisor.
  352. supervisor.New(rootCtx, logger, func(ctx context.Context) error {
  353. components := p2p.DefaultComponents()
  354. components.Port = *p2pPort
  355. if err := supervisor.Run(ctx,
  356. "p2p",
  357. p2p.Run(obsvC,
  358. obsvReqC,
  359. nil,
  360. sendC,
  361. signedInC,
  362. priv,
  363. nil,
  364. gst,
  365. *p2pNetworkID,
  366. *p2pBootstrap,
  367. "",
  368. false,
  369. rootCtxCancel,
  370. nil,
  371. nil,
  372. nil,
  373. nil,
  374. components,
  375. nil, // ibc feature string
  376. false, // gateway relayer enabled
  377. false, // ccqEnabled
  378. nil, // query requests
  379. nil, // query responses
  380. "", // query bootstrap peers
  381. 0, // query port
  382. "", // query allow list
  383. )); err != nil {
  384. return err
  385. }
  386. if err := supervisor.Run(ctx, "spyrpc", rpcSvc); err != nil {
  387. return err
  388. }
  389. logger.Info("Started internal services")
  390. <-ctx.Done()
  391. return nil
  392. },
  393. // It's safer to crash and restart the process in case we encounter a panic,
  394. // rather than attempting to reschedule the runnable.
  395. supervisor.WithPropagatePanic)
  396. <-rootCtx.Done()
  397. logger.Info("root context cancelled, exiting...")
  398. // TODO: wait for things to shut down gracefully
  399. }