reobserve.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package evm
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
  7. eth_common "github.com/ethereum/go-ethereum/common"
  8. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  9. "go.uber.org/zap"
  10. )
  11. // handleReobservationRequest performs a reobservation request and publishes any observed transactions.
  12. func (w *Watcher) handleReobservationRequest(ctx context.Context, chainId vaa.ChainID, txID []byte, ethConn connectors.Connector, finalizedBlockNum, safeBlockNum uint64) (numObservations uint32, err error) {
  13. // This can't happen unless there is a programming error - the caller
  14. // is expected to send us only requests for our chainID.
  15. if chainId != w.chainID {
  16. return 0, fmt.Errorf("unexpected chain id: %v", chainId)
  17. }
  18. tx := eth_common.BytesToHash(txID)
  19. w.logger.Info("received observation request", zap.String("tx_hash", tx.Hex()))
  20. // SECURITY: We loaded the block number before requesting the transaction to avoid a
  21. // race condition where requesting the tx succeeds and is then dropped due to a fork,
  22. // but finalizedBlock had already advanced beyond the required threshold.
  23. //
  24. // In the primary watcher flow, this is of no concern since we assume the node
  25. // always sends the head before it sends the logs (implicit synchronization
  26. // by relying on the same websocket connection).
  27. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  28. receipt, blockNumber, msgs, err := MessageEventsForTransaction(timeout, ethConn, w.contract, w.chainID, tx)
  29. cancel()
  30. if err != nil {
  31. return 0, fmt.Errorf("failed to process observation request: %v", err)
  32. }
  33. for _, msg := range msgs {
  34. msg.IsReobservation = true
  35. if msg.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
  36. w.logger.Info("re-observed message publication transaction, publishing it immediately",
  37. zap.String("msgId", msg.MessageIDString()),
  38. zap.String("txHash", msg.TxIDString()),
  39. zap.Uint64("current_block", finalizedBlockNum),
  40. zap.Uint64("observed_block", blockNumber),
  41. )
  42. pubErr := w.verifyAndPublish(msg, ctx, eth_common.BytesToHash(msg.TxID), receipt)
  43. if pubErr != nil {
  44. w.logger.Error("Error when publishing message", zap.Error(pubErr))
  45. } else {
  46. numObservations++
  47. }
  48. continue
  49. }
  50. if msg.ConsistencyLevel == vaa.ConsistencyLevelSafe {
  51. if safeBlockNum == 0 {
  52. w.logger.Error("no safe block number available, ignoring observation request",
  53. zap.String("msgId", msg.MessageIDString()),
  54. zap.String("txHash", msg.TxIDString()),
  55. )
  56. continue
  57. }
  58. if blockNumber <= safeBlockNum {
  59. w.logger.Info("re-observed message publication transaction",
  60. zap.String("msgId", msg.MessageIDString()),
  61. zap.String("txHash", msg.TxIDString()),
  62. zap.Uint64("current_safe_block", safeBlockNum),
  63. zap.Uint64("observed_block", blockNumber),
  64. )
  65. pubErr := w.verifyAndPublish(msg, ctx, eth_common.BytesToHash(msg.TxID), receipt)
  66. if pubErr != nil {
  67. w.logger.Error("Error when publishing message", zap.Error(pubErr))
  68. // Avoid increasing the observations metrics for messages that weren't published.
  69. continue
  70. }
  71. numObservations++
  72. } else {
  73. w.logger.Info("ignoring re-observed message publication transaction",
  74. zap.String("msgId", msg.MessageIDString()),
  75. zap.String("txHash", msg.TxIDString()),
  76. zap.Uint64("current_safe_block", safeBlockNum),
  77. zap.Uint64("observed_block", blockNumber),
  78. )
  79. }
  80. continue
  81. }
  82. if finalizedBlockNum == 0 {
  83. w.logger.Error("no block number available, ignoring observation request",
  84. zap.String("msgId", msg.MessageIDString()),
  85. zap.String("txHash", msg.TxIDString()),
  86. )
  87. continue
  88. }
  89. // SECURITY: In the recovery flow, we already know which transaction to
  90. // observe, and we can assume that it has reached the expected finality
  91. // level a long time ago. Therefore, the logic is much simpler than the
  92. // primary watcher, which has to wait for finality.
  93. //
  94. // Instead, we can simply check if the transaction's block number is in
  95. // the past by more than the expected confirmation number.
  96. //
  97. // Ensure that the current block number is larger than the message observation's block number.
  98. if blockNumber <= finalizedBlockNum {
  99. w.logger.Info("re-observed message publication transaction",
  100. zap.String("msgId", msg.MessageIDString()),
  101. zap.String("txHash", msg.TxIDString()),
  102. zap.Uint64("current_block", finalizedBlockNum),
  103. zap.Uint64("observed_block", blockNumber),
  104. )
  105. pubErr := w.verifyAndPublish(msg, ctx, eth_common.BytesToHash(msg.TxID), receipt)
  106. if pubErr != nil {
  107. w.logger.Error("Error when publishing message", zap.Error(pubErr))
  108. } else {
  109. numObservations++
  110. }
  111. } else {
  112. w.logger.Info("ignoring re-observed message publication transaction",
  113. zap.String("msgId", msg.MessageIDString()),
  114. zap.String("txHash", msg.TxIDString()),
  115. zap.Uint64("current_block", finalizedBlockNum),
  116. zap.Uint64("observed_block", blockNumber),
  117. )
  118. }
  119. }
  120. return
  121. }
  122. // Reobserve is the interface for reobserving using a custom URL. It opens a connection to that URL and does the reobservation on it.
  123. func (w *Watcher) Reobserve(ctx context.Context, chainID vaa.ChainID, txID []byte, customEndpoint string) (uint32, error) {
  124. w.logger.Info("received a request to reobserve using a custom endpoint", zap.Stringer("chainID", chainID), zap.Any("txID", txID), zap.String("url", customEndpoint))
  125. // Verify that this endpoint is for the correct chain.
  126. if err := w.verifyEvmChainID(ctx, w.logger, customEndpoint); err != nil {
  127. return 0, fmt.Errorf("failed to verify evm chain id: %w", err)
  128. }
  129. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  130. defer cancel()
  131. // Connect to the node using the appropriate type of connector and the custom endpoint.
  132. ethConn, _, _, err := w.createConnector(timeout, customEndpoint)
  133. if err != nil {
  134. return 0, fmt.Errorf(`failed to connect to endpoint "%v": %w`, customEndpoint, err)
  135. }
  136. // Get the current finalized and safe blocks.
  137. _, finalized, safe, err := ethConn.GetLatest(timeout)
  138. if err != nil {
  139. return 0, fmt.Errorf(`failed to get latest blocks: %w`, err)
  140. }
  141. // Finally, do the reobservation and return the number of messages observed.
  142. return w.handleReobservationRequest(ctx, chainID, txID, ethConn, finalized, safe)
  143. }