processor.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package processor
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "fmt"
  6. "time"
  7. ethcommon "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/crypto"
  9. "go.uber.org/zap"
  10. "github.com/certusone/wormhole/bridge/pkg/common"
  11. "github.com/certusone/wormhole/bridge/pkg/devnet"
  12. gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
  13. "github.com/certusone/wormhole/bridge/pkg/supervisor"
  14. "github.com/certusone/wormhole/bridge/pkg/terra"
  15. "github.com/certusone/wormhole/bridge/pkg/vaa"
  16. )
  17. type (
  18. // vaaState represents the local view of a given VAA
  19. vaaState struct {
  20. firstObserved time.Time
  21. ourVAA *vaa.VAA
  22. signatures map[ethcommon.Address][]byte
  23. submitted bool
  24. retryCount uint
  25. ourMsg []byte
  26. }
  27. vaaMap map[string]*vaaState
  28. // aggregationState represents the node's aggregation of guardian signatures.
  29. aggregationState struct {
  30. vaaSignatures vaaMap
  31. }
  32. )
  33. type Processor struct {
  34. // lockC is a channel of observed chain lockups
  35. lockC chan *common.ChainLock
  36. // setC is a channel of guardian set updates
  37. setC chan *common.GuardianSet
  38. // sendC is a channel of outbound messages to broadcast on p2p
  39. sendC chan []byte
  40. // obsvC is a channel of inbound decoded observations from p2p
  41. obsvC chan *gossipv1.LockupObservation
  42. // vaaC is a channel of VAAs to submit to store on Solana (either as target, or for data availability)
  43. vaaC chan *vaa.VAA
  44. // injectC is a channel of VAAs injected locally.
  45. injectC chan *vaa.VAA
  46. // gk is the node's guardian private key
  47. gk *ecdsa.PrivateKey
  48. // devnetMode specified whether to submit transactions to the hardcoded Ethereum devnet
  49. devnetMode bool
  50. devnetNumGuardians uint
  51. devnetEthRPC string
  52. terraLCD string
  53. terraChaidID string
  54. terraContract string
  55. terraFeePayer string
  56. logger *zap.Logger
  57. // Runtime state
  58. // gs is the currently valid guardian set
  59. gs *common.GuardianSet
  60. // state is the current runtime VAA view
  61. state *aggregationState
  62. // gk pk as eth address
  63. ourAddr ethcommon.Address
  64. // cleanup triggers periodic state cleanup
  65. cleanup *time.Ticker
  66. }
  67. func NewProcessor(
  68. ctx context.Context,
  69. lockC chan *common.ChainLock,
  70. setC chan *common.GuardianSet,
  71. sendC chan []byte,
  72. obsvC chan *gossipv1.LockupObservation,
  73. vaaC chan *vaa.VAA,
  74. injectC chan *vaa.VAA,
  75. gk *ecdsa.PrivateKey,
  76. devnetMode bool,
  77. devnetNumGuardians uint,
  78. devnetEthRPC string,
  79. terraLCD string,
  80. terraChaidID string,
  81. terraContract string,
  82. terraFeePayer string) *Processor {
  83. return &Processor{
  84. lockC: lockC,
  85. setC: setC,
  86. sendC: sendC,
  87. obsvC: obsvC,
  88. vaaC: vaaC,
  89. injectC: injectC,
  90. gk: gk,
  91. devnetMode: devnetMode,
  92. devnetNumGuardians: devnetNumGuardians,
  93. devnetEthRPC: devnetEthRPC,
  94. terraLCD: terraLCD,
  95. terraChaidID: terraChaidID,
  96. terraContract: terraContract,
  97. terraFeePayer: terraFeePayer,
  98. logger: supervisor.Logger(ctx),
  99. state: &aggregationState{vaaMap{}},
  100. ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
  101. }
  102. }
  103. func (p *Processor) Run(ctx context.Context) error {
  104. p.cleanup = time.NewTicker(30 * time.Second)
  105. for {
  106. select {
  107. case <-ctx.Done():
  108. return ctx.Err()
  109. case p.gs = <-p.setC:
  110. p.logger.Info("guardian set updated",
  111. zap.Strings("set", p.gs.KeysAsHexStrings()),
  112. zap.Uint32("index", p.gs.Index))
  113. // Dev mode guardian set update check (no-op in production)
  114. err := p.checkDevModeGuardianSetUpdate(ctx)
  115. if err != nil {
  116. return err
  117. }
  118. case k := <-p.lockC:
  119. p.handleLockup(ctx, k)
  120. case v := <-p.injectC:
  121. p.handleInjection(ctx, v)
  122. case m := <-p.obsvC:
  123. p.handleObservation(ctx, m)
  124. case <-p.cleanup.C:
  125. p.handleCleanup(ctx)
  126. }
  127. }
  128. }
  129. func (p *Processor) checkDevModeGuardianSetUpdate(ctx context.Context) error {
  130. if p.devnetMode {
  131. if uint(len(p.gs.Keys)) != p.devnetNumGuardians {
  132. v := devnet.DevnetGuardianSetVSS(p.devnetNumGuardians)
  133. p.logger.Info(fmt.Sprintf("guardian set has %d members, expecting %d - submitting VAA",
  134. len(p.gs.Keys), p.devnetNumGuardians),
  135. zap.Any("v", v))
  136. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  137. defer cancel()
  138. trx, err := devnet.SubmitVAA(timeout, p.devnetEthRPC, v)
  139. if err != nil {
  140. return fmt.Errorf("failed to submit devnet guardian set change: %v", err)
  141. }
  142. p.logger.Info("devnet guardian set change submitted to Ethereum", zap.Any("trx", trx), zap.Any("vaa", v))
  143. if p.terraChaidID != "" {
  144. // Submit to Terra
  145. trxResponse, err := terra.SubmitVAA(timeout, p.terraLCD, p.terraChaidID, p.terraContract, p.terraFeePayer, v)
  146. if err != nil {
  147. return fmt.Errorf("failed to submit devnet guardian set change: %v", err)
  148. }
  149. p.logger.Info("devnet guardian set change submitted to Terra", zap.Any("trxResponse", trxResponse), zap.Any("vaa", v))
  150. }
  151. // Submit VAA to Solana as well. This is asynchronous and can fail, leading to inconsistent devnet state.
  152. p.vaaC <- v
  153. }
  154. }
  155. return nil
  156. }