processor.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package processor
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "fmt"
  6. "time"
  7. ethcommon "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/crypto"
  9. "go.uber.org/zap"
  10. "github.com/certusone/wormhole/node/pkg/common"
  11. "github.com/certusone/wormhole/node/pkg/devnet"
  12. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  13. "github.com/certusone/wormhole/node/pkg/supervisor"
  14. "github.com/certusone/wormhole/node/pkg/terra"
  15. "github.com/certusone/wormhole/node/pkg/vaa"
  16. )
  17. type (
  18. // vaaState represents the local view of a given VAA
  19. vaaState struct {
  20. // First time this digest was seen (possibly even before we saw its lockup).
  21. firstObserved time.Time
  22. // Copy of the VAA we constructed when we saw the lockup.
  23. ourVAA *vaa.VAA
  24. // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
  25. // to either the old or new guardian set.
  26. signatures map[ethcommon.Address][]byte
  27. // Flag set after reaching quorum and submitting the VAA.
  28. submitted bool
  29. // Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
  30. settled bool
  31. // Human-readable description of the VAA's source, used for metrics.
  32. source string
  33. // Number of times the cleanup service has attempted to retransmit this VAA.
  34. retryCount uint
  35. // Copy of the bytes we submitted (ourVAA, but signed and serialized). Used for retransmissions.
  36. ourMsg []byte
  37. // Copy of the guardian set valid at lockup/injection time.
  38. gs *common.GuardianSet
  39. }
  40. vaaMap map[string]*vaaState
  41. // aggregationState represents the node's aggregation of guardian signatures.
  42. aggregationState struct {
  43. vaaSignatures vaaMap
  44. }
  45. )
  46. type Processor struct {
  47. // lockC is a channel of observed chain lockups
  48. lockC chan *common.ChainLock
  49. // setC is a channel of guardian set updates
  50. setC chan *common.GuardianSet
  51. // sendC is a channel of outbound messages to broadcast on p2p
  52. sendC chan []byte
  53. // obsvC is a channel of inbound decoded observations from p2p
  54. obsvC chan *gossipv1.SignedObservation
  55. // vaaC is a channel of VAAs to submit to store on Solana (either as target, or for data availability)
  56. vaaC chan *vaa.VAA
  57. // injectC is a channel of VAAs injected locally.
  58. injectC chan *vaa.VAA
  59. // gk is the node's guardian private key
  60. gk *ecdsa.PrivateKey
  61. // devnetMode specified whether to submit transactions to the hardcoded Ethereum devnet
  62. devnetMode bool
  63. devnetNumGuardians uint
  64. devnetEthRPC string
  65. terraEnabled bool
  66. terraLCD string
  67. terraChainID string
  68. terraContract string
  69. terraFeePayer string
  70. logger *zap.Logger
  71. // Runtime state
  72. // gs is the currently valid guardian set
  73. gs *common.GuardianSet
  74. // state is the current runtime VAA view
  75. state *aggregationState
  76. // gk pk as eth address
  77. ourAddr ethcommon.Address
  78. // cleanup triggers periodic state cleanup
  79. cleanup *time.Ticker
  80. }
  81. func NewProcessor(
  82. ctx context.Context,
  83. lockC chan *common.ChainLock,
  84. setC chan *common.GuardianSet,
  85. sendC chan []byte,
  86. obsvC chan *gossipv1.SignedObservation,
  87. vaaC chan *vaa.VAA,
  88. injectC chan *vaa.VAA,
  89. gk *ecdsa.PrivateKey,
  90. devnetMode bool,
  91. devnetNumGuardians uint,
  92. devnetEthRPC string,
  93. terraEnabled bool,
  94. terraLCD string,
  95. terraChainID string,
  96. terraContract string,
  97. terraFeePayer string) *Processor {
  98. return &Processor{
  99. lockC: lockC,
  100. setC: setC,
  101. sendC: sendC,
  102. obsvC: obsvC,
  103. vaaC: vaaC,
  104. injectC: injectC,
  105. gk: gk,
  106. devnetMode: devnetMode,
  107. devnetNumGuardians: devnetNumGuardians,
  108. devnetEthRPC: devnetEthRPC,
  109. terraEnabled: terraEnabled,
  110. terraLCD: terraLCD,
  111. terraChainID: terraChainID,
  112. terraContract: terraContract,
  113. terraFeePayer: terraFeePayer,
  114. logger: supervisor.Logger(ctx),
  115. state: &aggregationState{vaaMap{}},
  116. ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
  117. }
  118. }
  119. func (p *Processor) Run(ctx context.Context) error {
  120. p.cleanup = time.NewTicker(30 * time.Second)
  121. for {
  122. select {
  123. case <-ctx.Done():
  124. return ctx.Err()
  125. case p.gs = <-p.setC:
  126. p.logger.Info("guardian set updated",
  127. zap.Strings("set", p.gs.KeysAsHexStrings()),
  128. zap.Uint32("index", p.gs.Index))
  129. // Dev mode guardian set update check (no-op in production)
  130. err := p.checkDevModeGuardianSetUpdate(ctx)
  131. if err != nil {
  132. return err
  133. }
  134. case k := <-p.lockC:
  135. p.handleLockup(ctx, k)
  136. case v := <-p.injectC:
  137. p.handleInjection(ctx, v)
  138. case m := <-p.obsvC:
  139. p.handleObservation(ctx, m)
  140. case <-p.cleanup.C:
  141. p.handleCleanup(ctx)
  142. }
  143. }
  144. }
  145. func (p *Processor) checkDevModeGuardianSetUpdate(ctx context.Context) error {
  146. if p.devnetMode {
  147. if uint(len(p.gs.Keys)) != p.devnetNumGuardians {
  148. v := devnet.DevnetGuardianSetVSS(p.devnetNumGuardians)
  149. p.logger.Info(fmt.Sprintf("guardian set has %d members, expecting %d - submitting VAA",
  150. len(p.gs.Keys), p.devnetNumGuardians),
  151. zap.Any("v", v))
  152. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  153. defer cancel()
  154. trx, err := devnet.SubmitVAA(timeout, p.devnetEthRPC, v)
  155. if err != nil {
  156. // Either Ethereum is not yet up, or another node has already submitted - bail
  157. // and let another node handle it. We only check the guardian set on Ethereum,
  158. // so we use that to sequence devnet creation for Terra and Solana as well.
  159. return fmt.Errorf("failed to submit Eth devnet guardian set change: %v", err)
  160. }
  161. p.logger.Info("devnet guardian set change submitted to Ethereum", zap.Any("trx", trx), zap.Any("vaa", v))
  162. if p.terraEnabled {
  163. // Submit to Terra
  164. go func() {
  165. for {
  166. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  167. trxResponse, err := terra.SubmitVAA(timeout, p.terraLCD, p.terraChainID, p.terraContract, p.terraFeePayer, v)
  168. if err != nil {
  169. cancel()
  170. p.logger.Error("failed to submit Terra devnet guardian set change, retrying", zap.Error(err))
  171. time.Sleep(1 * time.Second)
  172. continue
  173. }
  174. cancel()
  175. p.logger.Info("devnet guardian set change submitted to Terra", zap.Any("trxResponse", trxResponse), zap.Any("vaa", v))
  176. break
  177. }
  178. }()
  179. }
  180. // Submit VAA to Solana as well. This is asynchronous and can fail, leading to inconsistent devnet state.
  181. p.vaaC <- v
  182. }
  183. }
  184. return nil
  185. }