cleanup.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package processor
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "github.com/certusone/wormhole/node/pkg/common"
  6. "github.com/certusone/wormhole/node/pkg/db"
  7. "github.com/certusone/wormhole/node/pkg/vaa"
  8. "github.com/prometheus/client_golang/prometheus"
  9. "github.com/prometheus/client_golang/prometheus/promauto"
  10. "time"
  11. "go.uber.org/zap"
  12. )
  13. var (
  14. aggregationStateEntries = promauto.NewGauge(
  15. prometheus.GaugeOpts{
  16. Name: "wormhole_aggregation_state_entries",
  17. Help: "Current number of aggregation state entries (including unexpired succeed ones)",
  18. })
  19. aggregationStateExpiration = promauto.NewCounter(
  20. prometheus.CounterOpts{
  21. Name: "wormhole_aggregation_state_expirations_total",
  22. Help: "Total number of expired submitted aggregation states",
  23. })
  24. aggregationStateLate = promauto.NewCounter(
  25. prometheus.CounterOpts{
  26. Name: "wormhole_aggregation_state_late_total",
  27. Help: "Total number of late aggregation states (cluster achieved consensus without us)",
  28. })
  29. aggregationStateTimeout = promauto.NewCounter(
  30. prometheus.CounterOpts{
  31. Name: "wormhole_aggregation_state_timeout_total",
  32. Help: "Total number of aggregation states expired due to timeout after exhausting retries",
  33. })
  34. aggregationStateRetries = promauto.NewCounter(
  35. prometheus.CounterOpts{
  36. Name: "wormhole_aggregation_state_retries_total",
  37. Help: "Total number of aggregation states queued for resubmission",
  38. })
  39. aggregationStateUnobserved = promauto.NewCounter(
  40. prometheus.CounterOpts{
  41. Name: "wormhole_aggregation_state_unobserved_total",
  42. Help: "Total number of aggregation states expired due to no matching local message observations",
  43. })
  44. aggregationStateFulfillment = promauto.NewCounterVec(
  45. prometheus.CounterOpts{
  46. Name: "wormhole_aggregation_state_settled_signatures_total",
  47. Help: "Total number of signatures produced by a validator, counted after waiting a fixed amount of time",
  48. }, []string{"addr", "origin", "status"})
  49. )
  50. const (
  51. settlementTime = time.Second * 30
  52. )
  53. // handleCleanup handles periodic retransmissions and cleanup of VAAs
  54. func (p *Processor) handleCleanup(ctx context.Context) {
  55. p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.vaaSignatures)))
  56. aggregationStateEntries.Set(float64(len(p.state.vaaSignatures)))
  57. for hash, s := range p.state.vaaSignatures {
  58. delta := time.Since(s.firstObserved)
  59. switch {
  60. case !s.submitted && s.ourVAA != nil && delta > settlementTime:
  61. // Expire pending VAAs post settlement time if we have a stored quorum VAA.
  62. //
  63. // This occurs when we observed a message after the cluster has already reached
  64. // consensus on it, causing us to never achieve quorum.
  65. if _, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(s.ourVAA)); err == nil {
  66. // If we have a stored quorum VAA, we can safely expire the state.
  67. //
  68. // This is a rare case, and we can safely expire the state, since we
  69. // have a quorum VAA.
  70. p.logger.Info("Expiring late VAA", zap.String("digest", hash), zap.Duration("delta", delta))
  71. aggregationStateLate.Inc()
  72. delete(p.state.vaaSignatures, hash)
  73. break
  74. } else if err != db.ErrVAANotFound {
  75. p.logger.Error("failed to look up VAA in database",
  76. zap.String("digest", hash),
  77. zap.Error(err),
  78. )
  79. }
  80. fallthrough
  81. case !s.settled && delta > settlementTime:
  82. // After 30 seconds, the VAA is considered settled - it's unlikely that more observations will
  83. // arrive, barring special circumstances. This is a better time to count misses than submission,
  84. // because we submit right when we quorum rather than waiting for all observations to arrive.
  85. s.settled = true
  86. // Use either the most recent (in case of a VAA we haven't seen) or stored gs, if available.
  87. var gs *common.GuardianSet
  88. if s.gs != nil {
  89. gs = s.gs
  90. } else {
  91. gs = p.gs
  92. }
  93. hasSigs := len(s.signatures)
  94. wantSigs := CalculateQuorum(len(gs.Keys))
  95. quorum := hasSigs >= wantSigs
  96. var chain vaa.ChainID
  97. if s.ourVAA != nil {
  98. chain = s.ourVAA.EmitterChain
  99. // If a notifier is configured, send a notification for any missing signatures.
  100. //
  101. // Only send a notification if we have a VAA. Otherwise, bogus observations
  102. // could cause invalid alerts.
  103. if p.notifier != nil && hasSigs < len(gs.Keys) {
  104. p.logger.Info("sending miss notification", zap.String("digest", hash))
  105. // Find names of missing validators
  106. missing := make([]string, 0, len(gs.Keys))
  107. for _, k := range gs.Keys {
  108. if s.signatures[k] == nil {
  109. name := hex.EncodeToString(k.Bytes())
  110. h := p.gst.LastHeartbeat(k)
  111. // Pick first node if there are multiple peers.
  112. for _, hb := range h {
  113. name = hb.NodeName
  114. break
  115. }
  116. missing = append(missing, name)
  117. }
  118. }
  119. // Send notification for individual message when quorum has failed or
  120. // more than one node is missing.
  121. if !quorum || len(missing) > 1 {
  122. go func(v *vaa.VAA, hasSigs, wantSigs int, quorum bool, missing []string) {
  123. if err := p.notifier.MissingSignaturesOnTransaction(v, hasSigs, wantSigs, quorum, missing); err != nil {
  124. p.logger.Error("failed to send notification", zap.Error(err))
  125. }
  126. }(s.ourVAA, hasSigs, wantSigs, quorum, missing)
  127. }
  128. }
  129. }
  130. p.logger.Info("VAA considered settled",
  131. zap.String("digest", hash),
  132. zap.Duration("delta", delta),
  133. zap.Int("have_sigs", hasSigs),
  134. zap.Int("required_sigs", wantSigs),
  135. zap.Bool("quorum", quorum),
  136. zap.Stringer("emitter_chain", chain),
  137. )
  138. for _, k := range gs.Keys {
  139. if _, ok := s.signatures[k]; ok {
  140. aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc()
  141. } else {
  142. aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc()
  143. }
  144. }
  145. case s.submitted && delta.Hours() >= 1:
  146. // We could delete submitted VAAs right away, but then we'd lose context about additional (late)
  147. // observation that come in. Therefore, keep it for a reasonable amount of time.
  148. // If a very late observation arrives after cleanup, a nil aggregation state will be created
  149. // and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian).
  150. p.logger.Info("expiring submitted VAA", zap.String("digest", hash), zap.Duration("delta", delta))
  151. delete(p.state.vaaSignatures, hash)
  152. aggregationStateExpiration.Inc()
  153. case !s.submitted && ((s.ourMsg != nil && s.retryCount >= 14400 /* 120 hours */) || (s.ourMsg == nil && s.retryCount >= 10 /* 5 minutes */)):
  154. // Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
  155. p.logger.Info("expiring unsubmitted VAA after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta))
  156. delete(p.state.vaaSignatures, hash)
  157. aggregationStateTimeout.Inc()
  158. case !s.submitted && delta.Minutes() >= 5:
  159. // Poor VAA has been unsubmitted for five minutes - clearly, something went wrong.
  160. // If we have previously submitted an observation, we can make another attempt to get it over
  161. // the finish line by rebroadcasting our sig. If we do not have a VAA, it means we either never observed it,
  162. // or it got revived by a malfunctioning guardian node, in which case, we can't do anything
  163. // about it and just delete it to keep our state nice and lean.
  164. if s.ourMsg != nil {
  165. p.logger.Info("resubmitting VAA observation",
  166. zap.String("digest", hash),
  167. zap.Duration("delta", delta),
  168. zap.Uint("retry", s.retryCount))
  169. p.sendC <- s.ourMsg
  170. s.retryCount += 1
  171. aggregationStateRetries.Inc()
  172. } else {
  173. // For nil state entries, we log the quorum to determine whether the
  174. // network reached consensus without us. We don't know the correct guardian
  175. // set, so we simply use the most recent one.
  176. hasSigs := len(s.signatures)
  177. wantSigs := CalculateQuorum(len(p.gs.Keys))
  178. p.logger.Info("expiring unsubmitted nil VAA",
  179. zap.String("digest", hash),
  180. zap.Duration("delta", delta),
  181. zap.Int("have_sigs", hasSigs),
  182. zap.Int("required_sigs", wantSigs),
  183. zap.Bool("quorum", hasSigs >= wantSigs),
  184. )
  185. delete(p.state.vaaSignatures, hash)
  186. aggregationStateUnobserved.Inc()
  187. }
  188. }
  189. }
  190. }