processor.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package processor
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "time"
  6. "github.com/certusone/wormhole/node/pkg/db"
  7. ethcommon "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/crypto"
  9. "go.uber.org/zap"
  10. "github.com/certusone/wormhole/node/pkg/common"
  11. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  12. "github.com/certusone/wormhole/node/pkg/reporter"
  13. "github.com/certusone/wormhole/node/pkg/supervisor"
  14. "github.com/certusone/wormhole/node/pkg/vaa"
  15. )
  16. type (
  17. // vaaState represents the local view of a given VAA
  18. vaaState struct {
  19. // First time this digest was seen (possibly even before we observed it ourselves).
  20. firstObserved time.Time
  21. // Copy of the VAA we constructed when we made our own observation.
  22. ourVAA *vaa.VAA
  23. // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
  24. // to either the old or new guardian set.
  25. signatures map[ethcommon.Address][]byte
  26. // Flag set after reaching quorum and submitting the VAA.
  27. submitted bool
  28. // Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
  29. settled bool
  30. // Human-readable description of the VAA's source, used for metrics.
  31. source string
  32. // Number of times the cleanup service has attempted to retransmit this VAA.
  33. retryCount uint
  34. // Copy of the bytes we submitted (ourVAA, but signed and serialized). Used for retransmissions.
  35. ourMsg []byte
  36. // Copy of the guardian set valid at observation/injection time.
  37. gs *common.GuardianSet
  38. }
  39. vaaMap map[string]*vaaState
  40. // aggregationState represents the node's aggregation of guardian signatures.
  41. aggregationState struct {
  42. vaaSignatures vaaMap
  43. }
  44. )
  45. type Processor struct {
  46. // lockC is a channel of observed emitted messages
  47. lockC chan *common.MessagePublication
  48. // setC is a channel of guardian set updates
  49. setC chan *common.GuardianSet
  50. // sendC is a channel of outbound messages to broadcast on p2p
  51. sendC chan []byte
  52. // obsvC is a channel of inbound decoded observations from p2p
  53. obsvC chan *gossipv1.SignedObservation
  54. // injectC is a channel of VAAs injected locally.
  55. injectC chan *vaa.VAA
  56. // gk is the node's guardian private key
  57. gk *ecdsa.PrivateKey
  58. // devnetMode specified whether to submit transactions to the hardcoded Ethereum devnet
  59. devnetMode bool
  60. devnetNumGuardians uint
  61. devnetEthRPC string
  62. terraLCD string
  63. terraChainID string
  64. terraContract string
  65. attestationEvents *reporter.AttestationEventReporter
  66. logger *zap.Logger
  67. db *db.Database
  68. // Runtime state
  69. // gs is the currently valid guardian set
  70. gs *common.GuardianSet
  71. // gst is managed by the processor and allows concurrent access to the
  72. // guardian set by other components.
  73. gst *common.GuardianSetState
  74. // state is the current runtime VAA view
  75. state *aggregationState
  76. // gk pk as eth address
  77. ourAddr ethcommon.Address
  78. // cleanup triggers periodic state cleanup
  79. cleanup *time.Ticker
  80. }
  81. func NewProcessor(
  82. ctx context.Context,
  83. db *db.Database,
  84. lockC chan *common.MessagePublication,
  85. setC chan *common.GuardianSet,
  86. sendC chan []byte,
  87. obsvC chan *gossipv1.SignedObservation,
  88. injectC chan *vaa.VAA,
  89. gk *ecdsa.PrivateKey,
  90. gst *common.GuardianSetState,
  91. devnetMode bool,
  92. devnetNumGuardians uint,
  93. devnetEthRPC string,
  94. terraLCD string,
  95. terraChainID string,
  96. terraContract string,
  97. attestationEvents *reporter.AttestationEventReporter) *Processor {
  98. return &Processor{
  99. lockC: lockC,
  100. setC: setC,
  101. sendC: sendC,
  102. obsvC: obsvC,
  103. injectC: injectC,
  104. gk: gk,
  105. gst: gst,
  106. devnetMode: devnetMode,
  107. devnetNumGuardians: devnetNumGuardians,
  108. devnetEthRPC: devnetEthRPC,
  109. db: db,
  110. terraLCD: terraLCD,
  111. terraChainID: terraChainID,
  112. terraContract: terraContract,
  113. attestationEvents: attestationEvents,
  114. logger: supervisor.Logger(ctx),
  115. state: &aggregationState{vaaMap{}},
  116. ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
  117. }
  118. }
  119. func (p *Processor) Run(ctx context.Context) error {
  120. p.cleanup = time.NewTicker(30 * time.Second)
  121. for {
  122. select {
  123. case <-ctx.Done():
  124. return ctx.Err()
  125. case p.gs = <-p.setC:
  126. p.logger.Info("guardian set updated",
  127. zap.Strings("set", p.gs.KeysAsHexStrings()),
  128. zap.Uint32("index", p.gs.Index))
  129. p.gst.Set(p.gs)
  130. case k := <-p.lockC:
  131. p.handleMessage(ctx, k)
  132. case v := <-p.injectC:
  133. p.handleInjection(ctx, v)
  134. case m := <-p.obsvC:
  135. p.handleObservation(ctx, m)
  136. case <-p.cleanup.C:
  137. p.handleCleanup(ctx)
  138. }
  139. }
  140. }