watcher.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. package ethereum
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/certusone/wormhole/node/pkg/p2p"
  6. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  7. "github.com/ethereum/go-ethereum/rpc"
  8. "github.com/prometheus/client_golang/prometheus/promauto"
  9. "math/big"
  10. "sync"
  11. "time"
  12. "github.com/prometheus/client_golang/prometheus"
  13. "github.com/ethereum/go-ethereum/accounts/abi/bind"
  14. eth_common "github.com/ethereum/go-ethereum/common"
  15. "github.com/ethereum/go-ethereum/core/types"
  16. "github.com/ethereum/go-ethereum/ethclient"
  17. "go.uber.org/zap"
  18. "github.com/certusone/wormhole/node/pkg/common"
  19. "github.com/certusone/wormhole/node/pkg/ethereum/abi"
  20. "github.com/certusone/wormhole/node/pkg/readiness"
  21. "github.com/certusone/wormhole/node/pkg/supervisor"
  22. "github.com/certusone/wormhole/node/pkg/vaa"
  23. )
  24. var (
  25. ethConnectionErrors = promauto.NewCounterVec(
  26. prometheus.CounterOpts{
  27. Name: "wormhole_eth_connection_errors_total",
  28. Help: "Total number of Ethereum connection errors (either during initial connection or while watching)",
  29. }, []string{"eth_network", "reason"})
  30. ethMessagesObserved = promauto.NewCounterVec(
  31. prometheus.CounterOpts{
  32. Name: "wormhole_eth_messages_observed_total",
  33. Help: "Total number of Eth messages observed (pre-confirmation)",
  34. }, []string{"eth_network"})
  35. ethMessagesOrphaned = promauto.NewCounterVec(
  36. prometheus.CounterOpts{
  37. Name: "wormhole_eth_messages_orphaned_total",
  38. Help: "Total number of Eth messages dropped (orphaned)",
  39. }, []string{"eth_network", "reason"})
  40. ethMessagesConfirmed = promauto.NewCounterVec(
  41. prometheus.CounterOpts{
  42. Name: "wormhole_eth_messages_confirmed_total",
  43. Help: "Total number of Eth messages verified (post-confirmation)",
  44. }, []string{"eth_network"})
  45. currentEthHeight = promauto.NewGaugeVec(
  46. prometheus.GaugeOpts{
  47. Name: "wormhole_eth_current_height",
  48. Help: "Current Ethereum block height",
  49. }, []string{"eth_network"})
  50. queryLatency = promauto.NewHistogramVec(
  51. prometheus.HistogramOpts{
  52. Name: "wormhole_eth_query_latency",
  53. Help: "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those",
  54. }, []string{"eth_network", "operation"})
  55. )
  56. type (
  57. Watcher struct {
  58. // Ethereum RPC url
  59. url string
  60. // Address of the Eth contract contract
  61. contract eth_common.Address
  62. // Human-readable name of the Eth network, for logging and monitoring.
  63. networkName string
  64. // Readiness component
  65. readiness readiness.Component
  66. // VAA ChainID of the network we're connecting to.
  67. chainID vaa.ChainID
  68. // Channel to send new messages to.
  69. msgChan chan *common.MessagePublication
  70. // Channel to send guardian set changes to.
  71. // setChan can be set to nil if no guardian set changes are needed.
  72. //
  73. // We currently only fetch the guardian set from one primary chain, which should
  74. // have this flag set to true, and false on all others.
  75. //
  76. // The current primary chain is Ethereum (a mostly arbitrary decision because it
  77. // has the best API - we might want to switch the primary chain to Solana once
  78. // the governance mechanism lives there),
  79. setChan chan *common.GuardianSet
  80. pending map[pendingKey]*pendingMessage
  81. pendingMu sync.Mutex
  82. // 0 is a valid guardian set, so we need a nil value here
  83. currentGuardianSet *uint32
  84. // Minimum number of confirmations to accept, regardless of what the contract specifies.
  85. minConfirmations uint64
  86. }
  87. pendingKey struct {
  88. TxHash eth_common.Hash
  89. BlockHash eth_common.Hash
  90. EmitterAddress vaa.Address
  91. Sequence uint64
  92. }
  93. pendingMessage struct {
  94. message *common.MessagePublication
  95. height uint64
  96. }
  97. )
  98. func NewEthWatcher(
  99. url string,
  100. contract eth_common.Address,
  101. networkName string,
  102. readiness readiness.Component,
  103. chainID vaa.ChainID,
  104. messageEvents chan *common.MessagePublication,
  105. setEvents chan *common.GuardianSet,
  106. minConfirmations uint64) *Watcher {
  107. return &Watcher{
  108. url: url,
  109. contract: contract,
  110. networkName: networkName,
  111. readiness: readiness,
  112. minConfirmations: minConfirmations,
  113. chainID: chainID,
  114. msgChan: messageEvents,
  115. setChan: setEvents,
  116. pending: map[pendingKey]*pendingMessage{}}
  117. }
  118. func (e *Watcher) Run(ctx context.Context) error {
  119. logger := supervisor.Logger(ctx)
  120. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  121. p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
  122. ContractAddress: e.contract.Hex(),
  123. })
  124. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  125. defer cancel()
  126. c, err := ethclient.DialContext(timeout, e.url)
  127. if err != nil {
  128. ethConnectionErrors.WithLabelValues(e.networkName, "dial_error").Inc()
  129. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  130. return fmt.Errorf("dialing eth client failed: %w", err)
  131. }
  132. f, err := abi.NewAbiFilterer(e.contract, c)
  133. if err != nil {
  134. return fmt.Errorf("could not create wormhole contract filter: %w", err)
  135. }
  136. caller, err := abi.NewAbiCaller(e.contract, c)
  137. if err != nil {
  138. panic(err)
  139. }
  140. // Timeout for initializing subscriptions
  141. timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
  142. defer cancel()
  143. // Subscribe to new message publications
  144. messageC := make(chan *abi.AbiLogMessagePublished, 2)
  145. messageSub, err := f.WatchLogMessagePublished(&bind.WatchOpts{Context: timeout}, messageC, nil)
  146. if err != nil {
  147. ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc()
  148. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  149. return fmt.Errorf("failed to subscribe to message publication events: %w", err)
  150. }
  151. // Fetch initial guardian set
  152. if err := e.fetchAndUpdateGuardianSet(logger, ctx, caller); err != nil {
  153. return fmt.Errorf("failed to request guardian set: %v", err)
  154. }
  155. // Poll for guardian set.
  156. go func() {
  157. t := time.NewTicker(15 * time.Second)
  158. defer t.Stop()
  159. for {
  160. select {
  161. case <-ctx.Done():
  162. return
  163. case <-t.C:
  164. if err := e.fetchAndUpdateGuardianSet(logger, ctx, caller); err != nil {
  165. logger.Error("failed updating guardian set",
  166. zap.Error(err), zap.String("eth_network", e.networkName))
  167. }
  168. }
  169. }
  170. }()
  171. errC := make(chan error)
  172. go func() {
  173. for {
  174. select {
  175. case <-ctx.Done():
  176. return
  177. case err := <-messageSub.Err():
  178. ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc()
  179. errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
  180. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  181. return
  182. case ev := <-messageC:
  183. // Request timestamp for block
  184. msm := time.Now()
  185. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  186. b, err := c.BlockByNumber(timeout, big.NewInt(int64(ev.Raw.BlockNumber)))
  187. cancel()
  188. queryLatency.WithLabelValues(e.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
  189. if err != nil {
  190. ethConnectionErrors.WithLabelValues(e.networkName, "block_by_number_error").Inc()
  191. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  192. errC <- fmt.Errorf("failed to request timestamp for block %d: %w", ev.Raw.BlockNumber, err)
  193. return
  194. }
  195. message := &common.MessagePublication{
  196. TxHash: ev.Raw.TxHash,
  197. Timestamp: time.Unix(int64(b.Time()), 0),
  198. Nonce: ev.Nonce,
  199. Sequence: ev.Sequence,
  200. EmitterChain: e.chainID,
  201. EmitterAddress: PadAddress(ev.Sender),
  202. Payload: ev.Payload,
  203. ConsistencyLevel: ev.ConsistencyLevel,
  204. }
  205. logger.Info("found new message publication transaction", zap.Stringer("tx", ev.Raw.TxHash),
  206. zap.Uint64("block", ev.Raw.BlockNumber), zap.String("eth_network", e.networkName))
  207. ethMessagesObserved.WithLabelValues(e.networkName).Inc()
  208. key := pendingKey{
  209. TxHash: message.TxHash,
  210. BlockHash: ev.Raw.BlockHash,
  211. EmitterAddress: message.EmitterAddress,
  212. Sequence: message.Sequence,
  213. }
  214. e.pendingMu.Lock()
  215. e.pending[key] = &pendingMessage{
  216. message: message,
  217. height: ev.Raw.BlockNumber,
  218. }
  219. e.pendingMu.Unlock()
  220. }
  221. }
  222. }()
  223. // Watch headers
  224. headSink := make(chan *types.Header, 2)
  225. headerSubscription, err := c.SubscribeNewHead(ctx, headSink)
  226. if err != nil {
  227. ethConnectionErrors.WithLabelValues(e.networkName, "header_subscribe_error").Inc()
  228. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  229. return fmt.Errorf("failed to subscribe to header events: %w", err)
  230. }
  231. go func() {
  232. for {
  233. select {
  234. case <-ctx.Done():
  235. return
  236. case err := <-headerSubscription.Err():
  237. ethConnectionErrors.WithLabelValues(e.networkName, "header_subscription_error").Inc()
  238. errC <- fmt.Errorf("error while processing header subscription: %w", err)
  239. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  240. return
  241. case ev := <-headSink:
  242. start := time.Now()
  243. currentHash := ev.Hash()
  244. logger.Info("processing new header",
  245. zap.Stringer("current_block", ev.Number),
  246. zap.Stringer("current_blockhash", currentHash),
  247. zap.String("eth_network", e.networkName))
  248. currentEthHeight.WithLabelValues(e.networkName).Set(float64(ev.Number.Int64()))
  249. readiness.SetReady(e.readiness)
  250. p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
  251. Height: ev.Number.Int64(),
  252. ContractAddress: e.contract.Hex(),
  253. })
  254. e.pendingMu.Lock()
  255. blockNumberU := ev.Number.Uint64()
  256. for key, pLock := range e.pending {
  257. expectedConfirmations := uint64(pLock.message.ConsistencyLevel)
  258. if expectedConfirmations < e.minConfirmations {
  259. expectedConfirmations = e.minConfirmations
  260. }
  261. // Transaction was dropped and never picked up again
  262. if pLock.height+4*uint64(expectedConfirmations) <= blockNumberU {
  263. logger.Info("observation timed out",
  264. zap.Stringer("tx", pLock.message.TxHash),
  265. zap.Stringer("blockhash", key.BlockHash),
  266. zap.Stringer("emitter_address", key.EmitterAddress),
  267. zap.Uint64("sequence", key.Sequence),
  268. zap.Stringer("current_block", ev.Number),
  269. zap.Stringer("current_blockhash", currentHash),
  270. zap.String("eth_network", e.networkName),
  271. )
  272. ethMessagesOrphaned.WithLabelValues(e.networkName, "timeout").Inc()
  273. delete(e.pending, key)
  274. continue
  275. }
  276. // Transaction is now ready
  277. if pLock.height+uint64(expectedConfirmations) <= blockNumberU {
  278. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  279. tx, err := c.TransactionReceipt(timeout, pLock.message.TxHash)
  280. cancel()
  281. // If the node returns an error after waiting expectedConfirmation blocks,
  282. // it means the chain reorged and the transaction was orphaned. The
  283. // TransactionReceipt call is using the same websocket connection than the
  284. // head notifications, so it's guaranteed to be atomic.
  285. //
  286. // Check multiple possible error cases - the node seems to return a
  287. // "not found" error most of the time, but it could conceivably also
  288. // return a nil tx or rpc.ErrNoResult.
  289. if tx == nil || err == rpc.ErrNoResult || (err != nil && err.Error() == "not found") {
  290. logger.Warn("tx was orphaned",
  291. zap.Stringer("tx", pLock.message.TxHash),
  292. zap.Stringer("blockhash", key.BlockHash),
  293. zap.Stringer("emitter_address", key.EmitterAddress),
  294. zap.Uint64("sequence", key.Sequence),
  295. zap.Stringer("current_block", ev.Number),
  296. zap.Stringer("current_blockhash", currentHash),
  297. zap.String("eth_network", e.networkName),
  298. zap.Error(err))
  299. delete(e.pending, key)
  300. ethMessagesOrphaned.WithLabelValues(e.networkName, "not_found").Inc()
  301. continue
  302. }
  303. // Any error other than "not found" is likely transient - we retry next block.
  304. if err != nil {
  305. logger.Warn("transaction could not be fetched",
  306. zap.Stringer("tx", pLock.message.TxHash),
  307. zap.Stringer("blockhash", key.BlockHash),
  308. zap.Stringer("emitter_address", key.EmitterAddress),
  309. zap.Uint64("sequence", key.Sequence),
  310. zap.Stringer("current_block", ev.Number),
  311. zap.Stringer("current_blockhash", currentHash),
  312. zap.String("eth_network", e.networkName),
  313. zap.Error(err))
  314. continue
  315. }
  316. // It's possible for a transaction to be orphaned and then included in a different block
  317. // but with the same tx hash. Drop the observation (it will be re-observed and needs to
  318. // wait for the full confirmation time again).
  319. if tx.BlockHash != key.BlockHash {
  320. logger.Info("tx got dropped and mined in a different block; the message should have been reobserved",
  321. zap.Stringer("tx", pLock.message.TxHash),
  322. zap.Stringer("blockhash", key.BlockHash),
  323. zap.Stringer("emitter_address", key.EmitterAddress),
  324. zap.Uint64("sequence", key.Sequence),
  325. zap.Stringer("current_block", ev.Number),
  326. zap.Stringer("current_blockhash", currentHash),
  327. zap.String("eth_network", e.networkName))
  328. delete(e.pending, key)
  329. ethMessagesOrphaned.WithLabelValues(e.networkName, "blockhash_mismatch").Inc()
  330. continue
  331. }
  332. logger.Info("observation confirmed",
  333. zap.Stringer("tx", pLock.message.TxHash),
  334. zap.Stringer("blockhash", key.BlockHash),
  335. zap.Stringer("emitter_address", key.EmitterAddress),
  336. zap.Uint64("sequence", key.Sequence),
  337. zap.Stringer("current_block", ev.Number),
  338. zap.Stringer("current_blockhash", currentHash),
  339. zap.String("eth_network", e.networkName))
  340. delete(e.pending, key)
  341. e.msgChan <- pLock.message
  342. ethMessagesConfirmed.WithLabelValues(e.networkName).Inc()
  343. }
  344. }
  345. e.pendingMu.Unlock()
  346. logger.Info("processed new header",
  347. zap.Stringer("current_block", ev.Number),
  348. zap.Stringer("current_blockhash", currentHash),
  349. zap.Duration("took", time.Since(start)),
  350. zap.String("eth_network", e.networkName))
  351. }
  352. }
  353. }()
  354. select {
  355. case <-ctx.Done():
  356. return ctx.Err()
  357. case err := <-errC:
  358. return err
  359. }
  360. }
  361. func (e *Watcher) fetchAndUpdateGuardianSet(
  362. logger *zap.Logger,
  363. ctx context.Context,
  364. caller *abi.AbiCaller,
  365. ) error {
  366. msm := time.Now()
  367. logger.Info("fetching guardian set")
  368. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  369. defer cancel()
  370. idx, gs, err := fetchCurrentGuardianSet(timeout, caller)
  371. if err != nil {
  372. ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
  373. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  374. return err
  375. }
  376. queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
  377. if e.currentGuardianSet != nil && *(e.currentGuardianSet) == idx {
  378. return nil
  379. }
  380. logger.Info("updated guardian set found",
  381. zap.Any("value", gs), zap.Uint32("index", idx),
  382. zap.String("eth_network", e.networkName))
  383. e.currentGuardianSet = &idx
  384. if e.setChan != nil {
  385. e.setChan <- &common.GuardianSet{
  386. Keys: gs.Keys,
  387. Index: idx,
  388. }
  389. }
  390. return nil
  391. }
  392. // Fetch the current guardian set ID and guardian set from the chain.
  393. func fetchCurrentGuardianSet(ctx context.Context, caller *abi.AbiCaller) (uint32, *abi.StructsGuardianSet, error) {
  394. opts := &bind.CallOpts{Context: ctx}
  395. currentIndex, err := caller.GetCurrentGuardianSetIndex(opts)
  396. if err != nil {
  397. return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err)
  398. }
  399. gs, err := caller.GetGuardianSet(opts, currentIndex)
  400. if err != nil {
  401. return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
  402. }
  403. return currentIndex, &gs, nil
  404. }