watcher.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. package ethereum
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/certusone/wormhole/node/pkg/p2p"
  6. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  7. "github.com/ethereum/go-ethereum/rpc"
  8. "github.com/prometheus/client_golang/prometheus/promauto"
  9. "math/big"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/prometheus/client_golang/prometheus"
  14. "github.com/ethereum/go-ethereum/accounts/abi/bind"
  15. eth_common "github.com/ethereum/go-ethereum/common"
  16. "github.com/ethereum/go-ethereum/core/types"
  17. "github.com/ethereum/go-ethereum/ethclient"
  18. "go.uber.org/zap"
  19. "github.com/certusone/wormhole/node/pkg/common"
  20. "github.com/certusone/wormhole/node/pkg/ethereum/abi"
  21. "github.com/certusone/wormhole/node/pkg/readiness"
  22. "github.com/certusone/wormhole/node/pkg/supervisor"
  23. "github.com/certusone/wormhole/node/pkg/vaa"
  24. )
  25. var (
  26. ethConnectionErrors = promauto.NewCounterVec(
  27. prometheus.CounterOpts{
  28. Name: "wormhole_eth_connection_errors_total",
  29. Help: "Total number of Ethereum connection errors (either during initial connection or while watching)",
  30. }, []string{"reason"})
  31. ethLockupsFound = promauto.NewCounter(
  32. prometheus.CounterOpts{
  33. Name: "wormhole_eth_lockups_found_total",
  34. Help: "Total number of Eth lockups found (pre-confirmation)",
  35. })
  36. ethLockupsConfirmed = promauto.NewCounter(
  37. prometheus.CounterOpts{
  38. Name: "wormhole_eth_lockups_confirmed_total",
  39. Help: "Total number of Eth lockups verified (post-confirmation)",
  40. })
  41. ethMessagesOrphaned = promauto.NewCounterVec(
  42. prometheus.CounterOpts{
  43. Name: "wormhole_eth_lockups_orphaned_total",
  44. Help: "Total number of Eth lockups dropped (orphaned)",
  45. }, []string{"reason"})
  46. guardianSetChangesConfirmed = promauto.NewCounter(
  47. prometheus.CounterOpts{
  48. Name: "wormhole_eth_guardian_set_changes_confirmed_total",
  49. Help: "Total number of guardian set changes verified (we only see confirmed ones to begin with)",
  50. })
  51. currentEthHeight = promauto.NewGauge(
  52. prometheus.GaugeOpts{
  53. Name: "wormhole_eth_current_height",
  54. Help: "Current Ethereum block height",
  55. })
  56. queryLatency = promauto.NewHistogramVec(
  57. prometheus.HistogramOpts{
  58. Name: "wormhole_eth_query_latency",
  59. Help: "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those",
  60. }, []string{"operation"})
  61. )
  62. type (
  63. EthBridgeWatcher struct {
  64. url string
  65. bridge eth_common.Address
  66. minLockupConfirmations uint64
  67. lockChan chan *common.ChainLock
  68. setChan chan *common.GuardianSet
  69. obsvReqC chan *gossipv1.ObservationRequest
  70. pending map[pendingKey]*pendingLock
  71. pendingMu sync.Mutex
  72. // 0 is a valid guardian set, so we need a nil value here
  73. currentGuardianSet *uint32
  74. }
  75. pendingKey struct {
  76. TxHash eth_common.Hash
  77. BlockHash eth_common.Hash
  78. }
  79. pendingLock struct {
  80. lock *common.ChainLock
  81. height uint64
  82. }
  83. )
  84. func NewEthBridgeWatcher(url string, bridge eth_common.Address, minConfirmations uint64, lockEvents chan *common.ChainLock, setEvents chan *common.GuardianSet, obsvReqC chan *gossipv1.ObservationRequest) *EthBridgeWatcher {
  85. return &EthBridgeWatcher{url: url, bridge: bridge, minLockupConfirmations: minConfirmations, lockChan: lockEvents, setChan: setEvents, pending: map[pendingKey]*pendingLock{}, obsvReqC: obsvReqC}
  86. }
  87. func (e *EthBridgeWatcher) Run(ctx context.Context) error {
  88. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  89. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDEthereum, &gossipv1.Heartbeat_Network{
  90. ContractAddress: e.bridge.Hex(),
  91. })
  92. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  93. defer cancel()
  94. c, err := ethclient.DialContext(timeout, e.url)
  95. if err != nil {
  96. ethConnectionErrors.WithLabelValues("dial_error").Inc()
  97. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDEthereum, 1)
  98. return fmt.Errorf("dialing eth client failed: %w", err)
  99. }
  100. f, err := abi.NewAbiFilterer(e.bridge, c)
  101. if err != nil {
  102. return fmt.Errorf("could not create wormhole bridge filter: %w", err)
  103. }
  104. caller, err := abi.NewAbiCaller(e.bridge, c)
  105. if err != nil {
  106. panic(err)
  107. }
  108. // Timeout for initializing subscriptions
  109. timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
  110. defer cancel()
  111. // Subscribe to new token lockups
  112. tokensLockedC := make(chan *abi.AbiLogTokensLocked, 2)
  113. tokensLockedSub, err := f.WatchLogTokensLocked(&bind.WatchOpts{Context: timeout}, tokensLockedC, nil, nil)
  114. if err != nil {
  115. ethConnectionErrors.WithLabelValues("subscribe_error").Inc()
  116. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDEthereum, 1)
  117. return fmt.Errorf("failed to subscribe to token lockup events: %w", err)
  118. }
  119. // Subscribe to guardian set changes
  120. guardianSetC := make(chan *abi.AbiLogGuardianSetChanged, 2)
  121. guardianSetEvent, err := f.WatchLogGuardianSetChanged(&bind.WatchOpts{Context: timeout}, guardianSetC)
  122. if err != nil {
  123. ethConnectionErrors.WithLabelValues("subscribe_error").Inc()
  124. return fmt.Errorf("failed to subscribe to guardian set events: %w", err)
  125. }
  126. errC := make(chan error)
  127. logger := supervisor.Logger(ctx)
  128. // Get initial validator set from Ethereum. We could also fetch it from Solana,
  129. // because both sets are synchronized, we simply made an arbitrary decision to use Ethereum.
  130. timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
  131. defer cancel()
  132. idx, gs, err := FetchCurrentGuardianSet(timeout, e.url, e.bridge)
  133. if err != nil {
  134. ethConnectionErrors.WithLabelValues("guardian_set_fetch_error").Inc()
  135. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDEthereum, 1)
  136. return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err)
  137. }
  138. logger.Info("initial guardian set fetched", zap.Any("value", gs), zap.Uint32("index", idx))
  139. e.setChan <- &common.GuardianSet{
  140. Keys: gs.Keys,
  141. Index: idx,
  142. }
  143. // Track the current block number so we can compare it to the block number of
  144. // the message publication for observation requests.
  145. var currentBlockNumber uint64
  146. go func() {
  147. for {
  148. select {
  149. case <-ctx.Done():
  150. return
  151. case r := <-e.obsvReqC:
  152. // This can't happen unless there is a programming error - the caller
  153. // is expected to send us only requests for our chainID.
  154. if vaa.ChainID(r.ChainId) != vaa.ChainIDEthereum {
  155. panic("invalid chain ID")
  156. }
  157. tx := eth_common.BytesToHash(r.TxHash)
  158. logger.Info("received observation request",
  159. zap.String("tx_hash", tx.Hex()))
  160. // SECURITY: Load the block number before requesting the transaction to avoid a
  161. // race condition where requesting the tx succeeds and is then dropped due to a fork,
  162. // but blockNumberU had already advanced beyond the required threshold.
  163. //
  164. // In the primary watcher flow, this is of no concern since we assume the node
  165. // always sends the head before it sends the logs (implicit synchronization
  166. // by relying on the same websocket connection).
  167. blockNumberU := atomic.LoadUint64(&currentBlockNumber)
  168. if blockNumberU == 0 {
  169. logger.Error("no block number available, ignoring observation request")
  170. continue
  171. }
  172. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  173. blockNumber, msgs, err := MessageEventsForTransaction(timeout, c, e.bridge, tx)
  174. cancel()
  175. if err != nil {
  176. logger.Error("failed to process observation request")
  177. continue
  178. }
  179. for _, msg := range msgs {
  180. // SECURITY: In the recovery flow, we already know which transaction to
  181. // observe, and we can assume that it has reached the expected finality
  182. // level a long time ago. Therefore, the logic is much simpler than the
  183. // primary watcher, which has to wait for finality.
  184. //
  185. // Instead, we can simply check if the transaction's block number is in
  186. // the past by more than the expected confirmation number.
  187. //
  188. // Ensure that the current block number is at least expectedConfirmations
  189. // larger than the message observation's block number.
  190. if blockNumber+e.minLockupConfirmations <= blockNumberU {
  191. logger.Info("re-observed message publication transaction",
  192. zap.Stringer("tx", msg.TxHash),
  193. zap.Uint64("current_block", blockNumberU),
  194. zap.Uint64("observed_block", blockNumber),
  195. )
  196. e.lockChan <- msg
  197. } else {
  198. logger.Info("ignoring re-observed message publication transaction",
  199. zap.Stringer("tx", msg.TxHash),
  200. zap.Uint64("current_block", blockNumberU),
  201. zap.Uint64("observed_block", blockNumber),
  202. )
  203. }
  204. }
  205. }
  206. }
  207. }()
  208. go func() {
  209. for {
  210. select {
  211. case <-ctx.Done():
  212. return
  213. case e := <-tokensLockedSub.Err():
  214. ethConnectionErrors.WithLabelValues("subscription_error").Inc()
  215. errC <- fmt.Errorf("error while processing token lockup subscription: %w", e)
  216. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDEthereum, 1)
  217. return
  218. case e := <-guardianSetEvent.Err():
  219. ethConnectionErrors.WithLabelValues("subscription_error").Inc()
  220. errC <- fmt.Errorf("error while processing guardian set subscription: %w", e)
  221. return
  222. case ev := <-tokensLockedC:
  223. // Request timestamp for block
  224. msm := time.Now()
  225. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  226. b, err := c.BlockByNumber(timeout, big.NewInt(int64(ev.Raw.BlockNumber)))
  227. cancel()
  228. queryLatency.WithLabelValues("block_by_number").Observe(time.Since(msm).Seconds())
  229. if err != nil {
  230. ethConnectionErrors.WithLabelValues("block_by_number_error").Inc()
  231. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDEthereum, 1)
  232. errC <- fmt.Errorf("failed to request timestamp for block %d: %w", ev.Raw.BlockNumber, err)
  233. return
  234. }
  235. lock := &common.ChainLock{
  236. TxHash: ev.Raw.TxHash,
  237. Timestamp: time.Unix(int64(b.Time()), 0),
  238. Nonce: ev.Nonce,
  239. SourceAddress: ev.Sender,
  240. TargetAddress: ev.Recipient,
  241. SourceChain: vaa.ChainIDEthereum,
  242. TargetChain: vaa.ChainID(ev.TargetChain),
  243. TokenChain: vaa.ChainID(ev.TokenChain),
  244. TokenAddress: ev.Token,
  245. TokenDecimals: ev.TokenDecimals,
  246. Amount: ev.Amount,
  247. }
  248. logger.Info("found new lockup transaction", zap.Stringer("tx", ev.Raw.TxHash),
  249. zap.Uint64("block", ev.Raw.BlockNumber))
  250. ethLockupsFound.Inc()
  251. key := pendingKey{
  252. TxHash: ev.Raw.TxHash,
  253. BlockHash: ev.Raw.BlockHash,
  254. }
  255. e.pendingMu.Lock()
  256. e.pending[key] = &pendingLock{
  257. lock: lock,
  258. height: ev.Raw.BlockNumber,
  259. }
  260. e.pendingMu.Unlock()
  261. case ev := <-guardianSetC:
  262. logger.Info("guardian set has changed, fetching new value",
  263. zap.Uint32("new_index", ev.NewGuardianIndex))
  264. guardianSetChangesConfirmed.Inc()
  265. msm := time.Now()
  266. timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
  267. gs, err := caller.GetGuardianSet(&bind.CallOpts{Context: timeout}, ev.NewGuardianIndex)
  268. cancel()
  269. queryLatency.WithLabelValues("get_guardian_set").Observe(time.Since(msm).Seconds())
  270. if err != nil {
  271. // We failed to process the guardian set update and are now out of sync with the chain.
  272. // Recover by crashing the runnable, which causes the guardian set to be re-fetched.
  273. errC <- fmt.Errorf("error requesting new guardian set value for %d: %w", ev.NewGuardianIndex, err)
  274. return
  275. }
  276. logger.Info("new guardian set fetched", zap.Any("value", gs), zap.Uint32("index", ev.NewGuardianIndex))
  277. e.setChan <- &common.GuardianSet{
  278. Keys: gs.Keys,
  279. Index: ev.NewGuardianIndex,
  280. }
  281. }
  282. }
  283. }()
  284. // Watch headers
  285. headSink := make(chan *types.Header, 2)
  286. headerSubscription, err := c.SubscribeNewHead(ctx, headSink)
  287. if err != nil {
  288. ethConnectionErrors.WithLabelValues("header_subscribe_error").Inc()
  289. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDEthereum, 1)
  290. return fmt.Errorf("failed to subscribe to header events: %w", err)
  291. }
  292. go func() {
  293. for {
  294. select {
  295. case <-ctx.Done():
  296. return
  297. case e := <-headerSubscription.Err():
  298. ethConnectionErrors.WithLabelValues("header_subscription_error").Inc()
  299. errC <- fmt.Errorf("error while processing header subscription: %w", e)
  300. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDEthereum, 1)
  301. return
  302. case ev := <-headSink:
  303. start := time.Now()
  304. currentHash := ev.Hash()
  305. logger.Info("processing new header",
  306. zap.Stringer("current_block", ev.Number),
  307. zap.Stringer("current_blockhash", currentHash))
  308. currentEthHeight.Set(float64(ev.Number.Int64()))
  309. readiness.SetReady(common.ReadinessEthSyncing)
  310. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDEthereum, &gossipv1.Heartbeat_Network{
  311. Height: ev.Number.Int64(),
  312. ContractAddress: e.bridge.Hex(),
  313. })
  314. e.pendingMu.Lock()
  315. blockNumberU := ev.Number.Uint64()
  316. atomic.StoreUint64(&currentBlockNumber, blockNumberU)
  317. for key, pLock := range e.pending {
  318. // Transaction was dropped and never picked up again
  319. if pLock.height+4*e.minLockupConfirmations <= blockNumberU {
  320. logger.Info("lockup timed out",
  321. zap.Stringer("tx", pLock.lock.TxHash),
  322. zap.Stringer("blockhash", key.BlockHash),
  323. zap.Stringer("current_block", ev.Number),
  324. zap.Stringer("current_blockhash", currentHash))
  325. ethMessagesOrphaned.WithLabelValues("timeout").Inc()
  326. delete(e.pending, key)
  327. continue
  328. }
  329. // Transaction is now ready
  330. if pLock.height+e.minLockupConfirmations <= ev.Number.Uint64() {
  331. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  332. tx, err := c.TransactionReceipt(timeout, pLock.lock.TxHash)
  333. cancel()
  334. // If the node returns an error after waiting expectedConfirmation blocks,
  335. // it means the chain reorged and the transaction was orphaned. The
  336. // TransactionReceipt call is using the same websocket connection than the
  337. // head notifications, so it's guaranteed to be atomic.
  338. //
  339. // Check multiple possible error cases - the node seems to return a
  340. // "not found" error most of the time, but it could conceivably also
  341. // return a nil tx or rpc.ErrNoResult.
  342. if tx == nil || err == rpc.ErrNoResult || (err != nil && err.Error() == "not found") {
  343. logger.Warn("tx was orphaned",
  344. zap.Stringer("tx", pLock.lock.TxHash),
  345. zap.Stringer("blockhash", key.BlockHash),
  346. zap.Stringer("current_block", ev.Number),
  347. zap.Stringer("current_blockhash", currentHash),
  348. zap.Error(err))
  349. delete(e.pending, key)
  350. ethMessagesOrphaned.WithLabelValues("not_found").Inc()
  351. continue
  352. }
  353. // Any error other than "not found" is likely transient - we retry next block.
  354. if err != nil {
  355. logger.Warn("transaction could not be fetched",
  356. zap.Stringer("tx", pLock.lock.TxHash),
  357. zap.Stringer("blockhash", key.BlockHash),
  358. zap.Stringer("current_block", ev.Number),
  359. zap.Stringer("current_blockhash", currentHash),
  360. zap.Error(err))
  361. continue
  362. }
  363. // It's possible for a transaction to be orphaned and then included in a different block
  364. // but with the same tx hash. Drop the observation (it will be re-observed and needs to
  365. // wait for the full confirmation time again).
  366. if tx.BlockHash != key.BlockHash {
  367. logger.Info("tx got dropped and mined in a different block; the message should have been reobserved",
  368. zap.Stringer("tx", pLock.lock.TxHash),
  369. zap.Stringer("blockhash", key.BlockHash),
  370. zap.Stringer("current_block", ev.Number),
  371. zap.Stringer("current_blockhash", currentHash))
  372. delete(e.pending, key)
  373. ethMessagesOrphaned.WithLabelValues("blockhash_mismatch").Inc()
  374. continue
  375. }
  376. logger.Info("lockup confirmed",
  377. zap.Stringer("tx", pLock.lock.TxHash),
  378. zap.Stringer("blockhash", key.BlockHash),
  379. zap.Stringer("current_block", ev.Number),
  380. zap.Stringer("current_blockhash", currentHash))
  381. delete(e.pending, key)
  382. e.lockChan <- pLock.lock
  383. ethLockupsConfirmed.Inc()
  384. }
  385. }
  386. e.pendingMu.Unlock()
  387. logger.Info("processed new header",
  388. zap.Stringer("current_block", ev.Number),
  389. zap.Stringer("current_blockhash", currentHash),
  390. zap.Duration("took", time.Since(start)))
  391. }
  392. }
  393. }()
  394. select {
  395. case <-ctx.Done():
  396. return ctx.Err()
  397. case err := <-errC:
  398. return err
  399. }
  400. }
  401. // Fetch the current guardian set ID and guardian set from the chain.
  402. func FetchCurrentGuardianSet(ctx context.Context, rpcURL string, bridgeContract eth_common.Address) (uint32, *abi.WormholeGuardianSet, error) {
  403. c, err := ethclient.DialContext(ctx, rpcURL)
  404. if err != nil {
  405. return 0, nil, fmt.Errorf("dialing eth client failed: %w", err)
  406. }
  407. caller, err := abi.NewAbiCaller(bridgeContract, c)
  408. if err != nil {
  409. panic(err)
  410. }
  411. opts := &bind.CallOpts{Context: ctx}
  412. currentIndex, err := caller.GuardianSetIndex(opts)
  413. if err != nil {
  414. return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err)
  415. }
  416. gs, err := caller.GetGuardianSet(opts, currentIndex)
  417. if err != nil {
  418. return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
  419. }
  420. return currentIndex, &gs, nil
  421. }