processor.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. package processor
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/certusone/wormhole/node/pkg/altpub"
  9. guardianDB "github.com/certusone/wormhole/node/pkg/db"
  10. "github.com/certusone/wormhole/node/pkg/governor"
  11. "github.com/certusone/wormhole/node/pkg/guardiansigner"
  12. guardianNotary "github.com/certusone/wormhole/node/pkg/notary"
  13. "github.com/certusone/wormhole/node/pkg/p2p"
  14. ethcommon "github.com/ethereum/go-ethereum/common"
  15. "github.com/ethereum/go-ethereum/crypto"
  16. "go.uber.org/zap"
  17. "github.com/certusone/wormhole/node/pkg/accountant"
  18. "github.com/certusone/wormhole/node/pkg/common"
  19. "github.com/certusone/wormhole/node/pkg/gwrelayer"
  20. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  21. "github.com/certusone/wormhole/node/pkg/supervisor"
  22. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  23. "github.com/prometheus/client_golang/prometheus"
  24. "github.com/prometheus/client_golang/prometheus/promauto"
  25. )
  26. var PollInterval = time.Minute
  27. var CleanupInterval = time.Second * 30
  28. type (
  29. // Observation defines the interface for any events observed by the guardian.
  30. Observation interface {
  31. // GetEmitterChain returns the id of the chain where this event was observed.
  32. GetEmitterChain() vaa.ChainID
  33. // MessageID returns a human-readable emitter_chain/emitter_address/sequence tuple.
  34. MessageID() string
  35. // SigningDigest returns the hash of the hash signing body of the observation. This is used
  36. // for signature generation and verification.
  37. SigningDigest() ethcommon.Hash
  38. // IsReliable returns whether this message is considered reliable meaning it can be reobserved.
  39. IsReliable() bool
  40. // IsReobservation returns whether this message is the result of a reobservation request.
  41. IsReobservation() bool
  42. // HandleQuorum finishes processing the observation once a quorum of signatures have
  43. // been received for it.
  44. HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)
  45. }
  46. // state represents the local view of a given observation
  47. state struct {
  48. // First time this digest was seen (possibly even before we observed it ourselves).
  49. firstObserved time.Time
  50. // A re-observation request shall not be sent before this time.
  51. nextRetry time.Time
  52. // Number of times we sent a re-observation request
  53. retryCtr uint
  54. // Copy of our observation.
  55. ourObservation Observation
  56. // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
  57. // to either the old or new guardian set.
  58. signatures map[ethcommon.Address][]byte
  59. // Flag set after reaching quorum and submitting the VAA.
  60. submitted bool
  61. // Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
  62. settled bool
  63. // Human-readable description of the VAA's source, used for metrics.
  64. source string
  65. // Our observation in case we need to resubmit it to the batch publisher.
  66. ourObs *gossipv1.Observation
  67. // Copy of the bytes we submitted (ourObservation, but signed and serialized). Used for retransmissions.
  68. ourMsg []byte
  69. // The hash of the transaction in which the observation was made. Used for re-observation requests.
  70. txHash []byte
  71. // Copy of the guardian set valid at observation/injection time.
  72. gs *common.GuardianSet
  73. }
  74. observationMap map[string]*state
  75. // aggregationState represents the node's aggregation of guardian signatures.
  76. aggregationState struct {
  77. signatures observationMap
  78. }
  79. )
  80. // LoggingID can be used to identify a state object in a log message. Note that it should not
  81. // be used to uniquely identify an observation. It is only meant for logging purposes.
  82. func (s *state) LoggingID() string {
  83. if s.ourObservation != nil {
  84. return s.ourObservation.MessageID()
  85. }
  86. return hex.EncodeToString(s.txHash)
  87. }
  88. type PythNetVaaEntry struct {
  89. v *vaa.VAA
  90. updateTime time.Time // Used for determining when to delete entries
  91. }
  92. type Processor struct {
  93. // msgC is a channel of observed emitted messages
  94. msgC <-chan *common.MessagePublication
  95. // setC is a channel of guardian set updates
  96. setC <-chan *common.GuardianSet
  97. // gossipAttestationSendC is a channel of outbound observation messages to broadcast on p2p
  98. gossipAttestationSendC chan<- []byte
  99. // gossipVaaSendC is a channel of outbound VAA messages to broadcast on p2p
  100. gossipVaaSendC chan<- []byte
  101. // batchObsvC is a channel of inbound decoded batches of observations from p2p
  102. batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]
  103. // obsvReqSendC is a send-only channel of outbound re-observation requests to broadcast on p2p
  104. obsvReqSendC chan<- *gossipv1.ObservationRequest
  105. // signedInC is a channel of inbound signed VAA observations from p2p
  106. signedInC <-chan *gossipv1.SignedVAAWithQuorum
  107. // guardianSigner is the guardian node's signer
  108. guardianSigner guardiansigner.GuardianSigner
  109. logger *zap.Logger
  110. db *guardianDB.Database
  111. alternatePublisher *altpub.AlternatePublisher
  112. // Runtime state
  113. // gs is the currently valid guardian set
  114. gs *common.GuardianSet
  115. // gst is managed by the processor and allows concurrent access to the
  116. // guardian set by other components.
  117. gst *common.GuardianSetState
  118. // state is the current runtime VAA view
  119. state *aggregationState
  120. // gk pk as eth address
  121. ourAddr ethcommon.Address
  122. governor *governor.ChainGovernor
  123. acct *accountant.Accountant
  124. acctReadC <-chan *common.MessagePublication
  125. notary *guardianNotary.Notary
  126. pythnetVaas map[string]PythNetVaaEntry
  127. gatewayRelayer *gwrelayer.GatewayRelayer
  128. updateVAALock sync.Mutex
  129. updatedVAAs map[string]*updateVaaEntry
  130. networkID string
  131. // batchObsvPubC is the internal channel used to publish observations to the batch processor for publishing.
  132. batchObsvPubC chan *gossipv1.Observation
  133. }
  134. // updateVaaEntry is used to queue up a VAA to be written to the database.
  135. type updateVaaEntry struct {
  136. v *vaa.VAA
  137. dirty bool
  138. }
  139. var (
  140. batchObservationChanDelay = promauto.NewHistogram(
  141. prometheus.HistogramOpts{
  142. Name: "wormhole_batch_observation_channel_delay_us",
  143. Help: "Latency histogram for delay of batched observations in channel",
  144. Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
  145. })
  146. batchObservationTotalDelay = promauto.NewHistogram(
  147. prometheus.HistogramOpts{
  148. Name: "wormhole_batch_observation_total_delay_us",
  149. Help: "Latency histogram for total time to process batched observations",
  150. Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
  151. })
  152. batchObservationChannelOverflow = promauto.NewCounterVec(
  153. prometheus.CounterOpts{
  154. Name: "wormhole_batch_observation_channel_overflow",
  155. Help: "Total number of times a write to the batch observation publish channel failed",
  156. }, []string{"channel"})
  157. vaaPublishChannelOverflow = promauto.NewCounter(
  158. prometheus.CounterOpts{
  159. Name: "wormhole_vaa_publish_channel_overflow",
  160. Help: "Total number of times a write to the vaa publish channel failed",
  161. })
  162. timeToHandleObservation = promauto.NewHistogram(
  163. prometheus.HistogramOpts{
  164. Name: "wormhole_time_to_handle_observation_us",
  165. Help: "Latency histogram for total time to handle observation on an observation",
  166. Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
  167. })
  168. timeToHandleQuorum = promauto.NewHistogram(
  169. prometheus.HistogramOpts{
  170. Name: "wormhole_time_to_handle_quorum_us",
  171. Help: "Latency histogram for total time to handle quorum on an observation",
  172. Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
  173. })
  174. )
  175. // batchObsvPubChanSize specifies the size of the channel used to publish observation batches. Allow five seconds worth.
  176. const batchObsvPubChanSize = p2p.MaxObservationBatchSize * 5
  177. func NewProcessor(
  178. ctx context.Context,
  179. db *guardianDB.Database,
  180. msgC <-chan *common.MessagePublication,
  181. setC <-chan *common.GuardianSet,
  182. gossipAttestationSendC chan<- []byte,
  183. gossipVaaSendC chan<- []byte,
  184. batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
  185. obsvReqSendC chan<- *gossipv1.ObservationRequest,
  186. signedInC <-chan *gossipv1.SignedVAAWithQuorum,
  187. guardianSigner guardiansigner.GuardianSigner,
  188. gst *common.GuardianSetState,
  189. g *governor.ChainGovernor,
  190. acct *accountant.Accountant,
  191. acctReadC <-chan *common.MessagePublication,
  192. notary *guardianNotary.Notary,
  193. gatewayRelayer *gwrelayer.GatewayRelayer,
  194. networkID string,
  195. alternatePublisher *altpub.AlternatePublisher,
  196. ) *Processor {
  197. return &Processor{
  198. msgC: msgC,
  199. setC: setC,
  200. gossipAttestationSendC: gossipAttestationSendC,
  201. gossipVaaSendC: gossipVaaSendC,
  202. batchObsvC: batchObsvC,
  203. obsvReqSendC: obsvReqSendC,
  204. signedInC: signedInC,
  205. guardianSigner: guardianSigner,
  206. gst: gst,
  207. db: db,
  208. alternatePublisher: alternatePublisher,
  209. logger: supervisor.Logger(ctx),
  210. state: &aggregationState{observationMap{}},
  211. ourAddr: crypto.PubkeyToAddress(guardianSigner.PublicKey(ctx)),
  212. governor: g,
  213. acct: acct,
  214. acctReadC: acctReadC,
  215. notary: notary,
  216. pythnetVaas: make(map[string]PythNetVaaEntry),
  217. gatewayRelayer: gatewayRelayer,
  218. batchObsvPubC: make(chan *gossipv1.Observation, batchObsvPubChanSize),
  219. updatedVAAs: make(map[string]*updateVaaEntry),
  220. networkID: networkID,
  221. }
  222. }
  223. func (p *Processor) Run(ctx context.Context) error {
  224. if err := supervisor.Run(ctx, "vaaWriter", common.WrapWithScissors(p.vaaWriter, "vaaWriter")); err != nil {
  225. return fmt.Errorf("failed to start vaa writer: %w", err)
  226. }
  227. if err := supervisor.Run(ctx, "batchProcessor", common.WrapWithScissors(p.batchProcessor, "batchProcessor")); err != nil {
  228. return fmt.Errorf("failed to start batch processor: %w", err)
  229. }
  230. cleanup := time.NewTicker(CleanupInterval)
  231. // Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that.
  232. pollTimer := time.NewTimer(PollInterval)
  233. for {
  234. select {
  235. case <-ctx.Done():
  236. if p.acct != nil {
  237. p.acct.Close()
  238. }
  239. return ctx.Err()
  240. case p.gs = <-p.setC:
  241. p.logger.Info("guardian set updated",
  242. zap.Strings("set", p.gs.KeysAsHexStrings()),
  243. zap.Uint32("index", p.gs.Index),
  244. zap.Int("quorum", p.gs.Quorum()),
  245. )
  246. p.gst.Set(p.gs)
  247. case k := <-p.msgC:
  248. // This is the main message processing loop. It is responsible for handling messages that are
  249. // received on the message channel. Depending on the configuration, a message may be processed
  250. // by the Notary, the Governor, and/or the Accountant.
  251. // This loop effectively causes each of these components to process messages in a modular
  252. // manner. The Notary, Governor, and Accountant can be enabled or disabled independently.
  253. // As a consequence of this loop, each of these components updates its internal state, tracking
  254. // whether a message is ready to be processed from its perspective. This state is used by the
  255. // processor to determine whether a message should be processed or not. This occurs elsewhere
  256. // in the processor code.
  257. p.logger.Debug("processor: received new message publication on message channel", k.ZapFields()...)
  258. // Notary: check whether a message is well-formed.
  259. // Send messages to the Notary first. If messages are not approved, they should not continue
  260. // to the Governor or the Accountant.
  261. if p.notary != nil {
  262. p.logger.Debug("processor: sending message to notary for evaluation", k.ZapFields()...)
  263. // NOTE: Always returns Approve for messages that are not token transfers.
  264. verdict, err := p.notary.ProcessMsg(k)
  265. if err != nil {
  266. // TODO: The error is deliberately ignored so that the processor does not panic and restart.
  267. // In contrast, the Accountant does not ignore the error and restarts the processor if it fails.
  268. // The error-handling strategy can be revisited once the Notary is considered stable.
  269. p.logger.Error("notary failed to process message", zap.Error(err), zap.String("messageID", k.MessageIDString()))
  270. continue
  271. }
  272. // Based on the verdict, we can decide what to do with the message.
  273. switch verdict {
  274. case guardianNotary.Blackhole, guardianNotary.Delay:
  275. p.logger.Error("notary evaluated message as threatening", k.ZapFields(zap.String("verdict", verdict.String()))...)
  276. if verdict == guardianNotary.Blackhole {
  277. // Black-holed messages should not be processed.
  278. p.logger.Error("message will not be processed", k.ZapFields(zap.String("verdict", verdict.String()))...)
  279. } else {
  280. // Delayed messages are added to a separate queue and processed elsewhere.
  281. p.logger.Error("message will be delayed", k.ZapFields(zap.String("verdict", verdict.String()))...)
  282. }
  283. // We're done processing the message.
  284. continue
  285. case guardianNotary.Unknown:
  286. p.logger.Error("notary returned Unknown verdict", k.ZapFields(zap.String("verdict", verdict.String()))...)
  287. case guardianNotary.Approve:
  288. // no-op: process normally
  289. p.logger.Debug("notary evaluated message as approved", k.ZapFields(zap.String("verdict", verdict.String()))...)
  290. default:
  291. p.logger.Error("notary returned unrecognized verdict", k.ZapFields(zap.String("verdict", verdict.String()))...)
  292. }
  293. }
  294. // Governor: check if a message is ready to be published.
  295. if p.governor != nil {
  296. if !p.governor.ProcessMsg(k) {
  297. // We're done processing the message.
  298. continue
  299. }
  300. }
  301. // Accountant: check if a message is ready to be published (i.e. if it has enough observations).
  302. if p.acct != nil {
  303. shouldPub, err := p.acct.SubmitObservation(k)
  304. if err != nil {
  305. return fmt.Errorf("accountant: failed to process message `%s`: %w", k.MessageIDString(), err)
  306. }
  307. if !shouldPub {
  308. // We're done processing the message.
  309. continue
  310. }
  311. }
  312. p.handleMessage(ctx, k)
  313. case k := <-p.acctReadC:
  314. if p.acct == nil {
  315. return fmt.Errorf("received an accountant event when accountant is not configured")
  316. }
  317. // SECURITY defense-in-depth: Make sure the accountant did not generate an unexpected message.
  318. if !p.acct.IsMessageCoveredByAccountant(k) {
  319. return fmt.Errorf("accountant published a message that is not covered by it: `%s`", k.MessageIDString())
  320. }
  321. p.handleMessage(ctx, k)
  322. case m := <-p.batchObsvC:
  323. batchObservationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
  324. p.handleBatchObservation(m)
  325. case m := <-p.signedInC:
  326. p.handleInboundSignedVAAWithQuorum(m)
  327. case <-cleanup.C:
  328. p.handleCleanup(ctx)
  329. case <-pollTimer.C:
  330. // Poll the pending lists for messages that can be released. Both the Notary and the Governor
  331. // can delay messages.
  332. // As each of the Notary, Governor, and Accountant can be enabled separately, each must
  333. // be processed in a modular way.
  334. // When more than one of these features are enabled, messages should be processed
  335. // serially in the order: Notary -> Governor -> Accountant.
  336. // NOTE: The Accountant can signal to a channel that it is ready to publish a message via
  337. // writing to acctReadC so it is not handled here.
  338. if p.notary != nil {
  339. readyMsgs := p.notary.ReleaseReadyMessages()
  340. // Iterate over all ready messages. Hand-off to the Governor or the Accountant
  341. // if they're enabled. If not, publish.
  342. for _, msg := range readyMsgs {
  343. // TODO: Much of this is duplicated from the msgC branch. It might be a good
  344. // idea to refactor how we handle combinations of Notary, Governor, and Accountant being
  345. // enabled.
  346. // Hand-off to governor
  347. if p.governor != nil {
  348. if !p.governor.ProcessMsg(msg) {
  349. continue
  350. }
  351. }
  352. // Hand-off to accountant. If we get here, both the Notary and the Governor
  353. // have signalled that the message is OK to publish.
  354. if p.acct != nil {
  355. shouldPub, err := p.acct.SubmitObservation(msg)
  356. if err != nil {
  357. return fmt.Errorf("accountant: failed to process message `%s`: %w", msg.MessageIDString(), err)
  358. }
  359. if !shouldPub {
  360. continue
  361. }
  362. }
  363. // Notary, Governor, and Accountant have all approved.
  364. p.handleMessage(ctx, msg)
  365. }
  366. }
  367. if p.governor != nil {
  368. toBePublished, err := p.governor.CheckPending()
  369. if err != nil {
  370. return err
  371. }
  372. if len(toBePublished) != 0 {
  373. for _, k := range toBePublished {
  374. // SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message.
  375. if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil {
  376. return fmt.Errorf("governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err)
  377. } else if !msgIsGoverned {
  378. return fmt.Errorf("governor published a message that should not be governed: `%s`", k.MessageIDString())
  379. }
  380. if p.acct != nil {
  381. shouldPub, err := p.acct.SubmitObservation(k)
  382. if err != nil {
  383. return fmt.Errorf("failed to process message released by governor `%s`: %w", k.MessageIDString(), err)
  384. }
  385. if !shouldPub {
  386. continue
  387. }
  388. }
  389. p.handleMessage(ctx, k)
  390. }
  391. }
  392. }
  393. if (p.notary != nil) || (p.governor != nil) || (p.acct != nil) {
  394. pollTimer.Reset(PollInterval)
  395. }
  396. }
  397. }
  398. }
  399. // storeSignedVAA schedules a database update for a VAA.
  400. func (p *Processor) storeSignedVAA(v *vaa.VAA) {
  401. if v.EmitterChain == vaa.ChainIDPythNet {
  402. key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
  403. p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
  404. return
  405. }
  406. key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence)
  407. p.updateVAALock.Lock()
  408. p.updatedVAAs[key] = &updateVaaEntry{v: v, dirty: true}
  409. p.updateVAALock.Unlock()
  410. }
  411. // haveSignedVAA returns true if we already have a VAA for the given VAAID
  412. func (p *Processor) haveSignedVAA(id guardianDB.VAAID) bool {
  413. if id.EmitterChain == vaa.ChainIDPythNet {
  414. if p.pythnetVaas == nil {
  415. return false
  416. }
  417. key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence)
  418. _, exists := p.pythnetVaas[key]
  419. return exists
  420. }
  421. key := fmt.Sprintf("%d/%v/%v", id.EmitterChain, id.EmitterAddress, id.Sequence)
  422. if p.getVaaFromUpdateMap(key) != nil {
  423. return true
  424. }
  425. if p.db == nil {
  426. return false
  427. }
  428. ok, err := p.db.HasVAA(id)
  429. if err != nil {
  430. p.logger.Error("failed to look up VAA in database",
  431. zap.String("vaaID", string(id.Bytes())),
  432. zap.Error(err),
  433. )
  434. return false
  435. }
  436. return ok
  437. }
  438. // getVaaFromUpdateMap gets the VAA from the local map. If it's not there, it returns nil.
  439. func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA {
  440. p.updateVAALock.Lock()
  441. entry, exists := p.updatedVAAs[key]
  442. p.updateVAALock.Unlock()
  443. if !exists {
  444. return nil
  445. }
  446. return entry.v
  447. }
  448. // vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map
  449. // being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA
  450. // gets updated again while we are in the process of writing it to the database.
  451. func (p *Processor) vaaWriter(ctx context.Context) error {
  452. ticker := time.NewTicker(time.Second)
  453. for {
  454. select {
  455. case <-ctx.Done():
  456. return nil
  457. case <-ticker.C:
  458. var updatedVAAs map[string]*updateVaaEntry
  459. p.updateVAALock.Lock()
  460. if len(p.updatedVAAs) != 0 {
  461. // There's something to write. Create a local copy of the map so we can release the lock.
  462. updatedVAAs = make(map[string]*updateVaaEntry)
  463. for key, entry := range p.updatedVAAs {
  464. updatedVAAs[key] = entry
  465. entry.dirty = false
  466. }
  467. }
  468. p.updateVAALock.Unlock()
  469. if updatedVAAs != nil {
  470. // If there's anything to write, do that.
  471. vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs))
  472. for _, entry := range updatedVAAs {
  473. vaaBatch = append(vaaBatch, entry.v)
  474. }
  475. if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil {
  476. p.logger.Error("failed to write VAAs to database", zap.Int("numVAAs", len(vaaBatch)), zap.Error(err))
  477. }
  478. // Go through the map and delete anything we have written that hasn't been updated again.
  479. // If something has been updated again, it will get written next interval.
  480. p.updateVAALock.Lock()
  481. for key, entry := range p.updatedVAAs {
  482. if !entry.dirty {
  483. delete(p.updatedVAAs, key)
  484. }
  485. }
  486. p.updateVAALock.Unlock()
  487. }
  488. }
  489. }
  490. }