| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package ccq
- import (
- "encoding/hex"
- "sync"
- gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
- "github.com/certusone/wormhole/node/pkg/query"
- "github.com/wormhole-foundation/wormhole/sdk/vaa"
- "go.uber.org/zap"
- )
- type PendingResponse struct {
- req *gossipv1.SignedQueryRequest
- userName string
- queryRequest *query.QueryRequest
- ch chan *SignedResponse
- errCh chan *ErrorEntry
- // statsLock protects the data items below.
- statsLock sync.RWMutex
- maxMatchingResponses int
- outstandingResponses int
- quorum int
- }
- type ErrorEntry struct {
- err error
- status int
- }
- func NewPendingResponse(req *gossipv1.SignedQueryRequest, userName string, queryRequest *query.QueryRequest) *PendingResponse {
- return &PendingResponse{
- req: req,
- userName: userName,
- queryRequest: queryRequest,
- ch: make(chan *SignedResponse),
- errCh: make(chan *ErrorEntry),
- }
- }
- type PendingResponses struct {
- pendingResponses map[string]*PendingResponse
- mu sync.RWMutex
- logger *zap.Logger
- }
- func NewPendingResponses(logger *zap.Logger) *PendingResponses {
- return &PendingResponses{
- // Make this channel bigger than the number of responses we ever expect to get for a query.
- pendingResponses: make(map[string]*PendingResponse, 100),
- logger: logger,
- }
- }
- func (p *PendingResponses) Add(r *PendingResponse) bool {
- signature := hex.EncodeToString(r.req.Signature)
- p.mu.Lock()
- defer p.mu.Unlock()
- if _, ok := p.pendingResponses[signature]; ok {
- // the request w/ this signature is already being handled
- // don't overwrite
- return false
- }
- p.pendingResponses[signature] = r
- p.updateMetricsAlreadyLocked(nil)
- return true
- }
- func (p *PendingResponses) Get(signature string) *PendingResponse {
- p.mu.RLock()
- defer p.mu.RUnlock()
- if r, ok := p.pendingResponses[signature]; ok {
- return r
- }
- return nil
- }
- func (p *PendingResponses) Remove(r *PendingResponse) {
- signature := hex.EncodeToString(r.req.Signature)
- p.mu.Lock()
- defer p.mu.Unlock()
- delete(p.pendingResponses, signature)
- p.updateMetricsAlreadyLocked(r)
- }
- func (p *PendingResponses) NumPending() int {
- p.mu.Lock()
- defer p.mu.Unlock()
- return len(p.pendingResponses)
- }
- func (p *PendingResponses) updateMetricsAlreadyLocked(reqRemoved *PendingResponse) {
- counts := make(map[vaa.ChainID]float64)
- if reqRemoved != nil {
- // We may have removed the last request for a chain. Make sure we always update that chain.
- for _, pcr := range reqRemoved.queryRequest.PerChainQueries {
- counts[pcr.ChainId] = 0
- }
- }
- for _, pr := range p.pendingResponses {
- for _, pcr := range pr.queryRequest.PerChainQueries {
- counts[pcr.ChainId] = counts[pcr.ChainId] + 1
- }
- }
- for chainId, count := range counts {
- currentNumConcurrentQueriesByChain.WithLabelValues(chainId.String()).Set(count)
- currVal, err := getGaugeValue(maxConcurrentQueriesByChain.WithLabelValues(chainId.String()))
- if err != nil {
- p.logger.Error("failed to read current value of max concurrent queries metric", zap.String("chainId", chainId.String()), zap.Error(err))
- continue
- }
- if count > currVal {
- p.logger.Info("updating max concurrent queries metric", zap.String("chain", chainId.String()), zap.Float64("oldMax", currVal), zap.Float64("newMax", count))
- maxConcurrentQueriesByChain.WithLabelValues(chainId.String()).Set(count)
- }
- }
- }
- func (p *PendingResponse) updateStats(maxMatchingResponses int, outstandingResponses int, quorum int) {
- p.statsLock.Lock()
- defer p.statsLock.Unlock()
- p.maxMatchingResponses = maxMatchingResponses
- p.outstandingResponses = outstandingResponses
- p.quorum = quorum
- }
- func (p *PendingResponse) getStats() (int, int, int) {
- p.statsLock.Lock()
- defer p.statsLock.Unlock()
- return p.maxMatchingResponses, p.outstandingResponses, p.quorum
- }
|