options.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691
  1. package node
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "time"
  8. "github.com/certusone/wormhole/node/pkg/accountant"
  9. "github.com/certusone/wormhole/node/pkg/altpub"
  10. "github.com/certusone/wormhole/node/pkg/common"
  11. guardianDB "github.com/certusone/wormhole/node/pkg/db"
  12. "github.com/certusone/wormhole/node/pkg/governor"
  13. "github.com/certusone/wormhole/node/pkg/gwrelayer"
  14. "github.com/certusone/wormhole/node/pkg/notary"
  15. "github.com/certusone/wormhole/node/pkg/p2p"
  16. "github.com/certusone/wormhole/node/pkg/processor"
  17. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  18. "github.com/certusone/wormhole/node/pkg/query"
  19. "github.com/certusone/wormhole/node/pkg/readiness"
  20. "github.com/certusone/wormhole/node/pkg/supervisor"
  21. "github.com/certusone/wormhole/node/pkg/watchers"
  22. "github.com/certusone/wormhole/node/pkg/watchers/ibc"
  23. "github.com/certusone/wormhole/node/pkg/wormconn"
  24. "github.com/gorilla/mux"
  25. libp2p_crypto "github.com/libp2p/go-libp2p/core/crypto"
  26. "github.com/prometheus/client_golang/prometheus/promhttp"
  27. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  28. "go.uber.org/zap"
  29. "go.uber.org/zap/zapcore"
  30. )
  31. type GuardianOption struct {
  32. name string
  33. dependencies []string // Array of other option's `name`. These options need to be configured before this option. Dependencies are enforced at runtime.
  34. f func(context.Context, *zap.Logger, *G) error // Function that is run by the constructor to initialize this component.
  35. }
  36. // GuardianOptionP2P configures p2p networking.
  37. // Dependencies: See below.
  38. func GuardianOptionP2P(
  39. p2pKey libp2p_crypto.PrivKey,
  40. networkId string,
  41. bootstrapPeers string,
  42. nodeName string,
  43. subscribeToVAAs bool,
  44. disableHeartbeatVerify bool,
  45. port uint,
  46. ccqBootstrapPeers string,
  47. ccqPort uint,
  48. ccqAllowedPeers string,
  49. gossipAdvertiseAddress string,
  50. ibcEnabled bool,
  51. protectedPeers []string,
  52. ccqProtectedPeers []string,
  53. featureFlags []string,
  54. ) *GuardianOption {
  55. return &GuardianOption{
  56. name: "p2p",
  57. dependencies: []string{"accountant", "alternate-publisher", "gateway-relayer", "governor", "query"},
  58. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  59. components := p2p.DefaultComponents()
  60. components.Port = port
  61. var signedInC chan<- *gossipv1.SignedVAAWithQuorum
  62. if subscribeToVAAs {
  63. logger.Info("subscribing to incoming signed VAAs")
  64. signedInC = g.signedInC.writeC
  65. }
  66. if g.env == common.GoTest {
  67. components.WarnChannelOverflow = true
  68. components.SignedHeartbeatLogLevel = zapcore.InfoLevel
  69. }
  70. // Add the gossip advertisement address
  71. components.GossipAdvertiseAddress = gossipAdvertiseAddress
  72. // Get the static feature flags and add them to what was passed in.
  73. featureFlags = getStaticFeatureFlags(g, featureFlags)
  74. // Create the list of dynamic feature flag functions.
  75. featureFlagFuncs := []func() string{}
  76. if ibcEnabled {
  77. // IBC has a dynamic feature flag because it reports the Wormchain version.
  78. featureFlagFuncs = append(featureFlagFuncs, ibc.GetFeatures)
  79. }
  80. params, err := p2p.NewRunParams(
  81. bootstrapPeers,
  82. networkId,
  83. p2pKey,
  84. g.gst,
  85. g.rootCtxCancel,
  86. p2p.WithGuardianOptions(
  87. nodeName,
  88. g.guardianSigner,
  89. g.batchObsvC.writeC,
  90. signedInC,
  91. g.obsvReqC.writeC,
  92. g.gossipControlSendC,
  93. g.gossipAttestationSendC,
  94. g.gossipVaaSendC,
  95. g.obsvReqSendC.readC,
  96. g.acct,
  97. g.gov,
  98. disableHeartbeatVerify,
  99. components,
  100. (g.queryHandler != nil), // ccqEnabled,
  101. g.signedQueryReqC.writeC,
  102. g.queryResponsePublicationC.readC,
  103. ccqBootstrapPeers,
  104. ccqPort,
  105. ccqAllowedPeers,
  106. protectedPeers,
  107. ccqProtectedPeers,
  108. featureFlags,
  109. featureFlagFuncs,
  110. ),
  111. )
  112. if err != nil {
  113. return err
  114. }
  115. g.runnables["p2p"] = p2p.Run(
  116. params,
  117. )
  118. return nil
  119. }}
  120. }
  121. // GuardianOptionQueryHandler configures the Cross Chain Query module.
  122. func GuardianOptionQueryHandler(ccqEnabled bool, allowedRequesters string) *GuardianOption {
  123. return &GuardianOption{
  124. name: "query",
  125. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  126. if !ccqEnabled {
  127. logger.Info("ccq: cross chain query is disabled", zap.String("component", "ccq"))
  128. return nil
  129. }
  130. g.queryHandler = query.NewQueryHandler(
  131. logger,
  132. g.env,
  133. allowedRequesters,
  134. g.signedQueryReqC.readC,
  135. g.chainQueryReqC,
  136. g.queryResponseC.readC,
  137. g.queryResponsePublicationC.writeC,
  138. )
  139. return nil
  140. }}
  141. }
  142. // GuardianOptionNoAccountant disables the accountant. It is a shorthand for GuardianOptionAccountant("", "", false, nil)
  143. // Dependencies: none
  144. func GuardianOptionNoAccountant() *GuardianOption {
  145. return &GuardianOption{
  146. name: "accountant",
  147. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  148. logger.Info("accountant is disabled", zap.String("component", "gacct"))
  149. return nil
  150. }}
  151. }
  152. // GuardianOptionAccountant configures the Accountant module.
  153. // Dependencies: db
  154. func GuardianOptionAccountant(
  155. websocket string,
  156. contract string,
  157. enforcing bool,
  158. wormchainConn *wormconn.ClientConn,
  159. nttContract string,
  160. nttWormchainConn *wormconn.ClientConn,
  161. ) *GuardianOption {
  162. return &GuardianOption{
  163. name: "accountant",
  164. dependencies: []string{"db"},
  165. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  166. // Set up the accountant. If the accountant smart contract is configured, we will instantiate the accountant and VAAs
  167. // will be passed to it for processing. It will forward all token bridge transfers to the accountant contract.
  168. // If accountantCheckEnabled is set to true, token bridge transfers will not be signed and published until they
  169. // are approved by the accountant smart contract.
  170. if contract == "" && nttContract == "" {
  171. logger.Info("accountant is disabled", zap.String("component", "gacct"))
  172. return nil
  173. }
  174. if websocket == "" {
  175. return errors.New("if either accountantContract or accountantNttContract is specified, accountantWS is required")
  176. }
  177. if contract != "" {
  178. if wormchainConn == nil {
  179. return errors.New("if accountantContract is specified, the wormchain sending connection must be enabled before")
  180. }
  181. if enforcing {
  182. logger.Info("accountant is enabled and will be enforced", zap.String("component", "gacct"))
  183. } else {
  184. logger.Info("accountant is enabled but will not be enforced", zap.String("component", "gacct"))
  185. }
  186. }
  187. if nttContract != "" {
  188. if nttWormchainConn == nil {
  189. return errors.New("if accountantNttContract is specified, the NTT wormchain sending connection must be enabled")
  190. }
  191. logger.Info("NTT accountant is enabled", zap.String("component", "gacct"))
  192. }
  193. g.acct = accountant.NewAccountant(
  194. ctx,
  195. logger,
  196. g.db,
  197. g.obsvReqC.writeC,
  198. contract,
  199. websocket,
  200. wormchainConn,
  201. enforcing,
  202. nttContract,
  203. nttWormchainConn,
  204. g.guardianSigner,
  205. g.gst,
  206. g.acctC.writeC,
  207. g.env,
  208. )
  209. return nil
  210. }}
  211. }
  212. // GuardianOptionGovernor enables or disables the governor.
  213. // Dependencies: db
  214. func GuardianOptionGovernor(governorEnabled bool, flowCancelEnabled bool, coinGeckoApiKey string) *GuardianOption {
  215. return &GuardianOption{
  216. name: "governor",
  217. dependencies: []string{"db"},
  218. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  219. if governorEnabled {
  220. if flowCancelEnabled {
  221. logger.Info("chain governor is enabled with flow cancel enabled")
  222. } else {
  223. logger.Info("chain governor is enabled without flow cancel")
  224. }
  225. if coinGeckoApiKey != "" {
  226. logger.Info("coingecko pro API key in use")
  227. }
  228. g.gov = governor.NewChainGovernor(logger, g.db, g.env, flowCancelEnabled, coinGeckoApiKey)
  229. } else {
  230. logger.Info("chain governor is disabled")
  231. }
  232. return nil
  233. }}
  234. }
  235. // GuardianOptionNotary enables or disables the Notary.
  236. // Dependencies: db
  237. func GuardianOptionNotary(notaryEnabled bool) *GuardianOption {
  238. return &GuardianOption{
  239. name: "notary",
  240. dependencies: []string{"db"},
  241. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  242. if notaryEnabled {
  243. g.notary = notary.NewNotary(ctx, logger, g.db, g.env)
  244. } else {
  245. logger.Info("notary is disabled")
  246. }
  247. return nil
  248. }}
  249. }
  250. // GuardianOptionGatewayRelayer configures the Gateway Relayer module. If the gateway relayer smart contract is configured, we will instantiate
  251. // the GatewayRelayer and signed VAAs will be passed to it for processing when they are published. It will forward payload three transfers destined
  252. // for the specified contract on wormchain to that contract.
  253. func GuardianOptionGatewayRelayer(gatewayRelayerContract string, wormchainConn *wormconn.ClientConn) *GuardianOption {
  254. return &GuardianOption{
  255. name: "gateway-relayer",
  256. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  257. g.gatewayRelayer = gwrelayer.NewGatewayRelayer(
  258. ctx,
  259. logger,
  260. gatewayRelayerContract,
  261. wormchainConn,
  262. g.env,
  263. )
  264. return nil
  265. }}
  266. }
  267. // GuardianOptionStatusServer configures the status server, including /readyz and /metrics.
  268. // If g.env == common.UnsafeDevNet || g.env == common.GoTest, pprof will be enabled under /debug/pprof/
  269. // Dependencies: none
  270. func GuardianOptionStatusServer(statusAddr string) *GuardianOption {
  271. return &GuardianOption{
  272. name: "status-server",
  273. f: func(_ context.Context, _ *zap.Logger, g *G) error {
  274. if statusAddr != "" {
  275. // Use a custom routing instead of using http.DefaultServeMux directly to avoid accidentally exposing packages
  276. // that register themselves with it by default (like pprof).
  277. router := mux.NewRouter()
  278. // pprof server. NOT necessarily safe to expose publicly - only enable it in dev mode to avoid exposing it by
  279. // accident. There's benefit to having pprof enabled on production nodes, but we would likely want to expose it
  280. // via a dedicated port listening on localhost, or via the admin UNIX socket.
  281. if g.env == common.UnsafeDevNet || g.env == common.GoTest {
  282. // Pass requests to http.DefaultServeMux, which pprof automatically registers with as an import side-effect.
  283. router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
  284. }
  285. // Simple endpoint exposing node readiness (safe to expose to untrusted clients)
  286. router.HandleFunc("/readyz", readiness.Handler)
  287. // Prometheus metrics (safe to expose to untrusted clients)
  288. router.Handle("/metrics", promhttp.Handler())
  289. // SECURITY: If making changes, ensure that we always do `router := mux.NewRouter()` before this to avoid accidentally exposing pprof
  290. server := &http.Server{
  291. Addr: statusAddr,
  292. Handler: router,
  293. ReadHeaderTimeout: time.Second, // SECURITY defense against Slowloris Attack
  294. ReadTimeout: time.Second,
  295. WriteTimeout: time.Second,
  296. }
  297. g.runnables["status-server"] = func(ctx context.Context) error {
  298. logger := supervisor.Logger(ctx)
  299. go func() {
  300. if err := server.ListenAndServe(); err != http.ErrServerClosed {
  301. logger.Error("status server crashed", zap.Error(err))
  302. }
  303. }()
  304. logger.Info("status server listening", zap.String("status_addr", statusAddr))
  305. <-ctx.Done()
  306. //nolint:contextcheck // We use context.Background() instead of ctx here because ctx is already canceled at this point and Shutdown would not work then.
  307. if err := server.Shutdown(context.Background()); err != nil {
  308. logger := supervisor.Logger(ctx)
  309. logger.Error("error while shutting down status server: ", zap.Error(err))
  310. }
  311. return nil
  312. }
  313. }
  314. return nil
  315. }}
  316. }
  317. type IbcWatcherConfig struct {
  318. Websocket string
  319. Lcd string
  320. BlockHeightURL string
  321. Contract string
  322. }
  323. // GuardianOptionWatchers configures all normal watchers and all IBC watchers. They need to be all configured at the same time because they may depend on each other.
  324. // TODO: currently, IBC watchers are partially statically configured in ibc.ChainConfig. It might make sense to refactor this to instead provide this as a parameter here.
  325. // Dependencies: none
  326. func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherConfig *IbcWatcherConfig) *GuardianOption {
  327. return &GuardianOption{
  328. name: "watchers",
  329. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  330. chainObsvReqC := make(map[vaa.ChainID]chan *gossipv1.ObservationRequest)
  331. chainMsgC := make(map[vaa.ChainID]chan *common.MessagePublication)
  332. for _, chainId := range vaa.GetAllNetworkIDs() {
  333. chainMsgC[chainId] = make(chan *common.MessagePublication)
  334. go func(c <-chan *common.MessagePublication, chainId vaa.ChainID) {
  335. zeroAddress := vaa.Address{}
  336. for {
  337. select {
  338. case <-ctx.Done():
  339. return
  340. case msg := <-c:
  341. if msg.EmitterChain != chainId {
  342. level := zapcore.FatalLevel
  343. if g.env == common.GoTest {
  344. // If we're in gotest, we don't want to os.Exit() here because that's hard to catch.
  345. // Since continuing execution here doesn't have any side effects here, it's fine to have a
  346. // differing behavior in GoTest mode.
  347. level = zapcore.ErrorLevel
  348. }
  349. logger.Log(level, "SECURITY CRITICAL: Received observation from a chain that was not marked as originating from that chain",
  350. zap.String("tx", msg.TxIDString()),
  351. zap.Stringer("emitter_address", msg.EmitterAddress),
  352. zap.Uint64("sequence", msg.Sequence),
  353. zap.Stringer("msgChainId", msg.EmitterChain),
  354. zap.Stringer("watcherChainId", chainId),
  355. )
  356. } else if msg.EmitterAddress == zeroAddress {
  357. level := zapcore.FatalLevel
  358. if g.env == common.GoTest {
  359. // If we're in gotest, we don't want to os.Exit() here because that's hard to catch.
  360. // Since continuing execution here doesn't have any side effects here, it's fine to have a
  361. // differing behavior in GoTest mode.
  362. level = zapcore.ErrorLevel
  363. }
  364. logger.Log(level, "SECURITY ERROR: Received observation with EmitterAddress == 0x00",
  365. zap.String("tx", msg.TxIDString()),
  366. zap.Stringer("emitter_address", msg.EmitterAddress),
  367. zap.Uint64("sequence", msg.Sequence),
  368. zap.Stringer("msgChainId", msg.EmitterChain),
  369. zap.Stringer("watcherChainId", chainId),
  370. )
  371. } else if msg.EmitterAddress == vaa.GovernanceEmitter && msg.EmitterChain == vaa.GovernanceChain {
  372. logger.Error(
  373. "EMERGENCY: PLEASE REPORT THIS IMMEDIATELY! A Solana message was emitted from the governance emitter. This should never be possible.",
  374. zap.Stringer("emitter_chain", msg.EmitterChain),
  375. zap.Stringer("emitter_address", msg.EmitterAddress),
  376. zap.Uint32("nonce", msg.Nonce),
  377. zap.String("txID", msg.TxIDString()),
  378. zap.Time("timestamp", msg.Timestamp))
  379. } else {
  380. g.msgC.writeC <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
  381. }
  382. }
  383. }
  384. }(chainMsgC[chainId], chainId)
  385. }
  386. // Per-chain query response channel
  387. chainQueryResponseC := make(map[vaa.ChainID]chan *query.PerChainQueryResponseInternal)
  388. // aggregate per-chain msgC into msgC.
  389. // SECURITY defense-in-depth: This way we enforce that a watcher must set the msg.EmitterChain to its chainId, which makes the code easier to audit
  390. for _, chainId := range vaa.GetAllNetworkIDs() {
  391. chainQueryResponseC[chainId] = make(chan *query.PerChainQueryResponseInternal, query.QueryResponseBufferSize)
  392. go func(c <-chan *query.PerChainQueryResponseInternal, chainId vaa.ChainID) {
  393. for {
  394. select {
  395. case <-ctx.Done():
  396. return
  397. case response := <-c:
  398. if response.ChainId != chainId {
  399. // SECURITY: This should never happen. If it does, a watcher has been compromised.
  400. logger.Fatal("SECURITY CRITICAL: Received query response from a chain that was not marked as originating from that chain",
  401. zap.Uint16("responseChainId", uint16(response.ChainId)),
  402. zap.Stringer("watcherChainId", chainId),
  403. )
  404. }
  405. g.queryResponseC.writeC <- response //nolint:channelcheck // This channel is buffered, if it backs up we'll stop processing queries until it clears
  406. }
  407. }
  408. }(chainQueryResponseC[chainId], chainId)
  409. }
  410. configuredWatchers := make(map[watchers.NetworkID]struct{})
  411. for _, wc := range watcherConfigs {
  412. if _, ok := configuredWatchers[wc.GetNetworkID()]; ok {
  413. return fmt.Errorf("NetworkID already configured: %s", string(wc.GetNetworkID()))
  414. }
  415. watcherName := string(wc.GetNetworkID()) + "_watch"
  416. logger.Debug("Setting up watcher: " + watcherName)
  417. if wc.GetNetworkID() != "solana-confirmed" && wc.GetNetworkID() != "fogo-confirmed" { // TODO this should not be a special case, see comment in common/readiness.go
  418. common.MustRegisterReadinessSyncing(wc.GetChainID())
  419. chainObsvReqC[wc.GetChainID()] = make(chan *gossipv1.ObservationRequest, observationRequestPerChainBufferSize)
  420. g.chainQueryReqC[wc.GetChainID()] = make(chan *query.PerChainQueryInternal, query.QueryRequestBufferSize)
  421. }
  422. runnable, reobserver, err := wc.Create(chainMsgC[wc.GetChainID()], chainObsvReqC[wc.GetChainID()], g.chainQueryReqC[wc.GetChainID()], chainQueryResponseC[wc.GetChainID()], g.setC.writeC, g.env)
  423. if err != nil {
  424. return fmt.Errorf("error creating watcher: %w", err)
  425. }
  426. g.runnablesWithScissors[watcherName] = runnable
  427. configuredWatchers[wc.GetNetworkID()] = struct{}{}
  428. if reobserver != nil {
  429. g.reobservers[wc.GetChainID()] = reobserver
  430. }
  431. }
  432. if ibcWatcherConfig != nil {
  433. var chainConfig ibc.ChainConfig
  434. for _, chainID := range ibc.Chains {
  435. if _, exists := chainMsgC[chainID]; !exists {
  436. return fmt.Errorf("invalid IBC chain ID: %s", chainID.String())
  437. }
  438. if _, exists := chainObsvReqC[chainID]; exists {
  439. logger.Warn("not monitoring chain with IBC because it is already registered.", zap.Stringer("chainID", chainID))
  440. continue
  441. }
  442. chainObsvReqC[chainID] = make(chan *gossipv1.ObservationRequest, observationRequestPerChainBufferSize)
  443. common.MustRegisterReadinessSyncing(chainID)
  444. chainConfig = append(chainConfig, ibc.ChainConfigEntry{
  445. ChainID: chainID,
  446. MsgC: chainMsgC[chainID],
  447. ObsvReqC: chainObsvReqC[chainID],
  448. })
  449. }
  450. if len(chainConfig) > 0 {
  451. logger.Info("Starting IBC watcher")
  452. readiness.RegisterComponent(common.ReadinessIBCSyncing)
  453. g.runnablesWithScissors["ibcwatch"] = ibc.NewWatcher(ibcWatcherConfig.Websocket, ibcWatcherConfig.Lcd, ibcWatcherConfig.BlockHeightURL, ibcWatcherConfig.Contract, chainConfig).Run
  454. } else {
  455. return errors.New("although IBC is enabled, there are no chains for it to monitor")
  456. }
  457. }
  458. clock := &CacheClock{}
  459. go handleReobservationRequests(ctx, clock, logger, g.obsvReqC.readC, chainObsvReqC)
  460. return nil
  461. }}
  462. }
  463. // GuardianOptionAdminService enables the admin rpc service on a unix socket.
  464. // Dependencies: db, governor
  465. func GuardianOptionAdminService(socketPath string, ethRpc *string, ethContract *string, rpcMap map[string]string) *GuardianOption {
  466. return &GuardianOption{
  467. name: "admin-service",
  468. dependencies: []string{"governor", "db"},
  469. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  470. //nolint:contextcheck // Independent service that should not be affected by other services
  471. adminService, err := adminServiceRunnable(
  472. logger,
  473. socketPath,
  474. g.msgC.writeC,
  475. g.signedInC.writeC,
  476. g.obsvReqSendC.writeC,
  477. g.db,
  478. g.gst,
  479. g.gov,
  480. g.guardianSigner,
  481. ethRpc,
  482. ethContract,
  483. rpcMap,
  484. g.reobservers,
  485. )
  486. if err != nil {
  487. return fmt.Errorf("failed to create admin service: %w", err)
  488. }
  489. g.runnables["admin"] = adminService
  490. return nil
  491. }}
  492. }
  493. // GuardianOptionPublicRpcSocket enables the public rpc service on a unix socket
  494. // Dependencies: db, governor
  495. func GuardianOptionPublicRpcSocket(publicGRPCSocketPath string, publicRpcLogDetail common.GrpcLogDetail) *GuardianOption {
  496. return &GuardianOption{
  497. name: "publicrpcsocket",
  498. dependencies: []string{"db", "governor"},
  499. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  500. // local public grpc service socket
  501. //nolint:contextcheck // We use context.Background() instead of ctx here because ctx is already canceled at this point and Shutdown would not work then.
  502. publicrpcUnixService, publicrpcServer, err := publicrpcUnixServiceRunnable(logger, publicGRPCSocketPath, publicRpcLogDetail, g.db, g.gst, g.gov)
  503. if err != nil {
  504. return fmt.Errorf("failed to create publicrpc service: %w", err)
  505. }
  506. g.runnables["publicrpcsocket"] = publicrpcUnixService
  507. g.publicrpcServer = publicrpcServer
  508. return nil
  509. }}
  510. }
  511. // GuardianOptionPublicrpcTcpService enables the public gRPC service on TCP.
  512. // Dependencies: db, governor, publicrpcsocket
  513. func GuardianOptionPublicrpcTcpService(publicRpc string, publicRpcLogDetail common.GrpcLogDetail) *GuardianOption {
  514. return &GuardianOption{
  515. name: "publicrpc",
  516. dependencies: []string{"db", "governor", "publicrpcsocket"},
  517. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  518. publicrpcService := publicrpcTcpServiceRunnable(logger, publicRpc, publicRpcLogDetail, g.db, g.gst, g.gov)
  519. g.runnables["publicrpc"] = publicrpcService
  520. return nil
  521. }}
  522. }
  523. // GuardianOptionPublicWeb enables the public rpc service on http, i.e. gRPC-web and JSON-web.
  524. // Dependencies: db, governor, publicrpcsocket
  525. func GuardianOptionPublicWeb(listenAddr string, publicGRPCSocketPath string, tlsHostname string, tlsProdEnv bool, tlsCacheDir string) *GuardianOption {
  526. return &GuardianOption{
  527. name: "publicweb",
  528. dependencies: []string{"db", "governor", "publicrpcsocket"},
  529. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  530. publicwebService := publicwebServiceRunnable(logger, listenAddr, publicGRPCSocketPath, g.publicrpcServer,
  531. tlsHostname, tlsProdEnv, tlsCacheDir)
  532. g.runnables["publicweb"] = publicwebService
  533. return nil
  534. }}
  535. }
  536. // GuardianOptionDatabase configures the main database to be used for this guardian node.
  537. // Dependencies: none
  538. func GuardianOptionDatabase(db *guardianDB.Database) *GuardianOption {
  539. return &GuardianOption{
  540. name: "db",
  541. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  542. g.db = db
  543. return nil
  544. }}
  545. }
  546. // GuardianOptionAlternatePublisher enables the alternate publisher if it is configured.
  547. func GuardianOptionAlternatePublisher(guardianAddr []byte, configs []string) *GuardianOption {
  548. return &GuardianOption{
  549. name: "alternate-publisher",
  550. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  551. var err error
  552. g.alternatePublisher, err = altpub.NewAlternatePublisher(logger, guardianAddr, configs)
  553. if err != nil {
  554. return err
  555. }
  556. if g.alternatePublisher != nil {
  557. g.runnables["alternate-publisher"] = g.alternatePublisher.Run
  558. }
  559. return nil
  560. }}
  561. }
  562. // GuardianOptionProcessor enables the default processor, which is required to make consensus on messages.
  563. // Dependencies: See below.
  564. func GuardianOptionProcessor(networkId string) *GuardianOption {
  565. return &GuardianOption{
  566. name: "processor",
  567. // governor, accountant, and notary may be set to nil, but that choice needs to be made before the processor is configured
  568. dependencies: []string{"accountant", "alternate-publisher", "db", "gateway-relayer", "governor", "notary"},
  569. f: func(ctx context.Context, logger *zap.Logger, g *G) error {
  570. g.runnables["processor"] = processor.NewProcessor(ctx,
  571. g.db,
  572. g.msgC.readC,
  573. g.setC.readC,
  574. g.gossipAttestationSendC,
  575. g.gossipVaaSendC,
  576. g.batchObsvC.readC,
  577. g.obsvReqSendC.writeC,
  578. g.signedInC.readC,
  579. g.guardianSigner,
  580. g.gst,
  581. g.gov,
  582. g.acct,
  583. g.acctC.readC,
  584. g.notary,
  585. g.gatewayRelayer,
  586. networkId,
  587. g.alternatePublisher,
  588. ).Run
  589. return nil
  590. }}
  591. }
  592. // getStaticFeatureFlags creates the list of feature flags that do not change after initialization and adds them to the ones passed in.
  593. // Note: Any objects referenced here should be listed as dependencies in `GuardianOptionP2P`.
  594. func getStaticFeatureFlags(g *G, featureFlags []string) []string {
  595. if g.gov != nil {
  596. flag := "gov"
  597. if g.gov.IsFlowCancelEnabled() {
  598. flag = "gov:fc"
  599. }
  600. featureFlags = append(featureFlags, flag)
  601. }
  602. if g.acct != nil {
  603. featureFlags = append(featureFlags, g.acct.FeatureString())
  604. }
  605. if g.queryHandler != nil {
  606. featureFlags = append(featureFlags, "ccq")
  607. }
  608. if g.gatewayRelayer != nil {
  609. featureFlags = append(featureFlags, "gwrelayer")
  610. }
  611. if g.alternatePublisher != nil {
  612. featureFlags = append(featureFlags, g.alternatePublisher.GetFeatures())
  613. }
  614. return featureFlags
  615. }