processor.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package processor
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "fmt"
  6. "time"
  7. ethcommon "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/crypto"
  9. "go.uber.org/zap"
  10. "github.com/certusone/wormhole/bridge/pkg/common"
  11. "github.com/certusone/wormhole/bridge/pkg/devnet"
  12. gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
  13. "github.com/certusone/wormhole/bridge/pkg/supervisor"
  14. "github.com/certusone/wormhole/bridge/pkg/vaa"
  15. )
  16. type (
  17. // vaaState represents the local view of a given VAA
  18. vaaState struct {
  19. // First time this digest was seen (possibly even before we saw its lockup).
  20. firstObserved time.Time
  21. // Copy of the VAA we constructed when we saw the lockup.
  22. ourVAA *vaa.VAA
  23. // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
  24. // to either the old or new guardian set.
  25. signatures map[ethcommon.Address][]byte
  26. // Flag set after reaching quorum and submitting the VAA.
  27. submitted bool
  28. // Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
  29. settled bool
  30. // Human-readable description of the VAA's source, used for metrics.
  31. source string
  32. // Number of times the cleanup service has attempted to retransmit this VAA.
  33. retryCount uint
  34. // Copy of the bytes we submitted (ourVAA, but signed and serialized). Used for retransmissions.
  35. ourMsg []byte
  36. // Copy of the guardian set valid at lockup/injection time.
  37. gs *common.GuardianSet
  38. }
  39. vaaMap map[string]*vaaState
  40. // aggregationState represents the node's aggregation of guardian signatures.
  41. aggregationState struct {
  42. vaaSignatures vaaMap
  43. }
  44. )
  45. type Processor struct {
  46. // lockC is a channel of observed chain lockups
  47. lockC chan *common.MessagePublication
  48. // setC is a channel of guardian set updates
  49. setC chan *common.GuardianSet
  50. // sendC is a channel of outbound messages to broadcast on p2p
  51. sendC chan []byte
  52. // obsvC is a channel of inbound decoded observations from p2p
  53. obsvC chan *gossipv1.SignedObservation
  54. // vaaC is a channel of VAAs to submit to store on Solana (either as target, or for data availability)
  55. vaaC chan *vaa.VAA
  56. // injectC is a channel of VAAs injected locally.
  57. injectC chan *vaa.VAA
  58. // gk is the node's guardian private key
  59. gk *ecdsa.PrivateKey
  60. // devnetMode specified whether to submit transactions to the hardcoded Ethereum devnet
  61. devnetMode bool
  62. devnetNumGuardians uint
  63. devnetEthRPC string
  64. terraEnabled bool
  65. terraLCD string
  66. terraChainID string
  67. terraContract string
  68. logger *zap.Logger
  69. // Runtime state
  70. // gs is the currently valid guardian set
  71. gs *common.GuardianSet
  72. // state is the current runtime VAA view
  73. state *aggregationState
  74. // gk pk as eth address
  75. ourAddr ethcommon.Address
  76. // cleanup triggers periodic state cleanup
  77. cleanup *time.Ticker
  78. }
  79. func NewProcessor(
  80. ctx context.Context,
  81. lockC chan *common.MessagePublication,
  82. setC chan *common.GuardianSet,
  83. sendC chan []byte,
  84. obsvC chan *gossipv1.SignedObservation,
  85. vaaC chan *vaa.VAA,
  86. injectC chan *vaa.VAA,
  87. gk *ecdsa.PrivateKey,
  88. devnetMode bool,
  89. devnetNumGuardians uint,
  90. devnetEthRPC string,
  91. terraLCD string,
  92. terraChainID string,
  93. terraContract string) *Processor {
  94. return &Processor{
  95. lockC: lockC,
  96. setC: setC,
  97. sendC: sendC,
  98. obsvC: obsvC,
  99. vaaC: vaaC,
  100. injectC: injectC,
  101. gk: gk,
  102. devnetMode: devnetMode,
  103. devnetNumGuardians: devnetNumGuardians,
  104. devnetEthRPC: devnetEthRPC,
  105. terraLCD: terraLCD,
  106. terraChainID: terraChainID,
  107. terraContract: terraContract,
  108. logger: supervisor.Logger(ctx),
  109. state: &aggregationState{vaaMap{}},
  110. ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
  111. }
  112. }
  113. func (p *Processor) Run(ctx context.Context) error {
  114. p.cleanup = time.NewTicker(30 * time.Second)
  115. for {
  116. select {
  117. case <-ctx.Done():
  118. return ctx.Err()
  119. case p.gs = <-p.setC:
  120. p.logger.Info("guardian set updated",
  121. zap.Strings("set", p.gs.KeysAsHexStrings()),
  122. zap.Uint32("index", p.gs.Index))
  123. // Dev mode guardian set update check (no-op in production)
  124. err := p.checkDevModeGuardianSetUpdate(ctx)
  125. if err != nil {
  126. return err
  127. }
  128. case k := <-p.lockC:
  129. p.handleLockup(ctx, k)
  130. case v := <-p.injectC:
  131. p.handleInjection(ctx, v)
  132. case m := <-p.obsvC:
  133. p.handleObservation(ctx, m)
  134. case <-p.cleanup.C:
  135. p.handleCleanup(ctx)
  136. }
  137. }
  138. }
  139. func (p *Processor) checkDevModeGuardianSetUpdate(ctx context.Context) error {
  140. if p.devnetMode {
  141. if uint(len(p.gs.Keys)) != p.devnetNumGuardians {
  142. v := devnet.DevnetGuardianSetVSS(p.devnetNumGuardians)
  143. p.logger.Info(fmt.Sprintf("guardian set has %d members, expecting %d - submitting VAA",
  144. len(p.gs.Keys), p.devnetNumGuardians),
  145. zap.Any("v", v))
  146. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  147. defer cancel()
  148. trx, err := devnet.SubmitVAA(timeout, p.devnetEthRPC, v)
  149. if err != nil {
  150. // Either Ethereum is not yet up, or another node has already submitted - bail
  151. // and let another node handle it. We only check the guardian set on Ethereum,
  152. // so we use that to sequence devnet creation for Terra and Solana as well.
  153. return fmt.Errorf("failed to submit Eth devnet guardian set change: %v", err)
  154. }
  155. p.logger.Info("devnet guardian set change submitted to Ethereum", zap.Any("trx", trx), zap.Any("vaa", v))
  156. // Submit VAA to Solana as well. This is asynchronous and can fail, leading to inconsistent devnet state.
  157. p.vaaC <- v
  158. }
  159. }
  160. return nil
  161. }