watcher.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. package ethereum
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/certusone/wormhole/node/pkg/p2p"
  6. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  7. "github.com/prometheus/client_golang/prometheus/promauto"
  8. "math/big"
  9. "sync"
  10. "time"
  11. "github.com/prometheus/client_golang/prometheus"
  12. "github.com/ethereum/go-ethereum/accounts/abi/bind"
  13. eth_common "github.com/ethereum/go-ethereum/common"
  14. "github.com/ethereum/go-ethereum/core/types"
  15. "github.com/ethereum/go-ethereum/ethclient"
  16. "go.uber.org/zap"
  17. "github.com/certusone/wormhole/node/pkg/common"
  18. "github.com/certusone/wormhole/node/pkg/ethereum/abi"
  19. "github.com/certusone/wormhole/node/pkg/readiness"
  20. "github.com/certusone/wormhole/node/pkg/supervisor"
  21. "github.com/certusone/wormhole/node/pkg/vaa"
  22. )
  23. var (
  24. ethConnectionErrors = promauto.NewCounterVec(
  25. prometheus.CounterOpts{
  26. Name: "wormhole_eth_connection_errors_total",
  27. Help: "Total number of Ethereum connection errors (either during initial connection or while watching)",
  28. }, []string{"eth_network", "reason"})
  29. ethMessagesObserved = promauto.NewCounterVec(
  30. prometheus.CounterOpts{
  31. Name: "wormhole_eth_messages_observed_total",
  32. Help: "Total number of Eth messages observed (pre-confirmation)",
  33. }, []string{"eth_network"})
  34. ethMessagesConfirmed = promauto.NewCounterVec(
  35. prometheus.CounterOpts{
  36. Name: "wormhole_eth_messages_confirmed_total",
  37. Help: "Total number of Eth messages verified (post-confirmation)",
  38. }, []string{"eth_network"})
  39. currentEthHeight = promauto.NewGaugeVec(
  40. prometheus.GaugeOpts{
  41. Name: "wormhole_eth_current_height",
  42. Help: "Current Ethereum block height",
  43. }, []string{"eth_network"})
  44. queryLatency = promauto.NewHistogramVec(
  45. prometheus.HistogramOpts{
  46. Name: "wormhole_eth_query_latency",
  47. Help: "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those",
  48. }, []string{"eth_network", "operation"})
  49. )
  50. type (
  51. EthBridgeWatcher struct {
  52. // Ethereum RPC url
  53. url string
  54. // Address of the Eth bridge contract
  55. bridge eth_common.Address
  56. // Human-readable name of the Eth network, for logging and monitoring.
  57. networkName string
  58. // Readiness component
  59. readiness readiness.Component
  60. // VAA ChainID of the network we're connecting to.
  61. chainID vaa.ChainID
  62. // Channel to send new messages to.
  63. msgChan chan *common.MessagePublication
  64. // Channel to send guardian set changes to.
  65. // setChan can be set to nil if no guardian set changes are needed.
  66. //
  67. // We currently only fetch the guardian set from one primary chain, which should
  68. // have this flag set to true, and false on all others.
  69. //
  70. // The current primary chain is Ethereum (a mostly arbitrary decision because it
  71. // has the best API - we might want to switch the primary chain to Solana once
  72. // the governance mechanism lives there),
  73. setChan chan *common.GuardianSet
  74. pending map[eth_common.Hash]*pendingMessage
  75. pendingMu sync.Mutex
  76. // 0 is a valid guardian set, so we need a nil value here
  77. currentGuardianSet *uint32
  78. }
  79. pendingMessage struct {
  80. message *common.MessagePublication
  81. height uint64
  82. }
  83. )
  84. func NewEthBridgeWatcher(
  85. url string,
  86. bridge eth_common.Address,
  87. networkName string,
  88. readiness readiness.Component,
  89. chainID vaa.ChainID,
  90. messageEvents chan *common.MessagePublication,
  91. setEvents chan *common.GuardianSet) *EthBridgeWatcher {
  92. return &EthBridgeWatcher{
  93. url: url,
  94. bridge: bridge,
  95. networkName: networkName,
  96. readiness: readiness,
  97. chainID: chainID,
  98. msgChan: messageEvents,
  99. setChan: setEvents,
  100. pending: map[eth_common.Hash]*pendingMessage{}}
  101. }
  102. func (e *EthBridgeWatcher) Run(ctx context.Context) error {
  103. logger := supervisor.Logger(ctx)
  104. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  105. p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
  106. BridgeAddress: e.bridge.Hex(),
  107. })
  108. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  109. defer cancel()
  110. c, err := ethclient.DialContext(timeout, e.url)
  111. if err != nil {
  112. ethConnectionErrors.WithLabelValues(e.networkName, "dial_error").Inc()
  113. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  114. return fmt.Errorf("dialing eth client failed: %w", err)
  115. }
  116. f, err := abi.NewAbiFilterer(e.bridge, c)
  117. if err != nil {
  118. return fmt.Errorf("could not create wormhole bridge filter: %w", err)
  119. }
  120. caller, err := abi.NewAbiCaller(e.bridge, c)
  121. if err != nil {
  122. panic(err)
  123. }
  124. // Timeout for initializing subscriptions
  125. timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
  126. defer cancel()
  127. // Subscribe to new message publications
  128. messageC := make(chan *abi.AbiLogMessagePublished, 2)
  129. messageSub, err := f.WatchLogMessagePublished(&bind.WatchOpts{Context: timeout}, messageC, nil)
  130. if err != nil {
  131. ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc()
  132. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  133. return fmt.Errorf("failed to subscribe to message publication events: %w", err)
  134. }
  135. // Fetch initial guardian set
  136. if err := e.fetchAndUpdateGuardianSet(logger, ctx, caller); err != nil {
  137. return fmt.Errorf("failed to request guardian set: %v", err)
  138. }
  139. // Poll for guardian set.
  140. go func() {
  141. t := time.NewTicker(15 * time.Second)
  142. defer t.Stop()
  143. for {
  144. select {
  145. case <-ctx.Done():
  146. return
  147. case <-t.C:
  148. if err := e.fetchAndUpdateGuardianSet(logger, ctx, caller); err != nil {
  149. logger.Error("failed updating guardian set",
  150. zap.Error(err), zap.String("eth_network", e.networkName))
  151. }
  152. }
  153. }
  154. }()
  155. errC := make(chan error)
  156. go func() {
  157. for {
  158. select {
  159. case <-ctx.Done():
  160. return
  161. case err := <-messageSub.Err():
  162. ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc()
  163. errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
  164. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  165. return
  166. case ev := <-messageC:
  167. // Request timestamp for block
  168. msm := time.Now()
  169. timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
  170. b, err := c.BlockByNumber(timeout, big.NewInt(int64(ev.Raw.BlockNumber)))
  171. cancel()
  172. queryLatency.WithLabelValues(e.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
  173. if err != nil {
  174. ethConnectionErrors.WithLabelValues(e.networkName, "block_by_number_error").Inc()
  175. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  176. errC <- fmt.Errorf("failed to request timestamp for block %d: %w", ev.Raw.BlockNumber, err)
  177. return
  178. }
  179. messsage := &common.MessagePublication{
  180. TxHash: ev.Raw.TxHash,
  181. Timestamp: time.Unix(int64(b.Time()), 0),
  182. Nonce: ev.Nonce,
  183. Sequence: ev.Sequence,
  184. EmitterChain: e.chainID,
  185. EmitterAddress: PadAddress(ev.Sender),
  186. Payload: ev.Payload,
  187. ConsistencyLevel: ev.ConsistencyLevel,
  188. }
  189. logger.Info("found new message publication transaction", zap.Stringer("tx", ev.Raw.TxHash),
  190. zap.Uint64("block", ev.Raw.BlockNumber), zap.String("eth_network", e.networkName))
  191. ethMessagesObserved.WithLabelValues(e.networkName).Inc()
  192. e.pendingMu.Lock()
  193. e.pending[ev.Raw.TxHash] = &pendingMessage{
  194. message: messsage,
  195. height: ev.Raw.BlockNumber,
  196. }
  197. e.pendingMu.Unlock()
  198. }
  199. }
  200. }()
  201. // Watch headers
  202. headSink := make(chan *types.Header, 2)
  203. headerSubscription, err := c.SubscribeNewHead(ctx, headSink)
  204. if err != nil {
  205. ethConnectionErrors.WithLabelValues(e.networkName, "header_subscribe_error").Inc()
  206. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  207. return fmt.Errorf("failed to subscribe to header events: %w", err)
  208. }
  209. go func() {
  210. for {
  211. select {
  212. case <-ctx.Done():
  213. return
  214. case err := <-headerSubscription.Err():
  215. ethConnectionErrors.WithLabelValues(e.networkName, "header_subscription_error").Inc()
  216. errC <- fmt.Errorf("error while processing header subscription: %w", err)
  217. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  218. return
  219. case ev := <-headSink:
  220. start := time.Now()
  221. logger.Info("processing new header", zap.Stringer("block", ev.Number),
  222. zap.String("eth_network", e.networkName))
  223. currentEthHeight.WithLabelValues(e.networkName).Set(float64(ev.Number.Int64()))
  224. readiness.SetReady(e.readiness)
  225. p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
  226. Height: ev.Number.Int64(),
  227. BridgeAddress: e.bridge.Hex(),
  228. })
  229. e.pendingMu.Lock()
  230. blockNumberU := ev.Number.Uint64()
  231. for hash, pLock := range e.pending {
  232. // Transaction was dropped and never picked up again
  233. if pLock.height+4*uint64(pLock.message.ConsistencyLevel) <= blockNumberU {
  234. logger.Debug("observation timed out", zap.Stringer("tx", pLock.message.TxHash),
  235. zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
  236. delete(e.pending, hash)
  237. continue
  238. }
  239. // Transaction is now ready
  240. if pLock.height+uint64(pLock.message.ConsistencyLevel) <= ev.Number.Uint64() {
  241. logger.Debug("observation confirmed", zap.Stringer("tx", pLock.message.TxHash),
  242. zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
  243. delete(e.pending, hash)
  244. e.msgChan <- pLock.message
  245. ethMessagesConfirmed.WithLabelValues(e.networkName).Inc()
  246. }
  247. }
  248. e.pendingMu.Unlock()
  249. logger.Info("processed new header", zap.Stringer("block", ev.Number),
  250. zap.Duration("took", time.Since(start)), zap.String("eth_network", e.networkName))
  251. }
  252. }
  253. }()
  254. select {
  255. case <-ctx.Done():
  256. return ctx.Err()
  257. case err := <-errC:
  258. return err
  259. }
  260. }
  261. func (e *EthBridgeWatcher) fetchAndUpdateGuardianSet(
  262. logger *zap.Logger,
  263. ctx context.Context,
  264. caller *abi.AbiCaller,
  265. ) error {
  266. msm := time.Now()
  267. logger.Info("fetching guardian set")
  268. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  269. defer cancel()
  270. idx, gs, err := fetchCurrentGuardianSet(timeout, caller)
  271. if err != nil {
  272. ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
  273. p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
  274. return err
  275. }
  276. queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
  277. if e.currentGuardianSet != nil && *(e.currentGuardianSet) == idx {
  278. return nil
  279. }
  280. logger.Info("updated guardian set found",
  281. zap.Any("value", gs), zap.Uint32("index", idx),
  282. zap.String("eth_network", e.networkName))
  283. e.currentGuardianSet = &idx
  284. if e.setChan != nil {
  285. e.setChan <- &common.GuardianSet{
  286. Keys: gs.Keys,
  287. Index: idx,
  288. }
  289. }
  290. return nil
  291. }
  292. // Fetch the current guardian set ID and guardian set from the chain.
  293. func fetchCurrentGuardianSet(ctx context.Context, caller *abi.AbiCaller) (uint32, *abi.StructsGuardianSet, error) {
  294. opts := &bind.CallOpts{Context: ctx}
  295. currentIndex, err := caller.GetCurrentGuardianSetIndex(opts)
  296. if err != nil {
  297. return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err)
  298. }
  299. gs, err := caller.GetGuardianSet(opts, currentIndex)
  300. if err != nil {
  301. return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
  302. }
  303. return currentIndex, &gs, nil
  304. }