watcher.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070
  1. package evm
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "errors"
  6. "fmt"
  7. "math"
  8. "math/big"
  9. "slices"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/certusone/wormhole/node/pkg/watchers"
  14. "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
  15. "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
  16. "github.com/certusone/wormhole/node/pkg/p2p"
  17. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  18. "github.com/ethereum/go-ethereum/rpc"
  19. "github.com/prometheus/client_golang/prometheus/promauto"
  20. "github.com/prometheus/client_golang/prometheus"
  21. eth_common "github.com/ethereum/go-ethereum/common"
  22. eth_hexutil "github.com/ethereum/go-ethereum/common/hexutil"
  23. gethTypes "github.com/ethereum/go-ethereum/core/types"
  24. "go.uber.org/zap"
  25. "github.com/certusone/wormhole/node/pkg/common"
  26. "github.com/certusone/wormhole/node/pkg/query"
  27. "github.com/certusone/wormhole/node/pkg/readiness"
  28. "github.com/certusone/wormhole/node/pkg/supervisor"
  29. "github.com/certusone/wormhole/node/pkg/txverifier"
  30. "github.com/wormhole-foundation/wormhole/sdk"
  31. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  32. )
  33. var (
  34. ethConnectionErrors = promauto.NewCounterVec(
  35. prometheus.CounterOpts{
  36. Name: "wormhole_eth_connection_errors_total",
  37. Help: "Total number of Ethereum connection errors (either during initial connection or while watching)",
  38. }, []string{"eth_network", "reason"})
  39. ethMessagesObserved = promauto.NewCounterVec(
  40. prometheus.CounterOpts{
  41. Name: "wormhole_eth_messages_observed_total",
  42. Help: "Total number of Eth messages observed (pre-confirmation)",
  43. }, []string{"eth_network"})
  44. ethMessagesOrphaned = promauto.NewCounterVec(
  45. prometheus.CounterOpts{
  46. Name: "wormhole_eth_messages_orphaned_total",
  47. Help: "Total number of Eth messages dropped (orphaned)",
  48. }, []string{"eth_network", "reason"})
  49. ethMessagesConfirmed = promauto.NewCounterVec(
  50. prometheus.CounterOpts{
  51. Name: "wormhole_eth_messages_confirmed_total",
  52. Help: "Total number of Eth messages verified (post-confirmation)",
  53. }, []string{"eth_network"})
  54. currentEthHeight = promauto.NewGaugeVec(
  55. prometheus.GaugeOpts{
  56. Name: "wormhole_eth_current_height",
  57. Help: "Current Ethereum block height",
  58. }, []string{"eth_network"})
  59. currentEthSafeHeight = promauto.NewGaugeVec(
  60. prometheus.GaugeOpts{
  61. Name: "wormhole_eth_current_safe_height",
  62. Help: "Current Ethereum safe block height",
  63. }, []string{"eth_network"})
  64. currentEthFinalizedHeight = promauto.NewGaugeVec(
  65. prometheus.GaugeOpts{
  66. Name: "wormhole_eth_current_finalized_height",
  67. Help: "Current Ethereum finalized block height",
  68. }, []string{"eth_network"})
  69. queryLatency = promauto.NewHistogramVec(
  70. prometheus.HistogramOpts{
  71. Name: "wormhole_eth_query_latency",
  72. Help: "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those",
  73. }, []string{"eth_network", "operation"})
  74. )
  75. type (
  76. Watcher struct {
  77. // EVM RPC url.
  78. url string
  79. // Address of the EVM contract
  80. contract eth_common.Address
  81. // Human-readable name of the EVM network, for logging and monitoring.
  82. networkName string
  83. // Readiness component
  84. readinessSync readiness.Component
  85. // VAA ChainID of the network monitored by this watcher.
  86. chainID vaa.ChainID
  87. // Channel for sending new MesssagePublications. Messages should not be sent
  88. // to this channel directly. Instead, they should be wrapped by
  89. // a call to `verifyAndPublish()`.
  90. msgC chan<- *common.MessagePublication
  91. // Channel to send guardian set changes to.
  92. // setC can be set to nil if no guardian set changes are needed.
  93. //
  94. // We currently only fetch the guardian set from one primary chain, which should
  95. // have this flag set to true, and false on all others.
  96. //
  97. // The current primary chain is Ethereum (a mostly arbitrary decision because it
  98. // has the best API - we might want to switch the primary chain to Solana once
  99. // the governance mechanism lives there),
  100. setC chan<- *common.GuardianSet
  101. // Incoming re-observation requests from the network. Pre-filtered to only
  102. // include requests for our chainID.
  103. obsvReqC <-chan *gossipv1.ObservationRequest
  104. // Incoming query requests from the network. Pre-filtered to only
  105. // include requests for our chainID.
  106. queryReqC <-chan *query.PerChainQueryInternal
  107. // Outbound query responses to query requests
  108. queryResponseC chan<- *query.PerChainQueryResponseInternal
  109. pending map[pendingKey]*pendingMessage
  110. pendingMu sync.Mutex
  111. // 0 is a valid guardian set, so we need a nil value here
  112. currentGuardianSet *uint32
  113. // Interface to the chain specific ethereum library.
  114. ethConn connectors.Connector
  115. env common.Environment
  116. logger *zap.Logger
  117. latestBlockNumber uint64
  118. latestSafeBlockNumber uint64
  119. latestFinalizedBlockNumber uint64
  120. ccqConfig query.PerChainConfig
  121. ccqMaxBlockNumber *big.Int
  122. ccqTimestampCache *BlocksByTimestamp
  123. ccqBackfillChannel chan *ccqBackfillRequest
  124. ccqBatchSize int64
  125. ccqBackfillCache bool
  126. ccqLogger *zap.Logger
  127. // Whether the Transfer Verifier should be initialized for this watcher.
  128. txVerifierEnabled bool
  129. // Transfer Verifier instance. If nil, transfer verification is disabled.
  130. txVerifier txverifier.TransferVerifierInterface
  131. cclEnabled bool
  132. cclLogger *zap.Logger
  133. cclAddr eth_common.Address
  134. cclCache CCLCache
  135. cclCacheLock sync.Mutex
  136. }
  137. pendingKey struct {
  138. TxHash eth_common.Hash
  139. BlockHash eth_common.Hash
  140. EmitterAddress vaa.Address
  141. Sequence uint64
  142. }
  143. pendingMessage struct {
  144. message *common.MessagePublication
  145. height uint64
  146. additionalBlocks uint64
  147. }
  148. )
  149. const (
  150. // MaxWaitConfirmations is the maximum number of confirmations to wait before declaring a transaction abandoned.
  151. MaxWaitConfirmations = 60
  152. // pruneHeightDelta is the block height difference between the latest block and the oldest block to keep in memory.
  153. // It is used as a parameter for the Transfer Verifier.
  154. // Value is arbitrary and can be adjusted if it helps performance.
  155. PruneHeightDelta = uint64(20)
  156. )
  157. func NewEthWatcher(
  158. url string,
  159. contract eth_common.Address,
  160. networkName string,
  161. chainID vaa.ChainID,
  162. msgC chan<- *common.MessagePublication,
  163. setC chan<- *common.GuardianSet,
  164. obsvReqC <-chan *gossipv1.ObservationRequest,
  165. queryReqC <-chan *query.PerChainQueryInternal,
  166. queryResponseC chan<- *query.PerChainQueryResponseInternal,
  167. env common.Environment,
  168. ccqBackfillCache bool,
  169. txVerifierEnabled bool,
  170. ) *Watcher {
  171. // Note: the watcher's txVerifier field is not set here because it requires a Connector as an argument.
  172. // Instead, it will be populated in `Run()`.
  173. return &Watcher{
  174. url: url,
  175. contract: contract,
  176. networkName: networkName,
  177. readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID),
  178. chainID: chainID,
  179. msgC: msgC,
  180. setC: setC,
  181. obsvReqC: obsvReqC,
  182. queryReqC: queryReqC,
  183. queryResponseC: queryResponseC,
  184. pending: map[pendingKey]*pendingMessage{},
  185. env: env,
  186. ccqConfig: query.GetPerChainConfig(chainID),
  187. ccqMaxBlockNumber: big.NewInt(0).SetUint64(math.MaxUint64),
  188. ccqBackfillCache: ccqBackfillCache,
  189. ccqBackfillChannel: make(chan *ccqBackfillRequest, 50),
  190. // Signals that a transfer Verifier should be instantiated in Run()
  191. txVerifierEnabled: txVerifierEnabled,
  192. }
  193. }
  194. func (w *Watcher) tokenBridge() eth_common.Address {
  195. var tb []byte
  196. switch w.env {
  197. case common.UnsafeDevNet, common.AccountantMock, common.GoTest:
  198. tb = sdk.KnownDevnetTokenbridgeEmitters[w.chainID]
  199. case common.TestNet:
  200. tb = sdk.KnownTestnetTokenbridgeEmitters[w.chainID]
  201. case common.MainNet:
  202. tb = sdk.KnownTokenbridgeEmitters[w.chainID]
  203. }
  204. return eth_common.BytesToAddress(tb)
  205. }
  206. func (w *Watcher) wrappedNative() eth_common.Address {
  207. var wnative string
  208. switch w.env {
  209. case common.UnsafeDevNet, common.AccountantMock, common.GoTest:
  210. wnative = sdk.KnownDevnetWrappedNativeAddresses[w.chainID]
  211. case common.TestNet:
  212. wnative = sdk.KnownTestnetWrappedNativeAddresses[w.chainID]
  213. case common.MainNet:
  214. wnative = sdk.KnownWrappedNativeAddress[w.chainID]
  215. }
  216. return eth_common.HexToAddress(wnative)
  217. }
  218. func (w *Watcher) Run(parentCtx context.Context) error {
  219. var err error
  220. logger := supervisor.Logger(parentCtx)
  221. w.logger = logger
  222. w.ccqLogger = logger.With(zap.String("component", "ccqevm"))
  223. w.cclLogger = logger.With(zap.String("component", "cclevm"))
  224. logger.Info("Starting watcher",
  225. zap.String("watcher_name", "evm"),
  226. zap.String("url", w.url),
  227. zap.String("contract", w.contract.String()),
  228. zap.String("networkName", w.networkName),
  229. zap.String("chainID", w.chainID.String()),
  230. zap.String("env", string(w.env)),
  231. zap.Bool("txVerifier", w.txVerifierEnabled),
  232. )
  233. // later on we will spawn multiple go-routines through `RunWithScissors`, i.e. catching panics.
  234. // If any of them panic, this function will return, causing this child context to be canceled
  235. // such that the other go-routines can free up resources
  236. ctx, watcherContextCancelFunc := context.WithCancel(parentCtx)
  237. defer watcherContextCancelFunc()
  238. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  239. p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
  240. ContractAddress: w.contract.Hex(),
  241. })
  242. // Verify that we are connecting to the correct chain.
  243. if err := w.verifyEvmChainID(ctx, logger, w.url); err != nil {
  244. return fmt.Errorf("failed to verify evm chain id: %w", err)
  245. }
  246. // Connect to the node using the appropriate type of connector.
  247. {
  248. var finalizedPollingSupported, safePollingSupported bool
  249. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  250. w.ethConn, finalizedPollingSupported, safePollingSupported, err = w.createConnector(timeout, w.url)
  251. cancel()
  252. if err != nil {
  253. ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
  254. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  255. return fmt.Errorf(`failed to create connection to url "%s": %w`, w.url, err)
  256. }
  257. // Log the connector details for troubleshooting purposes.
  258. if finalizedPollingSupported {
  259. if safePollingSupported {
  260. w.logger.Info("polling for finalized and safe blocks")
  261. } else {
  262. w.logger.Info("polling for finalized blocks, will generate safe blocks")
  263. }
  264. } else {
  265. w.logger.Info("assuming instant finality")
  266. }
  267. // Initialize a Transfer Verifier
  268. if w.txVerifierEnabled {
  269. // This shouldn't happen as Transfer Verification can
  270. // only be enabled by passing at least one chainID as a
  271. // CLI flag to guardiand, but this prevents the code
  272. // from erroneously setting up a Transfer Verifier or
  273. // else continuing in state where txVerifierEnabled is
  274. // true but the actual Transfer Verifier is nil.
  275. if !slices.Contains(txverifier.SupportedChains(), w.chainID) {
  276. return errors.New("watcher attempted to create Transfer Verifier but this chainId is not supported")
  277. }
  278. var tvErr error
  279. w.txVerifier, tvErr = txverifier.NewTransferVerifier(
  280. ctx,
  281. w.ethConn,
  282. &txverifier.TVAddresses{
  283. CoreBridgeAddr: w.contract,
  284. TokenBridgeAddr: w.tokenBridge(),
  285. WrappedNativeAddr: w.wrappedNative(),
  286. },
  287. PruneHeightDelta,
  288. logger,
  289. )
  290. if tvErr != nil {
  291. return fmt.Errorf("failed to create Transfer Verifier instance: %w", tvErr)
  292. }
  293. logger.Info("initialized Transfer Verifier",
  294. zap.String("watcher_name", "evm"),
  295. zap.String("url", w.url),
  296. zap.String("contract", w.contract.String()),
  297. )
  298. }
  299. }
  300. if w.ccqConfig.TimestampCacheSupported {
  301. w.ccqTimestampCache = NewBlocksByTimestamp(BTS_MAX_BLOCKS, (w.env == common.UnsafeDevNet))
  302. }
  303. // Get the node version for troubleshooting
  304. w.logVersion(ctx, logger)
  305. errC := make(chan error)
  306. // Subscribe to new message publications. We don't use a timeout here because the LogPollConnector
  307. // will keep running. Other connectors will use a timeout internally if appropriate.
  308. messageC := make(chan *ethabi.AbiLogMessagePublished, 2)
  309. messageSub, err := w.ethConn.WatchLogMessagePublished(ctx, errC, messageC)
  310. if err != nil {
  311. ethConnectionErrors.WithLabelValues(w.networkName, "subscribe_error").Inc()
  312. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  313. return fmt.Errorf("failed to subscribe to message publication events: %w", err)
  314. }
  315. defer messageSub.Unsubscribe()
  316. // Fetch initial guardian set
  317. if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil {
  318. return fmt.Errorf("failed to request guardian set: %v", err)
  319. }
  320. // Poll for guardian set.
  321. common.RunWithScissors(ctx, errC, "evm_fetch_guardian_set", func(ctx context.Context) error {
  322. t := time.NewTicker(15 * time.Second)
  323. defer t.Stop()
  324. for {
  325. select {
  326. case <-ctx.Done():
  327. return nil
  328. case <-t.C:
  329. if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil {
  330. errC <- fmt.Errorf("failed to request guardian set: %v", err) //nolint:channelcheck // The watcher will exit anyway
  331. return nil
  332. }
  333. }
  334. }
  335. })
  336. common.RunWithScissors(ctx, errC, "evm_fetch_objs_req", func(ctx context.Context) error {
  337. for {
  338. select {
  339. case <-ctx.Done():
  340. return nil
  341. case r := <-w.obsvReqC:
  342. if r.ChainId > math.MaxUint16 {
  343. logger.Error("chain id for observation request is not a valid uint16",
  344. zap.Uint32("chainID", r.ChainId),
  345. zap.String("txID", hex.EncodeToString(r.TxHash)),
  346. )
  347. continue
  348. }
  349. numObservations, err := w.handleReobservationRequest(
  350. ctx,
  351. vaa.ChainID(r.ChainId),
  352. r.TxHash,
  353. w.ethConn,
  354. atomic.LoadUint64(&w.latestFinalizedBlockNumber),
  355. atomic.LoadUint64(&w.latestSafeBlockNumber),
  356. )
  357. if err != nil {
  358. logger.Error("failed to process observation request",
  359. zap.Uint32("chainID", r.ChainId),
  360. zap.String("txID", hex.EncodeToString(r.TxHash)),
  361. zap.Error(err),
  362. )
  363. }
  364. logger.Info("reobserved transactions",
  365. zap.Uint32("chainID", r.ChainId),
  366. zap.String("txID", hex.EncodeToString(r.TxHash)),
  367. zap.Uint32("numObservations", numObservations),
  368. )
  369. }
  370. }
  371. })
  372. if w.ccqConfig.QueriesSupported() {
  373. w.ccqStart(ctx, errC)
  374. }
  375. if err := w.cclEnable(ctx); err != nil {
  376. return fmt.Errorf("failed to enable custom consistency level: %w", err)
  377. }
  378. common.RunWithScissors(ctx, errC, "evm_fetch_messages", func(ctx context.Context) error {
  379. for {
  380. select {
  381. case <-ctx.Done():
  382. return nil
  383. case err := <-messageSub.Err():
  384. ethConnectionErrors.WithLabelValues(w.networkName, "subscription_error").Inc()
  385. errC <- fmt.Errorf("error while processing message publication subscription: %w", err) //nolint:channelcheck // The watcher will exit anyway
  386. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  387. return nil
  388. case ev := <-messageC:
  389. blockTime, err := w.getBlockTime(ctx, ev.Raw.BlockHash)
  390. if err != nil {
  391. ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc()
  392. if canRetryGetBlockTime(err) {
  393. go w.waitForBlockTime(ctx, logger, errC, ev)
  394. continue
  395. }
  396. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  397. errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err) //nolint:channelcheck // The watcher will exit anyway
  398. return nil
  399. }
  400. w.postMessage(ctx, ev, blockTime)
  401. }
  402. }
  403. })
  404. // Watch headers
  405. headSink := make(chan *connectors.NewBlock, 100)
  406. headerSubscription, err := w.ethConn.SubscribeForBlocks(ctx, errC, headSink)
  407. if err != nil {
  408. ethConnectionErrors.WithLabelValues(w.networkName, "header_subscribe_error").Inc()
  409. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  410. return fmt.Errorf("failed to subscribe to header events: %w", err)
  411. }
  412. defer headerSubscription.Unsubscribe()
  413. common.RunWithScissors(ctx, errC, "evm_fetch_headers", func(ctx context.Context) error {
  414. stats := gossipv1.Heartbeat_Network{ContractAddress: w.contract.Hex()}
  415. for {
  416. select {
  417. case <-ctx.Done():
  418. return nil
  419. case err := <-headerSubscription.Err():
  420. logger.Error("error while processing header subscription", zap.Error(err))
  421. ethConnectionErrors.WithLabelValues(w.networkName, "header_subscription_error").Inc()
  422. errC <- fmt.Errorf("error while processing header subscription: %w", err) //nolint:channelcheck // The watcher will exit anyway
  423. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  424. return nil
  425. case ev := <-headSink:
  426. // These two pointers should have been checked before the event was placed on the channel, but just being safe.
  427. if ev == nil {
  428. logger.Error("new header event is nil")
  429. continue
  430. }
  431. if ev.Number == nil {
  432. logger.Error("new header block number is nil", zap.Stringer("finality", ev.Finality))
  433. continue
  434. }
  435. start := time.Now()
  436. currentHash := ev.Hash
  437. logger.Debug("processing new header",
  438. zap.Stringer("current_block", ev.Number),
  439. zap.Uint64("block_time", ev.Time),
  440. zap.Stringer("current_blockhash", currentHash),
  441. zap.Stringer("finality", ev.Finality),
  442. )
  443. readiness.SetReady(w.readinessSync)
  444. blockNumberU := ev.Number.Uint64()
  445. var thisConsistencyLevel uint8
  446. switch ev.Finality {
  447. case connectors.Latest:
  448. thisConsistencyLevel = vaa.ConsistencyLevelPublishImmediately
  449. atomic.StoreUint64(&w.latestBlockNumber, blockNumberU)
  450. currentEthHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU))
  451. stats.Height = int64(blockNumberU) // #nosec G115 -- This conversion is safe indefinitely
  452. w.ccqAddLatestBlock(ev)
  453. case connectors.Safe:
  454. thisConsistencyLevel = vaa.ConsistencyLevelSafe
  455. atomic.StoreUint64(&w.latestSafeBlockNumber, blockNumberU)
  456. currentEthSafeHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU))
  457. stats.SafeHeight = int64(blockNumberU) // #nosec G115 -- This conversion is safe indefinitely
  458. case connectors.Finalized:
  459. thisConsistencyLevel = vaa.ConsistencyLevelFinalized
  460. atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU)
  461. currentEthFinalizedHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU))
  462. stats.FinalizedHeight = int64(blockNumberU) // #nosec G115 -- This conversion is safe indefinitely
  463. default:
  464. logger.Error("unexpected finality in block", zap.Stringer("finality", ev.Finality), zap.Any("event", ev))
  465. errC <- fmt.Errorf("unexpected finality in block: %v", ev.Finality)
  466. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  467. return nil
  468. }
  469. w.updateNetworkStats(&stats)
  470. w.pendingMu.Lock()
  471. for key, pLock := range w.pending {
  472. // Don't process the observation if it is waiting on a different consistency level.
  473. if !consistencyLevelMatches(thisConsistencyLevel, pLock.message.ConsistencyLevel) {
  474. continue
  475. }
  476. // Don't process the observation if we haven't reached the desired block height yet.
  477. if pLock.height+pLock.additionalBlocks > blockNumberU {
  478. continue
  479. }
  480. // Transaction is now ready
  481. msm := time.Now()
  482. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  483. tx, err := w.ethConn.TransactionReceipt(timeout, eth_common.BytesToHash(pLock.message.TxID))
  484. queryLatency.WithLabelValues(w.networkName, "transaction_receipt").Observe(time.Since(msm).Seconds())
  485. cancel()
  486. // If the node returns an error after waiting expectedConfirmation blocks,
  487. // it means the chain reorged and the transaction was orphaned. The
  488. // TransactionReceipt call is using the same websocket connection than the
  489. // head notifications, so it's guaranteed to be atomic.
  490. //
  491. // Check multiple possible error cases - the node seems to return a
  492. // "not found" error most of the time, but it could conceivably also
  493. // return a nil tx or rpc.ErrNoResult.
  494. if tx == nil || errors.Is(err, rpc.ErrNoResult) || (err != nil && err.Error() == "not found") {
  495. logger.Warn("tx was orphaned",
  496. zap.String("msgId", pLock.message.MessageIDString()),
  497. zap.String("txHash", pLock.message.TxIDString()),
  498. zap.Stringer("blockHash", key.BlockHash),
  499. zap.Uint64("observedHeight", pLock.height),
  500. zap.Uint64("additionalBlocks", pLock.additionalBlocks),
  501. zap.Stringer("current_blockNum", ev.Number),
  502. zap.Stringer("finality", ev.Finality),
  503. zap.Stringer("current_blockHash", currentHash),
  504. zap.Error(err))
  505. delete(w.pending, key)
  506. ethMessagesOrphaned.WithLabelValues(w.networkName, "not_found").Inc()
  507. continue
  508. }
  509. // This should never happen - if we got this far, it means that logs were emitted,
  510. // which is only possible if the transaction succeeded. We check it anyway just
  511. // in case the EVM implementation is buggy.
  512. if tx.Status != 1 {
  513. logger.Error("transaction receipt with non-success status",
  514. zap.String("msgId", pLock.message.MessageIDString()),
  515. zap.String("txHash", pLock.message.TxIDString()),
  516. zap.Stringer("blockHash", key.BlockHash),
  517. zap.Uint64("observedHeight", pLock.height),
  518. zap.Uint64("additionalBlocks", pLock.additionalBlocks),
  519. zap.Stringer("current_blockNum", ev.Number),
  520. zap.Stringer("finality", ev.Finality),
  521. zap.Stringer("current_blockHash", currentHash),
  522. zap.Error(err))
  523. delete(w.pending, key)
  524. ethMessagesOrphaned.WithLabelValues(w.networkName, "tx_failed").Inc()
  525. continue
  526. }
  527. // Any error other than "not found" is likely transient - we retry next block.
  528. if err != nil {
  529. if pLock.height+MaxWaitConfirmations <= blockNumberU {
  530. // An error from this "transient" case has persisted for more than MaxWaitConfirmations.
  531. logger.Info("observation timed out",
  532. zap.String("msgId", pLock.message.MessageIDString()),
  533. zap.String("txHash", pLock.message.TxIDString()),
  534. zap.Stringer("blockHash", key.BlockHash),
  535. zap.Uint64("observedHeight", pLock.height),
  536. zap.Uint64("additionalBlocks", pLock.additionalBlocks),
  537. zap.Stringer("current_blockNum", ev.Number),
  538. zap.Stringer("finality", ev.Finality),
  539. zap.Stringer("current_blockHash", currentHash),
  540. )
  541. ethMessagesOrphaned.WithLabelValues(w.networkName, "timeout").Inc()
  542. delete(w.pending, key)
  543. } else {
  544. logger.Warn("transaction could not be fetched",
  545. zap.String("msgId", pLock.message.MessageIDString()),
  546. zap.String("txHash", pLock.message.TxIDString()),
  547. zap.Stringer("blockHash", key.BlockHash),
  548. zap.Uint64("observedHeight", pLock.height),
  549. zap.Uint64("additionalBlocks", pLock.additionalBlocks),
  550. zap.Stringer("current_blockNum", ev.Number),
  551. zap.Stringer("finality", ev.Finality),
  552. zap.Stringer("current_blockHash", currentHash),
  553. zap.Error(err))
  554. }
  555. continue
  556. }
  557. // It's possible for a transaction to be orphaned and then included in a different block
  558. // but with the same tx hash. Drop the observation (it will be re-observed and needs to
  559. // wait for the full confirmation time again).
  560. if tx.BlockHash != key.BlockHash {
  561. logger.Info("tx got dropped and mined in a different block; the message should have been reobserved",
  562. zap.String("msgId", pLock.message.MessageIDString()),
  563. zap.String("txHash", pLock.message.TxIDString()),
  564. zap.Stringer("blockHash", key.BlockHash),
  565. zap.Uint64("observedHeight", pLock.height),
  566. zap.Uint64("additionalBlocks", pLock.additionalBlocks),
  567. zap.Stringer("current_blockNum", ev.Number),
  568. zap.Stringer("finality", ev.Finality),
  569. zap.Stringer("current_blockHash", currentHash),
  570. )
  571. delete(w.pending, key)
  572. ethMessagesOrphaned.WithLabelValues(w.networkName, "blockhash_mismatch").Inc()
  573. continue
  574. }
  575. logger.Info("observation confirmed",
  576. zap.String("msgId", pLock.message.MessageIDString()),
  577. zap.String("txHash", pLock.message.TxIDString()),
  578. zap.Stringer("blockHash", key.BlockHash),
  579. zap.Uint64("observedHeight", pLock.height),
  580. zap.Uint64("additionalBlocks", pLock.additionalBlocks),
  581. zap.Stringer("current_blockNum", ev.Number),
  582. zap.Stringer("finality", ev.Finality),
  583. zap.Stringer("current_blockHash", currentHash),
  584. )
  585. delete(w.pending, key)
  586. // Note that `tx` here is actually a receipt
  587. txHash := eth_common.Hash(pLock.message.TxID)
  588. pubErr := w.verifyAndPublish(pLock.message, ctx, txHash, tx)
  589. if pubErr != nil {
  590. logger.Error("could not publish message",
  591. zap.String("msgId", pLock.message.MessageIDString()),
  592. zap.String("txHash", txHash.String()),
  593. zap.Error(pubErr),
  594. )
  595. }
  596. }
  597. w.pendingMu.Unlock()
  598. logger.Debug("processed new header",
  599. zap.Stringer("current_block", ev.Number),
  600. zap.Stringer("finality", ev.Finality),
  601. zap.Stringer("current_blockhash", currentHash),
  602. zap.Duration("took", time.Since(start)),
  603. )
  604. }
  605. }
  606. })
  607. // Now that the init is complete, peg readiness. That will also happen when we process a new head, but chains
  608. // that wait for finality may take a while to receive the first block and we don't want to hold up the init.
  609. readiness.SetReady(w.readinessSync)
  610. select {
  611. case <-ctx.Done():
  612. return ctx.Err()
  613. case err := <-errC:
  614. return err
  615. }
  616. }
  617. func (w *Watcher) fetchAndUpdateGuardianSet(
  618. logger *zap.Logger,
  619. ctx context.Context,
  620. ethConn connectors.Connector,
  621. ) error {
  622. msm := time.Now()
  623. logger.Debug("fetching guardian set")
  624. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  625. defer cancel()
  626. idx, gs, err := fetchCurrentGuardianSet(timeout, ethConn)
  627. if err != nil {
  628. ethConnectionErrors.WithLabelValues(w.networkName, "guardian_set_fetch_error").Inc()
  629. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  630. return err
  631. }
  632. queryLatency.WithLabelValues(w.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
  633. if w.currentGuardianSet != nil && *(w.currentGuardianSet) == idx {
  634. return nil
  635. }
  636. logger.Info("updated guardian set found", zap.Any("value", gs), zap.Uint32("index", idx))
  637. w.currentGuardianSet = &idx
  638. if w.setC != nil {
  639. w.setC <- common.NewGuardianSet(gs.Keys, idx) //nolint:channelcheck // Will only block the guardian set update routine
  640. }
  641. return nil
  642. }
  643. // Fetch the current guardian set ID and guardian set from the chain.
  644. func fetchCurrentGuardianSet(ctx context.Context, ethConn connectors.Connector) (uint32, *ethabi.StructsGuardianSet, error) {
  645. currentIndex, err := ethConn.GetCurrentGuardianSetIndex(ctx)
  646. if err != nil {
  647. return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err)
  648. }
  649. gs, err := ethConn.GetGuardianSet(ctx, currentIndex)
  650. if err != nil {
  651. return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
  652. }
  653. return currentIndex, &gs, nil
  654. }
  655. // getFinality determines if the chain supports "finalized" and "safe". This is hard coded so it requires thought to change something. However, it also reads the RPC
  656. // to make sure the node actually supports the expected values, and returns an error if it doesn't. Note that we do not support using safe mode but not finalized mode.
  657. func (w *Watcher) getFinality(ctx context.Context) (bool, bool, error) {
  658. finalized, safe, err := GetFinality(w.env, w.chainID)
  659. if err != nil {
  660. return false, false, fmt.Errorf("failed to get finality for %s chain %v: %v", w.env, w.chainID, err)
  661. }
  662. // If finalized / safe should be supported, read the RPC to make sure they actually are.
  663. if finalized {
  664. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  665. defer cancel()
  666. c, err := rpc.DialContext(timeout, w.url)
  667. if err != nil {
  668. return false, false, fmt.Errorf("failed to connect to endpoint: %w", err)
  669. }
  670. type Marshaller struct {
  671. Number *eth_hexutil.Big
  672. }
  673. var m Marshaller
  674. err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "finalized", false)
  675. if err != nil || m.Number == nil {
  676. return false, false, fmt.Errorf("finalized not supported by the node when it should be")
  677. }
  678. if safe {
  679. err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "safe", false)
  680. if err != nil || m.Number == nil {
  681. return false, false, fmt.Errorf("safe not supported by the node when it should be")
  682. }
  683. }
  684. }
  685. return finalized, safe, nil
  686. }
  687. // getLatestFinalizedBlockNumber() returns the latest finalized block seen by this watcher.
  688. func (w *Watcher) getLatestFinalizedBlockNumber() uint64 {
  689. return atomic.LoadUint64(&w.latestFinalizedBlockNumber)
  690. }
  691. // getLatestSafeBlockNumber() returns the latest safe block seen by this watcher.
  692. func (w *Watcher) getLatestSafeBlockNumber() uint64 {
  693. return atomic.LoadUint64(&w.latestSafeBlockNumber)
  694. }
  695. func (w *Watcher) updateNetworkStats(stats *gossipv1.Heartbeat_Network) {
  696. p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
  697. Height: stats.Height,
  698. SafeHeight: stats.SafeHeight,
  699. FinalizedHeight: stats.FinalizedHeight,
  700. ContractAddress: w.contract.Hex(),
  701. })
  702. }
  703. // getBlockTime reads the time of a block.
  704. func (w *Watcher) getBlockTime(ctx context.Context, blockHash eth_common.Hash) (uint64, error) {
  705. msm := time.Now()
  706. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  707. blockTime, err := w.ethConn.TimeOfBlockByHash(timeout, blockHash)
  708. cancel()
  709. queryLatency.WithLabelValues(w.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
  710. return blockTime, err
  711. }
  712. // postMessage creates a message object from a log event and adds it to the pending list for processing.
  713. func (w *Watcher) postMessage(
  714. parentCtx context.Context,
  715. ev *ethabi.AbiLogMessagePublished,
  716. blockTime uint64,
  717. ) {
  718. msg := &common.MessagePublication{
  719. TxID: ev.Raw.TxHash.Bytes(),
  720. Timestamp: time.Unix(int64(blockTime), 0), // #nosec G115 -- This conversion is safe indefinitely
  721. Nonce: ev.Nonce,
  722. Sequence: ev.Sequence,
  723. EmitterChain: w.chainID,
  724. EmitterAddress: PadAddress(ev.Sender),
  725. Payload: ev.Payload,
  726. ConsistencyLevel: ev.ConsistencyLevel,
  727. }
  728. ethMessagesObserved.WithLabelValues(w.networkName).Inc()
  729. if msg.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
  730. w.logger.Info("found new message publication transaction, publishing it immediately",
  731. zap.String("msgId", msg.MessageIDString()),
  732. zap.String("txHash", msg.TxIDString()),
  733. zap.Uint64("blockNum", ev.Raw.BlockNumber),
  734. zap.Uint64("latestFinalizedBlock", atomic.LoadUint64(&w.latestFinalizedBlockNumber)),
  735. zap.Stringer("blockHash", ev.Raw.BlockHash),
  736. zap.Uint64("blockTime", blockTime),
  737. zap.Uint32("Nonce", ev.Nonce),
  738. zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
  739. )
  740. verifyCtx, cancel := context.WithCancel(parentCtx)
  741. defer cancel()
  742. pubErr := w.verifyAndPublish(msg, verifyCtx, ev.Raw.TxHash, nil)
  743. if pubErr != nil {
  744. w.logger.Error("could not publish message: transfer verification failed",
  745. zap.String("msgId", msg.MessageIDString()),
  746. zap.String("txHash", msg.TxIDString()),
  747. zap.Error(pubErr),
  748. )
  749. }
  750. return
  751. }
  752. pendingEntry := &pendingMessage{
  753. message: msg,
  754. height: ev.Raw.BlockNumber,
  755. }
  756. if msg.ConsistencyLevel == vaa.ConsistencyLevelCustom {
  757. // Note: This function may modify the contents of pendingEntry.
  758. w.cclHandleMessage(parentCtx, pendingEntry, ev.Sender)
  759. }
  760. w.logger.Info("found new message publication transaction",
  761. zap.String("msgId", msg.MessageIDString()),
  762. zap.String("txHash", msg.TxIDString()),
  763. zap.Uint64("reportedBlockNum", ev.Raw.BlockNumber),
  764. zap.Uint64("latestBlock", atomic.LoadUint64(&w.latestBlockNumber)),
  765. zap.Uint64("latestFinalizedBlock", atomic.LoadUint64(&w.latestFinalizedBlockNumber)),
  766. zap.Uint64("latestSafeBlock", atomic.LoadUint64(&w.latestSafeBlockNumber)),
  767. zap.Stringer("blockHash", ev.Raw.BlockHash),
  768. zap.Uint64("blockTime", blockTime),
  769. zap.Uint32("Nonce", ev.Nonce),
  770. zap.Uint8("OrigConsistencyLevel", ev.ConsistencyLevel),
  771. zap.Uint8("ConsistencyLevel", pendingEntry.message.ConsistencyLevel),
  772. zap.Uint64("AdditionalBlocks", pendingEntry.additionalBlocks),
  773. )
  774. key := pendingKey{
  775. TxHash: eth_common.BytesToHash(msg.TxID),
  776. BlockHash: ev.Raw.BlockHash,
  777. EmitterAddress: msg.EmitterAddress,
  778. Sequence: msg.Sequence,
  779. }
  780. w.pendingMu.Lock()
  781. w.pending[key] = pendingEntry
  782. w.pendingMu.Unlock()
  783. }
  784. // blockNotFoundErrors is used by `canRetryGetBlockTime`. It is a map of the error returns from `getBlockTime` that can trigger a retry.
  785. var blockNotFoundErrors = map[string]struct{}{
  786. "not found": {},
  787. "Unknown block": {},
  788. "cannot query unfinalized data": {}, // Seen on Avalanche
  789. }
  790. // canRetryGetBlockTime returns true if the error returned by getBlockTime warrants doing a retry.
  791. func canRetryGetBlockTime(err error) bool {
  792. _, exists := blockNotFoundErrors[err.Error()]
  793. return exists
  794. }
  795. // verifyAndPublish validates a MessagePublication to ensure that it's safe. If so, it broadcasts the message. This function
  796. // should be the only location where the watcher's msgC channel is written to.
  797. // Modifies the verificationState field of the message as a side-effect.
  798. // Even if an invalid Transfer is detected, the message will still be published. It is the responsibility of the calling code to handle
  799. // a status of Rejected.
  800. // Note that the result of verification is not returned by this function, but can be accessed directly via the reference to message.
  801. func (w *Watcher) verifyAndPublish(
  802. // Must be non-nil and have verificationState equal to NotVerified.
  803. msg *common.MessagePublication,
  804. ctx context.Context,
  805. // TODO: in practice it might be possible to read the txHash from the MessagePublication and so this argument might be redundant
  806. txHash eth_common.Hash,
  807. // This argument is only used when Transfer Verifier is enabled. If nil, transfer verifier will fetch the receipt.
  808. // Otherwise, the receipt in the calling context can be passed here to save on RPC requests and parsing.
  809. receipt *gethTypes.Receipt,
  810. ) error {
  811. if msg == nil {
  812. return errors.New("verifyAndPublish: message publication cannot be nil")
  813. }
  814. if w.txVerifier != nil {
  815. verifiedMsg, err := verify(ctx, msg, txHash, receipt, w.txVerifier)
  816. if err != nil {
  817. return err
  818. }
  819. msg = &verifiedMsg
  820. w.logger.Debug(
  821. "verified transfer",
  822. msg.ZapFields()...,
  823. )
  824. }
  825. w.logger.Debug(
  826. "publishing new message publication",
  827. msg.ZapFields()...,
  828. )
  829. w.msgC <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
  830. ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
  831. if msg.IsReobservation {
  832. watchers.ReobservationsByChain.WithLabelValues(w.chainID.String(), "std").Inc()
  833. }
  834. return nil
  835. }
  836. // waitForBlockTime is a go routine that repeatedly attempts to read the block time for a single log event. It is used when the initial attempt to read
  837. // the block time fails. If it is finally able to read the block time, it posts the event for processing. Otherwise, it will eventually give up.
  838. func (w *Watcher) waitForBlockTime(ctx context.Context, logger *zap.Logger, errC chan error, ev *ethabi.AbiLogMessagePublished) {
  839. logger.Warn("found new message publication transaction but failed to look up block time, deferring processing",
  840. zap.String("msgId", msgIdFromLogEvent(w.chainID, ev)),
  841. zap.Stringer("txHash", ev.Raw.TxHash),
  842. zap.Uint64("blockNum", ev.Raw.BlockNumber),
  843. zap.Uint64("latestFinalizedBlock", atomic.LoadUint64(&w.latestFinalizedBlockNumber)),
  844. zap.Stringer("blockHash", ev.Raw.BlockHash),
  845. zap.Uint32("Nonce", ev.Nonce),
  846. zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
  847. )
  848. const RetryInterval = 5 * time.Second
  849. const MaxRetries = 3
  850. start := time.Now()
  851. t := time.NewTimer(RetryInterval)
  852. defer t.Stop()
  853. retries := 1
  854. for {
  855. select {
  856. case <-ctx.Done():
  857. return
  858. case <-t.C:
  859. blockTime, err := w.getBlockTime(ctx, ev.Raw.BlockHash)
  860. if err == nil {
  861. logger.Info("retry of block time query succeeded, posting transaction",
  862. zap.String("msgId", msgIdFromLogEvent(w.chainID, ev)),
  863. zap.Stringer("txHash", ev.Raw.TxHash),
  864. zap.Uint64("blockNum", ev.Raw.BlockNumber),
  865. zap.Stringer("blocHash", ev.Raw.BlockHash),
  866. zap.Uint64("blockTime", blockTime),
  867. zap.Uint32("Nonce", ev.Nonce),
  868. zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
  869. zap.Stringer("startTime", start),
  870. zap.Int("retries", retries),
  871. )
  872. w.postMessage(ctx, ev, blockTime)
  873. return
  874. }
  875. ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc()
  876. if !canRetryGetBlockTime(err) {
  877. p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
  878. errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err) //nolint:channelcheck // The watcher will exit anyway
  879. return
  880. }
  881. if retries >= MaxRetries {
  882. logger.Error("repeatedly failed to look up block time, giving up",
  883. zap.String("msgId", msgIdFromLogEvent(w.chainID, ev)),
  884. zap.Stringer("txHash", ev.Raw.TxHash),
  885. zap.Uint64("blockNum", ev.Raw.BlockNumber),
  886. zap.Stringer("blockHash", ev.Raw.BlockHash),
  887. zap.Uint32("Nonce", ev.Nonce),
  888. zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
  889. zap.Stringer("startTime", start),
  890. zap.Int("retries", retries),
  891. )
  892. return
  893. }
  894. retries++
  895. t.Reset(RetryInterval)
  896. }
  897. }
  898. }
  899. // logVersion runs the web3_clientVersion rpc and logs the node version
  900. func (w *Watcher) logVersion(ctx context.Context, logger *zap.Logger) {
  901. // From: https://ethereum.org/en/developers/docs/apis/json-rpc/#web3_clientversion
  902. var version string
  903. if err := w.ethConn.RawCallContext(ctx, &version, "web3_clientVersion"); err != nil {
  904. logger.Error("problem retrieving node version",
  905. zap.Error(err),
  906. zap.String("network", w.networkName),
  907. )
  908. return
  909. }
  910. logger.Info("node version",
  911. zap.String("version", version),
  912. zap.String("network", w.networkName),
  913. )
  914. }
  915. // msgIdFromLogEvent formats the message ID (chain/emitterAddress/seqNo) from a log event.
  916. func msgIdFromLogEvent(chainID vaa.ChainID, ev *ethabi.AbiLogMessagePublished) string {
  917. return fmt.Sprintf("%v/%v/%v", uint16(chainID), PadAddress(ev.Sender), ev.Sequence)
  918. }
  919. // createConnector determines the type of connector needed for a chain and creates the appropriate one.
  920. func (w *Watcher) createConnector(ctx context.Context, url string) (ethConn connectors.Connector, finalizedPollingSupported, safePollingSupported bool, err error) {
  921. finalizedPollingSupported, safePollingSupported, err = w.getFinality(ctx)
  922. if err != nil {
  923. err = fmt.Errorf("failed to determine finality: %w", err)
  924. return
  925. }
  926. baseConnector, err := connectors.NewEthereumBaseConnector(ctx, w.networkName, url, w.contract, w.logger)
  927. if err != nil {
  928. err = fmt.Errorf("dialing eth client failed: %w", err)
  929. return
  930. }
  931. // We support two types of pollers, the batch poller and the instant finality poller. Instantiate the right one.
  932. if finalizedPollingSupported {
  933. ethConn = connectors.NewBatchPollConnector(ctx, w.logger, baseConnector, safePollingSupported, 1000*time.Millisecond)
  934. } else {
  935. ethConn = connectors.NewInstantFinalityConnector(baseConnector, w.logger)
  936. }
  937. return
  938. }
  939. // consistencyLevelMatches returns true if the consistency level of this block "matches" the requested consistency level of an observation.
  940. // It matches if either the actual values match, or if this block is finalized and the requested value is not immediate (latest) or safe.
  941. // This extra check is necessary because the requested consistency level is assumed to be finalized unless they specifically ask for immediate or safe.
  942. func consistencyLevelMatches(thisConsistencyLevel uint8, requestedConsistencyLevel uint8) bool {
  943. if thisConsistencyLevel == requestedConsistencyLevel {
  944. return true
  945. }
  946. if thisConsistencyLevel != vaa.ConsistencyLevelFinalized {
  947. return false
  948. }
  949. return requestedConsistencyLevel != vaa.ConsistencyLevelPublishImmediately && requestedConsistencyLevel != vaa.ConsistencyLevelSafe
  950. }