processor.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package processor
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "github.com/certusone/wormhole/node/pkg/notify/discord"
  6. "time"
  7. "github.com/certusone/wormhole/node/pkg/db"
  8. ethcommon "github.com/ethereum/go-ethereum/common"
  9. "github.com/ethereum/go-ethereum/crypto"
  10. "go.uber.org/zap"
  11. "github.com/certusone/wormhole/node/pkg/common"
  12. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  13. "github.com/certusone/wormhole/node/pkg/reporter"
  14. "github.com/certusone/wormhole/node/pkg/supervisor"
  15. "github.com/certusone/wormhole/node/pkg/vaa"
  16. )
  17. type (
  18. // vaaState represents the local view of a given VAA
  19. vaaState struct {
  20. // First time this digest was seen (possibly even before we observed it ourselves).
  21. firstObserved time.Time
  22. // Copy of the VAA we constructed when we made our own observation.
  23. ourVAA *vaa.VAA
  24. // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
  25. // to either the old or new guardian set.
  26. signatures map[ethcommon.Address][]byte
  27. // Flag set after reaching quorum and submitting the VAA.
  28. submitted bool
  29. // Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
  30. settled bool
  31. // Human-readable description of the VAA's source, used for metrics.
  32. source string
  33. // Number of times the cleanup service has attempted to retransmit this VAA.
  34. retryCount uint
  35. // Copy of the bytes we submitted (ourVAA, but signed and serialized). Used for retransmissions.
  36. ourMsg []byte
  37. // Copy of the guardian set valid at observation/injection time.
  38. gs *common.GuardianSet
  39. }
  40. vaaMap map[string]*vaaState
  41. // aggregationState represents the node's aggregation of guardian signatures.
  42. aggregationState struct {
  43. vaaSignatures vaaMap
  44. }
  45. )
  46. type Processor struct {
  47. // lockC is a channel of observed emitted messages
  48. lockC chan *common.MessagePublication
  49. // setC is a channel of guardian set updates
  50. setC chan *common.GuardianSet
  51. // sendC is a channel of outbound messages to broadcast on p2p
  52. sendC chan []byte
  53. // obsvC is a channel of inbound decoded observations from p2p
  54. obsvC chan *gossipv1.SignedObservation
  55. // signedInC is a channel of inbound signed VAA observations from p2p
  56. signedInC chan *gossipv1.SignedVAAWithQuorum
  57. // injectC is a channel of VAAs injected locally.
  58. injectC chan *vaa.VAA
  59. // gk is the node's guardian private key
  60. gk *ecdsa.PrivateKey
  61. // devnetMode specified whether to submit transactions to the hardcoded Ethereum devnet
  62. devnetMode bool
  63. devnetNumGuardians uint
  64. devnetEthRPC string
  65. terraLCD string
  66. terraContract string
  67. attestationEvents *reporter.AttestationEventReporter
  68. logger *zap.Logger
  69. db *db.Database
  70. // Runtime state
  71. // gs is the currently valid guardian set
  72. gs *common.GuardianSet
  73. // gst is managed by the processor and allows concurrent access to the
  74. // guardian set by other components.
  75. gst *common.GuardianSetState
  76. // state is the current runtime VAA view
  77. state *aggregationState
  78. // gk pk as eth address
  79. ourAddr ethcommon.Address
  80. // cleanup triggers periodic state cleanup
  81. cleanup *time.Ticker
  82. notifier *discord.DiscordNotifier
  83. }
  84. func NewProcessor(
  85. ctx context.Context,
  86. db *db.Database,
  87. lockC chan *common.MessagePublication,
  88. setC chan *common.GuardianSet,
  89. sendC chan []byte,
  90. obsvC chan *gossipv1.SignedObservation,
  91. injectC chan *vaa.VAA,
  92. signedInC chan *gossipv1.SignedVAAWithQuorum,
  93. gk *ecdsa.PrivateKey,
  94. gst *common.GuardianSetState,
  95. devnetMode bool,
  96. devnetNumGuardians uint,
  97. devnetEthRPC string,
  98. terraLCD string,
  99. terraContract string,
  100. attestationEvents *reporter.AttestationEventReporter,
  101. notifier *discord.DiscordNotifier,
  102. ) *Processor {
  103. return &Processor{
  104. lockC: lockC,
  105. setC: setC,
  106. sendC: sendC,
  107. obsvC: obsvC,
  108. signedInC: signedInC,
  109. injectC: injectC,
  110. gk: gk,
  111. gst: gst,
  112. devnetMode: devnetMode,
  113. devnetNumGuardians: devnetNumGuardians,
  114. devnetEthRPC: devnetEthRPC,
  115. db: db,
  116. terraLCD: terraLCD,
  117. terraContract: terraContract,
  118. attestationEvents: attestationEvents,
  119. notifier: notifier,
  120. logger: supervisor.Logger(ctx),
  121. state: &aggregationState{vaaMap{}},
  122. ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
  123. }
  124. }
  125. func (p *Processor) Run(ctx context.Context) error {
  126. p.cleanup = time.NewTicker(30 * time.Second)
  127. for {
  128. select {
  129. case <-ctx.Done():
  130. return ctx.Err()
  131. case p.gs = <-p.setC:
  132. p.logger.Info("guardian set updated",
  133. zap.Strings("set", p.gs.KeysAsHexStrings()),
  134. zap.Uint32("index", p.gs.Index))
  135. p.gst.Set(p.gs)
  136. case k := <-p.lockC:
  137. p.handleMessage(ctx, k)
  138. case v := <-p.injectC:
  139. p.handleInjection(ctx, v)
  140. case m := <-p.obsvC:
  141. p.handleObservation(ctx, m)
  142. case m := <-p.signedInC:
  143. p.handleInboundSignedVAAWithQuorum(ctx, m)
  144. case <-p.cleanup.C:
  145. p.handleCleanup(ctx)
  146. }
  147. }
  148. }