query.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. package query
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/certusone/wormhole/node/pkg/common"
  9. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  10. "github.com/certusone/wormhole/node/pkg/supervisor"
  11. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  12. ethCommon "github.com/ethereum/go-ethereum/common"
  13. ethCrypto "github.com/ethereum/go-ethereum/crypto"
  14. "go.uber.org/zap"
  15. )
  16. const (
  17. // RequestTimeout indicates how long before a request is considered to have timed out.
  18. RequestTimeout = 1 * time.Minute
  19. // RetryInterval specifies how long we will wait between retry intervals.
  20. RetryInterval = 10 * time.Second
  21. // AuditInterval specifies how often to audit the list of pending queries.
  22. AuditInterval = time.Second
  23. // SignedQueryRequestChannelSize is the buffer size of the incoming query request channel.
  24. SignedQueryRequestChannelSize = 500
  25. // QueryRequestBufferSize is the buffer size of the per-network query request channel.
  26. QueryRequestBufferSize = 250
  27. // QueryResponseBufferSize is the buffer size of the single query response channel from the watchers.
  28. QueryResponseBufferSize = 500
  29. // QueryResponsePublicationChannelSize is the buffer size of the single query response channel back to the P2P publisher.
  30. QueryResponsePublicationChannelSize = 500
  31. )
  32. func NewQueryHandler(
  33. logger *zap.Logger,
  34. env common.Environment,
  35. allowedRequestorsStr string,
  36. signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
  37. chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal,
  38. queryResponseReadC <-chan *PerChainQueryResponseInternal,
  39. queryResponseWriteC chan<- *QueryResponsePublication,
  40. ) *QueryHandler {
  41. return &QueryHandler{
  42. logger: logger.With(zap.String("component", "ccq")),
  43. env: env,
  44. allowedRequestorsStr: allowedRequestorsStr,
  45. signedQueryReqC: signedQueryReqC,
  46. chainQueryReqC: chainQueryReqC,
  47. queryResponseReadC: queryResponseReadC,
  48. queryResponseWriteC: queryResponseWriteC,
  49. }
  50. }
  51. type (
  52. // Watcher is the interface that any watcher that supports cross chain queries must implement.
  53. Watcher interface {
  54. QueryHandler(ctx context.Context, queryRequest *PerChainQueryInternal)
  55. }
  56. // QueryHandler defines the cross chain query handler.
  57. QueryHandler struct {
  58. logger *zap.Logger
  59. env common.Environment
  60. allowedRequestorsStr string
  61. signedQueryReqC <-chan *gossipv1.SignedQueryRequest
  62. chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal
  63. queryResponseReadC <-chan *PerChainQueryResponseInternal
  64. queryResponseWriteC chan<- *QueryResponsePublication
  65. allowedRequestors map[ethCommon.Address]struct{}
  66. }
  67. // pendingQuery is the cache entry for a given query.
  68. pendingQuery struct {
  69. signedRequest *gossipv1.SignedQueryRequest
  70. request *QueryRequest
  71. requestID string
  72. receiveTime time.Time
  73. queries []*perChainQuery
  74. responses []*PerChainQueryResponseInternal
  75. // respPub is only populated when we need to retry sending the response to p2p.
  76. respPub *QueryResponsePublication
  77. }
  78. // perChainQuery is the data associated with a single per chain query in a query request.
  79. perChainQuery struct {
  80. req *PerChainQueryInternal
  81. channel chan *PerChainQueryInternal
  82. lastUpdateTime time.Time
  83. }
  84. PerChainConfig struct {
  85. TimestampCacheSupported bool
  86. NumWorkers int
  87. }
  88. )
  89. // perChainConfig provides static config info for each chain. If a chain is not listed here, then it does not support queries.
  90. // Every chain listed here must have at least one worker specified.
  91. var perChainConfig = map[vaa.ChainID]PerChainConfig{
  92. vaa.ChainIDSolana: {NumWorkers: 10, TimestampCacheSupported: false},
  93. vaa.ChainIDEthereum: {NumWorkers: 5, TimestampCacheSupported: true},
  94. vaa.ChainIDBSC: {NumWorkers: 1, TimestampCacheSupported: true},
  95. vaa.ChainIDPolygon: {NumWorkers: 5, TimestampCacheSupported: true},
  96. vaa.ChainIDAvalanche: {NumWorkers: 1, TimestampCacheSupported: true},
  97. vaa.ChainIDOasis: {NumWorkers: 1, TimestampCacheSupported: true},
  98. vaa.ChainIDAurora: {NumWorkers: 1, TimestampCacheSupported: true},
  99. vaa.ChainIDFantom: {NumWorkers: 1, TimestampCacheSupported: true},
  100. vaa.ChainIDKarura: {NumWorkers: 1, TimestampCacheSupported: true},
  101. vaa.ChainIDAcala: {NumWorkers: 1, TimestampCacheSupported: true},
  102. vaa.ChainIDKlaytn: {NumWorkers: 1, TimestampCacheSupported: true},
  103. vaa.ChainIDCelo: {NumWorkers: 1, TimestampCacheSupported: true},
  104. vaa.ChainIDMoonbeam: {NumWorkers: 1, TimestampCacheSupported: true},
  105. vaa.ChainIDArbitrum: {NumWorkers: 5, TimestampCacheSupported: true},
  106. vaa.ChainIDOptimism: {NumWorkers: 5, TimestampCacheSupported: true},
  107. vaa.ChainIDBase: {NumWorkers: 5, TimestampCacheSupported: true},
  108. vaa.ChainIDScroll: {NumWorkers: 1, TimestampCacheSupported: true},
  109. vaa.ChainIDMantle: {NumWorkers: 1, TimestampCacheSupported: true},
  110. vaa.ChainIDBlast: {NumWorkers: 1, TimestampCacheSupported: true},
  111. vaa.ChainIDXLayer: {NumWorkers: 1, TimestampCacheSupported: true},
  112. vaa.ChainIDLinea: {NumWorkers: 1, TimestampCacheSupported: true},
  113. vaa.ChainIDBerachain: {NumWorkers: 1, TimestampCacheSupported: true},
  114. vaa.ChainIDSnaxchain: {NumWorkers: 1, TimestampCacheSupported: true},
  115. vaa.ChainIDUnichain: {NumWorkers: 1, TimestampCacheSupported: true},
  116. vaa.ChainIDWorldchain: {NumWorkers: 1, TimestampCacheSupported: true},
  117. vaa.ChainIDInk: {NumWorkers: 1, TimestampCacheSupported: true},
  118. vaa.ChainIDSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  119. vaa.ChainIDHolesky: {NumWorkers: 1, TimestampCacheSupported: true},
  120. vaa.ChainIDArbitrumSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  121. vaa.ChainIDBaseSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  122. vaa.ChainIDOptimismSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  123. vaa.ChainIDPolygonSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
  124. vaa.ChainIDHyperEVM: {NumWorkers: 1, TimestampCacheSupported: true},
  125. vaa.ChainIDMonad: {NumWorkers: 1, TimestampCacheSupported: true},
  126. vaa.ChainIDSeiEVM: {NumWorkers: 1, TimestampCacheSupported: true},
  127. vaa.ChainIDMezo: {NumWorkers: 1, TimestampCacheSupported: true},
  128. vaa.ChainIDFogo: {NumWorkers: 10, TimestampCacheSupported: true},
  129. vaa.ChainIDConverge: {NumWorkers: 1, TimestampCacheSupported: true},
  130. vaa.ChainIDPlume: {NumWorkers: 1, TimestampCacheSupported: true},
  131. }
  132. // GetPerChainConfig returns the config for the specified chain. If the chain is not configured it returns an empty struct,
  133. // which is not an error. It just means that queries are not supported for that chain.
  134. func GetPerChainConfig(chainID vaa.ChainID) PerChainConfig {
  135. if pcc, exists := perChainConfig[chainID]; exists {
  136. return pcc
  137. }
  138. return PerChainConfig{}
  139. }
  140. // QueriesSupported can be used by the watcher to determine if queries are supported for the chain.
  141. func (config PerChainConfig) QueriesSupported() bool {
  142. return config.NumWorkers > 0
  143. }
  144. // Start initializes the query handler and starts the runnable.
  145. func (qh *QueryHandler) Start(ctx context.Context) error {
  146. qh.logger.Debug("entering Start", zap.String("enforceFlag", qh.allowedRequestorsStr))
  147. var err error
  148. qh.allowedRequestors, err = parseAllowedRequesters(qh.allowedRequestorsStr)
  149. if err != nil {
  150. return fmt.Errorf("failed to parse allowed requesters: %w", err)
  151. }
  152. if err := supervisor.Run(ctx, "query_handler", common.WrapWithScissors(qh.handleQueryRequests, "query_handler")); err != nil {
  153. return fmt.Errorf("failed to start query handler routine: %w", err)
  154. }
  155. return nil
  156. }
  157. // handleQueryRequests multiplexes observation requests to the appropriate chain
  158. func (qh *QueryHandler) handleQueryRequests(ctx context.Context) error {
  159. return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval, AuditInterval)
  160. }
  161. // handleQueryRequestsImpl allows instantiating the handler in the test environment with shorter timeout and retry parameters.
  162. func handleQueryRequestsImpl(
  163. ctx context.Context,
  164. logger *zap.Logger,
  165. signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
  166. chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal,
  167. allowedRequestors map[ethCommon.Address]struct{},
  168. queryResponseReadC <-chan *PerChainQueryResponseInternal,
  169. queryResponseWriteC chan<- *QueryResponsePublication,
  170. env common.Environment,
  171. requestTimeoutImpl time.Duration,
  172. retryIntervalImpl time.Duration,
  173. auditIntervalImpl time.Duration,
  174. ) error {
  175. qLogger := logger.With(zap.String("component", "ccqhandler"))
  176. qLogger.Info("cross chain queries are enabled", zap.Any("allowedRequestors", allowedRequestors), zap.String("env", string(env)))
  177. pendingQueries := make(map[string]*pendingQuery) // Key is requestID.
  178. // Create the set of chains for which CCQ is actually enabled. Those are the ones in the config for which we actually have a watcher enabled.
  179. supportedChains := make(map[vaa.ChainID]struct{})
  180. for chainID, config := range perChainConfig {
  181. if _, exists := chainQueryReqC[chainID]; exists {
  182. if config.NumWorkers <= 0 {
  183. panic(fmt.Sprintf(`invalid per chain config entry for "%s", no workers specified`, chainID.String()))
  184. }
  185. logger.Info("queries supported on chain", zap.Stringer("chainID", chainID), zap.Int("numWorkers", config.NumWorkers))
  186. supportedChains[chainID] = struct{}{}
  187. // Make sure we have a metric for every enabled chain, so we can see which ones are actually enabled.
  188. totalRequestsByChain.WithLabelValues(chainID.String()).Add(0)
  189. }
  190. }
  191. ticker := time.NewTicker(auditIntervalImpl)
  192. defer ticker.Stop()
  193. for {
  194. select {
  195. case <-ctx.Done():
  196. return nil
  197. case signedRequest := <-signedQueryReqC: // Inbound query request.
  198. // requestor validation happens here
  199. // request type validation is currently handled by the watcher
  200. // in the future, it may be worthwhile to catch certain types of
  201. // invalid requests here for tracking purposes
  202. // e.g.
  203. // - length check on "signature" 65 bytes
  204. // - length check on "to" address 20 bytes
  205. // - valid "block" strings
  206. allQueryRequestsReceived.Inc()
  207. digest := QueryRequestDigest(env, signedRequest.QueryRequest)
  208. // It's possible that the signature alone is not unique, and the digest alone is not unique, but the combination should be.
  209. requestID := hex.EncodeToString(signedRequest.Signature) + ":" + digest.String()
  210. qLogger.Info("received a query request", zap.String("requestID", requestID))
  211. signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), signedRequest.Signature)
  212. if err != nil {
  213. qLogger.Error("failed to recover public key", zap.String("requestID", requestID))
  214. invalidQueryRequestReceived.WithLabelValues("failed_to_recover_public_key").Inc()
  215. continue
  216. }
  217. signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])
  218. if _, exists := allowedRequestors[signerAddress]; !exists {
  219. qLogger.Debug("invalid requestor", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID))
  220. invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc()
  221. continue
  222. }
  223. // Make sure this is not a duplicate request. TODO: Should we do something smarter here than just dropping the duplicate?
  224. if oldReq, exists := pendingQueries[requestID]; exists {
  225. qLogger.Warn("dropping duplicate query request", zap.String("requestID", requestID), zap.Stringer("origRecvTime", oldReq.receiveTime))
  226. invalidQueryRequestReceived.WithLabelValues("duplicate_request").Inc()
  227. continue
  228. }
  229. var queryRequest QueryRequest
  230. err = queryRequest.Unmarshal(signedRequest.QueryRequest)
  231. if err != nil {
  232. qLogger.Error("failed to unmarshal query request", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err))
  233. invalidQueryRequestReceived.WithLabelValues("failed_to_unmarshal_request").Inc()
  234. continue
  235. }
  236. if err := queryRequest.Validate(); err != nil {
  237. qLogger.Error("received invalid message", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err))
  238. invalidQueryRequestReceived.WithLabelValues("invalid_request").Inc()
  239. continue
  240. }
  241. // Build the set of per chain queries and placeholders for the per chain responses.
  242. errorFound := false
  243. queries := []*perChainQuery{}
  244. responses := make([]*PerChainQueryResponseInternal, len(queryRequest.PerChainQueries))
  245. receiveTime := time.Now()
  246. for requestIdx, pcq := range queryRequest.PerChainQueries {
  247. chainID := vaa.ChainID(pcq.ChainId)
  248. if _, exists := supportedChains[chainID]; !exists {
  249. qLogger.Debug("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID))
  250. invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc()
  251. errorFound = true
  252. break
  253. }
  254. channel, channelExists := chainQueryReqC[chainID]
  255. if !channelExists {
  256. qLogger.Debug("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID))
  257. invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc()
  258. errorFound = true
  259. break
  260. }
  261. queries = append(queries, &perChainQuery{
  262. req: &PerChainQueryInternal{
  263. RequestID: requestID,
  264. RequestIdx: requestIdx,
  265. Request: pcq,
  266. },
  267. channel: channel,
  268. })
  269. }
  270. if errorFound {
  271. continue
  272. }
  273. validQueryRequestsReceived.Inc()
  274. // Create the pending query and add it to the cache.
  275. pq := &pendingQuery{
  276. signedRequest: signedRequest,
  277. request: &queryRequest,
  278. requestID: requestID,
  279. receiveTime: receiveTime,
  280. queries: queries,
  281. responses: responses,
  282. }
  283. pendingQueries[requestID] = pq
  284. // Forward the requests to the watchers.
  285. for _, pcq := range pq.queries {
  286. pcq.ccqForwardToWatcher(qLogger, pq.receiveTime)
  287. }
  288. case resp := <-queryResponseReadC: // Response from a watcher.
  289. if resp.Status == QuerySuccess {
  290. successfulQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
  291. if resp.Response == nil {
  292. qLogger.Error("received a successful query response with no results, dropping it!", zap.String("requestID", resp.RequestID))
  293. continue
  294. }
  295. pq, exists := pendingQueries[resp.RequestID]
  296. if !exists {
  297. qLogger.Warn("received a success response with no outstanding query, dropping it", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  298. continue
  299. }
  300. if resp.RequestIdx >= len(pq.responses) {
  301. qLogger.Error("received a response with an invalid index", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  302. continue
  303. }
  304. // Store the result, which will mark this per-chain query as completed.
  305. pq.responses[resp.RequestIdx] = resp
  306. // If we still have other outstanding per chain queries for this request, keep waiting.
  307. numStillPending := pq.numPendingRequests()
  308. if numStillPending > 0 {
  309. qLogger.Info("received a per chain query response, still waiting for more", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx), zap.Int("numStillPending", numStillPending))
  310. continue
  311. } else {
  312. qLogger.Info("received final per chain query response, ready to publish", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  313. }
  314. // Build the list of per chain response publications and the overall query response publication.
  315. responses := []*PerChainQueryResponse{}
  316. for _, resp := range pq.responses {
  317. if resp == nil {
  318. qLogger.Error("unexpected null response in pending query!", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  319. continue
  320. }
  321. responses = append(responses, &PerChainQueryResponse{
  322. ChainId: resp.ChainId,
  323. Response: resp.Response,
  324. })
  325. }
  326. respPub := &QueryResponsePublication{
  327. Request: pq.signedRequest,
  328. PerChainResponses: responses,
  329. }
  330. // Send the response to be published.
  331. select {
  332. case queryResponseWriteC <- respPub:
  333. qLogger.Info("forwarded query response to p2p", zap.String("requestID", resp.RequestID))
  334. queryResponsesPublished.Inc()
  335. delete(pendingQueries, resp.RequestID)
  336. default:
  337. qLogger.Warn("failed to publish query response to p2p, will retry publishing next interval", zap.String("requestID", resp.RequestID))
  338. pq.respPub = respPub
  339. }
  340. } else if resp.Status == QueryRetryNeeded {
  341. retryNeededQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
  342. if _, exists := pendingQueries[resp.RequestID]; exists {
  343. qLogger.Warn("query failed, will retry next interval", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  344. } else {
  345. qLogger.Warn("received a retry needed response with no outstanding query, dropping it", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  346. }
  347. } else if resp.Status == QueryFatalError {
  348. fatalQueryResponsesReceivedByChain.WithLabelValues(resp.ChainId.String()).Inc()
  349. qLogger.Error("received a fatal error response, dropping the whole request", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx))
  350. delete(pendingQueries, resp.RequestID)
  351. } else {
  352. qLogger.Error("received an unexpected query status, dropping the whole request", zap.String("requestID", resp.RequestID), zap.Int("requestIdx", resp.RequestIdx), zap.Int("status", int(resp.Status)))
  353. delete(pendingQueries, resp.RequestID)
  354. }
  355. case <-ticker.C: // Retry audit timer.
  356. now := time.Now()
  357. for reqId, pq := range pendingQueries {
  358. timeout := pq.receiveTime.Add(requestTimeoutImpl)
  359. qLogger.Debug("audit", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime), zap.Stringer("timeout", timeout))
  360. if timeout.Before(now) {
  361. qLogger.Debug("query request timed out, dropping it", zap.String("requestId", reqId), zap.Stringer("receiveTime", pq.receiveTime))
  362. queryRequestsTimedOut.Inc()
  363. delete(pendingQueries, reqId)
  364. } else {
  365. if pq.respPub != nil {
  366. // Resend the response to be published.
  367. select {
  368. case queryResponseWriteC <- pq.respPub:
  369. qLogger.Info("resend of query response to p2p succeeded", zap.String("requestID", reqId))
  370. queryResponsesPublished.Inc()
  371. delete(pendingQueries, reqId)
  372. default:
  373. qLogger.Warn("resend of query response to p2p failed again, will keep retrying", zap.String("requestID", reqId))
  374. }
  375. } else {
  376. for requestIdx, pcq := range pq.queries {
  377. if pq.responses[requestIdx] == nil && pcq.lastUpdateTime.Add(retryIntervalImpl).Before(now) {
  378. qLogger.Info("retrying query request",
  379. zap.String("requestId", reqId),
  380. zap.Int("requestIdx", requestIdx),
  381. zap.Stringer("receiveTime", pq.receiveTime),
  382. zap.Stringer("lastUpdateTime", pcq.lastUpdateTime),
  383. zap.String("chainID", pq.queries[requestIdx].req.Request.ChainId.String()),
  384. )
  385. pcq.ccqForwardToWatcher(qLogger, now)
  386. }
  387. }
  388. }
  389. }
  390. }
  391. }
  392. }
  393. }
  394. // parseAllowedRequesters parses a comma separated list of allowed requesters into a map to be used for look ups.
  395. func parseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address]struct{}, error) {
  396. if ccqAllowedRequesters == "" {
  397. return nil, fmt.Errorf("if cross chain query is enabled `--ccqAllowedRequesters` must be specified")
  398. }
  399. var nullAddr ethCommon.Address
  400. result := make(map[ethCommon.Address]struct{})
  401. for _, str := range strings.Split(ccqAllowedRequesters, ",") {
  402. addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(strings.TrimPrefix(str, "0x")))
  403. if addr == nullAddr {
  404. return nil, fmt.Errorf("invalid value in `--ccqAllowedRequesters`: `%s`", str)
  405. }
  406. result[addr] = struct{}{}
  407. }
  408. if len(result) <= 0 {
  409. return nil, fmt.Errorf("no allowed requestors specified, ccqAllowedRequesters: `%s`", ccqAllowedRequesters)
  410. }
  411. return result, nil
  412. }
  413. // ccqForwardToWatcher submits a query request to the appropriate watcher. It updates the request object if the write succeeds.
  414. // If the write fails, it does not update the last update time, which will cause a retry next interval (until it times out)
  415. func (pcq *perChainQuery) ccqForwardToWatcher(qLogger *zap.Logger, receiveTime time.Time) {
  416. select {
  417. // TODO: only send the query request itself and reassemble in this module
  418. case pcq.channel <- pcq.req:
  419. qLogger.Debug("forwarded query request to watcher", zap.String("requestID", pcq.req.RequestID), zap.Stringer("chainID", pcq.req.Request.ChainId))
  420. totalRequestsByChain.WithLabelValues(pcq.req.Request.ChainId.String()).Inc()
  421. default:
  422. qLogger.Warn("failed to send query request to watcher, will retry next interval", zap.String("requestID", pcq.req.RequestID), zap.Stringer("chain_id", pcq.req.Request.ChainId))
  423. }
  424. pcq.lastUpdateTime = receiveTime
  425. }
  426. // numPendingRequests returns the number of per chain queries in a request that are still awaiting responses. Zero means the request can now be published.
  427. func (pq *pendingQuery) numPendingRequests() int {
  428. numPending := 0
  429. for _, resp := range pq.responses {
  430. if resp == nil {
  431. numPending += 1
  432. }
  433. }
  434. return numPending
  435. }
  436. // StartWorkers is used by the watchers to start the query handler worker routines.
  437. func StartWorkers(
  438. ctx context.Context,
  439. logger *zap.Logger,
  440. errC chan error,
  441. w Watcher,
  442. queryReqC <-chan *PerChainQueryInternal,
  443. config PerChainConfig,
  444. tag string,
  445. ) {
  446. for count := 0; count < config.NumWorkers; count++ {
  447. workerId := count
  448. common.RunWithScissors(ctx, errC, fmt.Sprintf("%s_fetch_query_req", tag), func(ctx context.Context) error {
  449. logger.Debug("CONCURRENT: starting worker", zap.Int("worker", workerId))
  450. for {
  451. select {
  452. case <-ctx.Done():
  453. return nil
  454. case queryRequest := <-queryReqC:
  455. logger.Debug("CONCURRENT: processing query request", zap.Int("worker", workerId))
  456. w.QueryHandler(ctx, queryRequest)
  457. logger.Debug("CONCURRENT: finished processing query request", zap.Int("worker", workerId))
  458. }
  459. }
  460. })
  461. }
  462. }