node.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. package guardiand
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "github.com/certusone/wormhole/node/pkg/db"
  7. "github.com/certusone/wormhole/node/pkg/notify/discord"
  8. "github.com/certusone/wormhole/node/pkg/telemetry"
  9. "github.com/certusone/wormhole/node/pkg/version"
  10. "github.com/gagliardetto/solana-go/rpc"
  11. "go.uber.org/zap/zapcore"
  12. "log"
  13. "net/http"
  14. _ "net/http/pprof"
  15. "os"
  16. "path"
  17. "strings"
  18. solana_types "github.com/gagliardetto/solana-go"
  19. "github.com/gorilla/mux"
  20. "github.com/prometheus/client_golang/prometheus/promhttp"
  21. "github.com/certusone/wormhole/node/pkg/common"
  22. "github.com/certusone/wormhole/node/pkg/devnet"
  23. "github.com/certusone/wormhole/node/pkg/ethereum"
  24. "github.com/certusone/wormhole/node/pkg/p2p"
  25. "github.com/certusone/wormhole/node/pkg/processor"
  26. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  27. "github.com/certusone/wormhole/node/pkg/readiness"
  28. "github.com/certusone/wormhole/node/pkg/reporter"
  29. solana "github.com/certusone/wormhole/node/pkg/solana"
  30. "github.com/certusone/wormhole/node/pkg/supervisor"
  31. "github.com/certusone/wormhole/node/pkg/vaa"
  32. eth_common "github.com/ethereum/go-ethereum/common"
  33. ethcrypto "github.com/ethereum/go-ethereum/crypto"
  34. "github.com/libp2p/go-libp2p-core/crypto"
  35. "github.com/libp2p/go-libp2p-core/peer"
  36. "github.com/spf13/cobra"
  37. "go.uber.org/zap"
  38. "github.com/certusone/wormhole/node/pkg/terra"
  39. "github.com/certusone/wormhole/node/pkg/algorand"
  40. ipfslog "github.com/ipfs/go-log/v2"
  41. )
  42. var (
  43. p2pNetworkID *string
  44. p2pPort *uint
  45. p2pBootstrap *string
  46. nodeKeyPath *string
  47. adminSocketPath *string
  48. dataDir *string
  49. statusAddr *string
  50. guardianKeyPath *string
  51. solanaContract *string
  52. ethRPC *string
  53. ethContract *string
  54. bscRPC *string
  55. bscContract *string
  56. polygonRPC *string
  57. polygonContract *string
  58. ethRopstenRPC *string
  59. ethRopstenContract *string
  60. avalancheRPC *string
  61. avalancheContract *string
  62. oasisRPC *string
  63. oasisContract *string
  64. terraWS *string
  65. terraLCD *string
  66. terraContract *string
  67. algorandRPC *string
  68. algorandToken *string
  69. algorandContract *string
  70. solanaWsRPC *string
  71. solanaRPC *string
  72. logLevel *string
  73. unsafeDevMode *bool
  74. testnetMode *bool
  75. devNumGuardians *uint
  76. nodeName *string
  77. publicRPC *string
  78. publicWeb *string
  79. tlsHostname *string
  80. tlsProdEnv *bool
  81. disableHeartbeatVerify *bool
  82. disableTelemetry *bool
  83. telemetryKey *string
  84. discordToken *string
  85. discordChannel *string
  86. bigTablePersistenceEnabled *bool
  87. bigTableGCPProject *string
  88. bigTableInstanceName *string
  89. bigTableTableName *string
  90. bigTableTopicName *string
  91. bigTableKeyPath *string
  92. )
  93. func init() {
  94. p2pNetworkID = NodeCmd.Flags().String("network", "/wormhole/dev", "P2P network identifier")
  95. p2pPort = NodeCmd.Flags().Uint("port", 8999, "P2P UDP listener port")
  96. p2pBootstrap = NodeCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (comma-separated)")
  97. statusAddr = NodeCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
  98. nodeKeyPath = NodeCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
  99. adminSocketPath = NodeCmd.Flags().String("adminSocket", "", "Admin gRPC service UNIX domain socket path")
  100. dataDir = NodeCmd.Flags().String("dataDir", "", "Data directory")
  101. guardianKeyPath = NodeCmd.Flags().String("guardianKey", "", "Path to guardian key (required)")
  102. solanaContract = NodeCmd.Flags().String("solanaContract", "", "Address of the Solana program (required)")
  103. ethRPC = NodeCmd.Flags().String("ethRPC", "", "Ethereum RPC URL")
  104. ethContract = NodeCmd.Flags().String("ethContract", "", "Ethereum contract address")
  105. bscRPC = NodeCmd.Flags().String("bscRPC", "", "Binance Smart Chain RPC URL")
  106. bscContract = NodeCmd.Flags().String("bscContract", "", "Binance Smart Chain contract address")
  107. polygonRPC = NodeCmd.Flags().String("polygonRPC", "", "Polygon RPC URL")
  108. polygonContract = NodeCmd.Flags().String("polygonContract", "", "Polygon contract address")
  109. ethRopstenRPC = NodeCmd.Flags().String("ethRopstenRPC", "", "Ethereum Ropsten RPC URL")
  110. ethRopstenContract = NodeCmd.Flags().String("ethRopstenContract", "", "Ethereum Ropsten contract address")
  111. avalancheRPC = NodeCmd.Flags().String("avalancheRPC", "", "Avalanche RPC URL")
  112. avalancheContract = NodeCmd.Flags().String("avalancheContract", "", "Avalanche contract address")
  113. oasisRPC = NodeCmd.Flags().String("oasisRPC", "", "Oasis RPC URL")
  114. oasisContract = NodeCmd.Flags().String("oasisContract", "", "Oasis contract address")
  115. terraWS = NodeCmd.Flags().String("terraWS", "", "Path to terrad root for websocket connection")
  116. terraLCD = NodeCmd.Flags().String("terraLCD", "", "Path to LCD service root for http calls")
  117. terraContract = NodeCmd.Flags().String("terraContract", "", "Wormhole contract address on Terra blockchain")
  118. algorandRPC = NodeCmd.Flags().String("algorandRPC", "", "Algorand RPC URL")
  119. algorandToken = NodeCmd.Flags().String("algorandToken", "", "Algorand access token")
  120. algorandContract = NodeCmd.Flags().String("algorandContract", "", "Algorand contract")
  121. solanaWsRPC = NodeCmd.Flags().String("solanaWS", "", "Solana Websocket URL (required")
  122. solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required")
  123. logLevel = NodeCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
  124. unsafeDevMode = NodeCmd.Flags().Bool("unsafeDevMode", false, "Launch node in unsafe, deterministic devnet mode")
  125. testnetMode = NodeCmd.Flags().Bool("testnetMode", false, "Launch node in testnet mode (enables testnet-only features like Ropsten)")
  126. devNumGuardians = NodeCmd.Flags().Uint("devNumGuardians", 5, "Number of devnet guardians to include in guardian set")
  127. nodeName = NodeCmd.Flags().String("nodeName", "", "Node name to announce in gossip heartbeats")
  128. publicRPC = NodeCmd.Flags().String("publicRPC", "", "Listen address for public gRPC interface")
  129. publicWeb = NodeCmd.Flags().String("publicWeb", "", "Listen address for public REST and gRPC Web interface")
  130. tlsHostname = NodeCmd.Flags().String("tlsHostname", "", "If set, serve publicWeb as TLS with this hostname using Let's Encrypt")
  131. tlsProdEnv = NodeCmd.Flags().Bool("tlsProdEnv", false,
  132. "Use the production Let's Encrypt environment instead of staging")
  133. disableHeartbeatVerify = NodeCmd.Flags().Bool("disableHeartbeatVerify", false,
  134. "Disable heartbeat signature verification (useful during network startup)")
  135. disableTelemetry = NodeCmd.Flags().Bool("disableTelemetry", false,
  136. "Disable telemetry")
  137. telemetryKey = NodeCmd.Flags().String("telemetryKey", "",
  138. "Telemetry write key")
  139. discordToken = NodeCmd.Flags().String("discordToken", "", "Discord bot token (optional)")
  140. discordChannel = NodeCmd.Flags().String("discordChannel", "", "Discord channel name (optional)")
  141. bigTablePersistenceEnabled = NodeCmd.Flags().Bool("bigTablePersistenceEnabled", false, "Turn on forwarding events to BigTable")
  142. bigTableGCPProject = NodeCmd.Flags().String("bigTableGCPProject", "", "Google Cloud project ID for storing events")
  143. bigTableInstanceName = NodeCmd.Flags().String("bigTableInstanceName", "", "BigTable instance name for storing events")
  144. bigTableTableName = NodeCmd.Flags().String("bigTableTableName", "", "BigTable table name to store events in")
  145. bigTableTopicName = NodeCmd.Flags().String("bigTableTopicName", "", "GCP topic name to publish to")
  146. bigTableKeyPath = NodeCmd.Flags().String("bigTableKeyPath", "", "Path to json Service Account key")
  147. }
  148. var (
  149. rootCtx context.Context
  150. rootCtxCancel context.CancelFunc
  151. )
  152. // "Why would anyone do this?" are famous last words.
  153. //
  154. // We already forcibly override RPC URLs and keys in dev mode to prevent security
  155. // risks from operator error, but an extra warning won't hurt.
  156. const devwarning = `
  157. +++++++++++++++++++++++++++++++++++++++++++++++++++
  158. | NODE IS RUNNING IN INSECURE DEVELOPMENT MODE |
  159. | |
  160. | Do not use -unsafeDevMode in prod. |
  161. +++++++++++++++++++++++++++++++++++++++++++++++++++
  162. `
  163. // NodeCmd represents the node command
  164. var NodeCmd = &cobra.Command{
  165. Use: "node",
  166. Short: "Run the guardiand node",
  167. Run: runNode,
  168. }
  169. func runNode(cmd *cobra.Command, args []string) {
  170. if *unsafeDevMode {
  171. fmt.Print(devwarning)
  172. }
  173. common.LockMemory()
  174. common.SetRestrictiveUmask()
  175. // Refuse to run as root in production mode.
  176. if !*unsafeDevMode && os.Geteuid() == 0 {
  177. fmt.Println("can't run as uid 0")
  178. os.Exit(1)
  179. }
  180. // Set up logging. The go-log zap wrapper that libp2p uses is compatible with our
  181. // usage of zap in supervisor, which is nice.
  182. lvl, err := ipfslog.LevelFromString(*logLevel)
  183. if err != nil {
  184. fmt.Println("Invalid log level")
  185. os.Exit(1)
  186. }
  187. logger := zap.New(zapcore.NewCore(
  188. consoleEncoder{zapcore.NewConsoleEncoder(
  189. zap.NewDevelopmentEncoderConfig())},
  190. zapcore.AddSync(zapcore.Lock(os.Stderr)),
  191. zap.NewAtomicLevelAt(zapcore.Level(lvl))))
  192. if *unsafeDevMode {
  193. // Use the hostname as nodeName. For production, we don't want to do this to
  194. // prevent accidentally leaking sensitive hostnames.
  195. hostname, err := os.Hostname()
  196. if err != nil {
  197. panic(err)
  198. }
  199. *nodeName = hostname
  200. // Put node name into the log for development.
  201. logger = logger.Named(*nodeName)
  202. }
  203. // Override the default go-log config, which uses a magic environment variable.
  204. ipfslog.SetAllLoggers(lvl)
  205. // Register components for readiness checks.
  206. readiness.RegisterComponent(common.ReadinessEthSyncing)
  207. readiness.RegisterComponent(common.ReadinessSolanaSyncing)
  208. readiness.RegisterComponent(common.ReadinessTerraSyncing)
  209. if *unsafeDevMode {
  210. readiness.RegisterComponent(common.ReadinessAlgorandSyncing)
  211. }
  212. readiness.RegisterComponent(common.ReadinessBSCSyncing)
  213. readiness.RegisterComponent(common.ReadinessPolygonSyncing)
  214. readiness.RegisterComponent(common.ReadinessAvalancheSyncing)
  215. readiness.RegisterComponent(common.ReadinessOasisSyncing)
  216. if *testnetMode {
  217. readiness.RegisterComponent(common.ReadinessEthRopstenSyncing)
  218. }
  219. if *statusAddr != "" {
  220. // Use a custom routing instead of using http.DefaultServeMux directly to avoid accidentally exposing packages
  221. // that register themselves with it by default (like pprof).
  222. router := mux.NewRouter()
  223. // pprof server. NOT necessarily safe to expose publicly - only enable it in dev mode to avoid exposing it by
  224. // accident. There's benefit to having pprof enabled on production nodes, but we would likely want to expose it
  225. // via a dedicated port listening on localhost, or via the admin UNIX socket.
  226. if *unsafeDevMode {
  227. // Pass requests to http.DefaultServeMux, which pprof automatically registers with as an import side-effect.
  228. router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  229. }
  230. // Simple endpoint exposing node readiness (safe to expose to untrusted clients)
  231. router.HandleFunc("/readyz", readiness.Handler)
  232. // Prometheus metrics (safe to expose to untrusted clients)
  233. router.Handle("/metrics", promhttp.Handler())
  234. go func() {
  235. logger.Info("status server listening on [::]:6060")
  236. logger.Error("status server crashed", zap.Error(http.ListenAndServe(*statusAddr, router)))
  237. }()
  238. }
  239. // In devnet mode, we automatically set a number of flags that rely on deterministic keys.
  240. if *unsafeDevMode {
  241. g0key, err := peer.IDFromPrivateKey(devnet.DeterministicP2PPrivKeyByIndex(0))
  242. if err != nil {
  243. panic(err)
  244. }
  245. // Use the first guardian node as bootstrap
  246. *p2pBootstrap = fmt.Sprintf("/dns4/guardian-0.guardian/udp/%d/quic/p2p/%s", *p2pPort, g0key.String())
  247. // Deterministic ganache ETH devnet address.
  248. *ethContract = devnet.GanacheWormholeContractAddress.Hex()
  249. *bscContract = devnet.GanacheWormholeContractAddress.Hex()
  250. *polygonContract = devnet.GanacheWormholeContractAddress.Hex()
  251. *avalancheContract = devnet.GanacheWormholeContractAddress.Hex()
  252. *oasisContract = devnet.GanacheWormholeContractAddress.Hex()
  253. }
  254. // Verify flags
  255. if *nodeKeyPath == "" && !*unsafeDevMode { // In devnet mode, keys are deterministically generated.
  256. logger.Fatal("Please specify --nodeKey")
  257. }
  258. if *guardianKeyPath == "" {
  259. logger.Fatal("Please specify --guardianKey")
  260. }
  261. if *adminSocketPath == "" {
  262. logger.Fatal("Please specify --adminSocket")
  263. }
  264. if *dataDir == "" {
  265. logger.Fatal("Please specify --dataDir")
  266. }
  267. if *ethRPC == "" {
  268. logger.Fatal("Please specify --ethRPC")
  269. }
  270. if *ethContract == "" {
  271. logger.Fatal("Please specify --ethContract")
  272. }
  273. if *bscRPC == "" {
  274. logger.Fatal("Please specify --bscRPC")
  275. }
  276. if *bscContract == "" {
  277. logger.Fatal("Please specify --bscContract")
  278. }
  279. if *polygonRPC == "" {
  280. logger.Fatal("Please specify --polygonRPC")
  281. }
  282. if *polygonContract == "" {
  283. logger.Fatal("Please specify --polygonContract")
  284. }
  285. if *avalancheRPC == "" {
  286. logger.Fatal("Please specify --avalancheRPC")
  287. }
  288. if *oasisRPC == "" {
  289. logger.Fatal("Please specify --oasisRPC")
  290. }
  291. if *testnetMode {
  292. if *ethRopstenRPC == "" {
  293. logger.Fatal("Please specify --ethRopstenRPC")
  294. }
  295. if *ethRopstenContract == "" {
  296. logger.Fatal("Please specify --ethRopstenContract")
  297. }
  298. } else {
  299. if *ethRopstenRPC != "" {
  300. logger.Fatal("Please do not specify --ethRopstenRPC in non-testnet mode")
  301. }
  302. if *ethRopstenContract != "" {
  303. logger.Fatal("Please do not specify --ethRopstenContract in non-testnet mode")
  304. }
  305. }
  306. if *nodeName == "" {
  307. logger.Fatal("Please specify --nodeName")
  308. }
  309. if *solanaContract == "" {
  310. logger.Fatal("Please specify --solanaContract")
  311. }
  312. if *solanaWsRPC == "" {
  313. logger.Fatal("Please specify --solanaWsUrl")
  314. }
  315. if *solanaRPC == "" {
  316. logger.Fatal("Please specify --solanaUrl")
  317. }
  318. if *terraWS == "" {
  319. logger.Fatal("Please specify --terraWS")
  320. }
  321. if *terraLCD == "" {
  322. logger.Fatal("Please specify --terraLCD")
  323. }
  324. if *terraContract == "" {
  325. logger.Fatal("Please specify --terraContract")
  326. }
  327. if *unsafeDevMode {
  328. if *algorandRPC == "" {
  329. logger.Fatal("Please specify --algorandRPC")
  330. }
  331. if *algorandToken == "" {
  332. logger.Fatal("Please specify --algorandToken")
  333. }
  334. if *algorandContract == "" {
  335. logger.Fatal("Please specify --algorandContract")
  336. }
  337. }
  338. if *bigTablePersistenceEnabled {
  339. if *bigTableGCPProject == "" {
  340. logger.Fatal("Please specify --bigTableGCPProject")
  341. }
  342. if *bigTableInstanceName == "" {
  343. logger.Fatal("Please specify --bigTableInstanceName")
  344. }
  345. if *bigTableTableName == "" {
  346. logger.Fatal("Please specify --bigTableTableName")
  347. }
  348. if *bigTableTopicName == "" {
  349. logger.Fatal("Please specify --bigTableTopicName")
  350. }
  351. if *bigTableKeyPath == "" {
  352. logger.Fatal("Please specify --bigTableKeyPath")
  353. }
  354. }
  355. // Complain about Infura on mainnet.
  356. //
  357. // As it turns out, Infura has a bug where it would sometimes incorrectly round
  358. // block timestamps, which causes consensus issues - the timestamp is part of
  359. // the VAA and nodes using Infura would sometimes derive an incorrect VAA,
  360. // accidentally attacking the network by signing a conflicting VAA.
  361. //
  362. // Node operators do not usually rely on Infura in the first place - doing
  363. // so is insecure, since nodes blindly trust the connected nodes to verify
  364. // on-chain message proofs. However, node operators sometimes used
  365. // Infura during migrations where their primary node was offline, causing
  366. // the aforementioned consensus oddities which were eventually found to
  367. // be Infura-related. This is generally to the detriment of network security
  368. // and a judgement call made by individual operators. In the case of Infura,
  369. // we know it's actively dangerous so let's make an opinionated argument.
  370. //
  371. // Insert "I'm a sign, not a cop" meme.
  372. //
  373. if strings.Contains(*ethRPC, "mainnet.infura.io") ||
  374. strings.Contains(*polygonRPC, "polygon-mainnet.infura.io") {
  375. logger.Fatal("Infura is known to send incorrect blocks - please use your own nodes")
  376. }
  377. ethContractAddr := eth_common.HexToAddress(*ethContract)
  378. bscContractAddr := eth_common.HexToAddress(*bscContract)
  379. polygonContractAddr := eth_common.HexToAddress(*polygonContract)
  380. ethRopstenContractAddr := eth_common.HexToAddress(*ethRopstenContract)
  381. avalancheContractAddr := eth_common.HexToAddress(*avalancheContract)
  382. oasisContractAddr := eth_common.HexToAddress(*oasisContract)
  383. solAddress, err := solana_types.PublicKeyFromBase58(*solanaContract)
  384. if err != nil {
  385. logger.Fatal("invalid Solana contract address", zap.Error(err))
  386. }
  387. // In devnet mode, we generate a deterministic guardian key and write it to disk.
  388. if *unsafeDevMode {
  389. gk, err := generateDevnetGuardianKey()
  390. if err != nil {
  391. logger.Fatal("failed to generate devnet guardian key", zap.Error(err))
  392. }
  393. err = writeGuardianKey(gk, "auto-generated deterministic devnet key", *guardianKeyPath, true)
  394. if err != nil {
  395. logger.Fatal("failed to write devnet guardian key", zap.Error(err))
  396. }
  397. }
  398. // Database
  399. dbPath := path.Join(*dataDir, "db")
  400. if err := os.MkdirAll(dbPath, 0700); err != nil {
  401. logger.Fatal("failed to create database directory", zap.Error(err))
  402. }
  403. db, err := db.Open(dbPath)
  404. if err != nil {
  405. logger.Fatal("failed to open database", zap.Error(err))
  406. }
  407. defer db.Close()
  408. // Guardian key
  409. gk, err := loadGuardianKey(*guardianKeyPath)
  410. if err != nil {
  411. logger.Fatal("failed to load guardian key", zap.Error(err))
  412. }
  413. guardianAddr := ethcrypto.PubkeyToAddress(gk.PublicKey).String()
  414. logger.Info("Loaded guardian key", zap.String(
  415. "address", guardianAddr))
  416. p2p.DefaultRegistry.SetGuardianAddress(guardianAddr)
  417. // Node's main lifecycle context.
  418. rootCtx, rootCtxCancel = context.WithCancel(context.Background())
  419. defer rootCtxCancel()
  420. // Ethereum lock event channel
  421. lockC := make(chan *common.MessagePublication)
  422. // Ethereum incoming guardian set updates
  423. setC := make(chan *common.GuardianSet)
  424. // Outbound gossip message queue
  425. sendC := make(chan []byte)
  426. // Inbound observations
  427. obsvC := make(chan *gossipv1.SignedObservation, 50)
  428. // Inbound signed VAAs
  429. signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
  430. // Inbound observation requests
  431. obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
  432. // Outbound observation requests
  433. obsvReqSendC := make(chan *gossipv1.ObservationRequest)
  434. // Injected VAAs (manually generated rather than created via observation)
  435. injectC := make(chan *vaa.VAA)
  436. // Guardian set state managed by processor
  437. gst := common.NewGuardianSetState()
  438. var notifier *discord.DiscordNotifier
  439. if *discordToken != "" {
  440. notifier, err = discord.NewDiscordNotifier(*discordToken, *discordChannel, logger)
  441. if err != nil {
  442. logger.Error("failed to initialize Discord bot", zap.Error(err))
  443. }
  444. }
  445. // Load p2p private key
  446. var priv crypto.PrivKey
  447. if *unsafeDevMode {
  448. idx, err := devnet.GetDevnetIndex()
  449. if err != nil {
  450. logger.Fatal("Failed to parse hostname - are we running in devnet?")
  451. }
  452. priv = devnet.DeterministicP2PPrivKeyByIndex(int64(idx))
  453. } else {
  454. priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath)
  455. if err != nil {
  456. logger.Fatal("Failed to load node key", zap.Error(err))
  457. }
  458. }
  459. // Enable unless it is disabled. For devnet, only when --telemetryKey is set.
  460. if !*disableTelemetry && (!*unsafeDevMode || *unsafeDevMode && *telemetryKey != "") {
  461. logger.Info("Telemetry enabled")
  462. if *telemetryKey == "" {
  463. logger.Fatal("Please specify --telemetryKey")
  464. }
  465. creds, err := decryptTelemetryServiceAccount()
  466. if err != nil {
  467. logger.Fatal("Failed to decrypt telemetry service account", zap.Error(err))
  468. }
  469. // Get libp2p peer ID from private key
  470. pk := priv.GetPublic()
  471. peerID, err := peer.IDFromPublicKey(pk)
  472. if err != nil {
  473. logger.Fatal("Failed to get peer ID from private key", zap.Error(err))
  474. }
  475. tm, err := telemetry.New(context.Background(), telemetryProject, creds, map[string]string{
  476. "node_name": *nodeName,
  477. "node_key": peerID.Pretty(),
  478. "guardian_addr": guardianAddr,
  479. "network": *p2pNetworkID,
  480. "version": version.Version(),
  481. })
  482. if err != nil {
  483. logger.Fatal("Failed to initialize telemetry", zap.Error(err))
  484. }
  485. defer tm.Close()
  486. logger = tm.WrapLogger(logger)
  487. } else {
  488. logger.Info("Telemetry disabled")
  489. }
  490. // Redirect ipfs logs to plain zap
  491. ipfslog.SetPrimaryCore(logger.Core())
  492. // provides methods for reporting progress toward message attestation, and channels for receiving attestation lifecyclye events.
  493. attestationEvents := reporter.EventListener(logger)
  494. publicrpcService, publicrpcServer, err := publicrpcServiceRunnable(logger, *publicRPC, db, gst)
  495. if err != nil {
  496. log.Fatal("failed to create publicrpc service socket", zap.Error(err))
  497. }
  498. // local admin service socket
  499. adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, signedInC, obsvReqSendC, db, gst)
  500. if err != nil {
  501. logger.Fatal("failed to create admin service socket", zap.Error(err))
  502. }
  503. publicwebService, err := publicwebServiceRunnable(logger, *publicWeb, *adminSocketPath, publicrpcServer,
  504. *tlsHostname, *tlsProdEnv, path.Join(*dataDir, "autocert"))
  505. if err != nil {
  506. log.Fatal("failed to create publicrpc service socket", zap.Error(err))
  507. }
  508. // Run supervisor.
  509. supervisor.New(rootCtx, logger, func(ctx context.Context) error {
  510. if err := supervisor.Run(ctx, "p2p", p2p.Run(
  511. obsvC, obsvReqC, obsvReqSendC, sendC, signedInC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil {
  512. return err
  513. }
  514. if err := supervisor.Run(ctx, "ethwatch",
  515. ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1).Run); err != nil {
  516. return err
  517. }
  518. if err := supervisor.Run(ctx, "bscwatch",
  519. ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1).Run); err != nil {
  520. return err
  521. }
  522. if err := supervisor.Run(ctx, "polygonwatch",
  523. ethereum.NewEthWatcher(
  524. *polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil,
  525. // Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is
  526. //
  527. // Hardcode the minimum number of confirmations to 256 regardless of what the smart contract specifies to protect
  528. // developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon
  529. // as specific public guidance exists for Polygon developers.
  530. 256).Run); err != nil {
  531. return err
  532. }
  533. if err := supervisor.Run(ctx, "avalanchewatch",
  534. ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1).Run); err != nil {
  535. return err
  536. }
  537. if err := supervisor.Run(ctx, "oasiswatch",
  538. ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1).Run); err != nil {
  539. return err
  540. }
  541. if *testnetMode {
  542. if err := supervisor.Run(ctx, "ethropstenwatch",
  543. ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, setC, 1).Run); err != nil {
  544. return err
  545. }
  546. }
  547. // Start Terra watcher only if configured
  548. logger.Info("Starting Terra watcher")
  549. if err := supervisor.Run(ctx, "terrawatch",
  550. terra.NewWatcher(*terraWS, *terraLCD, *terraContract, lockC, setC).Run); err != nil {
  551. return err
  552. }
  553. if *unsafeDevMode {
  554. if err := supervisor.Run(ctx, "algorandwatch",
  555. algorand.NewWatcher(*algorandRPC, *algorandToken, *algorandContract, lockC, setC).Run); err != nil {
  556. return err
  557. }
  558. }
  559. if err := supervisor.Run(ctx, "solwatch-confirmed",
  560. solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, nil, rpc.CommitmentConfirmed).Run); err != nil {
  561. return err
  562. }
  563. if err := supervisor.Run(ctx, "solwatch-finalized",
  564. solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, obsvReqC, rpc.CommitmentFinalized).Run); err != nil {
  565. return err
  566. }
  567. p := processor.NewProcessor(ctx,
  568. db,
  569. lockC,
  570. setC,
  571. sendC,
  572. obsvC,
  573. injectC,
  574. signedInC,
  575. gk,
  576. gst,
  577. *unsafeDevMode,
  578. *devNumGuardians,
  579. *ethRPC,
  580. *terraLCD,
  581. *terraContract,
  582. attestationEvents,
  583. notifier,
  584. )
  585. if err := supervisor.Run(ctx, "processor", p.Run); err != nil {
  586. return err
  587. }
  588. if err := supervisor.Run(ctx, "admin", adminService); err != nil {
  589. return err
  590. }
  591. if *publicRPC != "" {
  592. if err := supervisor.Run(ctx, "publicrpc", publicrpcService); err != nil {
  593. return err
  594. }
  595. }
  596. if *publicWeb != "" {
  597. if err := supervisor.Run(ctx, "publicweb", publicwebService); err != nil {
  598. return err
  599. }
  600. }
  601. if *bigTablePersistenceEnabled {
  602. bigTableConnection := &reporter.BigTableConnectionConfig{
  603. GcpProjectID: *bigTableGCPProject,
  604. GcpInstanceName: *bigTableInstanceName,
  605. TableName: *bigTableTableName,
  606. TopicName: *bigTableTopicName,
  607. GcpKeyFilePath: *bigTableKeyPath,
  608. }
  609. if err := supervisor.Run(ctx, "bigtable", reporter.BigTableWriter(attestationEvents, bigTableConnection)); err != nil {
  610. return err
  611. }
  612. }
  613. logger.Info("Started internal services")
  614. <-ctx.Done()
  615. return nil
  616. },
  617. // It's safer to crash and restart the process in case we encounter a panic,
  618. // rather than attempting to reschedule the runnable.
  619. supervisor.WithPropagatePanic)
  620. <-rootCtx.Done()
  621. logger.Info("root context cancelled, exiting...")
  622. // TODO: wait for things to shut down gracefully
  623. }
  624. func decryptTelemetryServiceAccount() ([]byte, error) {
  625. // Decrypt service account credentials
  626. key, err := base64.StdEncoding.DecodeString(*telemetryKey)
  627. if err != nil {
  628. return nil, fmt.Errorf("failed to decode: %w", err)
  629. }
  630. ciphertext, err := base64.StdEncoding.DecodeString(telemetryServiceAccount)
  631. if err != nil {
  632. panic(err)
  633. }
  634. creds, err := common.DecryptAESGCM(ciphertext, key)
  635. if err != nil {
  636. return nil, fmt.Errorf("failed to decrypt: %w", err)
  637. }
  638. return creds, err
  639. }