processor.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package processor
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "fmt"
  6. "time"
  7. ethcommon "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/crypto"
  9. "go.uber.org/zap"
  10. "github.com/certusone/wormhole/bridge/pkg/common"
  11. "github.com/certusone/wormhole/bridge/pkg/devnet"
  12. gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
  13. "github.com/certusone/wormhole/bridge/pkg/supervisor"
  14. "github.com/certusone/wormhole/bridge/pkg/vaa"
  15. )
  16. type (
  17. // vaaState represents the local view of a given VAA
  18. vaaState struct {
  19. firstObserved time.Time
  20. ourVAA *vaa.VAA
  21. signatures map[ethcommon.Address][]byte
  22. submitted bool
  23. retryCount uint
  24. ourMsg []byte
  25. }
  26. vaaMap map[string]*vaaState
  27. // aggregationState represents the node's aggregation of guardian signatures.
  28. aggregationState struct {
  29. vaaSignatures vaaMap
  30. }
  31. )
  32. type Processor struct {
  33. // lockC is a channel of observed chain lockups
  34. lockC chan *common.ChainLock
  35. // setC is a channel of guardian set updates
  36. setC chan *common.GuardianSet
  37. // sendC is a channel of outbound messages to broadcast on p2p
  38. sendC chan []byte
  39. // obsvC is a channel of inbound decoded observations from p2p
  40. obsvC chan *gossipv1.LockupObservation
  41. // vaaC is a channel of VAAs to submit to store on Solana (either as target, or for data availability)
  42. vaaC chan *vaa.VAA
  43. // injectC is a channel of VAAs injected locally.
  44. injectC chan *vaa.VAA
  45. // gk is the node's guardian private key
  46. gk *ecdsa.PrivateKey
  47. // devnetMode specified whether to submit transactions to the hardcoded Ethereum devnet
  48. devnetMode bool
  49. devnetNumGuardians uint
  50. devnetEthRPC string
  51. terraLCD string
  52. terraChaidID string
  53. terraContract string
  54. terraFeePayer string
  55. logger *zap.Logger
  56. // Runtime state
  57. // gs is the currently valid guardian set
  58. gs *common.GuardianSet
  59. // state is the current runtime VAA view
  60. state *aggregationState
  61. // gk pk as eth address
  62. ourAddr ethcommon.Address
  63. // cleanup triggers periodic state cleanup
  64. cleanup *time.Ticker
  65. }
  66. func NewProcessor(
  67. ctx context.Context,
  68. lockC chan *common.ChainLock,
  69. setC chan *common.GuardianSet,
  70. sendC chan []byte,
  71. obsvC chan *gossipv1.LockupObservation,
  72. vaaC chan *vaa.VAA,
  73. injectC chan *vaa.VAA,
  74. gk *ecdsa.PrivateKey,
  75. devnetMode bool,
  76. devnetNumGuardians uint,
  77. devnetEthRPC string,
  78. terraLCD string,
  79. terraChaidID string,
  80. terraContract string,
  81. terraFeePayer string) *Processor {
  82. return &Processor{
  83. lockC: lockC,
  84. setC: setC,
  85. sendC: sendC,
  86. obsvC: obsvC,
  87. vaaC: vaaC,
  88. injectC: injectC,
  89. gk: gk,
  90. devnetMode: devnetMode,
  91. devnetNumGuardians: devnetNumGuardians,
  92. devnetEthRPC: devnetEthRPC,
  93. terraLCD: terraLCD,
  94. terraChaidID: terraChaidID,
  95. terraContract: terraContract,
  96. terraFeePayer: terraFeePayer,
  97. logger: supervisor.Logger(ctx),
  98. state: &aggregationState{vaaMap{}},
  99. ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
  100. }
  101. }
  102. func (p *Processor) Run(ctx context.Context) error {
  103. p.cleanup = time.NewTicker(30 * time.Second)
  104. for {
  105. select {
  106. case <-ctx.Done():
  107. return ctx.Err()
  108. case p.gs = <-p.setC:
  109. p.logger.Info("guardian set updated",
  110. zap.Strings("set", p.gs.KeysAsHexStrings()),
  111. zap.Uint32("index", p.gs.Index))
  112. // Dev mode guardian set update check (no-op in production)
  113. err := p.checkDevModeGuardianSetUpdate(ctx)
  114. if err != nil {
  115. return err
  116. }
  117. case k := <-p.lockC:
  118. p.handleLockup(ctx, k)
  119. case v := <-p.injectC:
  120. p.handleInjection(ctx, v)
  121. case m := <-p.obsvC:
  122. p.handleObservation(ctx, m)
  123. case <-p.cleanup.C:
  124. p.handleCleanup(ctx)
  125. }
  126. }
  127. }
  128. func (p *Processor) checkDevModeGuardianSetUpdate(ctx context.Context) error {
  129. if p.devnetMode {
  130. if uint(len(p.gs.Keys)) != p.devnetNumGuardians {
  131. v := devnet.DevnetGuardianSetVSS(p.devnetNumGuardians)
  132. p.logger.Info(fmt.Sprintf("guardian set has %d members, expecting %d - submitting VAA",
  133. len(p.gs.Keys), p.devnetNumGuardians),
  134. zap.Any("v", v))
  135. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  136. defer cancel()
  137. trx, err := devnet.SubmitVAA(timeout, p.devnetEthRPC, v)
  138. if err != nil {
  139. return fmt.Errorf("failed to submit devnet guardian set change: %v", err)
  140. }
  141. p.logger.Info("devnet guardian set change submitted to Ethereum", zap.Any("trx", trx), zap.Any("vaa", v))
  142. // Submit VAA to Solana as well. This is asynchronous and can fail, leading to inconsistent devnet state.
  143. p.vaaC <- v
  144. }
  145. }
  146. return nil
  147. }