observation.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package processor
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. bridge_common "github.com/certusone/wormhole/bridge/pkg/common"
  7. "github.com/prometheus/client_golang/prometheus"
  8. "time"
  9. "github.com/ethereum/go-ethereum/common"
  10. "github.com/ethereum/go-ethereum/crypto"
  11. "go.uber.org/zap"
  12. gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
  13. "github.com/certusone/wormhole/bridge/pkg/vaa"
  14. )
  15. var (
  16. observationsReceivedTotal = prometheus.NewCounter(
  17. prometheus.CounterOpts{
  18. Name: "wormhole_observations_received_total",
  19. Help: "Total number of raw VAA observations received from gossip",
  20. })
  21. observationsReceivedByGuardianAddressTotal = prometheus.NewCounterVec(
  22. prometheus.CounterOpts{
  23. Name: "wormhole_observations_signed_by_guardian_total",
  24. Help: "Total number of signed and verified VAA observations grouped by guardian address",
  25. }, []string{"addr"})
  26. observationsFailedTotal = prometheus.NewCounterVec(
  27. prometheus.CounterOpts{
  28. Name: "wormhole_observations_verification_failures_total",
  29. Help: "Total number of observations verification failure, grouped by failure reason",
  30. }, []string{"cause"})
  31. observationsUnknownLockupTotal = prometheus.NewCounter(
  32. prometheus.CounterOpts{
  33. Name: "wormhole_observations_unknown_lockup_total",
  34. Help: "Total number of verified VAA observations for a lockup we haven't seen yet",
  35. })
  36. observationsDirectSubmissionsTotal = prometheus.NewCounterVec(
  37. prometheus.CounterOpts{
  38. Name: "wormhole_observations_direct_submissions_queued_total",
  39. Help: "Total number of observations for a specific target chain that were queued for direct submission",
  40. }, []string{"target_chain"})
  41. observationsDirectSubmissionSuccessTotal = prometheus.NewCounterVec(
  42. prometheus.CounterOpts{
  43. Name: "wormhole_observations_direct_submission_success_total",
  44. Help: "Total number of observations for a specific target chain that succeeded",
  45. }, []string{"target_chain"})
  46. )
  47. func init() {
  48. prometheus.MustRegister(observationsReceivedTotal)
  49. prometheus.MustRegister(observationsReceivedByGuardianAddressTotal)
  50. prometheus.MustRegister(observationsFailedTotal)
  51. prometheus.MustRegister(observationsUnknownLockupTotal)
  52. prometheus.MustRegister(observationsDirectSubmissionsTotal)
  53. prometheus.MustRegister(observationsDirectSubmissionSuccessTotal)
  54. }
  55. // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum,
  56. // and assembles and submits a valid VAA if possible.
  57. func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObservation) {
  58. // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!)
  59. //
  60. // Note that observations are never tied to the (verified) p2p identity key - the p2p network
  61. // identity is completely decoupled from the guardian identity, p2p is just transport.
  62. hash := hex.EncodeToString(m.Hash)
  63. p.logger.Info("received observation",
  64. zap.String("digest", hash),
  65. zap.String("signature", hex.EncodeToString(m.Signature)),
  66. zap.String("addr", hex.EncodeToString(m.Addr)))
  67. observationsReceivedTotal.Inc()
  68. // Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers
  69. // the public key that was used to sign the payload.
  70. pk, err := crypto.Ecrecover(m.Hash, m.Signature)
  71. if err != nil {
  72. p.logger.Warn("failed to verify signature on observation",
  73. zap.String("digest", hash),
  74. zap.String("signature", hex.EncodeToString(m.Signature)),
  75. zap.String("addr", hex.EncodeToString(m.Addr)),
  76. zap.Error(err))
  77. observationsFailedTotal.WithLabelValues("invalid_signature").Inc()
  78. return
  79. }
  80. // Verify that m.Addr matches the public key that signed m.Hash.
  81. their_addr := common.BytesToAddress(m.Addr)
  82. signer_pk := common.BytesToAddress(crypto.Keccak256(pk[1:])[12:])
  83. if their_addr != signer_pk {
  84. p.logger.Info("invalid observation - address does not match pubkey",
  85. zap.String("digest", hash),
  86. zap.String("signature", hex.EncodeToString(m.Signature)),
  87. zap.String("addr", hex.EncodeToString(m.Addr)),
  88. zap.String("pk", signer_pk.Hex()))
  89. observationsFailedTotal.WithLabelValues("pubkey_mismatch").Inc()
  90. return
  91. }
  92. // Determine which guardian set to use. The following cases are possible:
  93. //
  94. // - We have already seen the lockup and generated ourVAA. In this case, use the guardian set valid at the time,
  95. // even if the guardian set was updated. Old guardian sets remain valid for longer than aggregation state,
  96. // and the guardians in the old set stay online and observe and sign lockup for the transition period.
  97. //
  98. // - We have not yet seen the lockup. In this case, we assume the latest guardian set because that's what
  99. // we will store once we do see the lockup.
  100. //
  101. // This ensures that during a guardian set update, a node which observed a given lockup with either the old
  102. // or the new guardian set can achieve consensus, since both the old and the new set would achieve consensus,
  103. // assuming that 2/3+ of the old and the new guardian set have seen the lockup and will periodically attempt
  104. // to retransmit their observations such that nodes who initially dropped the signature will get a 2nd chance.
  105. //
  106. // During an update, vaaState.signatures can contain signatures from *both* guardian sets.
  107. //
  108. var gs *bridge_common.GuardianSet
  109. if p.state.vaaSignatures[hash] != nil && p.state.vaaSignatures[hash].gs != nil {
  110. gs = p.state.vaaSignatures[hash].gs
  111. } else {
  112. gs = p.gs
  113. }
  114. // We haven't yet observed the trusted guardian set on Ethereum, and therefore, it's impossible to verify it.
  115. // May as well not have received it/been offline - drop it and wait for the guardian set.
  116. if gs == nil {
  117. p.logger.Warn("dropping observations since we haven't initialized our guardian set yet",
  118. zap.String("digest", their_addr.Hex()),
  119. zap.String("their_addr", their_addr.Hex()),
  120. )
  121. observationsFailedTotal.WithLabelValues("uninitialized_guardian_set").Inc()
  122. return
  123. }
  124. // Verify that m.Addr is included in the guardian set. If it's not, drop the message. In case it's us
  125. // who have the outdated guardian set, we'll just wait for the message to be retransmitted eventually.
  126. _, ok := gs.KeyIndex(their_addr)
  127. if !ok {
  128. p.logger.Warn("received observation by unknown guardian - is our guardian set outdated?",
  129. zap.String("digest", their_addr.Hex()),
  130. zap.String("their_addr", their_addr.Hex()),
  131. zap.Uint32("index", gs.Index),
  132. zap.Any("keys", gs.KeysAsHexStrings()),
  133. )
  134. observationsFailedTotal.WithLabelValues("unknown_guardian").Inc()
  135. return
  136. }
  137. // Hooray! Now, we have verified all fields on SignedObservation and know that it includes
  138. // a valid signature by an active guardian. We still don't fully trust them, as they may be
  139. // byzantine, but now we know who we're dealing with.
  140. // We can now count events by guardian without worry about cardinality explosions:
  141. observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc()
  142. // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging.
  143. if p.state.vaaSignatures[hash] == nil {
  144. // We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like.
  145. // However, we have established that a valid guardian has signed it, and therefore we can
  146. // already start aggregating signatures for it.
  147. //
  148. // A malicious guardian can potentially DoS this by creating fake lockups at a faster rate than they decay,
  149. // leading to a slow out-of-memory crash. We do not attempt to automatically mitigate spam attacks with valid
  150. // signatures - such byzantine behavior would be plainly visible and would be dealt with by kicking them.
  151. observationsUnknownLockupTotal.Inc()
  152. p.state.vaaSignatures[hash] = &vaaState{
  153. firstObserved: time.Now(),
  154. signatures: map[common.Address][]byte{},
  155. source: "unknown",
  156. }
  157. }
  158. p.state.vaaSignatures[hash].signatures[their_addr] = m.Signature
  159. // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
  160. agg := make([]bool, len(gs.Keys))
  161. var sigs []*vaa.Signature
  162. for i, a := range gs.Keys {
  163. s, ok := p.state.vaaSignatures[hash].signatures[a]
  164. if ok {
  165. var bs [65]byte
  166. if n := copy(bs[:], s); n != 65 {
  167. panic(fmt.Sprintf("invalid sig len: %d", n))
  168. }
  169. sigs = append(sigs, &vaa.Signature{
  170. Index: uint8(i),
  171. Signature: bs,
  172. })
  173. }
  174. agg[i] = ok
  175. }
  176. if p.state.vaaSignatures[hash].ourVAA != nil {
  177. // We have seen it on chain!
  178. // Deep copy the VAA and add signatures
  179. v := p.state.vaaSignatures[hash].ourVAA
  180. signed := &vaa.VAA{
  181. Version: v.Version,
  182. GuardianSetIndex: v.GuardianSetIndex,
  183. Signatures: sigs,
  184. Timestamp: v.Timestamp,
  185. Payload: v.Payload,
  186. }
  187. // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA.
  188. quorum := CalculateQuorum(len(gs.Keys))
  189. p.logger.Info("aggregation state for VAA",
  190. zap.String("digest", hash),
  191. zap.Any("set", gs.KeysAsHexStrings()),
  192. zap.Uint32("index", gs.Index),
  193. zap.Bools("aggregation", agg),
  194. zap.Int("required_sigs", quorum),
  195. zap.Int("have_sigs", len(sigs)),
  196. )
  197. if len(sigs) >= quorum && !p.state.vaaSignatures[hash].submitted {
  198. vaaBytes, err := signed.Marshal()
  199. if err != nil {
  200. panic(err)
  201. }
  202. // Submit every VAA to Solana for data availability.
  203. p.logger.Info("submitting signed VAA to Solana",
  204. zap.String("digest", hash),
  205. zap.Any("vaa", signed),
  206. zap.String("bytes", hex.EncodeToString(vaaBytes)))
  207. p.vaaC <- signed
  208. p.state.vaaSignatures[hash].submitted = true
  209. } else {
  210. p.logger.Info("quorum not met or already submitted, doing nothing",
  211. zap.String("digest", hash))
  212. }
  213. } else {
  214. p.logger.Info("we have not yet seen this VAA - temporarily storing signature",
  215. zap.String("digest", hash),
  216. zap.Bools("aggregation", agg))
  217. }
  218. }