watcher.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932
  1. package evm
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "math/big"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
  11. "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
  12. "github.com/certusone/wormhole/node/pkg/watchers/interfaces"
  13. "github.com/certusone/wormhole/node/pkg/p2p"
  14. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  15. "github.com/ethereum/go-ethereum/rpc"
  16. "github.com/prometheus/client_golang/prometheus/promauto"
  17. "github.com/prometheus/client_golang/prometheus"
  18. ethereum "github.com/ethereum/go-ethereum"
  19. eth_common "github.com/ethereum/go-ethereum/common"
  20. eth_hexutil "github.com/ethereum/go-ethereum/common/hexutil"
  21. "go.uber.org/zap"
  22. "github.com/certusone/wormhole/node/pkg/common"
  23. "github.com/certusone/wormhole/node/pkg/query"
  24. "github.com/certusone/wormhole/node/pkg/readiness"
  25. "github.com/certusone/wormhole/node/pkg/supervisor"
  26. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  27. )
  28. var (
  29. ethConnectionErrors = promauto.NewCounterVec(
  30. prometheus.CounterOpts{
  31. Name: "wormhole_eth_connection_errors_total",
  32. Help: "Total number of Ethereum connection errors (either during initial connection or while watching)",
  33. }, []string{"eth_network", "reason"})
  34. ethMessagesObserved = promauto.NewCounterVec(
  35. prometheus.CounterOpts{
  36. Name: "wormhole_eth_messages_observed_total",
  37. Help: "Total number of Eth messages observed (pre-confirmation)",
  38. }, []string{"eth_network"})
  39. ethMessagesOrphaned = promauto.NewCounterVec(
  40. prometheus.CounterOpts{
  41. Name: "wormhole_eth_messages_orphaned_total",
  42. Help: "Total number of Eth messages dropped (orphaned)",
  43. }, []string{"eth_network", "reason"})
  44. ethMessagesConfirmed = promauto.NewCounterVec(
  45. prometheus.CounterOpts{
  46. Name: "wormhole_eth_messages_confirmed_total",
  47. Help: "Total number of Eth messages verified (post-confirmation)",
  48. }, []string{"eth_network"})
  49. currentEthHeight = promauto.NewGaugeVec(
  50. prometheus.GaugeOpts{
  51. Name: "wormhole_eth_current_height",
  52. Help: "Current Ethereum block height",
  53. }, []string{"eth_network"})
  54. currentEthSafeHeight = promauto.NewGaugeVec(
  55. prometheus.GaugeOpts{
  56. Name: "wormhole_eth_current_safe_height",
  57. Help: "Current Ethereum safe block height",
  58. }, []string{"eth_network"})
  59. currentEthFinalizedHeight = promauto.NewGaugeVec(
  60. prometheus.GaugeOpts{
  61. Name: "wormhole_eth_current_finalized_height",
  62. Help: "Current Ethereum finalized block height",
  63. }, []string{"eth_network"})
  64. queryLatency = promauto.NewHistogramVec(
  65. prometheus.HistogramOpts{
  66. Name: "wormhole_eth_query_latency",
  67. Help: "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those",
  68. }, []string{"eth_network", "operation"})
  69. )
  70. type (
  71. Watcher struct {
  72. // Ethereum RPC url
  73. url string
  74. // Address of the Eth contract
  75. contract eth_common.Address
  76. // Human-readable name of the Eth network, for logging and monitoring.
  77. networkName string
  78. // Readiness component
  79. readinessSync readiness.Component
  80. // VAA ChainID of the network we're connecting to.
  81. chainID vaa.ChainID
  82. // Channel to send new messages to.
  83. msgC chan<- *common.MessagePublication
  84. // Channel to send guardian set changes to.
  85. // setC can be set to nil if no guardian set changes are needed.
  86. //
  87. // We currently only fetch the guardian set from one primary chain, which should
  88. // have this flag set to true, and false on all others.
  89. //
  90. // The current primary chain is Ethereum (a mostly arbitrary decision because it
  91. // has the best API - we might want to switch the primary chain to Solana once
  92. // the governance mechanism lives there),
  93. setC chan<- *common.GuardianSet
  94. // Incoming re-observation requests from the network. Pre-filtered to only
  95. // include requests for our chainID.
  96. obsvReqC <-chan *gossipv1.ObservationRequest
  97. // Incoming query requests from the network. Pre-filtered to only
  98. // include requests for our chainID.
  99. queryReqC <-chan *query.PerChainQueryInternal
  100. // Outbound query responses to query requests
  101. queryResponseC chan<- *query.PerChainQueryResponseInternal
  102. pending map[pendingKey]*pendingMessage
  103. pendingMu sync.Mutex
  104. // 0 is a valid guardian set, so we need a nil value here
  105. currentGuardianSet *uint32
  106. // Interface to the chain specific ethereum library.
  107. ethConn connectors.Connector
  108. unsafeDevMode bool
  109. latestBlockNumber uint64
  110. latestSafeBlockNumber uint64
  111. latestFinalizedBlockNumber uint64
  112. l1Finalizer interfaces.L1Finalizer
  113. ccqConfig query.PerChainConfig
  114. ccqMaxBlockNumber *big.Int
  115. ccqTimestampCache *BlocksByTimestamp
  116. ccqBackfillChannel chan *ccqBackfillRequest
  117. ccqBatchSize int64
  118. ccqBackfillCache bool
  119. ccqLogger *zap.Logger
  120. }
  121. pendingKey struct {
  122. TxHash eth_common.Hash
  123. BlockHash eth_common.Hash
  124. EmitterAddress vaa.Address
  125. Sequence uint64
  126. }
  127. pendingMessage struct {
  128. message *common.MessagePublication
  129. height uint64
  130. }
  131. )
  132. // MaxWaitConfirmations is the maximum number of confirmations to wait before declaring a transaction abandoned.
  133. const MaxWaitConfirmations = 60
  134. func NewEthWatcher(
  135. url string,
  136. contract eth_common.Address,
  137. networkName string,
  138. chainID vaa.ChainID,
  139. msgC chan<- *common.MessagePublication,
  140. setC chan<- *common.GuardianSet,
  141. obsvReqC <-chan *gossipv1.ObservationRequest,
  142. queryReqC <-chan *query.PerChainQueryInternal,
  143. queryResponseC chan<- *query.PerChainQueryResponseInternal,
  144. unsafeDevMode bool,
  145. ccqBackfillCache bool,
  146. ) *Watcher {
  147. return &Watcher{
  148. url: url,
  149. contract: contract,
  150. networkName: networkName,
  151. readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID),
  152. chainID: chainID,
  153. msgC: msgC,
  154. setC: setC,
  155. obsvReqC: obsvReqC,
  156. queryReqC: queryReqC,
  157. queryResponseC: queryResponseC,
  158. pending: map[pendingKey]*pendingMessage{},
  159. unsafeDevMode: unsafeDevMode,
  160. ccqConfig: query.GetPerChainConfig(chainID),
  161. ccqMaxBlockNumber: big.NewInt(0).SetUint64(math.MaxUint64),
  162. ccqBackfillCache: ccqBackfillCache,
  163. ccqBackfillChannel: make(chan *ccqBackfillRequest, 50),
  164. }
  165. }
  166. func (w *Watcher) Run(parentCtx context.Context) error {
  167. var err error
  168. logger := supervisor.Logger(parentCtx)
  169. w.ccqLogger = logger.With(zap.String("component", "ccqevm"))
  170. logger.Info("Starting watcher",
  171. zap.String("watcher_name", "evm"),
  172. zap.String("url", w.url),
  173. zap.String("contract", w.contract.String()),
  174. zap.String("networkName", w.networkName),
  175. zap.String("chainID", w.chainID.String()),
  176. zap.Bool("unsafeDevMode", w.unsafeDevMode),
  177. )
  178. // later on we will spawn multiple go-routines through `RunWithScissors`, i.e. catching panics.
  179. // If any of them panic, this function will return, causing this child context to be canceled
  180. // such that the other go-routines can free up resources
  181. ctx, watcherContextCancelFunc := context.WithCancel(parentCtx)
  182. defer watcherContextCancelFunc()
  183. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  184. p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
  185. ContractAddress: w.contract.Hex(),
  186. })
  187. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  188. defer cancel()
  189. finalizedPollingSupported, safePollingSupported, err := w.getFinality(ctx)
  190. if err != nil {
  191. return fmt.Errorf("failed to determine finality: %w", err)
  192. }
  193. if finalizedPollingSupported {
  194. if safePollingSupported {
  195. logger.Info("polling for finalized and safe blocks")
  196. } else {
  197. logger.Info("polling for finalized blocks, will generate safe blocks")
  198. }
  199. baseConnector, err := connectors.NewEthereumBaseConnector(timeout, w.networkName, w.url, w.contract, logger)
  200. if err != nil {
  201. ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
  202. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  203. return fmt.Errorf("dialing eth client failed: %w", err)
  204. }
  205. w.ethConn = connectors.NewBatchPollConnector(ctx, logger, baseConnector, safePollingSupported, 1000*time.Millisecond)
  206. } else if w.chainID == vaa.ChainIDCelo {
  207. // When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum.
  208. // However, in devnet, we currently run the standard ETH node for Celo, so we need to use the standard go-ethereum.
  209. w.ethConn, err = connectors.NewCeloConnector(timeout, w.networkName, w.url, w.contract, logger)
  210. if err != nil {
  211. ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
  212. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  213. return fmt.Errorf("dialing eth client failed: %w", err)
  214. }
  215. } else {
  216. // Everything else is instant finality.
  217. logger.Info("assuming instant finality")
  218. baseConnector, err := connectors.NewEthereumBaseConnector(timeout, w.networkName, w.url, w.contract, logger)
  219. if err != nil {
  220. ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
  221. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  222. return fmt.Errorf("dialing eth client failed: %w", err)
  223. }
  224. w.ethConn, err = connectors.NewInstantFinalityConnector(baseConnector, logger)
  225. if err != nil {
  226. ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
  227. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  228. return fmt.Errorf("failed to connect to instant finality chain: %w", err)
  229. }
  230. }
  231. if w.ccqConfig.TimestampCacheSupported {
  232. w.ccqTimestampCache = NewBlocksByTimestamp(BTS_MAX_BLOCKS)
  233. }
  234. errC := make(chan error)
  235. // Subscribe to new message publications. We don't use a timeout here because the LogPollConnector
  236. // will keep running. Other connectors will use a timeout internally if appropriate.
  237. messageC := make(chan *ethabi.AbiLogMessagePublished, 2)
  238. messageSub, err := w.ethConn.WatchLogMessagePublished(ctx, errC, messageC)
  239. if err != nil {
  240. ethConnectionErrors.WithLabelValues(w.networkName, "subscribe_error").Inc()
  241. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  242. return fmt.Errorf("failed to subscribe to message publication events: %w", err)
  243. }
  244. defer messageSub.Unsubscribe()
  245. // Fetch initial guardian set
  246. if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil {
  247. return fmt.Errorf("failed to request guardian set: %v", err)
  248. }
  249. // Poll for guardian set.
  250. common.RunWithScissors(ctx, errC, "evm_fetch_guardian_set", func(ctx context.Context) error {
  251. t := time.NewTicker(15 * time.Second)
  252. defer t.Stop()
  253. for {
  254. select {
  255. case <-ctx.Done():
  256. return nil
  257. case <-t.C:
  258. if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil {
  259. errC <- fmt.Errorf("failed to request guardian set: %v", err)
  260. return nil
  261. }
  262. }
  263. }
  264. })
  265. common.RunWithScissors(ctx, errC, "evm_fetch_objs_req", func(ctx context.Context) error {
  266. for {
  267. select {
  268. case <-ctx.Done():
  269. return nil
  270. case r := <-w.obsvReqC:
  271. // This can't happen unless there is a programming error - the caller
  272. // is expected to send us only requests for our chainID.
  273. if vaa.ChainID(r.ChainId) != w.chainID {
  274. panic("invalid chain ID")
  275. }
  276. tx := eth_common.BytesToHash(r.TxHash)
  277. logger.Info("received observation request", zap.String("tx_hash", tx.Hex()))
  278. // SECURITY: Load the block number before requesting the transaction to avoid a
  279. // race condition where requesting the tx succeeds and is then dropped due to a fork,
  280. // but blockNumberU had already advanced beyond the required threshold.
  281. //
  282. // In the primary watcher flow, this is of no concern since we assume the node
  283. // always sends the head before it sends the logs (implicit synchronization
  284. // by relying on the same websocket connection).
  285. blockNumberU := atomic.LoadUint64(&w.latestFinalizedBlockNumber)
  286. safeBlockNumberU := atomic.LoadUint64(&w.latestSafeBlockNumber)
  287. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  288. blockNumber, msgs, err := MessageEventsForTransaction(timeout, w.ethConn, w.contract, w.chainID, tx)
  289. cancel()
  290. if err != nil {
  291. logger.Error("failed to process observation request", zap.String("tx_hash", tx.Hex()), zap.Error(err))
  292. continue
  293. }
  294. for _, msg := range msgs {
  295. msg.IsReobservation = true
  296. if msg.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
  297. logger.Info("re-observed message publication transaction, publishing it immediately",
  298. zap.Stringer("tx", msg.TxHash),
  299. zap.Stringer("emitter_address", msg.EmitterAddress),
  300. zap.Uint64("sequence", msg.Sequence),
  301. zap.Uint64("current_block", blockNumberU),
  302. zap.Uint64("observed_block", blockNumber),
  303. )
  304. w.msgC <- msg
  305. continue
  306. }
  307. if msg.ConsistencyLevel == vaa.ConsistencyLevelSafe {
  308. if safeBlockNumberU == 0 {
  309. logger.Error("no safe block number available, ignoring observation request")
  310. continue
  311. }
  312. if blockNumber <= safeBlockNumberU {
  313. logger.Info("re-observed message publication transaction",
  314. zap.Stringer("tx", msg.TxHash),
  315. zap.Stringer("emitter_address", msg.EmitterAddress),
  316. zap.Uint64("sequence", msg.Sequence),
  317. zap.Uint64("current_safe_block", safeBlockNumberU),
  318. zap.Uint64("observed_block", blockNumber),
  319. )
  320. w.msgC <- msg
  321. } else {
  322. logger.Info("ignoring re-observed message publication transaction",
  323. zap.Stringer("tx", msg.TxHash),
  324. zap.Stringer("emitter_address", msg.EmitterAddress),
  325. zap.Uint64("sequence", msg.Sequence),
  326. zap.Uint64("current_safe_block", safeBlockNumberU),
  327. zap.Uint64("observed_block", blockNumber),
  328. )
  329. }
  330. continue
  331. }
  332. if blockNumberU == 0 {
  333. logger.Error("no block number available, ignoring observation request")
  334. continue
  335. }
  336. // SECURITY: In the recovery flow, we already know which transaction to
  337. // observe, and we can assume that it has reached the expected finality
  338. // level a long time ago. Therefore, the logic is much simpler than the
  339. // primary watcher, which has to wait for finality.
  340. //
  341. // Instead, we can simply check if the transaction's block number is in
  342. // the past by more than the expected confirmation number.
  343. //
  344. // Ensure that the current block number is larger than the message observation's block number.
  345. if blockNumber <= blockNumberU {
  346. logger.Info("re-observed message publication transaction",
  347. zap.Stringer("tx", msg.TxHash),
  348. zap.Stringer("emitter_address", msg.EmitterAddress),
  349. zap.Uint64("sequence", msg.Sequence),
  350. zap.Uint64("current_block", blockNumberU),
  351. zap.Uint64("observed_block", blockNumber),
  352. )
  353. w.msgC <- msg
  354. } else {
  355. logger.Info("ignoring re-observed message publication transaction",
  356. zap.Stringer("tx", msg.TxHash),
  357. zap.Stringer("emitter_address", msg.EmitterAddress),
  358. zap.Uint64("sequence", msg.Sequence),
  359. zap.Uint64("current_block", blockNumberU),
  360. zap.Uint64("observed_block", blockNumber),
  361. )
  362. }
  363. }
  364. }
  365. }
  366. })
  367. if w.ccqConfig.QueriesSupported() {
  368. w.ccqStart(ctx, errC)
  369. }
  370. common.RunWithScissors(ctx, errC, "evm_fetch_messages", func(ctx context.Context) error {
  371. for {
  372. select {
  373. case <-ctx.Done():
  374. return nil
  375. case err := <-messageSub.Err():
  376. ethConnectionErrors.WithLabelValues(w.networkName, "subscription_error").Inc()
  377. errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
  378. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  379. return nil
  380. case ev := <-messageC:
  381. blockTime, err := w.getBlockTime(ctx, ev.Raw.BlockHash)
  382. if err != nil {
  383. ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc()
  384. if canRetryGetBlockTime(err) {
  385. go w.waitForBlockTime(ctx, logger, errC, ev)
  386. continue
  387. }
  388. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  389. errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err)
  390. return nil
  391. }
  392. w.postMessage(logger, ev, blockTime)
  393. }
  394. }
  395. })
  396. // Watch headers
  397. headSink := make(chan *connectors.NewBlock, 100)
  398. headerSubscription, err := w.ethConn.SubscribeForBlocks(ctx, errC, headSink)
  399. if err != nil {
  400. ethConnectionErrors.WithLabelValues(w.networkName, "header_subscribe_error").Inc()
  401. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  402. return fmt.Errorf("failed to subscribe to header events: %w", err)
  403. }
  404. defer headerSubscription.Unsubscribe()
  405. common.RunWithScissors(ctx, errC, "evm_fetch_headers", func(ctx context.Context) error {
  406. stats := gossipv1.Heartbeat_Network{ContractAddress: w.contract.Hex()}
  407. for {
  408. select {
  409. case <-ctx.Done():
  410. return nil
  411. case err := <-headerSubscription.Err():
  412. logger.Error("error while processing header subscription", zap.Error(err))
  413. ethConnectionErrors.WithLabelValues(w.networkName, "header_subscription_error").Inc()
  414. errC <- fmt.Errorf("error while processing header subscription: %w", err)
  415. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  416. return nil
  417. case ev := <-headSink:
  418. // These two pointers should have been checked before the event was placed on the channel, but just being safe.
  419. if ev == nil {
  420. logger.Error("new header event is nil")
  421. continue
  422. }
  423. if ev.Number == nil {
  424. logger.Error("new header block number is nil", zap.Stringer("finality", ev.Finality))
  425. continue
  426. }
  427. start := time.Now()
  428. currentHash := ev.Hash
  429. logger.Debug("processing new header",
  430. zap.Stringer("current_block", ev.Number),
  431. zap.Uint64("block_time", ev.Time),
  432. zap.Stringer("current_blockhash", currentHash),
  433. zap.Stringer("finality", ev.Finality),
  434. )
  435. readiness.SetReady(w.readinessSync)
  436. blockNumberU := ev.Number.Uint64()
  437. if ev.Finality == connectors.Latest {
  438. atomic.StoreUint64(&w.latestBlockNumber, blockNumberU)
  439. currentEthHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU))
  440. stats.Height = int64(blockNumberU)
  441. w.updateNetworkStats(&stats)
  442. w.ccqAddLatestBlock(ev)
  443. continue
  444. }
  445. // The only blocks that get here are safe and finalized.
  446. if ev.Finality == connectors.Safe {
  447. atomic.StoreUint64(&w.latestSafeBlockNumber, blockNumberU)
  448. currentEthSafeHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU))
  449. stats.SafeHeight = int64(blockNumberU)
  450. } else {
  451. atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU)
  452. currentEthFinalizedHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU))
  453. stats.FinalizedHeight = int64(blockNumberU)
  454. }
  455. w.updateNetworkStats(&stats)
  456. w.pendingMu.Lock()
  457. for key, pLock := range w.pending {
  458. // If this block is safe, only process messages wanting safe.
  459. // If it's not safe, only process messages wanting finalized.
  460. if (ev.Finality == connectors.Safe) != (pLock.message.ConsistencyLevel == vaa.ConsistencyLevelSafe) {
  461. continue
  462. }
  463. // Transaction was dropped and never picked up again
  464. if pLock.height+MaxWaitConfirmations <= blockNumberU {
  465. logger.Info("observation timed out",
  466. zap.Stringer("tx", pLock.message.TxHash),
  467. zap.Stringer("blockhash", key.BlockHash),
  468. zap.Stringer("emitter_address", key.EmitterAddress),
  469. zap.Uint64("sequence", key.Sequence),
  470. zap.Stringer("current_block", ev.Number),
  471. zap.Stringer("finality", ev.Finality),
  472. zap.Stringer("current_blockhash", currentHash),
  473. )
  474. ethMessagesOrphaned.WithLabelValues(w.networkName, "timeout").Inc()
  475. delete(w.pending, key)
  476. continue
  477. }
  478. // Transaction is now ready
  479. if pLock.height <= blockNumberU {
  480. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  481. tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash)
  482. cancel()
  483. // If the node returns an error after waiting expectedConfirmation blocks,
  484. // it means the chain reorged and the transaction was orphaned. The
  485. // TransactionReceipt call is using the same websocket connection than the
  486. // head notifications, so it's guaranteed to be atomic.
  487. //
  488. // Check multiple possible error cases - the node seems to return a
  489. // "not found" error most of the time, but it could conceivably also
  490. // return a nil tx or rpc.ErrNoResult.
  491. if tx == nil || err == rpc.ErrNoResult || (err != nil && err.Error() == "not found") {
  492. logger.Warn("tx was orphaned",
  493. zap.Stringer("tx", pLock.message.TxHash),
  494. zap.Stringer("blockhash", key.BlockHash),
  495. zap.Stringer("emitter_address", key.EmitterAddress),
  496. zap.Uint64("sequence", key.Sequence),
  497. zap.Stringer("current_block", ev.Number),
  498. zap.Stringer("finality", ev.Finality),
  499. zap.Stringer("current_blockhash", currentHash),
  500. zap.Error(err))
  501. delete(w.pending, key)
  502. ethMessagesOrphaned.WithLabelValues(w.networkName, "not_found").Inc()
  503. continue
  504. }
  505. // This should never happen - if we got this far, it means that logs were emitted,
  506. // which is only possible if the transaction succeeded. We check it anyway just
  507. // in case the EVM implementation is buggy.
  508. if tx.Status != 1 {
  509. logger.Error("transaction receipt with non-success status",
  510. zap.Stringer("tx", pLock.message.TxHash),
  511. zap.Stringer("blockhash", key.BlockHash),
  512. zap.Stringer("emitter_address", key.EmitterAddress),
  513. zap.Uint64("sequence", key.Sequence),
  514. zap.Stringer("current_block", ev.Number),
  515. zap.Stringer("finality", ev.Finality),
  516. zap.Stringer("current_blockhash", currentHash),
  517. zap.Error(err))
  518. delete(w.pending, key)
  519. ethMessagesOrphaned.WithLabelValues(w.networkName, "tx_failed").Inc()
  520. continue
  521. }
  522. // Any error other than "not found" is likely transient - we retry next block.
  523. if err != nil {
  524. logger.Warn("transaction could not be fetched",
  525. zap.Stringer("tx", pLock.message.TxHash),
  526. zap.Stringer("blockhash", key.BlockHash),
  527. zap.Stringer("emitter_address", key.EmitterAddress),
  528. zap.Uint64("sequence", key.Sequence),
  529. zap.Stringer("current_block", ev.Number),
  530. zap.Stringer("finality", ev.Finality),
  531. zap.Stringer("current_blockhash", currentHash),
  532. zap.Error(err))
  533. continue
  534. }
  535. // It's possible for a transaction to be orphaned and then included in a different block
  536. // but with the same tx hash. Drop the observation (it will be re-observed and needs to
  537. // wait for the full confirmation time again).
  538. if tx.BlockHash != key.BlockHash {
  539. logger.Info("tx got dropped and mined in a different block; the message should have been reobserved",
  540. zap.Stringer("tx", pLock.message.TxHash),
  541. zap.Stringer("blockhash", key.BlockHash),
  542. zap.Stringer("emitter_address", key.EmitterAddress),
  543. zap.Uint64("sequence", key.Sequence),
  544. zap.Stringer("current_block", ev.Number),
  545. zap.Stringer("finality", ev.Finality),
  546. zap.Stringer("current_blockhash", currentHash),
  547. )
  548. delete(w.pending, key)
  549. ethMessagesOrphaned.WithLabelValues(w.networkName, "blockhash_mismatch").Inc()
  550. continue
  551. }
  552. logger.Info("observation confirmed",
  553. zap.Stringer("tx", pLock.message.TxHash),
  554. zap.Stringer("blockhash", key.BlockHash),
  555. zap.Stringer("emitter_address", key.EmitterAddress),
  556. zap.Uint64("sequence", key.Sequence),
  557. zap.Stringer("current_block", ev.Number),
  558. zap.Stringer("finality", ev.Finality),
  559. zap.Stringer("current_blockhash", currentHash),
  560. )
  561. delete(w.pending, key)
  562. w.msgC <- pLock.message
  563. ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
  564. }
  565. }
  566. w.pendingMu.Unlock()
  567. logger.Debug("processed new header",
  568. zap.Stringer("current_block", ev.Number),
  569. zap.Stringer("finality", ev.Finality),
  570. zap.Stringer("current_blockhash", currentHash),
  571. zap.Duration("took", time.Since(start)),
  572. )
  573. }
  574. }
  575. })
  576. // Now that the init is complete, peg readiness. That will also happen when we process a new head, but chains
  577. // that wait for finality may take a while to receive the first block and we don't want to hold up the init.
  578. readiness.SetReady(w.readinessSync)
  579. select {
  580. case <-ctx.Done():
  581. return ctx.Err()
  582. case err := <-errC:
  583. return err
  584. }
  585. }
  586. func (w *Watcher) fetchAndUpdateGuardianSet(
  587. logger *zap.Logger,
  588. ctx context.Context,
  589. ethConn connectors.Connector,
  590. ) error {
  591. msm := time.Now()
  592. logger.Debug("fetching guardian set")
  593. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  594. defer cancel()
  595. idx, gs, err := fetchCurrentGuardianSet(timeout, ethConn)
  596. if err != nil {
  597. ethConnectionErrors.WithLabelValues(w.networkName, "guardian_set_fetch_error").Inc()
  598. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  599. return err
  600. }
  601. queryLatency.WithLabelValues(w.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
  602. if w.currentGuardianSet != nil && *(w.currentGuardianSet) == idx {
  603. return nil
  604. }
  605. logger.Info("updated guardian set found", zap.Any("value", gs), zap.Uint32("index", idx))
  606. w.currentGuardianSet = &idx
  607. if w.setC != nil {
  608. w.setC <- &common.GuardianSet{
  609. Keys: gs.Keys,
  610. Index: idx,
  611. }
  612. }
  613. return nil
  614. }
  615. // Fetch the current guardian set ID and guardian set from the chain.
  616. func fetchCurrentGuardianSet(ctx context.Context, ethConn connectors.Connector) (uint32, *ethabi.StructsGuardianSet, error) {
  617. currentIndex, err := ethConn.GetCurrentGuardianSetIndex(ctx)
  618. if err != nil {
  619. return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err)
  620. }
  621. gs, err := ethConn.GetGuardianSet(ctx, currentIndex)
  622. if err != nil {
  623. return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
  624. }
  625. return currentIndex, &gs, nil
  626. }
  627. // getFinality determines if the chain supports "finalized" and "safe". This is hard coded so it requires thought to change something. However, it also reads the RPC
  628. // to make sure the node actually supports the expected values, and returns an error if it doesn't. Note that we do not support using safe mode but not finalized mode.
  629. func (w *Watcher) getFinality(ctx context.Context) (bool, bool, error) {
  630. finalized := false
  631. safe := false
  632. if w.unsafeDevMode {
  633. // Devnet supports finalized and safe (although they returns the same value as latest).
  634. finalized = true
  635. safe = true
  636. } else if w.chainID == vaa.ChainIDAcala ||
  637. w.chainID == vaa.ChainIDArbitrum ||
  638. w.chainID == vaa.ChainIDBase ||
  639. w.chainID == vaa.ChainIDBSC ||
  640. w.chainID == vaa.ChainIDEthereum ||
  641. w.chainID == vaa.ChainIDKarura ||
  642. w.chainID == vaa.ChainIDMantle ||
  643. w.chainID == vaa.ChainIDMoonbeam ||
  644. w.chainID == vaa.ChainIDOptimism ||
  645. w.chainID == vaa.ChainIDSepolia ||
  646. w.chainID == vaa.ChainIDHolesky ||
  647. w.chainID == vaa.ChainIDArbitrumSepolia ||
  648. w.chainID == vaa.ChainIDBaseSepolia ||
  649. w.chainID == vaa.ChainIDOptimismSepolia {
  650. finalized = true
  651. safe = true
  652. } else if w.chainID == vaa.ChainIDScroll {
  653. // As of 11/10/2023 Scroll supports polling for finalized but not safe.
  654. finalized = true
  655. } else if w.chainID == vaa.ChainIDPolygon ||
  656. w.chainID == vaa.ChainIDPolygonSepolia {
  657. // Polygon now supports polling for finalized but not safe.
  658. // https://forum.polygon.technology/t/optimizing-decentralized-apps-ux-with-milestones-a-significantly-accelerated-finality-solution/13154
  659. finalized = true
  660. }
  661. // If finalized / safe should be supported, read the RPC to make sure they actually are.
  662. if finalized {
  663. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  664. defer cancel()
  665. c, err := rpc.DialContext(timeout, w.url)
  666. if err != nil {
  667. return false, false, fmt.Errorf("failed to connect to endpoint: %w", err)
  668. }
  669. type Marshaller struct {
  670. Number *eth_hexutil.Big
  671. }
  672. var m Marshaller
  673. err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "finalized", false)
  674. if err != nil || m.Number == nil {
  675. return false, false, fmt.Errorf("finalized not supported by the node when it should be")
  676. }
  677. if safe {
  678. err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "safe", false)
  679. if err != nil || m.Number == nil {
  680. return false, false, fmt.Errorf("safe not supported by the node when it should be")
  681. }
  682. }
  683. }
  684. return finalized, safe, nil
  685. }
  686. // SetL1Finalizer is used to set the layer one finalizer.
  687. func (w *Watcher) SetL1Finalizer(l1Finalizer interfaces.L1Finalizer) {
  688. w.l1Finalizer = l1Finalizer
  689. }
  690. // GetLatestFinalizedBlockNumber() implements the L1Finalizer interface and allows other watchers to
  691. // get the latest finalized block number from this watcher.
  692. func (w *Watcher) GetLatestFinalizedBlockNumber() uint64 {
  693. return atomic.LoadUint64(&w.latestFinalizedBlockNumber)
  694. }
  695. // getLatestSafeBlockNumber() returns the latest safe block seen by this watcher..
  696. func (w *Watcher) getLatestSafeBlockNumber() uint64 {
  697. return atomic.LoadUint64(&w.latestSafeBlockNumber)
  698. }
  699. func (w *Watcher) updateNetworkStats(stats *gossipv1.Heartbeat_Network) {
  700. p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
  701. Height: stats.Height,
  702. SafeHeight: stats.SafeHeight,
  703. FinalizedHeight: stats.FinalizedHeight,
  704. ContractAddress: w.contract.Hex(),
  705. })
  706. }
  707. // getBlockTime reads the time of a block.
  708. func (w *Watcher) getBlockTime(ctx context.Context, blockHash eth_common.Hash) (uint64, error) {
  709. msm := time.Now()
  710. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  711. blockTime, err := w.ethConn.TimeOfBlockByHash(timeout, blockHash)
  712. cancel()
  713. queryLatency.WithLabelValues(w.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
  714. return blockTime, err
  715. }
  716. // postMessage creates a message object from a log event and adds it to the pending list for processing.
  717. func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublished, blockTime uint64) {
  718. message := &common.MessagePublication{
  719. TxHash: ev.Raw.TxHash,
  720. Timestamp: time.Unix(int64(blockTime), 0),
  721. Nonce: ev.Nonce,
  722. Sequence: ev.Sequence,
  723. EmitterChain: w.chainID,
  724. EmitterAddress: PadAddress(ev.Sender),
  725. Payload: ev.Payload,
  726. ConsistencyLevel: ev.ConsistencyLevel,
  727. }
  728. ethMessagesObserved.WithLabelValues(w.networkName).Inc()
  729. if message.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
  730. logger.Info("found new message publication transaction, publishing it immediately",
  731. zap.Stringer("tx", ev.Raw.TxHash),
  732. zap.Uint64("block", ev.Raw.BlockNumber),
  733. zap.Stringer("blockhash", ev.Raw.BlockHash),
  734. zap.Uint64("blockTime", blockTime),
  735. zap.Uint64("Sequence", ev.Sequence),
  736. zap.Uint32("Nonce", ev.Nonce),
  737. zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
  738. )
  739. w.msgC <- message
  740. ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
  741. return
  742. }
  743. logger.Info("found new message publication transaction",
  744. zap.Stringer("tx", ev.Raw.TxHash),
  745. zap.Uint64("block", ev.Raw.BlockNumber),
  746. zap.Stringer("blockhash", ev.Raw.BlockHash),
  747. zap.Uint64("blockTime", blockTime),
  748. zap.Uint64("Sequence", ev.Sequence),
  749. zap.Uint32("Nonce", ev.Nonce),
  750. zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
  751. )
  752. key := pendingKey{
  753. TxHash: message.TxHash,
  754. BlockHash: ev.Raw.BlockHash,
  755. EmitterAddress: message.EmitterAddress,
  756. Sequence: message.Sequence,
  757. }
  758. w.pendingMu.Lock()
  759. w.pending[key] = &pendingMessage{
  760. message: message,
  761. height: ev.Raw.BlockNumber,
  762. }
  763. w.pendingMu.Unlock()
  764. }
  765. // canRetryGetBlockTime returns true if the error returned by getBlockTime warrants doing a retry.
  766. func canRetryGetBlockTime(err error) bool {
  767. return err == ethereum.NotFound /* go-ethereum */ || err.Error() == "cannot query unfinalized data" /* avalanche */
  768. }
  769. // waitForBlockTime is a go routine that repeatedly attempts to read the block time for a single log event. It is used when the initial attempt to read
  770. // the block time fails. If it is finally able to read the block time, it posts the event for processing. Otherwise, it will eventually give up.
  771. func (w *Watcher) waitForBlockTime(ctx context.Context, logger *zap.Logger, errC chan error, ev *ethabi.AbiLogMessagePublished) {
  772. logger.Warn("found new message publication transaction but failed to look up block time, deferring processing",
  773. zap.Stringer("tx", ev.Raw.TxHash),
  774. zap.Uint64("block", ev.Raw.BlockNumber),
  775. zap.Stringer("blockhash", ev.Raw.BlockHash),
  776. zap.Uint64("Sequence", ev.Sequence),
  777. zap.Uint32("Nonce", ev.Nonce),
  778. zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
  779. )
  780. const RetryInterval = 5 * time.Second
  781. const MaxRetries = 3
  782. start := time.Now()
  783. t := time.NewTimer(RetryInterval)
  784. defer t.Stop()
  785. retries := 1
  786. for {
  787. select {
  788. case <-ctx.Done():
  789. return
  790. case <-t.C:
  791. blockTime, err := w.getBlockTime(ctx, ev.Raw.BlockHash)
  792. if err == nil {
  793. logger.Info("retry of block time query succeeded, posting transaction",
  794. zap.Stringer("tx", ev.Raw.TxHash),
  795. zap.Uint64("block", ev.Raw.BlockNumber),
  796. zap.Stringer("blockhash", ev.Raw.BlockHash),
  797. zap.Uint64("blockTime", blockTime),
  798. zap.Uint64("Sequence", ev.Sequence),
  799. zap.Uint32("Nonce", ev.Nonce),
  800. zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
  801. zap.Stringer("startTime", start),
  802. zap.Int("retries", retries),
  803. )
  804. w.postMessage(logger, ev, blockTime)
  805. return
  806. }
  807. ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc()
  808. if !canRetryGetBlockTime(err) {
  809. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  810. errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err)
  811. return
  812. }
  813. if retries >= MaxRetries {
  814. logger.Error("repeatedly failed to look up block time, giving up",
  815. zap.Stringer("tx", ev.Raw.TxHash),
  816. zap.Uint64("block", ev.Raw.BlockNumber),
  817. zap.Stringer("blockhash", ev.Raw.BlockHash),
  818. zap.Uint64("Sequence", ev.Sequence),
  819. zap.Uint32("Nonce", ev.Nonce),
  820. zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
  821. zap.Stringer("startTime", start),
  822. zap.Int("retries", retries),
  823. )
  824. return
  825. }
  826. retries++
  827. t.Reset(RetryInterval)
  828. }
  829. }
  830. }