| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558 |
- package processor
- import (
- "context"
- "encoding/hex"
- "fmt"
- "sync"
- "time"
- "github.com/certusone/wormhole/node/pkg/altpub"
- guardianDB "github.com/certusone/wormhole/node/pkg/db"
- "github.com/certusone/wormhole/node/pkg/governor"
- "github.com/certusone/wormhole/node/pkg/guardiansigner"
- guardianNotary "github.com/certusone/wormhole/node/pkg/notary"
- "github.com/certusone/wormhole/node/pkg/p2p"
- ethcommon "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/crypto"
- "go.uber.org/zap"
- "github.com/certusone/wormhole/node/pkg/accountant"
- "github.com/certusone/wormhole/node/pkg/common"
- "github.com/certusone/wormhole/node/pkg/gwrelayer"
- gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
- "github.com/certusone/wormhole/node/pkg/supervisor"
- "github.com/wormhole-foundation/wormhole/sdk/vaa"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
- )
- var PollInterval = time.Minute
- var CleanupInterval = time.Second * 30
- type (
- // Observation defines the interface for any events observed by the guardian.
- Observation interface {
- // GetEmitterChain returns the id of the chain where this event was observed.
- GetEmitterChain() vaa.ChainID
- // MessageID returns a human-readable emitter_chain/emitter_address/sequence tuple.
- MessageID() string
- // SigningDigest returns the hash of the hash signing body of the observation. This is used
- // for signature generation and verification.
- SigningDigest() ethcommon.Hash
- // IsReliable returns whether this message is considered reliable meaning it can be reobserved.
- IsReliable() bool
- // IsReobservation returns whether this message is the result of a reobservation request.
- IsReobservation() bool
- // HandleQuorum finishes processing the observation once a quorum of signatures have
- // been received for it.
- HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)
- }
- // state represents the local view of a given observation
- state struct {
- // First time this digest was seen (possibly even before we observed it ourselves).
- firstObserved time.Time
- // A re-observation request shall not be sent before this time.
- nextRetry time.Time
- // Number of times we sent a re-observation request
- retryCtr uint
- // Copy of our observation.
- ourObservation Observation
- // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
- // to either the old or new guardian set.
- signatures map[ethcommon.Address][]byte
- // Flag set after reaching quorum and submitting the VAA.
- submitted bool
- // Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
- settled bool
- // Human-readable description of the VAA's source, used for metrics.
- source string
- // Our observation in case we need to resubmit it to the batch publisher.
- ourObs *gossipv1.Observation
- // Copy of the bytes we submitted (ourObservation, but signed and serialized). Used for retransmissions.
- ourMsg []byte
- // The hash of the transaction in which the observation was made. Used for re-observation requests.
- txHash []byte
- // Copy of the guardian set valid at observation/injection time.
- gs *common.GuardianSet
- }
- observationMap map[string]*state
- // aggregationState represents the node's aggregation of guardian signatures.
- aggregationState struct {
- signatures observationMap
- }
- )
- // LoggingID can be used to identify a state object in a log message. Note that it should not
- // be used to uniquely identify an observation. It is only meant for logging purposes.
- func (s *state) LoggingID() string {
- if s.ourObservation != nil {
- return s.ourObservation.MessageID()
- }
- return hex.EncodeToString(s.txHash)
- }
- type PythNetVaaEntry struct {
- v *vaa.VAA
- updateTime time.Time // Used for determining when to delete entries
- }
- type Processor struct {
- // msgC is a channel of observed emitted messages
- msgC <-chan *common.MessagePublication
- // setC is a channel of guardian set updates
- setC <-chan *common.GuardianSet
- // gossipAttestationSendC is a channel of outbound observation messages to broadcast on p2p
- gossipAttestationSendC chan<- []byte
- // gossipVaaSendC is a channel of outbound VAA messages to broadcast on p2p
- gossipVaaSendC chan<- []byte
- // batchObsvC is a channel of inbound decoded batches of observations from p2p
- batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]
- // obsvReqSendC is a send-only channel of outbound re-observation requests to broadcast on p2p
- obsvReqSendC chan<- *gossipv1.ObservationRequest
- // signedInC is a channel of inbound signed VAA observations from p2p
- signedInC <-chan *gossipv1.SignedVAAWithQuorum
- // guardianSigner is the guardian node's signer
- guardianSigner guardiansigner.GuardianSigner
- logger *zap.Logger
- db *guardianDB.Database
- alternatePublisher *altpub.AlternatePublisher
- // Runtime state
- // gs is the currently valid guardian set
- gs *common.GuardianSet
- // gst is managed by the processor and allows concurrent access to the
- // guardian set by other components.
- gst *common.GuardianSetState
- // state is the current runtime VAA view
- state *aggregationState
- // gk pk as eth address
- ourAddr ethcommon.Address
- governor *governor.ChainGovernor
- acct *accountant.Accountant
- acctReadC <-chan *common.MessagePublication
- notary *guardianNotary.Notary
- pythnetVaas map[string]PythNetVaaEntry
- gatewayRelayer *gwrelayer.GatewayRelayer
- updateVAALock sync.Mutex
- updatedVAAs map[string]*updateVaaEntry
- networkID string
- // batchObsvPubC is the internal channel used to publish observations to the batch processor for publishing.
- batchObsvPubC chan *gossipv1.Observation
- }
- // updateVaaEntry is used to queue up a VAA to be written to the database.
- type updateVaaEntry struct {
- v *vaa.VAA
- dirty bool
- }
- var (
- batchObservationChanDelay = promauto.NewHistogram(
- prometheus.HistogramOpts{
- Name: "wormhole_batch_observation_channel_delay_us",
- Help: "Latency histogram for delay of batched observations in channel",
- Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
- })
- batchObservationTotalDelay = promauto.NewHistogram(
- prometheus.HistogramOpts{
- Name: "wormhole_batch_observation_total_delay_us",
- Help: "Latency histogram for total time to process batched observations",
- Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
- })
- batchObservationChannelOverflow = promauto.NewCounterVec(
- prometheus.CounterOpts{
- Name: "wormhole_batch_observation_channel_overflow",
- Help: "Total number of times a write to the batch observation publish channel failed",
- }, []string{"channel"})
- vaaPublishChannelOverflow = promauto.NewCounter(
- prometheus.CounterOpts{
- Name: "wormhole_vaa_publish_channel_overflow",
- Help: "Total number of times a write to the vaa publish channel failed",
- })
- timeToHandleObservation = promauto.NewHistogram(
- prometheus.HistogramOpts{
- Name: "wormhole_time_to_handle_observation_us",
- Help: "Latency histogram for total time to handle observation on an observation",
- Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
- })
- timeToHandleQuorum = promauto.NewHistogram(
- prometheus.HistogramOpts{
- Name: "wormhole_time_to_handle_quorum_us",
- Help: "Latency histogram for total time to handle quorum on an observation",
- Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
- })
- )
- // batchObsvPubChanSize specifies the size of the channel used to publish observation batches. Allow five seconds worth.
- const batchObsvPubChanSize = p2p.MaxObservationBatchSize * 5
- func NewProcessor(
- ctx context.Context,
- db *guardianDB.Database,
- msgC <-chan *common.MessagePublication,
- setC <-chan *common.GuardianSet,
- gossipAttestationSendC chan<- []byte,
- gossipVaaSendC chan<- []byte,
- batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
- obsvReqSendC chan<- *gossipv1.ObservationRequest,
- signedInC <-chan *gossipv1.SignedVAAWithQuorum,
- guardianSigner guardiansigner.GuardianSigner,
- gst *common.GuardianSetState,
- g *governor.ChainGovernor,
- acct *accountant.Accountant,
- acctReadC <-chan *common.MessagePublication,
- notary *guardianNotary.Notary,
- gatewayRelayer *gwrelayer.GatewayRelayer,
- networkID string,
- alternatePublisher *altpub.AlternatePublisher,
- ) *Processor {
- return &Processor{
- msgC: msgC,
- setC: setC,
- gossipAttestationSendC: gossipAttestationSendC,
- gossipVaaSendC: gossipVaaSendC,
- batchObsvC: batchObsvC,
- obsvReqSendC: obsvReqSendC,
- signedInC: signedInC,
- guardianSigner: guardianSigner,
- gst: gst,
- db: db,
- alternatePublisher: alternatePublisher,
- logger: supervisor.Logger(ctx),
- state: &aggregationState{observationMap{}},
- ourAddr: crypto.PubkeyToAddress(guardianSigner.PublicKey(ctx)),
- governor: g,
- acct: acct,
- acctReadC: acctReadC,
- notary: notary,
- pythnetVaas: make(map[string]PythNetVaaEntry),
- gatewayRelayer: gatewayRelayer,
- batchObsvPubC: make(chan *gossipv1.Observation, batchObsvPubChanSize),
- updatedVAAs: make(map[string]*updateVaaEntry),
- networkID: networkID,
- }
- }
- func (p *Processor) Run(ctx context.Context) error {
- if err := supervisor.Run(ctx, "vaaWriter", common.WrapWithScissors(p.vaaWriter, "vaaWriter")); err != nil {
- return fmt.Errorf("failed to start vaa writer: %w", err)
- }
- if err := supervisor.Run(ctx, "batchProcessor", common.WrapWithScissors(p.batchProcessor, "batchProcessor")); err != nil {
- return fmt.Errorf("failed to start batch processor: %w", err)
- }
- cleanup := time.NewTicker(CleanupInterval)
- // Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that.
- pollTimer := time.NewTimer(PollInterval)
- for {
- select {
- case <-ctx.Done():
- if p.acct != nil {
- p.acct.Close()
- }
- return ctx.Err()
- case p.gs = <-p.setC:
- p.logger.Info("guardian set updated",
- zap.Strings("set", p.gs.KeysAsHexStrings()),
- zap.Uint32("index", p.gs.Index),
- zap.Int("quorum", p.gs.Quorum()),
- )
- p.gst.Set(p.gs)
- case k := <-p.msgC:
- // This is the main message processing loop. It is responsible for handling messages that are
- // received on the message channel. Depending on the configuration, a message may be processed
- // by the Notary, the Governor, and/or the Accountant.
- // This loop effectively causes each of these components to process messages in a modular
- // manner. The Notary, Governor, and Accountant can be enabled or disabled independently.
- // As a consequence of this loop, each of these components updates its internal state, tracking
- // whether a message is ready to be processed from its perspective. This state is used by the
- // processor to determine whether a message should be processed or not. This occurs elsewhere
- // in the processor code.
- p.logger.Debug("processor: received new message publication on message channel", k.ZapFields()...)
- // Notary: check whether a message is well-formed.
- // Send messages to the Notary first. If messages are not approved, they should not continue
- // to the Governor or the Accountant.
- if p.notary != nil {
- p.logger.Debug("processor: sending message to notary for evaluation", k.ZapFields()...)
- // NOTE: Always returns Approve for messages that are not token transfers.
- verdict, err := p.notary.ProcessMsg(k)
- if err != nil {
- // TODO: The error is deliberately ignored so that the processor does not panic and restart.
- // In contrast, the Accountant does not ignore the error and restarts the processor if it fails.
- // The error-handling strategy can be revisited once the Notary is considered stable.
- p.logger.Error("notary failed to process message", zap.Error(err), zap.String("messageID", k.MessageIDString()))
- continue
- }
- // Based on the verdict, we can decide what to do with the message.
- switch verdict {
- case guardianNotary.Blackhole, guardianNotary.Delay:
- p.logger.Error("notary evaluated message as threatening", k.ZapFields(zap.String("verdict", verdict.String()))...)
- if verdict == guardianNotary.Blackhole {
- // Black-holed messages should not be processed.
- p.logger.Error("message will not be processed", k.ZapFields(zap.String("verdict", verdict.String()))...)
- } else {
- // Delayed messages are added to a separate queue and processed elsewhere.
- p.logger.Error("message will be delayed", k.ZapFields(zap.String("verdict", verdict.String()))...)
- }
- // We're done processing the message.
- continue
- case guardianNotary.Unknown:
- p.logger.Error("notary returned Unknown verdict", k.ZapFields(zap.String("verdict", verdict.String()))...)
- case guardianNotary.Approve:
- // no-op: process normally
- p.logger.Debug("notary evaluated message as approved", k.ZapFields(zap.String("verdict", verdict.String()))...)
- default:
- p.logger.Error("notary returned unrecognized verdict", k.ZapFields(zap.String("verdict", verdict.String()))...)
- }
- }
- // Governor: check if a message is ready to be published.
- if p.governor != nil {
- if !p.governor.ProcessMsg(k) {
- // We're done processing the message.
- continue
- }
- }
- // Accountant: check if a message is ready to be published (i.e. if it has enough observations).
- if p.acct != nil {
- shouldPub, err := p.acct.SubmitObservation(k)
- if err != nil {
- return fmt.Errorf("accountant: failed to process message `%s`: %w", k.MessageIDString(), err)
- }
- if !shouldPub {
- // We're done processing the message.
- continue
- }
- }
- p.handleMessage(ctx, k)
- case k := <-p.acctReadC:
- if p.acct == nil {
- return fmt.Errorf("received an accountant event when accountant is not configured")
- }
- // SECURITY defense-in-depth: Make sure the accountant did not generate an unexpected message.
- if !p.acct.IsMessageCoveredByAccountant(k) {
- return fmt.Errorf("accountant published a message that is not covered by it: `%s`", k.MessageIDString())
- }
- p.handleMessage(ctx, k)
- case m := <-p.batchObsvC:
- batchObservationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
- p.handleBatchObservation(m)
- case m := <-p.signedInC:
- p.handleInboundSignedVAAWithQuorum(m)
- case <-cleanup.C:
- p.handleCleanup(ctx)
- case <-pollTimer.C:
- // Poll the pending lists for messages that can be released. Both the Notary and the Governor
- // can delay messages.
- // As each of the Notary, Governor, and Accountant can be enabled separately, each must
- // be processed in a modular way.
- // When more than one of these features are enabled, messages should be processed
- // serially in the order: Notary -> Governor -> Accountant.
- // NOTE: The Accountant can signal to a channel that it is ready to publish a message via
- // writing to acctReadC so it is not handled here.
- if p.notary != nil {
- readyMsgs := p.notary.ReleaseReadyMessages()
- // Iterate over all ready messages. Hand-off to the Governor or the Accountant
- // if they're enabled. If not, publish.
- for _, msg := range readyMsgs {
- // TODO: Much of this is duplicated from the msgC branch. It might be a good
- // idea to refactor how we handle combinations of Notary, Governor, and Accountant being
- // enabled.
- // Hand-off to governor
- if p.governor != nil {
- if !p.governor.ProcessMsg(msg) {
- continue
- }
- }
- // Hand-off to accountant. If we get here, both the Notary and the Governor
- // have signalled that the message is OK to publish.
- if p.acct != nil {
- shouldPub, err := p.acct.SubmitObservation(msg)
- if err != nil {
- return fmt.Errorf("accountant: failed to process message `%s`: %w", msg.MessageIDString(), err)
- }
- if !shouldPub {
- continue
- }
- }
- // Notary, Governor, and Accountant have all approved.
- p.handleMessage(ctx, msg)
- }
- }
- if p.governor != nil {
- toBePublished, err := p.governor.CheckPending()
- if err != nil {
- return err
- }
- if len(toBePublished) != 0 {
- for _, k := range toBePublished {
- // SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message.
- if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil {
- return fmt.Errorf("governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err)
- } else if !msgIsGoverned {
- return fmt.Errorf("governor published a message that should not be governed: `%s`", k.MessageIDString())
- }
- if p.acct != nil {
- shouldPub, err := p.acct.SubmitObservation(k)
- if err != nil {
- return fmt.Errorf("failed to process message released by governor `%s`: %w", k.MessageIDString(), err)
- }
- if !shouldPub {
- continue
- }
- }
- p.handleMessage(ctx, k)
- }
- }
- }
- if (p.notary != nil) || (p.governor != nil) || (p.acct != nil) {
- pollTimer.Reset(PollInterval)
- }
- }
- }
- }
- // storeSignedVAA schedules a database update for a VAA.
- func (p *Processor) storeSignedVAA(v *vaa.VAA) {
- if v.EmitterChain == vaa.ChainIDPythNet {
- key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
- p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
- return
- }
- key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence)
- p.updateVAALock.Lock()
- p.updatedVAAs[key] = &updateVaaEntry{v: v, dirty: true}
- p.updateVAALock.Unlock()
- }
- // haveSignedVAA returns true if we already have a VAA for the given VAAID
- func (p *Processor) haveSignedVAA(id guardianDB.VAAID) bool {
- if id.EmitterChain == vaa.ChainIDPythNet {
- if p.pythnetVaas == nil {
- return false
- }
- key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence)
- _, exists := p.pythnetVaas[key]
- return exists
- }
- key := fmt.Sprintf("%d/%v/%v", id.EmitterChain, id.EmitterAddress, id.Sequence)
- if p.getVaaFromUpdateMap(key) != nil {
- return true
- }
- if p.db == nil {
- return false
- }
- ok, err := p.db.HasVAA(id)
- if err != nil {
- p.logger.Error("failed to look up VAA in database",
- zap.String("vaaID", string(id.Bytes())),
- zap.Error(err),
- )
- return false
- }
- return ok
- }
- // getVaaFromUpdateMap gets the VAA from the local map. If it's not there, it returns nil.
- func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA {
- p.updateVAALock.Lock()
- entry, exists := p.updatedVAAs[key]
- p.updateVAALock.Unlock()
- if !exists {
- return nil
- }
- return entry.v
- }
- // vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map
- // being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA
- // gets updated again while we are in the process of writing it to the database.
- func (p *Processor) vaaWriter(ctx context.Context) error {
- ticker := time.NewTicker(time.Second)
- for {
- select {
- case <-ctx.Done():
- return nil
- case <-ticker.C:
- var updatedVAAs map[string]*updateVaaEntry
- p.updateVAALock.Lock()
- if len(p.updatedVAAs) != 0 {
- // There's something to write. Create a local copy of the map so we can release the lock.
- updatedVAAs = make(map[string]*updateVaaEntry)
- for key, entry := range p.updatedVAAs {
- updatedVAAs[key] = entry
- entry.dirty = false
- }
- }
- p.updateVAALock.Unlock()
- if updatedVAAs != nil {
- // If there's anything to write, do that.
- vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs))
- for _, entry := range updatedVAAs {
- vaaBatch = append(vaaBatch, entry.v)
- }
- if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil {
- p.logger.Error("failed to write VAAs to database", zap.Int("numVAAs", len(vaaBatch)), zap.Error(err))
- }
- // Go through the map and delete anything we have written that hasn't been updated again.
- // If something has been updated again, it will get written next interval.
- p.updateVAALock.Lock()
- for key, entry := range p.updatedVAAs {
- if !entry.dirty {
- delete(p.updatedVAAs, key)
- }
- }
- p.updateVAALock.Unlock()
- }
- }
- }
- }
|