watcher.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. package near
  2. import (
  3. "context"
  4. "fmt"
  5. "sync/atomic"
  6. "time"
  7. "github.com/certusone/wormhole/node/pkg/common"
  8. "github.com/certusone/wormhole/node/pkg/p2p"
  9. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  10. "github.com/certusone/wormhole/node/pkg/readiness"
  11. "github.com/certusone/wormhole/node/pkg/supervisor"
  12. "github.com/certusone/wormhole/node/pkg/watchers/near/nearapi"
  13. "github.com/ethereum/go-ethereum/common/math"
  14. "github.com/mr-tron/base58"
  15. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  16. "go.uber.org/zap"
  17. )
  18. var (
  19. // how long to initially wait between observing a transaction and attempting to process the transaction.
  20. // To successfully process the transaction, all receipts need to be finalized, which typically only occurs two blocks later or so.
  21. // transaction processing will be retried with exponential backoff, i.e. transaction may stay in the queue for ca. initialTxProcDelay^(txProcRetry+2) time.
  22. initialTxProcDelay = time.Second * 3
  23. blockPollInterval = time.Millisecond * 200
  24. // this value should be set to the max. amount of transactions in a block such that they can all be processed in parallel.
  25. workerCountTxProcessing int = 100
  26. // this value should be set to be greater than the amount of chunks in a NEAR block,
  27. // such that they can all be fetched in parallel.
  28. // We're currently seeing ~10 chunks/block, so setting this to 20 conservatively.
  29. workerChunkFetching int = 20
  30. queueSize int = 10_000 // size of the queues for chunk processing as well as transaction processing
  31. // if watcher falls behind this many blocks, start over. This should be set proportional to `queueSize`
  32. // such that all transactions from `maxFallBehindBlocks` can easily fit into the queue
  33. maxFallBehindBlocks uint = 200
  34. metricsInterval = time.Second * 10 // how often you want health metrics reported
  35. txProcRetry uint = 4 // how often to retry processing a transaction
  36. // the maximum span of gaps in the NEAR blockchain we want to support
  37. // lower values yields better performance, but can lead to missed observations if NEAR has larger gaps.
  38. // During testing, gaps on NEAR were at most 1 block long.
  39. nearBlockchainMaxGaps = 5
  40. )
  41. type (
  42. transactionProcessingJob struct {
  43. txHash string
  44. senderAccountId string
  45. creationTime time.Time
  46. retryCounter uint
  47. delay time.Duration
  48. isReobservation bool
  49. // set during processing
  50. hasWormholeMsg bool // set during processing; whether this transaction emitted a Wormhole message
  51. }
  52. Watcher struct {
  53. mainnet bool
  54. wormholeAccount string // name of the Wormhole Account on the NEAR blockchain
  55. nearRPC string
  56. // external channels
  57. msgC chan<- *common.MessagePublication // validated (SECURITY: and only validated!) observations go into this channel
  58. obsvReqC <-chan *gossipv1.ObservationRequest // observation requests are coming from this channel
  59. readinessSync readiness.Component
  60. // internal queues
  61. transactionProcessingQueueCounter atomic.Int64
  62. transactionProcessingQueue chan *transactionProcessingJob
  63. chunkProcessingQueue chan nearapi.ChunkHeader
  64. // events channels
  65. eventChanTxProcessedDuration chan time.Duration
  66. eventChan chan eventType
  67. // Error channel
  68. errC chan error
  69. // sub-components
  70. finalizer Finalizer
  71. nearAPI nearapi.NearApi
  72. }
  73. )
  74. // NewWatcher creates a new Near appid watcher
  75. func NewWatcher(
  76. nearRPC string,
  77. wormholeContract string,
  78. msgC chan<- *common.MessagePublication,
  79. obsvReqC <-chan *gossipv1.ObservationRequest,
  80. mainnet bool,
  81. ) *Watcher {
  82. return &Watcher{
  83. mainnet: mainnet,
  84. wormholeAccount: wormholeContract,
  85. nearRPC: nearRPC,
  86. msgC: msgC,
  87. obsvReqC: obsvReqC,
  88. readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDNear),
  89. transactionProcessingQueue: make(chan *transactionProcessingJob, queueSize),
  90. chunkProcessingQueue: make(chan nearapi.ChunkHeader, queueSize),
  91. eventChanTxProcessedDuration: make(chan time.Duration, 10),
  92. eventChan: make(chan eventType, 10),
  93. }
  94. }
  95. func newTransactionProcessingJob(txHash string, senderAccountId string, isReobservation bool) *transactionProcessingJob {
  96. return &transactionProcessingJob{
  97. txHash,
  98. senderAccountId,
  99. time.Now(),
  100. 0,
  101. initialTxProcDelay,
  102. isReobservation,
  103. false,
  104. }
  105. }
  106. func (e *Watcher) runBlockPoll(ctx context.Context) error {
  107. logger := supervisor.Logger(ctx)
  108. // As we start, get the height of the latest finalized block. We won't be processing any blocks before that.
  109. finalBlock, err := e.nearAPI.GetFinalBlock(ctx)
  110. if err != nil || finalBlock.Header.Height == 0 {
  111. logger.Error("failed to start NEAR block poll", zap.String("error_type", "startup_fail"), zap.String("log_msg_type", "startup_error"))
  112. return err
  113. }
  114. highestFinalBlockHeightObserved := finalBlock.Header.Height - 1 // minues one because we still want to process this block, just no blocks before it
  115. timer := time.NewTimer(time.Nanosecond) // this is just for the first iteration.
  116. for {
  117. select {
  118. case <-ctx.Done():
  119. return nil
  120. case <-timer.C:
  121. highestFinalBlockHeightObserved, err = e.ReadFinalChunksSince(logger, ctx, highestFinalBlockHeightObserved, e.chunkProcessingQueue)
  122. if err != nil {
  123. logger.Warn("NEAR poll error", zap.String("log_msg_type", "block_poll_error"), zap.String("error", err.Error()))
  124. }
  125. if highestFinalBlockHeightObserved > math.MaxInt64 {
  126. logger.Error("failed to start NEAR block poll", zap.String("error_type", "startup_fail"), zap.String("log_msg_type", "startup_error"))
  127. return fmt.Errorf("the latest finalised NEAR block heigh is not a valid int64: %d", highestFinalBlockHeightObserved)
  128. }
  129. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDNear, &gossipv1.Heartbeat_Network{
  130. Height: int64(highestFinalBlockHeightObserved),
  131. ContractAddress: e.wormholeAccount,
  132. })
  133. readiness.SetReady(e.readinessSync)
  134. timer.Reset(blockPollInterval)
  135. }
  136. }
  137. }
  138. func (e *Watcher) runChunkFetcher(ctx context.Context) error {
  139. logger := supervisor.Logger(ctx)
  140. for {
  141. select {
  142. case <-ctx.Done():
  143. return nil
  144. case chunkHeader := <-e.chunkProcessingQueue:
  145. newJobs, err := e.fetchAndParseChunk(logger, ctx, chunkHeader)
  146. if err != nil {
  147. logger.Warn("near.processChunk failed", zap.String("log_msg_type", "chunk_processing_failed"), zap.String("error", err.Error()))
  148. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
  149. continue
  150. }
  151. for _, job := range newJobs {
  152. err := e.schedule(ctx, job, job.delay)
  153. if err != nil {
  154. // Debug-level logging here because it could be very noisy (one log entry for *any* transaction on the NEAR blockchain)
  155. logger.Debug("error scheduling transaction processing job", zap.Error(err))
  156. }
  157. }
  158. }
  159. }
  160. }
  161. func (e *Watcher) runObsvReqProcessor(ctx context.Context) error {
  162. logger := supervisor.Logger(ctx)
  163. for {
  164. select {
  165. case <-ctx.Done():
  166. return ctx.Err()
  167. case r := <-e.obsvReqC:
  168. // node/pkg/node/reobserve.go already enforces the chain id is a valid uint16
  169. // and only writes to the channel for this chain id.
  170. // If either of the below cases are true, something has gone wrong
  171. if r.ChainId > math.MaxUint16 || vaa.ChainID(r.ChainId) != vaa.ChainIDNear {
  172. panic("invalid chain ID")
  173. }
  174. txHash := base58.Encode(r.TxHash)
  175. logger.Info("Received obsv request", zap.String("log_msg_type", "obsv_req_received"), zap.String("tx_hash", txHash))
  176. // TODO e.wormholeContract is not the correct value for senderAccountId. Instead, it should be the account id of the transaction sender.
  177. // This value is used by NEAR to determine which shard to query. An incorrect value here is not a security risk but could lead to reobservation requests failing.
  178. // Guardians currently run nodes for all shards and the API seems to be returning the correct results independent of the set senderAccountId but this could change in the future.
  179. // Fixing this would require adding the transaction sender account ID to the observation request.
  180. job := newTransactionProcessingJob(txHash, e.wormholeAccount, true)
  181. err := e.schedule(ctx, job, time.Nanosecond)
  182. if err != nil {
  183. // Error-level logging here because this is after an re-observation request already, which should be infrequent
  184. logger.Error("error scheduling transaction processing job", zap.Error(err))
  185. }
  186. }
  187. }
  188. }
  189. func (e *Watcher) runTxProcessor(ctx context.Context) error {
  190. logger := supervisor.Logger(ctx)
  191. for {
  192. select {
  193. case <-ctx.Done():
  194. return ctx.Err()
  195. case job := <-e.transactionProcessingQueue:
  196. err := e.processTx(logger, ctx, job)
  197. if err != nil {
  198. // transaction processing unsuccessful. Retry if retry_counter not exceeded.
  199. if job.retryCounter < txProcRetry {
  200. // Log and retry with exponential backoff
  201. logger.Debug(
  202. "near.processTx",
  203. zap.String("log_msg_type", "tx_processing_retry"),
  204. zap.String("tx_hash", job.txHash),
  205. zap.String("error", err.Error()),
  206. )
  207. job.retryCounter++
  208. job.delay *= 2
  209. err := e.schedule(ctx, job, job.delay)
  210. if err != nil {
  211. // Debug-level logging here because it could be very noisy (one log entry for *any* transaction on the NEAR blockchain)
  212. logger.Debug("error scheduling transaction processing job", zap.Error(err))
  213. }
  214. } else {
  215. // Warn and do not retry
  216. logger.Warn(
  217. "near.processTx",
  218. zap.String("log_msg_type", "tx_processing_retries_exceeded"),
  219. zap.String("tx_hash", job.txHash),
  220. zap.String("error", err.Error()),
  221. )
  222. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
  223. }
  224. }
  225. if job.hasWormholeMsg {
  226. // report how long it took to process this transaction
  227. e.eventChanTxProcessedDuration <- time.Since(job.creationTime) //nolint:channelcheck // Only pauses this watcher
  228. }
  229. }
  230. }
  231. }
  232. func (e *Watcher) Run(ctx context.Context) error {
  233. logger := supervisor.Logger(ctx)
  234. logger.Info("Starting watcher",
  235. zap.String("watcher_name", "near"),
  236. zap.Bool("mainnet", e.mainnet),
  237. zap.String("wormholeAccount", e.wormholeAccount),
  238. zap.String("nearRPC", e.nearRPC),
  239. )
  240. e.errC = make(chan error)
  241. e.nearAPI = nearapi.NewNearApiImpl(nearapi.NewHttpNearRpc(e.nearRPC))
  242. e.finalizer = newFinalizer(e.eventChan, e.nearAPI, e.mainnet)
  243. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDNear, &gossipv1.Heartbeat_Network{
  244. ContractAddress: e.wormholeAccount,
  245. })
  246. // Get the node version for troubleshooting
  247. e.logVersion(ctx, logger)
  248. logger.Info("Near watcher connecting to RPC node ", zap.String("url", e.nearRPC))
  249. // start metrics reporter
  250. common.RunWithScissors(ctx, e.errC, "metrics", e.runMetrics)
  251. // start one poller
  252. common.RunWithScissors(ctx, e.errC, "blockPoll", e.runBlockPoll)
  253. // start one obsvReqC runner
  254. common.RunWithScissors(ctx, e.errC, "obsvReqProcessor", e.runObsvReqProcessor)
  255. // start `workerCount` many chunkFetcher runners
  256. for i := 0; i < workerChunkFetching; i++ {
  257. common.RunWithScissors(ctx, e.errC, fmt.Sprintf("chunk_fetcher_%d", i), e.runChunkFetcher)
  258. }
  259. // start `workerCount` many transactionProcessing runners
  260. for i := 0; i < workerCountTxProcessing; i++ {
  261. common.RunWithScissors(ctx, e.errC, fmt.Sprintf("txProcessor_%d", i), e.runTxProcessor)
  262. }
  263. supervisor.Signal(ctx, supervisor.SignalHealthy)
  264. select {
  265. case <-ctx.Done():
  266. return ctx.Err()
  267. case err := <-e.errC:
  268. return err
  269. }
  270. }
  271. // schedule pushes a job to workers after delay. It is context aware and will not execute the job if the context
  272. // is cancelled before delay has passed and the job is picked up by a worker.
  273. func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) error {
  274. if int(e.transactionProcessingQueueCounter.Load())+len(e.transactionProcessingQueue) > queueSize {
  275. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
  276. return fmt.Errorf("NEAR transactionProcessingQueue exceeds max queue size, skipping transaction")
  277. }
  278. common.RunWithScissors(ctx, e.errC, "scheduledThread",
  279. func(ctx context.Context) error {
  280. timer := time.NewTimer(delay)
  281. defer timer.Stop()
  282. e.transactionProcessingQueueCounter.Add(1)
  283. defer e.transactionProcessingQueueCounter.Add(-1)
  284. select {
  285. case <-ctx.Done():
  286. return nil
  287. case <-timer.C:
  288. // Don't block on processing if the context is cancelled
  289. select {
  290. case <-ctx.Done():
  291. return nil
  292. case e.transactionProcessingQueue <- job: //nolint:channelcheck // Only blocking this go routine.
  293. }
  294. }
  295. return nil
  296. })
  297. return nil
  298. }
  299. // logVersion retrieves the NEAR node version and logs it
  300. func (e *Watcher) logVersion(ctx context.Context, logger *zap.Logger) {
  301. // From: https://www.quicknode.com/docs/near/status
  302. networkName := "near"
  303. version, err := e.nearAPI.GetVersion(ctx)
  304. if err != nil {
  305. logger.Error("problem retrieving node version",
  306. zap.Error(err),
  307. zap.String("network", networkName),
  308. )
  309. return
  310. }
  311. logger.Info("node version",
  312. zap.String("version", version),
  313. zap.String("network", networkName),
  314. )
  315. }