pending_request.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package ccq
  2. import (
  3. "encoding/hex"
  4. "sync"
  5. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  6. "github.com/certusone/wormhole/node/pkg/query"
  7. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  8. "go.uber.org/zap"
  9. )
  10. type PendingResponse struct {
  11. req *gossipv1.SignedQueryRequest
  12. userName string
  13. queryRequest *query.QueryRequest
  14. ch chan *SignedResponse
  15. errCh chan *ErrorEntry
  16. // statsLock protects the data items below.
  17. statsLock sync.RWMutex
  18. maxMatchingResponses int
  19. outstandingResponses int
  20. quorum int
  21. }
  22. type ErrorEntry struct {
  23. err error
  24. status int
  25. }
  26. func NewPendingResponse(req *gossipv1.SignedQueryRequest, userName string, queryRequest *query.QueryRequest) *PendingResponse {
  27. return &PendingResponse{
  28. req: req,
  29. userName: userName,
  30. queryRequest: queryRequest,
  31. ch: make(chan *SignedResponse),
  32. errCh: make(chan *ErrorEntry),
  33. }
  34. }
  35. type PendingResponses struct {
  36. pendingResponses map[string]*PendingResponse
  37. mu sync.RWMutex
  38. logger *zap.Logger
  39. }
  40. func NewPendingResponses(logger *zap.Logger) *PendingResponses {
  41. return &PendingResponses{
  42. // Make this channel bigger than the number of responses we ever expect to get for a query.
  43. pendingResponses: make(map[string]*PendingResponse, 100),
  44. logger: logger,
  45. }
  46. }
  47. func (p *PendingResponses) Add(r *PendingResponse) bool {
  48. signature := hex.EncodeToString(r.req.Signature)
  49. p.mu.Lock()
  50. defer p.mu.Unlock()
  51. if _, ok := p.pendingResponses[signature]; ok {
  52. // the request w/ this signature is already being handled
  53. // don't overwrite
  54. return false
  55. }
  56. p.pendingResponses[signature] = r
  57. p.updateMetricsAlreadyLocked(nil)
  58. return true
  59. }
  60. func (p *PendingResponses) Get(signature string) *PendingResponse {
  61. p.mu.RLock()
  62. defer p.mu.RUnlock()
  63. if r, ok := p.pendingResponses[signature]; ok {
  64. return r
  65. }
  66. return nil
  67. }
  68. func (p *PendingResponses) Remove(r *PendingResponse) {
  69. signature := hex.EncodeToString(r.req.Signature)
  70. p.mu.Lock()
  71. defer p.mu.Unlock()
  72. delete(p.pendingResponses, signature)
  73. p.updateMetricsAlreadyLocked(r)
  74. }
  75. func (p *PendingResponses) NumPending() int {
  76. p.mu.Lock()
  77. defer p.mu.Unlock()
  78. return len(p.pendingResponses)
  79. }
  80. func (p *PendingResponses) updateMetricsAlreadyLocked(reqRemoved *PendingResponse) {
  81. counts := make(map[vaa.ChainID]float64)
  82. if reqRemoved != nil {
  83. // We may have removed the last request for a chain. Make sure we always update that chain.
  84. for _, pcr := range reqRemoved.queryRequest.PerChainQueries {
  85. counts[pcr.ChainId] = 0
  86. }
  87. }
  88. for _, pr := range p.pendingResponses {
  89. for _, pcr := range pr.queryRequest.PerChainQueries {
  90. counts[pcr.ChainId] = counts[pcr.ChainId] + 1
  91. }
  92. }
  93. for chainId, count := range counts {
  94. currentNumConcurrentQueriesByChain.WithLabelValues(chainId.String()).Set(count)
  95. currVal, err := getGaugeValue(maxConcurrentQueriesByChain.WithLabelValues(chainId.String()))
  96. if err != nil {
  97. p.logger.Error("failed to read current value of max concurrent queries metric", zap.String("chainId", chainId.String()), zap.Error(err))
  98. continue
  99. }
  100. if count > currVal {
  101. p.logger.Info("updating max concurrent queries metric", zap.String("chain", chainId.String()), zap.Float64("oldMax", currVal), zap.Float64("newMax", count))
  102. maxConcurrentQueriesByChain.WithLabelValues(chainId.String()).Set(count)
  103. }
  104. }
  105. }
  106. func (p *PendingResponse) updateStats(maxMatchingResponses int, outstandingResponses int, quorum int) {
  107. p.statsLock.Lock()
  108. defer p.statsLock.Unlock()
  109. p.maxMatchingResponses = maxMatchingResponses
  110. p.outstandingResponses = outstandingResponses
  111. p.quorum = quorum
  112. }
  113. func (p *PendingResponse) getStats() (int, int, int) {
  114. p.statsLock.Lock()
  115. defer p.statsLock.Unlock()
  116. return p.maxMatchingResponses, p.outstandingResponses, p.quorum
  117. }