query_server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. // Note: To generate a signer key file do: guardiand keygen --block-type "CCQ SERVER SIGNING KEY" /path/to/key/file
  2. // You will need to add this key to ccqAllowedRequesters in the guardian configs.
  3. package ccq
  4. import (
  5. "context"
  6. "crypto/ecdsa"
  7. "fmt"
  8. "net/http"
  9. "os"
  10. "os/signal"
  11. "syscall"
  12. "time"
  13. "github.com/certusone/wormhole/node/pkg/common"
  14. "github.com/certusone/wormhole/node/pkg/p2p"
  15. "github.com/certusone/wormhole/node/pkg/telemetry"
  16. promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write"
  17. "github.com/certusone/wormhole/node/pkg/version"
  18. ethCrypto "github.com/ethereum/go-ethereum/crypto"
  19. ipfslog "github.com/ipfs/go-log/v2"
  20. "github.com/libp2p/go-libp2p/core/crypto"
  21. "github.com/spf13/cobra"
  22. "go.uber.org/zap"
  23. )
  24. const CCQ_SERVER_SIGNING_KEY = "CCQ SERVER SIGNING KEY"
  25. var (
  26. envStr *string
  27. p2pNetworkID *string
  28. p2pPort *uint
  29. p2pBootstrap *string
  30. protectedPeers []string
  31. listenAddr *string
  32. nodeKeyPath *string
  33. signerKeyPath *string
  34. permFile *string
  35. ethRPC *string
  36. ethContract *string
  37. logLevel *string
  38. telemetryLokiURL *string
  39. telemetryNodeName *string
  40. statusAddr *string
  41. promRemoteURL *string
  42. shutdownDelay1 *uint
  43. shutdownDelay2 *uint
  44. monitorPeers *bool
  45. gossipAdvertiseAddress *string
  46. verifyPermissions *bool
  47. )
  48. const DEV_NETWORK_ID = "/wormhole/dev"
  49. func init() {
  50. envStr = QueryServerCmd.Flags().String("env", "", "environment (devnet, testnet, mainnet)")
  51. p2pNetworkID = QueryServerCmd.Flags().String("network", "", "P2P network identifier (optional, overrides default for environment)")
  52. p2pPort = QueryServerCmd.Flags().Uint("port", 8995, "P2P UDP listener port")
  53. p2pBootstrap = QueryServerCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)")
  54. QueryServerCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")
  55. nodeKeyPath = QueryServerCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
  56. signerKeyPath = QueryServerCmd.Flags().String("signerKey", "", "Path to key used to sign unsigned queries")
  57. listenAddr = QueryServerCmd.Flags().String("listenAddr", "[::]:6069", "Listen address for query server (disabled if blank)")
  58. permFile = QueryServerCmd.Flags().String("permFile", "", "JSON file containing permissions configuration")
  59. ethRPC = QueryServerCmd.Flags().String("ethRPC", "", "Ethereum RPC for fetching current guardian set")
  60. ethContract = QueryServerCmd.Flags().String("ethContract", "", "Ethereum core bridge address for fetching current guardian set")
  61. logLevel = QueryServerCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
  62. telemetryLokiURL = QueryServerCmd.Flags().String("telemetryLokiURL", "", "Loki cloud logging URL")
  63. telemetryNodeName = QueryServerCmd.Flags().String("telemetryNodeName", "", "Node name used in telemetry")
  64. statusAddr = QueryServerCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
  65. promRemoteURL = QueryServerCmd.Flags().String("promRemoteURL", "", "Prometheus remote write URL (Grafana)")
  66. monitorPeers = QueryServerCmd.Flags().Bool("monitorPeers", false, "Should monitor bootstrap peers and attempt to reconnect")
  67. gossipAdvertiseAddress = QueryServerCmd.Flags().String("gossipAdvertiseAddress", "", "External IP to advertize on P2P (use if behind a NAT or running in k8s)")
  68. verifyPermissions = QueryServerCmd.Flags().Bool("verifyPermissions", false, `parse and verify the permissions file and then exit with 0 if success, 1 if failure`)
  69. // The default health check monitoring is every five seconds, with a five second timeout, and you have to miss two, for 20 seconds total.
  70. shutdownDelay1 = QueryServerCmd.Flags().Uint("shutdownDelay1", 25, "Seconds to delay after disabling health check on shutdown")
  71. // The guardians will wait up to 60 seconds before giving up on a request.
  72. shutdownDelay2 = QueryServerCmd.Flags().Uint("shutdownDelay2", 65, "Seconds to wait after delay1 for pending requests to complete")
  73. }
  74. var QueryServerCmd = &cobra.Command{
  75. Use: "query-server",
  76. Short: "Run the cross-chain query server",
  77. Run: runQueryServer,
  78. }
  79. func runQueryServer(cmd *cobra.Command, args []string) {
  80. env, err := common.ParseEnvironment(*envStr)
  81. if err != nil || (env != common.UnsafeDevNet && env != common.TestNet && env != common.MainNet) {
  82. if *envStr == "" {
  83. fmt.Println("Please specify --env")
  84. } else {
  85. fmt.Println("Invalid value for --env, should be devnet, testnet or mainnet", zap.String("val", *envStr))
  86. }
  87. os.Exit(1)
  88. }
  89. if *verifyPermissions {
  90. _, err := parseConfigFile(*permFile, env)
  91. if err != nil {
  92. fmt.Println(err)
  93. os.Exit(1)
  94. }
  95. os.Exit(0)
  96. }
  97. common.SetRestrictiveUmask()
  98. // Setup logging
  99. lvl, err := ipfslog.LevelFromString(*logLevel)
  100. if err != nil {
  101. fmt.Println("Invalid log level")
  102. os.Exit(1)
  103. }
  104. logger := ipfslog.Logger("query-server").Desugar()
  105. ipfslog.SetAllLoggers(lvl)
  106. if *p2pNetworkID == "" {
  107. *p2pNetworkID = p2p.GetNetworkId(env)
  108. } else if env != common.UnsafeDevNet {
  109. logger.Warn("overriding default p2p network ID", zap.String("p2pNetworkID", *p2pNetworkID))
  110. }
  111. if *p2pNetworkID == DEV_NETWORK_ID && env != common.UnsafeDevNet {
  112. logger.Fatal("May not set --network to dev unless --env is also dev", zap.String("network", *p2pNetworkID), zap.String("env", *envStr))
  113. }
  114. networkID := *p2pNetworkID + "/ccq"
  115. if *p2pBootstrap == "" {
  116. *p2pBootstrap, err = p2p.GetCcqBootstrapPeers(env)
  117. if err != nil {
  118. logger.Fatal("failed to determine the bootstrap peers from the environment", zap.String("env", string(env)), zap.Error(err))
  119. }
  120. } else if env != common.UnsafeDevNet {
  121. logger.Warn("overriding default p2p bootstrap peers", zap.String("p2pBootstrap", *p2pBootstrap))
  122. }
  123. if *telemetryLokiURL != "" {
  124. logger.Info("Using Loki telemetry logger")
  125. if *telemetryNodeName == "" {
  126. logger.Fatal("if --telemetryLokiURL is specified --telemetryNodeName must be specified")
  127. }
  128. labels := map[string]string{
  129. "network": *p2pNetworkID,
  130. "node_name": *telemetryNodeName,
  131. "version": version.Version(),
  132. }
  133. tm, err := telemetry.NewLokiCloudLogger(context.Background(), logger, *telemetryLokiURL, "ccq_server", true, labels)
  134. if err != nil {
  135. logger.Fatal("Failed to initialize telemetry", zap.Error(err))
  136. }
  137. defer tm.Close()
  138. logger = tm.WrapLogger(logger) // Wrap logger with telemetry logger
  139. }
  140. // Verify flags
  141. if *nodeKeyPath == "" {
  142. logger.Fatal("Please specify --nodeKey")
  143. }
  144. if *p2pBootstrap == "" {
  145. logger.Fatal("Please specify --bootstrap")
  146. }
  147. if *permFile == "" {
  148. logger.Fatal("Please specify --permFile")
  149. }
  150. if *ethRPC == "" {
  151. logger.Fatal("Please specify --ethRPC")
  152. }
  153. if *ethContract == "" {
  154. logger.Fatal("Please specify --ethContract")
  155. }
  156. permissions, err := NewPermissions(*permFile, env)
  157. if err != nil {
  158. logger.Fatal("Failed to load permissions file", zap.String("permFile", *permFile), zap.Error(err))
  159. }
  160. loggingMap := NewLoggingMap()
  161. // Load p2p private key
  162. var priv crypto.PrivKey
  163. priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath)
  164. if err != nil {
  165. logger.Fatal("Failed to load node key", zap.Error(err))
  166. }
  167. var signerKey *ecdsa.PrivateKey
  168. if *signerKeyPath != "" {
  169. signerKey, err = common.LoadArmoredKey(*signerKeyPath, CCQ_SERVER_SIGNING_KEY, false)
  170. if err != nil {
  171. logger.Fatal("Failed to loader signer key", zap.Error(err))
  172. }
  173. logger.Info("will sign unsigned requests if api key supports it", zap.Stringer("signingKey", ethCrypto.PubkeyToAddress(signerKey.PublicKey)))
  174. }
  175. ctx, cancel := context.WithCancel(context.Background())
  176. defer cancel()
  177. // Run p2p
  178. pendingResponses := NewPendingResponses(logger)
  179. p2pSub, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap, *gossipAdvertiseAddress, protectedPeers)
  180. if err != nil {
  181. logger.Fatal("Failed to start p2p", zap.Error(err))
  182. }
  183. // Start the HTTP server
  184. go func() {
  185. s := NewHTTPServer(*listenAddr, p2pSub.topic_req, permissions, signerKey, pendingResponses, logger, env, loggingMap)
  186. logger.Sugar().Infof("Server listening on %s", *listenAddr)
  187. err := s.ListenAndServe()
  188. if err != nil && err != http.ErrServerClosed {
  189. logger.Fatal("Server closed unexpectedly", zap.Error(err))
  190. }
  191. }()
  192. // Start the status server
  193. var statServer *statusServer
  194. if *statusAddr != "" {
  195. statServer = NewStatusServer(*statusAddr, logger, env)
  196. go func() {
  197. logger.Sugar().Infof("Status server listening on %s", *statusAddr)
  198. err := statServer.httpServer.ListenAndServe()
  199. if err != nil && err != http.ErrServerClosed {
  200. logger.Fatal("Status server closed unexpectedly", zap.Error(err))
  201. }
  202. }()
  203. }
  204. // Start the Prometheus scraper
  205. usingPromRemoteWrite := *promRemoteURL != ""
  206. if usingPromRemoteWrite {
  207. var info promremotew.PromTelemetryInfo
  208. info.PromRemoteURL = *promRemoteURL
  209. info.Labels = map[string]string{
  210. "node_name": *telemetryNodeName,
  211. "network": *p2pNetworkID,
  212. "version": version.Version(),
  213. "product": "ccq_server",
  214. }
  215. err := RunPrometheusScraper(ctx, logger, info)
  216. if err != nil {
  217. logger.Fatal("Failed to start prometheus scraper", zap.Error(err))
  218. }
  219. }
  220. // Handle SIGTERM
  221. sigterm := make(chan os.Signal, 1)
  222. signal.Notify(sigterm, syscall.SIGTERM)
  223. go func() {
  224. <-sigterm
  225. if statServer != nil && *shutdownDelay1 != 0 {
  226. logger.Info("Received sigterm. disabling health checks and pausing.")
  227. statServer.disableHealth()
  228. time.Sleep(time.Duration(*shutdownDelay1) * time.Second) // #nosec G115 -- Defaults to 25 seconds, overflowing is infeasible
  229. numPending := 0
  230. logger.Info("Waiting for any outstanding requests to complete before shutting down.")
  231. // #nosec G115 -- Defaults to 65 seconds, overflowing is infeasible
  232. for count := 0; count < int(*shutdownDelay2); count++ {
  233. time.Sleep(time.Second)
  234. numPending = pendingResponses.NumPending()
  235. if numPending == 0 {
  236. break
  237. }
  238. }
  239. if numPending == 0 {
  240. logger.Info("Done waiting. shutting down.")
  241. } else {
  242. logger.Error("Gave up waiting for pending requests to finish. shutting down anyway.", zap.Int("numStillPending", numPending))
  243. }
  244. } else {
  245. logger.Info("Received sigterm. exiting.")
  246. }
  247. cancel()
  248. }()
  249. // Start watching for permissions file updates.
  250. errC := make(chan error)
  251. permissions.StartWatcher(ctx, logger, errC)
  252. // Star logging cleanup process.
  253. loggingMap.Start(ctx, logger, errC)
  254. // Wait for either a shutdown or a fatal error from the permissions watcher.
  255. select {
  256. case <-ctx.Done():
  257. logger.Info("Context cancelled, exiting...")
  258. break
  259. case err := <-errC:
  260. logger.Error("Encountered an error, exiting", zap.Error(err))
  261. break
  262. }
  263. // Stop the permissions file watcher.
  264. permissions.StopWatcher()
  265. // Shutdown p2p. Without this the same host won't properly discover peers until some timeout
  266. p2pSub.sub.Cancel()
  267. if err := p2pSub.topic_req.Close(); err != nil {
  268. logger.Error("Error closing the request topic", zap.Error(err))
  269. }
  270. if err := p2pSub.topic_resp.Close(); err != nil {
  271. logger.Error("Error closing the response topic", zap.Error(err))
  272. }
  273. if err := p2pSub.host.Close(); err != nil {
  274. logger.Error("Error closing the host", zap.Error(err))
  275. }
  276. }