cleanup.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. //nolint:unparam // this will be refactored in https://github.com/wormhole-foundation/wormhole/pull/1953
  2. package processor
  3. import (
  4. "context"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "time"
  9. "github.com/certusone/wormhole/node/pkg/common"
  10. "github.com/certusone/wormhole/node/pkg/db"
  11. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  12. "github.com/prometheus/client_golang/prometheus"
  13. "github.com/prometheus/client_golang/prometheus/promauto"
  14. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  15. "go.uber.org/zap"
  16. "go.uber.org/zap/zapcore"
  17. )
  18. var (
  19. aggregationStateEntries = promauto.NewGauge(
  20. prometheus.GaugeOpts{
  21. Name: "wormhole_aggregation_state_entries",
  22. Help: "Current number of aggregation state entries (including unexpired succeed ones)",
  23. })
  24. aggregationStateExpiration = promauto.NewCounter(
  25. prometheus.CounterOpts{
  26. Name: "wormhole_aggregation_state_expirations_total",
  27. Help: "Total number of expired submitted aggregation states",
  28. })
  29. aggregationStateLate = promauto.NewCounter(
  30. prometheus.CounterOpts{
  31. Name: "wormhole_aggregation_state_late_total",
  32. Help: "Total number of late aggregation states (cluster achieved consensus without us)",
  33. })
  34. aggregationStateTimeout = promauto.NewCounter(
  35. prometheus.CounterOpts{
  36. Name: "wormhole_aggregation_state_timeout_total",
  37. Help: "Total number of aggregation states expired due to timeout after exhausting retries",
  38. })
  39. aggregationStateRetries = promauto.NewCounter(
  40. prometheus.CounterOpts{
  41. Name: "wormhole_aggregation_state_retries_total",
  42. Help: "Total number of aggregation states queued for resubmission",
  43. })
  44. aggregationStateUnobserved = promauto.NewCounter(
  45. prometheus.CounterOpts{
  46. Name: "wormhole_aggregation_state_unobserved_total",
  47. Help: "Total number of aggregation states expired due to no matching local message observations",
  48. })
  49. aggregationStateFulfillment = promauto.NewCounterVec(
  50. prometheus.CounterOpts{
  51. Name: "wormhole_aggregation_state_settled_signatures_total",
  52. Help: "Total number of signatures produced by a validator, counted after waiting a fixed amount of time",
  53. }, []string{"addr", "origin", "status"})
  54. )
  55. const (
  56. settlementTime = time.Second * 30
  57. // retryLimitOurs defines how long this Guardian will keep an observation in the local state before discarding it.
  58. // Oservations from other Guardians can take up to 24h to arrive if they are held in their Governor. Therefore, this value should be greater than 24h.
  59. retryLimitOurs = time.Hour * 30
  60. retryLimitNotOurs = time.Hour
  61. )
  62. var (
  63. FirstRetryMinWait = time.Minute * 5
  64. )
  65. // handleCleanup handles periodic retransmissions and cleanup of observations
  66. func (p *Processor) handleCleanup(ctx context.Context) {
  67. p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.signatures)))
  68. aggregationStateEntries.Set(float64(len(p.state.signatures)))
  69. for hash, s := range p.state.signatures {
  70. delta := time.Since(s.firstObserved)
  71. if !s.submitted && s.ourObservation != nil && delta > settlementTime {
  72. // Expire pending VAAs post settlement time if we have a stored quorum VAA.
  73. //
  74. // This occurs when we observed a message after the cluster has already reached
  75. // consensus on it, causing us to never achieve quorum.
  76. if ourVaa, ok := s.ourObservation.(*VAA); ok {
  77. if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) {
  78. // If we have a stored quorum VAA, we can safely expire the state.
  79. //
  80. // This is a rare case, and we can safely expire the state, since we
  81. // have a quorum VAA.
  82. p.logger.Info("Expiring late VAA",
  83. zap.String("message_id", ourVaa.VAA.MessageID()),
  84. zap.String("digest", hash),
  85. zap.Duration("delta", delta),
  86. )
  87. aggregationStateLate.Inc()
  88. delete(p.state.signatures, hash)
  89. continue
  90. }
  91. }
  92. }
  93. switch {
  94. case !s.settled && delta > settlementTime:
  95. // After 30 seconds, the observation is considered settled - it's unlikely that more observations will
  96. // arrive, barring special circumstances. This is a better time to count misses than submission,
  97. // because we submit right when we quorum rather than waiting for all observations to arrive.
  98. s.settled = true
  99. // Use either the most recent (in case of a observation we haven't seen) or stored gs, if available.
  100. var gs *common.GuardianSet
  101. if s.gs != nil {
  102. gs = s.gs
  103. } else {
  104. gs = p.gs
  105. }
  106. hasSigs := len(s.signatures)
  107. quorum := hasSigs >= gs.Quorum()
  108. var chain vaa.ChainID
  109. if s.ourObservation != nil {
  110. chain = s.ourObservation.GetEmitterChain()
  111. }
  112. if p.logger.Level().Enabled(zapcore.DebugLevel) {
  113. p.logger.Debug("observation considered settled",
  114. zap.String("message_id", s.LoggingID()),
  115. zap.String("digest", hash),
  116. zap.Duration("delta", delta),
  117. zap.Int("have_sigs", hasSigs),
  118. zap.Int("required_sigs", gs.Quorum()),
  119. zap.Bool("quorum", quorum),
  120. zap.Stringer("emitter_chain", chain),
  121. )
  122. }
  123. for _, k := range gs.Keys {
  124. if _, ok := s.signatures[k]; ok {
  125. aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc()
  126. } else {
  127. aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc()
  128. }
  129. }
  130. case s.submitted && delta.Hours() >= 1:
  131. // We could delete submitted observations right away, but then we'd lose context about additional (late)
  132. // observation that come in. Therefore, keep it for a reasonable amount of time.
  133. // If a very late observation arrives after cleanup, a nil aggregation state will be created
  134. // and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian).
  135. if p.logger.Level().Enabled(zapcore.DebugLevel) {
  136. p.logger.Debug("expiring submitted observation",
  137. zap.String("message_id", s.LoggingID()),
  138. zap.String("digest", hash),
  139. zap.Duration("delta", delta),
  140. )
  141. }
  142. delete(p.state.signatures, hash)
  143. aggregationStateExpiration.Inc()
  144. case !s.submitted && ((s.ourObs != nil && delta > retryLimitOurs) || (s.ourObs == nil && delta > retryLimitNotOurs)):
  145. // Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
  146. p.logger.Info("expiring unsubmitted observation after exhausting retries",
  147. zap.String("message_id", s.LoggingID()),
  148. zap.String("digest", hash),
  149. zap.Duration("delta", delta),
  150. zap.Bool("weObserved", s.ourObs != nil),
  151. )
  152. delete(p.state.signatures, hash)
  153. aggregationStateTimeout.Inc()
  154. case !s.submitted && delta >= FirstRetryMinWait && time.Since(s.nextRetry) >= 0:
  155. // Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
  156. // If we have previously submitted an observation, and it was reliable, we can make another attempt to get
  157. // it over the finish line by sending a re-observation request to the network and rebroadcasting our
  158. // sig. If we do not have an observation, it means we either never observed it, or it got
  159. // revived by a malfunctioning guardian node, in which case, we can't do anything about it
  160. // and just delete it to keep our state nice and lean.
  161. if s.ourObs != nil {
  162. // Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
  163. if !s.ourObservation.IsReliable() {
  164. p.logger.Info("expiring unsubmitted unreliable observation",
  165. zap.String("message_id", s.LoggingID()),
  166. zap.String("digest", hash),
  167. zap.Duration("delta", delta),
  168. )
  169. delete(p.state.signatures, hash)
  170. aggregationStateTimeout.Inc()
  171. break
  172. }
  173. // Reobservation requests should not be resubmitted but we will keep waiting for more observations.
  174. if s.ourObservation.IsReobservation() {
  175. if p.logger.Level().Enabled(zapcore.DebugLevel) {
  176. p.logger.Debug("not submitting reobservation request for reobservation",
  177. zap.String("message_id", s.LoggingID()),
  178. zap.String("digest", hash),
  179. zap.Duration("delta", delta),
  180. )
  181. }
  182. break
  183. }
  184. // If we have already stored this VAA, there is no reason for us to request reobservation.
  185. alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s)
  186. if err != nil {
  187. p.logger.Error("failed to check if observation is already in DB, requesting reobservation",
  188. zap.String("message_id", s.LoggingID()),
  189. zap.String("hash", hash),
  190. zap.Error(err))
  191. }
  192. if alreadyInDB {
  193. if p.logger.Level().Enabled(zapcore.DebugLevel) {
  194. p.logger.Debug("observation already in DB, not requesting reobservation",
  195. zap.String("message_id", s.LoggingID()),
  196. zap.String("digest", hash),
  197. )
  198. }
  199. } else {
  200. p.logger.Info("resubmitting observation",
  201. zap.String("message_id", s.LoggingID()),
  202. zap.String("digest", hash),
  203. zap.Duration("delta", delta),
  204. zap.String("firstObserved", s.firstObserved.String()),
  205. zap.Int("numSignatures", len(s.signatures)),
  206. )
  207. req := &gossipv1.ObservationRequest{
  208. ChainId: uint32(s.ourObservation.GetEmitterChain()),
  209. TxHash: s.txHash,
  210. }
  211. if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
  212. p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err))
  213. }
  214. if s.ourMsg != nil {
  215. // This is the case for immediately published messages (as well as anything still pending from before the cutover).
  216. select {
  217. case p.gossipAttestationSendC <- s.ourMsg:
  218. default:
  219. batchObservationChannelOverflow.WithLabelValues("gossipResend").Inc()
  220. }
  221. } else {
  222. p.postObservationToBatch(s.ourObs)
  223. }
  224. s.retryCtr++
  225. s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr))
  226. aggregationStateRetries.Inc()
  227. }
  228. } else {
  229. // For nil state entries, we log the quorum to determine whether the
  230. // network reached consensus without us. We don't know the correct guardian
  231. // set, so we simply use the most recent one.
  232. hasSigs := len(s.signatures)
  233. if p.logger.Level().Enabled(zapcore.DebugLevel) {
  234. p.logger.Debug("expiring unsubmitted nil observation",
  235. zap.String("message_id", s.LoggingID()),
  236. zap.String("digest", hash),
  237. zap.Duration("delta", delta),
  238. zap.Int("have_sigs", hasSigs),
  239. zap.Int("required_sigs", p.gs.Quorum()),
  240. zap.Bool("quorum", hasSigs >= p.gs.Quorum()),
  241. )
  242. }
  243. delete(p.state.signatures, hash)
  244. aggregationStateUnobserved.Inc()
  245. }
  246. }
  247. }
  248. // Clean up old pythnet VAAs.
  249. oldestTime := time.Now().Add(-time.Hour)
  250. for key, pe := range p.pythnetVaas {
  251. if pe.updateTime.Before(oldestTime) {
  252. delete(p.pythnetVaas, key)
  253. }
  254. }
  255. }
  256. // signedVaaAlreadyInDB checks if the VAA is already in the DB. If it is, it makes sure the hash matches.
  257. func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) {
  258. if s.ourObservation == nil {
  259. p.logger.Debug("unable to check if VAA is already in DB, no observation", zap.String("digest", hash))
  260. return false, nil
  261. }
  262. msgId := s.ourObservation.MessageID()
  263. vaaID, err := db.VaaIDFromString(msgId)
  264. if err != nil {
  265. return false, fmt.Errorf(`failed to generate VAA ID from message id "%s": %w`, s.ourObservation.MessageID(), err)
  266. }
  267. // If the VAA is waiting to be written to the DB, use that version. Otherwise use the DB.
  268. v := p.getVaaFromUpdateMap(msgId)
  269. if v == nil {
  270. vb, err := p.db.GetSignedVAABytes(*vaaID)
  271. if err != nil {
  272. if errors.Is(err, db.ErrVAANotFound) {
  273. if p.logger.Level().Enabled(zapcore.DebugLevel) {
  274. p.logger.Debug("VAA not in DB",
  275. zap.String("message_id", s.ourObservation.MessageID()),
  276. zap.String("digest", hash),
  277. )
  278. }
  279. return false, nil
  280. }
  281. return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err)
  282. }
  283. v, err = vaa.Unmarshal(vb)
  284. if err != nil {
  285. return false, fmt.Errorf("failed to unmarshal VAA: %w", err)
  286. }
  287. }
  288. oldHash := hex.EncodeToString(v.SigningDigest().Bytes())
  289. if hash != oldHash {
  290. if p.logger.Core().Enabled(zapcore.DebugLevel) {
  291. p.logger.Debug("VAA already in DB but hash is different",
  292. zap.String("message_id", s.ourObservation.MessageID()),
  293. zap.String("old_hash", oldHash),
  294. zap.String("new_hash", hash))
  295. }
  296. return false, fmt.Errorf("hash mismatch in_db: %s, new: %s", oldHash, hash)
  297. }
  298. return true, nil
  299. }