processor.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. package processor
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/certusone/wormhole/node/pkg/altpub"
  9. guardianDB "github.com/certusone/wormhole/node/pkg/db"
  10. "github.com/certusone/wormhole/node/pkg/governor"
  11. "github.com/certusone/wormhole/node/pkg/guardiansigner"
  12. "github.com/certusone/wormhole/node/pkg/p2p"
  13. ethcommon "github.com/ethereum/go-ethereum/common"
  14. "github.com/ethereum/go-ethereum/crypto"
  15. "go.uber.org/zap"
  16. "github.com/certusone/wormhole/node/pkg/accountant"
  17. "github.com/certusone/wormhole/node/pkg/common"
  18. "github.com/certusone/wormhole/node/pkg/gwrelayer"
  19. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  20. "github.com/certusone/wormhole/node/pkg/supervisor"
  21. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  22. "github.com/prometheus/client_golang/prometheus"
  23. "github.com/prometheus/client_golang/prometheus/promauto"
  24. )
  25. var GovInterval = time.Minute
  26. var CleanupInterval = time.Second * 30
  27. type (
  28. // Observation defines the interface for any events observed by the guardian.
  29. Observation interface {
  30. // GetEmitterChain returns the id of the chain where this event was observed.
  31. GetEmitterChain() vaa.ChainID
  32. // MessageID returns a human-readable emitter_chain/emitter_address/sequence tuple.
  33. MessageID() string
  34. // SigningDigest returns the hash of the hash signing body of the observation. This is used
  35. // for signature generation and verification.
  36. SigningDigest() ethcommon.Hash
  37. // IsReliable returns whether this message is considered reliable meaning it can be reobserved.
  38. IsReliable() bool
  39. // IsReobservation returns whether this message is the result of a reobservation request.
  40. IsReobservation() bool
  41. // HandleQuorum finishes processing the observation once a quorum of signatures have
  42. // been received for it.
  43. HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)
  44. }
  45. // state represents the local view of a given observation
  46. state struct {
  47. // First time this digest was seen (possibly even before we observed it ourselves).
  48. firstObserved time.Time
  49. // A re-observation request shall not be sent before this time.
  50. nextRetry time.Time
  51. // Number of times we sent a re-observation request
  52. retryCtr uint
  53. // Copy of our observation.
  54. ourObservation Observation
  55. // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
  56. // to either the old or new guardian set.
  57. signatures map[ethcommon.Address][]byte
  58. // Flag set after reaching quorum and submitting the VAA.
  59. submitted bool
  60. // Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
  61. settled bool
  62. // Human-readable description of the VAA's source, used for metrics.
  63. source string
  64. // Our observation in case we need to resubmit it to the batch publisher.
  65. ourObs *gossipv1.Observation
  66. // Copy of the bytes we submitted (ourObservation, but signed and serialized). Used for retransmissions.
  67. ourMsg []byte
  68. // The hash of the transaction in which the observation was made. Used for re-observation requests.
  69. txHash []byte
  70. // Copy of the guardian set valid at observation/injection time.
  71. gs *common.GuardianSet
  72. }
  73. observationMap map[string]*state
  74. // aggregationState represents the node's aggregation of guardian signatures.
  75. aggregationState struct {
  76. signatures observationMap
  77. }
  78. )
  79. // LoggingID can be used to identify a state object in a log message. Note that it should not
  80. // be used to uniquely identify an observation. It is only meant for logging purposes.
  81. func (s *state) LoggingID() string {
  82. if s.ourObservation != nil {
  83. return s.ourObservation.MessageID()
  84. }
  85. return hex.EncodeToString(s.txHash)
  86. }
  87. type PythNetVaaEntry struct {
  88. v *vaa.VAA
  89. updateTime time.Time // Used for determining when to delete entries
  90. }
  91. type Processor struct {
  92. // msgC is a channel of observed emitted messages
  93. msgC <-chan *common.MessagePublication
  94. // setC is a channel of guardian set updates
  95. setC <-chan *common.GuardianSet
  96. // gossipAttestationSendC is a channel of outbound observation messages to broadcast on p2p
  97. gossipAttestationSendC chan<- []byte
  98. // gossipVaaSendC is a channel of outbound VAA messages to broadcast on p2p
  99. gossipVaaSendC chan<- []byte
  100. // batchObsvC is a channel of inbound decoded batches of observations from p2p
  101. batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]
  102. // obsvReqSendC is a send-only channel of outbound re-observation requests to broadcast on p2p
  103. obsvReqSendC chan<- *gossipv1.ObservationRequest
  104. // signedInC is a channel of inbound signed VAA observations from p2p
  105. signedInC <-chan *gossipv1.SignedVAAWithQuorum
  106. // guardianSigner is the guardian node's signer
  107. guardianSigner guardiansigner.GuardianSigner
  108. logger *zap.Logger
  109. db *guardianDB.Database
  110. alternatePublisher *altpub.AlternatePublisher
  111. // Runtime state
  112. // gs is the currently valid guardian set
  113. gs *common.GuardianSet
  114. // gst is managed by the processor and allows concurrent access to the
  115. // guardian set by other components.
  116. gst *common.GuardianSetState
  117. // state is the current runtime VAA view
  118. state *aggregationState
  119. // gk pk as eth address
  120. ourAddr ethcommon.Address
  121. governor *governor.ChainGovernor
  122. acct *accountant.Accountant
  123. acctReadC <-chan *common.MessagePublication
  124. pythnetVaas map[string]PythNetVaaEntry
  125. gatewayRelayer *gwrelayer.GatewayRelayer
  126. updateVAALock sync.Mutex
  127. updatedVAAs map[string]*updateVaaEntry
  128. networkID string
  129. // batchObsvPubC is the internal channel used to publish observations to the batch processor for publishing.
  130. batchObsvPubC chan *gossipv1.Observation
  131. }
  132. // updateVaaEntry is used to queue up a VAA to be written to the database.
  133. type updateVaaEntry struct {
  134. v *vaa.VAA
  135. dirty bool
  136. }
  137. var (
  138. batchObservationChanDelay = promauto.NewHistogram(
  139. prometheus.HistogramOpts{
  140. Name: "wormhole_batch_observation_channel_delay_us",
  141. Help: "Latency histogram for delay of batched observations in channel",
  142. Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
  143. })
  144. batchObservationTotalDelay = promauto.NewHistogram(
  145. prometheus.HistogramOpts{
  146. Name: "wormhole_batch_observation_total_delay_us",
  147. Help: "Latency histogram for total time to process batched observations",
  148. Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
  149. })
  150. batchObservationChannelOverflow = promauto.NewCounterVec(
  151. prometheus.CounterOpts{
  152. Name: "wormhole_batch_observation_channel_overflow",
  153. Help: "Total number of times a write to the batch observation publish channel failed",
  154. }, []string{"channel"})
  155. vaaPublishChannelOverflow = promauto.NewCounter(
  156. prometheus.CounterOpts{
  157. Name: "wormhole_vaa_publish_channel_overflow",
  158. Help: "Total number of times a write to the vaa publish channel failed",
  159. })
  160. timeToHandleObservation = promauto.NewHistogram(
  161. prometheus.HistogramOpts{
  162. Name: "wormhole_time_to_handle_observation_us",
  163. Help: "Latency histogram for total time to handle observation on an observation",
  164. Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
  165. })
  166. timeToHandleQuorum = promauto.NewHistogram(
  167. prometheus.HistogramOpts{
  168. Name: "wormhole_time_to_handle_quorum_us",
  169. Help: "Latency histogram for total time to handle quorum on an observation",
  170. Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
  171. })
  172. )
  173. // batchObsvPubChanSize specifies the size of the channel used to publish observation batches. Allow five seconds worth.
  174. const batchObsvPubChanSize = p2p.MaxObservationBatchSize * 5
  175. func NewProcessor(
  176. ctx context.Context,
  177. db *guardianDB.Database,
  178. msgC <-chan *common.MessagePublication,
  179. setC <-chan *common.GuardianSet,
  180. gossipAttestationSendC chan<- []byte,
  181. gossipVaaSendC chan<- []byte,
  182. batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
  183. obsvReqSendC chan<- *gossipv1.ObservationRequest,
  184. signedInC <-chan *gossipv1.SignedVAAWithQuorum,
  185. guardianSigner guardiansigner.GuardianSigner,
  186. gst *common.GuardianSetState,
  187. g *governor.ChainGovernor,
  188. acct *accountant.Accountant,
  189. acctReadC <-chan *common.MessagePublication,
  190. gatewayRelayer *gwrelayer.GatewayRelayer,
  191. networkID string,
  192. alternatePublisher *altpub.AlternatePublisher,
  193. ) *Processor {
  194. return &Processor{
  195. msgC: msgC,
  196. setC: setC,
  197. gossipAttestationSendC: gossipAttestationSendC,
  198. gossipVaaSendC: gossipVaaSendC,
  199. batchObsvC: batchObsvC,
  200. obsvReqSendC: obsvReqSendC,
  201. signedInC: signedInC,
  202. guardianSigner: guardianSigner,
  203. gst: gst,
  204. db: db,
  205. alternatePublisher: alternatePublisher,
  206. logger: supervisor.Logger(ctx),
  207. state: &aggregationState{observationMap{}},
  208. ourAddr: crypto.PubkeyToAddress(guardianSigner.PublicKey(ctx)),
  209. governor: g,
  210. acct: acct,
  211. acctReadC: acctReadC,
  212. pythnetVaas: make(map[string]PythNetVaaEntry),
  213. gatewayRelayer: gatewayRelayer,
  214. batchObsvPubC: make(chan *gossipv1.Observation, batchObsvPubChanSize),
  215. updatedVAAs: make(map[string]*updateVaaEntry),
  216. networkID: networkID,
  217. }
  218. }
  219. func (p *Processor) Run(ctx context.Context) error {
  220. if err := supervisor.Run(ctx, "vaaWriter", common.WrapWithScissors(p.vaaWriter, "vaaWriter")); err != nil {
  221. return fmt.Errorf("failed to start vaa writer: %w", err)
  222. }
  223. if err := supervisor.Run(ctx, "batchProcessor", common.WrapWithScissors(p.batchProcessor, "batchProcessor")); err != nil {
  224. return fmt.Errorf("failed to start batch processor: %w", err)
  225. }
  226. cleanup := time.NewTicker(CleanupInterval)
  227. // Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that.
  228. govTimer := time.NewTimer(GovInterval)
  229. for {
  230. select {
  231. case <-ctx.Done():
  232. if p.acct != nil {
  233. p.acct.Close()
  234. }
  235. return ctx.Err()
  236. case p.gs = <-p.setC:
  237. p.logger.Info("guardian set updated",
  238. zap.Strings("set", p.gs.KeysAsHexStrings()),
  239. zap.Uint32("index", p.gs.Index),
  240. zap.Int("quorum", p.gs.Quorum()),
  241. )
  242. p.gst.Set(p.gs)
  243. case k := <-p.msgC:
  244. if p.governor != nil {
  245. if !p.governor.ProcessMsg(k) {
  246. continue
  247. }
  248. }
  249. if p.acct != nil {
  250. shouldPub, err := p.acct.SubmitObservation(k)
  251. if err != nil {
  252. return fmt.Errorf("failed to process message `%s`: %w", k.MessageIDString(), err)
  253. }
  254. if !shouldPub {
  255. continue
  256. }
  257. }
  258. p.handleMessage(ctx, k)
  259. case k := <-p.acctReadC:
  260. if p.acct == nil {
  261. return fmt.Errorf("received an accountant event when accountant is not configured")
  262. }
  263. // SECURITY defense-in-depth: Make sure the accountant did not generate an unexpected message.
  264. if !p.acct.IsMessageCoveredByAccountant(k) {
  265. return fmt.Errorf("accountant published a message that is not covered by it: `%s`", k.MessageIDString())
  266. }
  267. p.handleMessage(ctx, k)
  268. case m := <-p.batchObsvC:
  269. batchObservationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
  270. p.handleBatchObservation(m)
  271. case m := <-p.signedInC:
  272. p.handleInboundSignedVAAWithQuorum(m)
  273. case <-cleanup.C:
  274. p.handleCleanup(ctx)
  275. case <-govTimer.C:
  276. if p.governor != nil {
  277. toBePublished, err := p.governor.CheckPending()
  278. if err != nil {
  279. return err
  280. }
  281. if len(toBePublished) != 0 {
  282. for _, k := range toBePublished {
  283. // SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message.
  284. if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil {
  285. return fmt.Errorf("governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err)
  286. } else if !msgIsGoverned {
  287. return fmt.Errorf("governor published a message that should not be governed: `%s`", k.MessageIDString())
  288. }
  289. if p.acct != nil {
  290. shouldPub, err := p.acct.SubmitObservation(k)
  291. if err != nil {
  292. return fmt.Errorf("failed to process message released by governor `%s`: %w", k.MessageIDString(), err)
  293. }
  294. if !shouldPub {
  295. continue
  296. }
  297. }
  298. p.handleMessage(ctx, k)
  299. }
  300. }
  301. }
  302. if (p.governor != nil) || (p.acct != nil) {
  303. govTimer.Reset(GovInterval)
  304. }
  305. }
  306. }
  307. }
  308. // storeSignedVAA schedules a database update for a VAA.
  309. func (p *Processor) storeSignedVAA(v *vaa.VAA) {
  310. if v.EmitterChain == vaa.ChainIDPythNet {
  311. key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
  312. p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
  313. return
  314. }
  315. key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence)
  316. p.updateVAALock.Lock()
  317. p.updatedVAAs[key] = &updateVaaEntry{v: v, dirty: true}
  318. p.updateVAALock.Unlock()
  319. }
  320. // haveSignedVAA returns true if we already have a VAA for the given VAAID
  321. func (p *Processor) haveSignedVAA(id guardianDB.VAAID) bool {
  322. if id.EmitterChain == vaa.ChainIDPythNet {
  323. if p.pythnetVaas == nil {
  324. return false
  325. }
  326. key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence)
  327. _, exists := p.pythnetVaas[key]
  328. return exists
  329. }
  330. key := fmt.Sprintf("%d/%v/%v", id.EmitterChain, id.EmitterAddress, id.Sequence)
  331. if p.getVaaFromUpdateMap(key) != nil {
  332. return true
  333. }
  334. if p.db == nil {
  335. return false
  336. }
  337. ok, err := p.db.HasVAA(id)
  338. if err != nil {
  339. p.logger.Error("failed to look up VAA in database",
  340. zap.String("vaaID", string(id.Bytes())),
  341. zap.Error(err),
  342. )
  343. return false
  344. }
  345. return ok
  346. }
  347. // getVaaFromUpdateMap gets the VAA from the local map. If it's not there, it returns nil.
  348. func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA {
  349. p.updateVAALock.Lock()
  350. entry, exists := p.updatedVAAs[key]
  351. p.updateVAALock.Unlock()
  352. if !exists {
  353. return nil
  354. }
  355. return entry.v
  356. }
  357. // vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map
  358. // being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA
  359. // gets updated again while we are in the process of writing it to the database.
  360. func (p *Processor) vaaWriter(ctx context.Context) error {
  361. ticker := time.NewTicker(time.Second)
  362. for {
  363. select {
  364. case <-ctx.Done():
  365. return nil
  366. case <-ticker.C:
  367. var updatedVAAs map[string]*updateVaaEntry
  368. p.updateVAALock.Lock()
  369. if len(p.updatedVAAs) != 0 {
  370. // There's something to write. Create a local copy of the map so we can release the lock.
  371. updatedVAAs = make(map[string]*updateVaaEntry)
  372. for key, entry := range p.updatedVAAs {
  373. updatedVAAs[key] = entry
  374. entry.dirty = false
  375. }
  376. }
  377. p.updateVAALock.Unlock()
  378. if updatedVAAs != nil {
  379. // If there's anything to write, do that.
  380. vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs))
  381. for _, entry := range updatedVAAs {
  382. vaaBatch = append(vaaBatch, entry.v)
  383. }
  384. if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil {
  385. p.logger.Error("failed to write VAAs to database", zap.Int("numVAAs", len(vaaBatch)), zap.Error(err))
  386. }
  387. // Go through the map and delete anything we have written that hasn't been updated again.
  388. // If something has been updated again, it will get written next interval.
  389. p.updateVAALock.Lock()
  390. for key, entry := range p.updatedVAAs {
  391. if !entry.dirty {
  392. delete(p.updatedVAAs, key)
  393. }
  394. }
  395. p.updateVAALock.Unlock()
  396. }
  397. }
  398. }
  399. }