query.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. package query
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/certusone/wormhole/node/pkg/common"
  9. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  10. "github.com/certusone/wormhole/node/pkg/supervisor"
  11. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  12. ethCommon "github.com/ethereum/go-ethereum/common"
  13. ethCrypto "github.com/ethereum/go-ethereum/crypto"
  14. "go.uber.org/zap"
  15. )
  16. const (
  17. // RequestTimeout indicates how long before a request is considered to have timed out.
  18. RequestTimeout = 1 * time.Minute
  19. // RetryInterval specifies how long we will wait between retry intervals.
  20. RetryInterval = 10 * time.Second
  21. // AuditInterval specifies how often to audit the list of pending queries.
  22. AuditInterval = time.Second
  23. // SignedQueryRequestChannelSize is the buffer size of the incoming query request channel.
  24. SignedQueryRequestChannelSize = 500
  25. // QueryRequestBufferSize is the buffer size of the per-network query request channel.
  26. QueryRequestBufferSize = 250
  27. // QueryResponseBufferSize is the buffer size of the single query response channel from the watchers.
  28. QueryResponseBufferSize = 500
  29. // QueryResponsePublicationChannelSize is the buffer size of the single query response channel back to the P2P publisher.
  30. QueryResponsePublicationChannelSize = 500
  31. )
  32. func NewQueryHandler(
  33. logger *zap.Logger,
  34. env common.Environment,
  35. allowedRequestorsStr string,
  36. signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
  37. chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal,
  38. queryResponseReadC <-chan *PerChainQueryResponseInternal,
  39. queryResponseWriteC chan<- *QueryResponsePublication,
  40. ) *QueryHandler {
  41. return &QueryHandler{
  42. logger: logger.With(zap.String("component", "ccq")),
  43. env: env,
  44. allowedRequestorsStr: allowedRequestorsStr,
  45. signedQueryReqC: signedQueryReqC,
  46. chainQueryReqC: chainQueryReqC,
  47. queryResponseReadC: queryResponseReadC,
  48. queryResponseWriteC: queryResponseWriteC,
  49. }
  50. }
  51. type (
  52. // Watcher is the interface that any watcher that supports cross chain queries must implement.
  53. Watcher interface {
  54. QueryHandler(ctx context.Context, queryRequest *PerChainQueryInternal)
  55. }
  56. // QueryHandler defines the cross chain query handler.
  57. QueryHandler struct {
  58. logger *zap.Logger
  59. env common.Environment
  60. allowedRequestorsStr string
  61. signedQueryReqC <-chan *gossipv1.SignedQueryRequest
  62. chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal
  63. queryResponseReadC <-chan *PerChainQueryResponseInternal
  64. queryResponseWriteC chan<- *QueryResponsePublication
  65. allowedRequestors map[ethCommon.Address]struct{}
  66. }
  67. // pendingQuery is the cache entry for a given query.
  68. pendingQuery struct {
  69. signedRequest *gossipv1.SignedQueryRequest
  70. request *QueryRequest
  71. requestID string
  72. receiveTime time.Time
  73. queries []*perChainQuery
  74. responses []*PerChainQueryResponseInternal
  75. // respPub is only populated when we need to retry sending the response to p2p.
  76. respPub *QueryResponsePublication
  77. }
  78. // perChainQuery is the data associated with a single per chain query in a query request.
  79. perChainQuery struct {
  80. req *PerChainQueryInternal
  81. channel chan *PerChainQueryInternal
  82. lastUpdateTime time.Time
  83. }
  84. PerChainConfig struct {
  85. TimestampCacheSupported bool
  86. NumWorkers int
  87. }
  88. )
  89. // perChainConfig provides static config info for each chain. If a chain is not listed here, then it does not support queries.
  90. // Every chain listed here must have at least one worker specified.
  91. var perChainConfig = map[vaa.ChainID]PerChainConfig{
  92. vaa.ChainIDSolana: {NumWorkers: 10, TimestampCacheSupported: false},
  93. vaa.ChainIDEthereum: {NumWorkers: 5, TimestampCacheSupported: true},
  94. vaa.ChainIDBSC: {NumWorkers: 1, TimestampCacheSupported: true},
  95. vaa.ChainIDPolygon: {NumWorkers: 5, TimestampCacheSupported: true},
  96. vaa.ChainIDAvalanche: {NumWorkers: 1, TimestampCacheSupported: true},
  97. vaa.ChainIDFantom: {NumWorkers: 1, TimestampCacheSupported: true},
  98. vaa.ChainIDKlaytn: {NumWorkers: 1, TimestampCacheSupported: true},
  99. vaa.ChainIDCelo: {NumWorkers: 1, TimestampCacheSupported: true},
  100. vaa.ChainIDMoonbeam: {NumWorkers: 1, TimestampCacheSupported: true},
  101. vaa.ChainIDArbitrum: {NumWorkers: 5, TimestampCacheSupported: true},
  102. vaa.ChainIDOptimism: {NumWorkers: 5, TimestampCacheSupported: true},
  103. vaa.ChainIDBase: {NumWorkers: 5, TimestampCacheSupported: true},
  104. vaa.ChainIDScroll: {NumWorkers: 1, TimestampCacheSupported: true},
  105. vaa.ChainIDMantle: {NumWorkers: 1, TimestampCacheSupported: true},
  106. vaa.ChainIDXLayer: {NumWorkers: 1, TimestampCacheSupported: true},
  107. vaa.ChainIDLinea: {NumWorkers: 1, TimestampCacheSupported: true},
  108. vaa.ChainIDBerachain: {NumWorkers: 1, TimestampCacheSupported: true},
  109. vaa.ChainIDUnichain: {NumWorkers: 1, TimestampCacheSupported: true},
  110. vaa.ChainIDWorldchain: {NumWorkers: 1, TimestampCacheSupported: true},
  111. vaa.ChainIDInk: {NumWorkers: 1, TimestampCacheSupported: true},
  112. vaa.ChainIDSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  113. vaa.ChainIDHolesky: {NumWorkers: 1, TimestampCacheSupported: true},
  114. vaa.ChainIDArbitrumSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  115. vaa.ChainIDBaseSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  116. vaa.ChainIDOptimismSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  117. vaa.ChainIDPolygonSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  118. vaa.ChainIDHyperEVM: {NumWorkers: 1, TimestampCacheSupported: true},
  119. vaa.ChainIDMonad: {NumWorkers: 1, TimestampCacheSupported: true},
  120. vaa.ChainIDSeiEVM: {NumWorkers: 1, TimestampCacheSupported: true},
  121. vaa.ChainIDMezo: {NumWorkers: 1, TimestampCacheSupported: true},
  122. vaa.ChainIDFogo: {NumWorkers: 10, TimestampCacheSupported: true},
  123. vaa.ChainIDConverge: {NumWorkers: 1, TimestampCacheSupported: true},
  124. vaa.ChainIDPlume: {NumWorkers: 1, TimestampCacheSupported: true},
  125. vaa.ChainIDXRPLEVM: {NumWorkers: 1, TimestampCacheSupported: true},
  126. vaa.ChainIDPlasma: {NumWorkers: 1, TimestampCacheSupported: true},
  127. vaa.ChainIDCreditCoin: {NumWorkers: 1, TimestampCacheSupported: true},
  128. vaa.ChainIDMoca: {NumWorkers: 1, TimestampCacheSupported: true},
  129. }
  130. // GetPerChainConfig returns the config for the specified chain. If the chain is not configured it returns an empty struct,
  131. // which is not an error. It just means that queries are not supported for that chain.
  132. func GetPerChainConfig(chainID vaa.ChainID) PerChainConfig {
  133. if pcc, exists := perChainConfig[chainID]; exists {
  134. return pcc
  135. }
  136. return PerChainConfig{}
  137. }
  138. // QueriesSupported can be used by the watcher to determine if queries are supported for the chain.
  139. func (config PerChainConfig) QueriesSupported() bool {
  140. return config.NumWorkers > 0
  141. }
  142. // Start initializes the query handler and starts the runnable.
  143. func (qh *QueryHandler) Start(ctx context.Context) error {
  144. qh.logger.Debug("entering Start", zap.String("enforceFlag", qh.allowedRequestorsStr))
  145. var err error
  146. qh.allowedRequestors, err = parseAllowedRequesters(qh.allowedRequestorsStr)
  147. if err != nil {
  148. return fmt.Errorf("failed to parse allowed requesters: %w", err)
  149. }
  150. if err := supervisor.Run(ctx, "query_handler", common.WrapWithScissors(qh.handleQueryRequests, "query_handler")); err != nil {
  151. return fmt.Errorf("failed to start query handler routine: %w", err)
  152. }
  153. return nil
  154. }
  155. // handleQueryRequests multiplexes observation requests to the appropriate chain
  156. func (qh *QueryHandler) handleQueryRequests(ctx context.Context) error {
  157. return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval, AuditInterval)
  158. }
  159. // handleQueryRequestsImpl allows instantiating the handler in the test environment with shorter timeout and retry parameters.
  160. func handleQueryRequestsImpl(
  161. ctx context.Context,
  162. logger *zap.Logger,
  163. signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
  164. chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal,
  165. allowedRequestors map[ethCommon.Address]struct{},
  166. queryResponseReadC <-chan *PerChainQueryResponseInternal,
  167. queryResponseWriteC chan<- *QueryResponsePublication,
  168. env common.Environment,
  169. requestTimeoutImpl time.Duration,
  170. retryIntervalImpl time.Duration,
  171. auditIntervalImpl time.Duration,
  172. ) error {
  173. qLogger := logger.With(zap.String("component", "ccqhandler"))
  174. qLogger.Info("cross chain queries are enabled", zap.Any("allowedRequestors", allowedRequestors), zap.String("env", string(env)))
  175. pendingQueries := make(map[string]*pendingQuery) // Key is requestID.
  176. // Create the set of chains for which CCQ is actually enabled. Those are the ones in the config for which we actually have a watcher enabled.
  177. supportedChains := make(map[vaa.ChainID]struct{})
  178. for chainID, config := range perChainConfig {
  179. if _, exists := chainQueryReqC[chainID]; exists {
  180. if config.NumWorkers <= 0 {
  181. panic(fmt.Sprintf(`invalid per chain config entry for "%s", no workers specified`, chainID.String()))
  182. }
  183. logger.Info("queries supported on chain", zap.Stringer("chainID", chainID), zap.Int("numWorkers", config.NumWorkers))
  184. supportedChains[chainID] = struct{}{}
  185. // Make sure we have a metric for every enabled chain, so we can see which ones are actually enabled.
  186. totalRequestsByChain.WithLabelValues(chainID.String()).Add(0)
  187. }
  188. }
  189. ticker := time.NewTicker(auditIntervalImpl)
  190. defer ticker.Stop()
  191. for {
  192. select {
  193. case <-ctx.Done():
  194. return nil
  195. case signedRequest := <-signedQueryReqC: // Inbound query request.
  196. // requestor validation happens here
  197. // request type validation is currently handled by the watcher
  198. // in the future, it may be worthwhile to catch certain types of
  199. // invalid requests here for tracking purposes
  200. // e.g.
  201. // - length check on "signature" 65 bytes
  202. // - length check on "to" address 20 bytes
  203. // - valid "block" strings
  204. allQueryRequestsReceived.Inc()
  205. digest := QueryRequestDigest(env, signedRequest.QueryRequest)
  206. // It's possible that the signature alone is not unique, and the digest alone is not unique, but the combination should be.
  207. requestID := hex.EncodeToString(signedRequest.Signature) + ":" + digest.String()
  208. qLogger.Info("received a query request", zap.String("requestID", requestID))
  209. signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), signedRequest.Signature)
  210. if err != nil {
  211. qLogger.Error("failed to recover public key", zap.String("requestID", requestID))
  212. invalidQueryRequestReceived.WithLabelValues("failed_to_recover_public_key").Inc()
  213. continue
  214. }
  215. signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])
  216. if _, exists := allowedRequestors[signerAddress]; !exists {
  217. qLogger.Debug("invalid requestor", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID))
  218. invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc()
  219. continue
  220. }
  221. // Make sure this is not a duplicate request. TODO: Should we do something smarter here than just dropping the duplicate?
  222. if oldReq, exists := pendingQueries[requestID]; exists {
  223. qLogger.Warn("dropping duplicate query request", zap.String("requestID", requestID), zap.Stringer("origRecvTime", oldReq.receiveTime))
  224. invalidQueryRequestReceived.WithLabelValues("duplicate_request").Inc()
  225. continue
  226. }
  227. var queryRequest QueryRequest
  228. err = queryRequest.Unmarshal(signedRequest.QueryRequest)
  229. if err != nil {
  230. qLogger.Error("failed to unmarshal query request", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err))
  231. invalidQueryRequestReceived.WithLabelValues("failed_to_unmarshal_request").Inc()
  232. continue
  233. }
  234. if err := queryRequest.Validate(); err != nil {
  235. qLogger.Error("received invalid message", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err))
  236. invalidQueryRequestReceived.WithLabelValues("invalid_request").Inc()
  237. continue
  238. }
  239. // Build the set of per chain queries and placeholders for the per chain responses.
  240. errorFound := false
  241. queries := []*perChainQuery{}
  242. responses := make([]*PerChainQueryResponseInternal, len(queryRequest.PerChainQueries))
  243. receiveTime := time.Now()
  244. for requestIdx, pcq := range queryRequest.PerChainQueries {
  245. chainID := vaa.ChainID(pcq.ChainId)
  246. if _, exists := supportedChains[chainID]; !exists {
  247. qLogger.Debug("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID))
  248. invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc()
  249. errorFound = true
  250. break
  251. }
  252. channel, channelExists := chainQueryReqC[chainID]
  253. if !channelExists {
  254. qLogger.Debug("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID))
  255. invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc()
  256. errorFound = true
  257. break
  258. }
  259. queries = append(queries, &perChainQuery{
  260. req: &PerChainQueryInternal{
  261. RequestID: requestID,
  262. RequestIdx: requestIdx,
  263. Request: pcq,
  264. },
  265. channel: channel,
  266. })
  267. }
  268. if errorFound {
  269. continue
  270. }
  271. validQueryRequestsReceived.Inc()
  272. // Create the pending query and add it to the cache.
  273. pq := &pendingQuery{
  274. signedRequest: signedRequest,
  275. request: &queryRequest,
  276. requestID: requestID,
  277. receiveTime: receiveTime,
  278. queries: queries,
  279. responses: responses,
  280. }
  281. pendingQueries[requestID] = pq
  282. // Forward the requests to the watchers.
  283. for _, pcq := range pq.queries {
  284. pcq.ccqForwardToWatcher(qLogger, pq.receiveTime)
  285. }
  286. case resp := <-queryResponseReadC: // Response from a watcher.
  287. if resp.Status == QuerySuccess {
  288. successfulQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
  289. if resp.Response == nil {
  290. qLogger.Error("received a successful query response with no results, dropping it!", zap.String("requestID", resp.RequestID))
  291. continue
  292. }
  293. pq, exists := pendingQueries[resp.RequestID]
  294. if !exists {
  295. qLogger.Warn("received a success response with no outstanding query, dropping it", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  296. continue
  297. }
  298. if resp.RequestIdx >= len(pq.responses) {
  299. qLogger.Error("received a response with an invalid index", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  300. continue
  301. }
  302. // Store the result, which will mark this per-chain query as completed.
  303. pq.responses[resp.RequestIdx] = resp
  304. // If we still have other outstanding per chain queries for this request, keep waiting.
  305. numStillPending := pq.numPendingRequests()
  306. if numStillPending > 0 {
  307. qLogger.Info("received a per chain query response, still waiting for more", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx), zap.Int("numStillPending", numStillPending))
  308. continue
  309. } else {
  310. qLogger.Info("received final per chain query response, ready to publish", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  311. }
  312. // Build the list of per chain response publications and the overall query response publication.
  313. responses := []*PerChainQueryResponse{}
  314. for _, pqResp := range pq.responses {
  315. if pqResp == nil {
  316. qLogger.Error("unexpected nil response in pending query!", zap.String("requestID", resp.RequestID))
  317. continue
  318. }
  319. responses = append(responses, &PerChainQueryResponse{
  320. ChainId: pqResp.ChainId,
  321. Response: pqResp.Response,
  322. })
  323. }
  324. respPub := &QueryResponsePublication{
  325. Request: pq.signedRequest,
  326. PerChainResponses: responses,
  327. }
  328. // Send the response to be published.
  329. select {
  330. case queryResponseWriteC <- respPub:
  331. qLogger.Info("forwarded query response to p2p", zap.String("requestID", resp.RequestID))
  332. queryResponsesPublished.Inc()
  333. delete(pendingQueries, resp.RequestID)
  334. default:
  335. qLogger.Warn("failed to publish query response to p2p, will retry publishing next interval", zap.String("requestID", resp.RequestID))
  336. pq.respPub = respPub
  337. }
  338. } else if resp.Status == QueryRetryNeeded {
  339. retryNeededQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
  340. if _, exists := pendingQueries[resp.RequestID]; exists {
  341. qLogger.Warn("query failed, will retry next interval", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  342. } else {
  343. qLogger.Warn("received a retry needed response with no outstanding query, dropping it", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  344. }
  345. } else if resp.Status == QueryFatalError {
  346. fatalQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
  347. qLogger.Error("received a fatal error response, dropping the whole request", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  348. delete(pendingQueries, resp.RequestID)
  349. } else {
  350. qLogger.Error("received an unexpected query status, dropping the whole request", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx), zap.Int("status", int(resp.Status)))
  351. delete(pendingQueries, resp.RequestID)
  352. }
  353. case <-ticker.C: // Retry audit timer.
  354. now := time.Now()
  355. for reqId, pq := range pendingQueries {
  356. timeout := pq.receiveTime.Add(requestTimeoutImpl)
  357. qLogger.Debug("audit", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime), zap.Stringer("timeout", timeout))
  358. if timeout.Before(now) {
  359. qLogger.Debug("query request timed out, dropping it", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime))
  360. queryRequestsTimedOut.Inc()
  361. delete(pendingQueries, reqId)
  362. } else {
  363. if pq.respPub != nil {
  364. // Resend the response to be published.
  365. select {
  366. case queryResponseWriteC <- pq.respPub:
  367. qLogger.Info("resend of query response to p2p succeeded", zap.String("requestID", reqId))
  368. queryResponsesPublished.Inc()
  369. delete(pendingQueries, reqId)
  370. default:
  371. qLogger.Warn("resend of query response to p2p failed again, will keep retrying", zap.String("requestID", reqId))
  372. }
  373. } else {
  374. for requestIdx, pcq := range pq.queries {
  375. if pq.responses[requestIdx] == nil && pcq.lastUpdateTime.Add(retryIntervalImpl).Before(now) {
  376. qLogger.Info("retrying query request",
  377. zap.String("requestId", reqId),
  378. zap.Int("requestIdx", requestIdx),
  379. zap.Stringer("receiveTime", pq.receiveTime),
  380. zap.Stringer("lastUpdateTime", pcq.lastUpdateTime),
  381. zap.String("chainID", pq.queries[requestIdx].req.Request.ChainId.String()),
  382. )
  383. pcq.ccqForwardToWatcher(qLogger, now)
  384. }
  385. }
  386. }
  387. }
  388. }
  389. }
  390. }
  391. }
  392. // parseAllowedRequesters parses a comma separated list of allowed requesters into a map to be used for look ups.
  393. func parseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address]struct{}, error) {
  394. if ccqAllowedRequesters == "" {
  395. return nil, fmt.Errorf("if cross chain query is enabled `--ccqAllowedRequesters` must be specified")
  396. }
  397. var nullAddr ethCommon.Address
  398. result := make(map[ethCommon.Address]struct{})
  399. for _, str := range strings.Split(ccqAllowedRequesters, ",") {
  400. addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(strings.TrimPrefix(str, "0x")))
  401. if addr == nullAddr {
  402. return nil, fmt.Errorf("invalid value in `--ccqAllowedRequesters`: `%s`", str)
  403. }
  404. result[addr] = struct{}{}
  405. }
  406. if len(result) <= 0 {
  407. return nil, fmt.Errorf("no allowed requestors specified, ccqAllowedRequesters: `%s`", ccqAllowedRequesters)
  408. }
  409. return result, nil
  410. }
  411. // ccqForwardToWatcher submits a query request to the appropriate watcher. It updates the request object if the write succeeds.
  412. // If the write fails, it does not update the last update time, which will cause a retry next interval (until it times out)
  413. func (pcq *perChainQuery) ccqForwardToWatcher(qLogger *zap.Logger, receiveTime time.Time) {
  414. select {
  415. // TODO: only send the query request itself and reassemble in this module
  416. case pcq.channel <- pcq.req:
  417. qLogger.Debug("forwarded query request to watcher", zap.String("requestID", pcq.req.RequestID), zap.Stringer("chainID", pcq.req.Request.ChainId))
  418. totalRequestsByChain.WithLabelValues(pcq.req.Request.ChainId.String()).Inc()
  419. default:
  420. qLogger.Warn("failed to send query request to watcher, will retry next interval", zap.String("requestID", pcq.req.RequestID), zap.Stringer("chain_id", pcq.req.Request.ChainId))
  421. }
  422. pcq.lastUpdateTime = receiveTime
  423. }
  424. // numPendingRequests returns the number of per chain queries in a request that are still awaiting responses. Zero means the request can now be published.
  425. func (pq *pendingQuery) numPendingRequests() int {
  426. numPending := 0
  427. for _, resp := range pq.responses {
  428. if resp == nil {
  429. numPending += 1
  430. }
  431. }
  432. return numPending
  433. }
  434. // StartWorkers is used by the watchers to start the query handler worker routines.
  435. func StartWorkers(
  436. ctx context.Context,
  437. logger *zap.Logger,
  438. errC chan error,
  439. w Watcher,
  440. queryReqC <-chan *PerChainQueryInternal,
  441. config PerChainConfig,
  442. tag string,
  443. ) {
  444. for count := 0; count < config.NumWorkers; count++ {
  445. workerId := count
  446. common.RunWithScissors(ctx, errC, fmt.Sprintf("%s_fetch_query_req", tag), func(ctx context.Context) error {
  447. logger.Debug("CONCURRENT: starting worker", zap.Int("worker", workerId))
  448. for {
  449. select {
  450. case <-ctx.Done():
  451. return nil
  452. case queryRequest := <-queryReqC:
  453. logger.Debug("CONCURRENT: processing query request", zap.Int("worker", workerId))
  454. w.QueryHandler(ctx, queryRequest)
  455. logger.Debug("CONCURRENT: finished processing query request", zap.Int("worker", workerId))
  456. }
  457. }
  458. })
  459. }
  460. }