cleanup.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package processor
  2. import (
  3. "context"
  4. "github.com/certusone/wormhole/node/pkg/common"
  5. "github.com/prometheus/client_golang/prometheus"
  6. "github.com/prometheus/client_golang/prometheus/promauto"
  7. "time"
  8. "go.uber.org/zap"
  9. )
  10. var (
  11. aggregationStateEntries = promauto.NewGauge(
  12. prometheus.GaugeOpts{
  13. Name: "wormhole_aggregation_state_entries",
  14. Help: "Current number of aggregation state entries (including unexpired succeed ones)",
  15. })
  16. aggregationStateExpiration = promauto.NewCounter(
  17. prometheus.CounterOpts{
  18. Name: "wormhole_aggregation_state_expirations_total",
  19. Help: "Total number of expired submitted aggregation states",
  20. })
  21. aggregationStateTimeout = promauto.NewCounter(
  22. prometheus.CounterOpts{
  23. Name: "wormhole_aggregation_state_timeout_total",
  24. Help: "Total number of aggregation states expired due to timeout after exhausting retries",
  25. })
  26. aggregationStateRetries = promauto.NewCounter(
  27. prometheus.CounterOpts{
  28. Name: "wormhole_aggregation_state_retries_total",
  29. Help: "Total number of aggregation states queued for resubmission",
  30. })
  31. aggregationStateUnobserved = promauto.NewCounter(
  32. prometheus.CounterOpts{
  33. Name: "wormhole_aggregation_state_unobserved_total",
  34. Help: "Total number of aggregation states expired due to no matching local message observations",
  35. })
  36. aggregationStateFulfillment = promauto.NewCounterVec(
  37. prometheus.CounterOpts{
  38. Name: "wormhole_aggregation_state_settled_signatures_total",
  39. Help: "Total number of signatures produced by a validator, counted after waiting a fixed amount of time",
  40. }, []string{"addr", "origin", "status"})
  41. )
  42. // handleCleanup handles periodic retransmissions and cleanup of VAAs
  43. func (p *Processor) handleCleanup(ctx context.Context) {
  44. p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.vaaSignatures)))
  45. aggregationStateEntries.Set(float64(len(p.state.vaaSignatures)))
  46. for hash, s := range p.state.vaaSignatures {
  47. delta := time.Now().Sub(s.firstObserved)
  48. switch {
  49. case !s.settled && delta.Seconds() >= 30:
  50. // After 30 seconds, the VAA is considered settled - it's unlikely that more observations will
  51. // arrive, barring special circumstances. This is a better time to count misses than submission,
  52. // because we submit right when we quorum rather than waiting for all observations to arrive.
  53. s.settled = true
  54. p.logger.Info("VAA considered settled", zap.String("digest", hash))
  55. // Use either the most recent (in case of a VAA we haven't seen) or stored gs, if available.
  56. var gs *common.GuardianSet
  57. if s.gs != nil {
  58. gs = s.gs
  59. } else {
  60. gs = p.gs
  61. }
  62. for _, k := range gs.Keys {
  63. if _, ok := s.signatures[k]; ok {
  64. aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc()
  65. } else {
  66. aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc()
  67. }
  68. }
  69. case s.submitted && delta.Hours() >= 1:
  70. // We could delete submitted VAAs right away, but then we'd lose context about additional (late)
  71. // observation that come in. Therefore, keep it for a reasonable amount of time.
  72. // If a very late observation arrives after cleanup, a nil aggregation state will be created
  73. // and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian).
  74. p.logger.Info("expiring submitted VAA", zap.String("digest", hash), zap.Duration("delta", delta))
  75. delete(p.state.vaaSignatures, hash)
  76. aggregationStateExpiration.Inc()
  77. case !s.submitted && s.retryCount >= 10:
  78. // Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
  79. p.logger.Info("expiring unsubmitted VAA after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta))
  80. delete(p.state.vaaSignatures, hash)
  81. aggregationStateTimeout.Inc()
  82. case !s.submitted && delta.Minutes() >= 5:
  83. // Poor VAA has been unsubmitted for five minutes - clearly, something went wrong.
  84. // If we have previously submitted an observation, we can make another attempt to get it over
  85. // the finish line by rebroadcasting our sig. If we do not have a VAA, it means we either never observed it,
  86. // or it got revived by a malfunctioning guardian node, in which case, we can't do anything
  87. // about it and just delete it to keep our state nice and lean.
  88. if s.ourMsg != nil {
  89. p.logger.Info("resubmitting VAA observation",
  90. zap.String("digest", hash),
  91. zap.Duration("delta", delta),
  92. zap.Int("retry", 1))
  93. p.sendC <- s.ourMsg
  94. s.retryCount += 1
  95. aggregationStateRetries.Inc()
  96. } else {
  97. p.logger.Info("expiring unsubmitted nil VAA", zap.String("digest", hash), zap.Duration("delta", delta))
  98. delete(p.state.vaaSignatures, hash)
  99. aggregationStateUnobserved.Inc()
  100. }
  101. }
  102. }
  103. }