processor.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package processor
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "fmt"
  6. "time"
  7. ethcommon "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/crypto"
  9. "go.uber.org/zap"
  10. "github.com/certusone/wormhole/node/pkg/common"
  11. "github.com/certusone/wormhole/node/pkg/devnet"
  12. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  13. "github.com/certusone/wormhole/node/pkg/supervisor"
  14. "github.com/certusone/wormhole/node/pkg/vaa"
  15. )
  16. type (
  17. // vaaState represents the local view of a given VAA
  18. vaaState struct {
  19. // First time this digest was seen (possibly even before we saw its lockup).
  20. firstObserved time.Time
  21. // Copy of the VAA we constructed when we saw the lockup.
  22. ourVAA *vaa.VAA
  23. // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
  24. // to either the old or new guardian set.
  25. signatures map[ethcommon.Address][]byte
  26. // Flag set after reaching quorum and submitting the VAA.
  27. submitted bool
  28. // Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
  29. settled bool
  30. // Human-readable description of the VAA's source, used for metrics.
  31. source string
  32. // Number of times the cleanup service has attempted to retransmit this VAA.
  33. retryCount uint
  34. // Copy of the bytes we submitted (ourVAA, but signed and serialized). Used for retransmissions.
  35. ourMsg []byte
  36. // Copy of the guardian set valid at lockup/injection time.
  37. gs *common.GuardianSet
  38. }
  39. vaaMap map[string]*vaaState
  40. // aggregationState represents the node's aggregation of guardian signatures.
  41. aggregationState struct {
  42. vaaSignatures vaaMap
  43. }
  44. )
  45. type Processor struct {
  46. // lockC is a channel of observed chain lockups
  47. lockC chan *common.ChainLock
  48. // setC is a channel of guardian set updates
  49. setC chan *common.GuardianSet
  50. // sendC is a channel of outbound messages to broadcast on p2p
  51. sendC chan []byte
  52. // obsvC is a channel of inbound decoded observations from p2p
  53. obsvC chan *gossipv1.SignedObservation
  54. // vaaC is a channel of VAAs to submit to store on Solana (either as target, or for data availability)
  55. vaaC chan *vaa.VAA
  56. // injectC is a channel of VAAs injected locally.
  57. injectC chan *vaa.VAA
  58. // gk is the node's guardian private key
  59. gk *ecdsa.PrivateKey
  60. // devnetMode specified whether to submit transactions to the hardcoded Ethereum devnet
  61. devnetMode bool
  62. devnetNumGuardians uint
  63. devnetEthRPC string
  64. logger *zap.Logger
  65. // Runtime state
  66. // gs is the currently valid guardian set
  67. gs *common.GuardianSet
  68. // gst is managed by the processor and allows concurrent access to the
  69. // guardian set by other components.
  70. gst *common.GuardianSetState
  71. // state is the current runtime VAA view
  72. state *aggregationState
  73. // gk pk as eth address
  74. ourAddr ethcommon.Address
  75. // cleanup triggers periodic state cleanup
  76. cleanup *time.Ticker
  77. }
  78. func NewProcessor(
  79. ctx context.Context,
  80. lockC chan *common.ChainLock,
  81. setC chan *common.GuardianSet,
  82. sendC chan []byte,
  83. obsvC chan *gossipv1.SignedObservation,
  84. vaaC chan *vaa.VAA,
  85. injectC chan *vaa.VAA,
  86. gk *ecdsa.PrivateKey,
  87. gst *common.GuardianSetState,
  88. devnetMode bool,
  89. devnetNumGuardians uint,
  90. devnetEthRPC string) *Processor {
  91. return &Processor{
  92. lockC: lockC,
  93. setC: setC,
  94. sendC: sendC,
  95. obsvC: obsvC,
  96. vaaC: vaaC,
  97. injectC: injectC,
  98. gk: gk,
  99. gst: gst,
  100. devnetMode: devnetMode,
  101. devnetNumGuardians: devnetNumGuardians,
  102. devnetEthRPC: devnetEthRPC,
  103. logger: supervisor.Logger(ctx),
  104. state: &aggregationState{vaaMap{}},
  105. ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
  106. }
  107. }
  108. func (p *Processor) Run(ctx context.Context) error {
  109. p.cleanup = time.NewTicker(30 * time.Second)
  110. for {
  111. select {
  112. case <-ctx.Done():
  113. return ctx.Err()
  114. case p.gs = <-p.setC:
  115. p.logger.Info("guardian set updated",
  116. zap.Strings("set", p.gs.KeysAsHexStrings()),
  117. zap.Uint32("index", p.gs.Index))
  118. p.gst.Set(p.gs)
  119. // Dev mode guardian set update check (no-op in production)
  120. err := p.checkDevModeGuardianSetUpdate(ctx)
  121. if err != nil {
  122. return err
  123. }
  124. case k := <-p.lockC:
  125. p.handleLockup(ctx, k)
  126. case v := <-p.injectC:
  127. p.handleInjection(ctx, v)
  128. case m := <-p.obsvC:
  129. p.handleObservation(ctx, m)
  130. case <-p.cleanup.C:
  131. p.handleCleanup(ctx)
  132. }
  133. }
  134. }
  135. func (p *Processor) checkDevModeGuardianSetUpdate(ctx context.Context) error {
  136. if p.devnetMode {
  137. if uint(len(p.gs.Keys)) != p.devnetNumGuardians {
  138. v := devnet.DevnetGuardianSetVSS(p.devnetNumGuardians)
  139. p.logger.Info(fmt.Sprintf("guardian set has %d members, expecting %d - submitting VAA",
  140. len(p.gs.Keys), p.devnetNumGuardians),
  141. zap.Any("v", v))
  142. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  143. defer cancel()
  144. trx, err := devnet.SubmitVAA(timeout, p.devnetEthRPC, v)
  145. if err != nil {
  146. // Either Ethereum is not yet up, or another node has already submitted - bail
  147. // and let another node handle it. We only check the guardian set on Ethereum,
  148. // so we use that to sequence devnet creation Solana as well.
  149. return fmt.Errorf("failed to submit Eth devnet guardian set change: %v", err)
  150. }
  151. p.logger.Info("devnet guardian set change submitted to Ethereum", zap.Any("trx", trx), zap.Any("vaa", v))
  152. // Submit VAA to Solana as well. This is asynchronous and can fail, leading to inconsistent devnet state.
  153. p.vaaC <- v
  154. }
  155. }
  156. return nil
  157. }