query_server.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Note: To generate a signer key file do: guardiand keygen --block-type "CCQ SERVER SIGNING KEY" /path/to/key/file
  2. // You will need to add this key to ccqAllowedRequesters in the guardian configs.
  3. package ccq
  4. import (
  5. "context"
  6. "crypto/ecdsa"
  7. "fmt"
  8. "net/http"
  9. "os"
  10. "os/signal"
  11. "syscall"
  12. "github.com/certusone/wormhole/node/pkg/common"
  13. "github.com/certusone/wormhole/node/pkg/telemetry"
  14. promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write"
  15. "github.com/certusone/wormhole/node/pkg/version"
  16. ethCrypto "github.com/ethereum/go-ethereum/crypto"
  17. ipfslog "github.com/ipfs/go-log/v2"
  18. "github.com/libp2p/go-libp2p/core/crypto"
  19. "github.com/spf13/cobra"
  20. "go.uber.org/zap"
  21. )
  22. const CCQ_SERVER_SIGNING_KEY = "CCQ SERVER SIGNING KEY"
  23. var (
  24. envStr *string
  25. p2pNetworkID *string
  26. p2pPort *uint
  27. p2pBootstrap *string
  28. listenAddr *string
  29. nodeKeyPath *string
  30. signerKeyPath *string
  31. permFile *string
  32. ethRPC *string
  33. ethContract *string
  34. logLevel *string
  35. telemetryLokiURL *string
  36. telemetryNodeName *string
  37. statusAddr *string
  38. promRemoteURL *string
  39. )
  40. const DEV_NETWORK_ID = "/wormhole/dev"
  41. func init() {
  42. envStr = QueryServerCmd.Flags().String("env", "", "environment (dev, test, prod)")
  43. p2pNetworkID = QueryServerCmd.Flags().String("network", DEV_NETWORK_ID, "P2P network identifier")
  44. p2pPort = QueryServerCmd.Flags().Uint("port", 8995, "P2P UDP listener port")
  45. p2pBootstrap = QueryServerCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (comma-separated)")
  46. nodeKeyPath = QueryServerCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
  47. signerKeyPath = QueryServerCmd.Flags().String("signerKey", "", "Path to key used to sign unsigned queries")
  48. listenAddr = QueryServerCmd.Flags().String("listenAddr", "[::]:6069", "Listen address for query server (disabled if blank)")
  49. permFile = QueryServerCmd.Flags().String("permFile", "", "JSON file containing permissions configuration")
  50. ethRPC = QueryServerCmd.Flags().String("ethRPC", "", "Ethereum RPC for fetching current guardian set")
  51. ethContract = QueryServerCmd.Flags().String("ethContract", "", "Ethereum core bridge address for fetching current guardian set")
  52. logLevel = QueryServerCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
  53. telemetryLokiURL = QueryServerCmd.Flags().String("telemetryLokiURL", "", "Loki cloud logging URL")
  54. telemetryNodeName = QueryServerCmd.Flags().String("telemetryNodeName", "", "Node name used in telemetry")
  55. statusAddr = QueryServerCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
  56. promRemoteURL = QueryServerCmd.Flags().String("promRemoteURL", "", "Prometheus remote write URL (Grafana)")
  57. }
  58. var QueryServerCmd = &cobra.Command{
  59. Use: "query-server",
  60. Short: "Run the cross-chain query server",
  61. Run: runQueryServer,
  62. }
  63. func runQueryServer(cmd *cobra.Command, args []string) {
  64. common.SetRestrictiveUmask()
  65. networkID := *p2pNetworkID + "/ccq"
  66. // Setup logging
  67. lvl, err := ipfslog.LevelFromString(*logLevel)
  68. if err != nil {
  69. fmt.Println("Invalid log level")
  70. os.Exit(1)
  71. }
  72. logger := ipfslog.Logger("query-server").Desugar()
  73. ipfslog.SetAllLoggers(lvl)
  74. if *telemetryLokiURL != "" {
  75. logger.Info("Using Loki telemetry logger")
  76. if *telemetryNodeName == "" {
  77. logger.Fatal("if --telemetryLokiURL is specified --telemetryNodeName must be specified")
  78. }
  79. labels := map[string]string{
  80. "network": *p2pNetworkID,
  81. "node_name": *telemetryNodeName,
  82. "version": version.Version(),
  83. }
  84. tm, err := telemetry.NewLokiCloudLogger(context.Background(), logger, *telemetryLokiURL, "ccq_server", true, labels)
  85. if err != nil {
  86. logger.Fatal("Failed to initialize telemetry", zap.Error(err))
  87. }
  88. defer tm.Close()
  89. logger = tm.WrapLogger(logger) // Wrap logger with telemetry logger
  90. }
  91. env, err := common.ParseEnvironment(*envStr)
  92. if err != nil || (env != common.UnsafeDevNet && env != common.TestNet && env != common.MainNet) {
  93. if *envStr == "" {
  94. logger.Fatal("Please specify --env")
  95. }
  96. logger.Fatal("Invalid value for --env, must be dev, test or prod", zap.String("val", *envStr))
  97. }
  98. if *p2pNetworkID == DEV_NETWORK_ID && env != common.UnsafeDevNet {
  99. logger.Fatal("May not set --network to dev unless --env is also dev", zap.String("network", *p2pNetworkID), zap.String("env", *envStr))
  100. }
  101. // Verify flags
  102. if *nodeKeyPath == "" {
  103. logger.Fatal("Please specify --nodeKey")
  104. }
  105. if *p2pBootstrap == "" {
  106. logger.Fatal("Please specify --bootstrap")
  107. }
  108. if *permFile == "" {
  109. logger.Fatal("Please specify --permFile")
  110. }
  111. if *ethRPC == "" {
  112. logger.Fatal("Please specify --ethRPC")
  113. }
  114. if *ethContract == "" {
  115. logger.Fatal("Please specify --ethContract")
  116. }
  117. permissions, err := parseConfigFile(*permFile)
  118. if err != nil {
  119. logger.Fatal("Failed to load permissions file", zap.String("permFile", *permFile), zap.Error(err))
  120. }
  121. // Load p2p private key
  122. var priv crypto.PrivKey
  123. priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath)
  124. if err != nil {
  125. logger.Fatal("Failed to load node key", zap.Error(err))
  126. }
  127. var signerKey *ecdsa.PrivateKey
  128. if *signerKeyPath != "" {
  129. signerKey, err = common.LoadArmoredKey(*signerKeyPath, CCQ_SERVER_SIGNING_KEY, false)
  130. if err != nil {
  131. logger.Fatal("Failed to loader signer key", zap.Error(err))
  132. }
  133. logger.Info("will sign unsigned requests if api key supports it", zap.Stringer("signingKey", ethCrypto.PubkeyToAddress(signerKey.PublicKey)))
  134. }
  135. ctx, cancel := context.WithCancel(context.Background())
  136. defer cancel()
  137. // Run p2p
  138. pendingResponses := NewPendingResponses()
  139. p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger)
  140. if err != nil {
  141. logger.Fatal("Failed to start p2p", zap.Error(err))
  142. }
  143. // Start the HTTP server
  144. go func() {
  145. s := NewHTTPServer(*listenAddr, p2p.topic_req, permissions, signerKey, pendingResponses, logger, env)
  146. logger.Sugar().Infof("Server listening on %s", *listenAddr)
  147. err := s.ListenAndServe()
  148. if err != nil && err != http.ErrServerClosed {
  149. logger.Fatal("Server closed unexpectedly", zap.Error(err))
  150. }
  151. }()
  152. // Start the status server
  153. if *statusAddr != "" {
  154. go func() {
  155. ss := NewStatusServer(*statusAddr, logger, env)
  156. logger.Sugar().Infof("Status server listening on %s", *statusAddr)
  157. err := ss.ListenAndServe()
  158. if err != nil && err != http.ErrServerClosed {
  159. logger.Fatal("Status server closed unexpectedly", zap.Error(err))
  160. }
  161. }()
  162. }
  163. // Start the Prometheus scraper
  164. usingPromRemoteWrite := *promRemoteURL != ""
  165. if usingPromRemoteWrite {
  166. var info promremotew.PromTelemetryInfo
  167. info.PromRemoteURL = *promRemoteURL
  168. info.Labels = map[string]string{
  169. "node_name": *telemetryNodeName,
  170. "network": *p2pNetworkID,
  171. "version": version.Version(),
  172. "product": "ccq_server",
  173. }
  174. err := RunPrometheusScraper(ctx, logger, info)
  175. if err != nil {
  176. logger.Fatal("Failed to start prometheus scraper", zap.Error(err))
  177. }
  178. }
  179. // Handle SIGTERM
  180. sigterm := make(chan os.Signal, 1)
  181. signal.Notify(sigterm, syscall.SIGTERM)
  182. go func() {
  183. <-sigterm
  184. logger.Info("Received sigterm. exiting.")
  185. cancel()
  186. }()
  187. <-ctx.Done()
  188. logger.Info("Context cancelled, exiting...")
  189. // Cleanly shutdown
  190. // Without this the same host won't properly discover peers until some timeout
  191. p2p.sub.Cancel()
  192. if err := p2p.topic_req.Close(); err != nil {
  193. logger.Error("Error closing the request topic", zap.Error(err))
  194. }
  195. if err := p2p.topic_resp.Close(); err != nil {
  196. logger.Error("Error closing the response topic", zap.Error(err))
  197. }
  198. if err := p2p.host.Close(); err != nil {
  199. logger.Error("Error closing the host", zap.Error(err))
  200. }
  201. }