observation.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package processor
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. node_common "github.com/certusone/wormhole/node/pkg/common"
  7. "github.com/certusone/wormhole/node/pkg/db"
  8. "github.com/mr-tron/base58"
  9. "github.com/prometheus/client_golang/prometheus"
  10. "github.com/prometheus/client_golang/prometheus/promauto"
  11. "time"
  12. "github.com/ethereum/go-ethereum/common"
  13. "github.com/ethereum/go-ethereum/crypto"
  14. "go.uber.org/zap"
  15. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  16. "github.com/certusone/wormhole/node/pkg/vaa"
  17. )
  18. var (
  19. observationsReceivedTotal = promauto.NewCounter(
  20. prometheus.CounterOpts{
  21. Name: "wormhole_observations_received_total",
  22. Help: "Total number of raw VAA observations received from gossip",
  23. })
  24. observationsReceivedByGuardianAddressTotal = promauto.NewCounterVec(
  25. prometheus.CounterOpts{
  26. Name: "wormhole_observations_signed_by_guardian_total",
  27. Help: "Total number of signed and verified VAA observations grouped by guardian address",
  28. }, []string{"addr"})
  29. observationsFailedTotal = promauto.NewCounterVec(
  30. prometheus.CounterOpts{
  31. Name: "wormhole_observations_verification_failures_total",
  32. Help: "Total number of observations verification failure, grouped by failure reason",
  33. }, []string{"cause"})
  34. observationsUnknownTotal = promauto.NewCounter(
  35. prometheus.CounterOpts{
  36. Name: "wormhole_observations_unknown_total",
  37. Help: "Total number of verified observations we haven't seen ourselves",
  38. })
  39. )
  40. // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum,
  41. // and assembles and submits a valid VAA if possible.
  42. func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObservation) {
  43. // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!)
  44. //
  45. // Note that observations are never tied to the (verified) p2p identity key - the p2p network
  46. // identity is completely decoupled from the guardian identity, p2p is just transport.
  47. hash := hex.EncodeToString(m.Hash)
  48. p.logger.Info("received observation",
  49. zap.String("digest", hash),
  50. zap.String("signature", hex.EncodeToString(m.Signature)),
  51. zap.String("addr", hex.EncodeToString(m.Addr)),
  52. zap.String("txhash", hex.EncodeToString(m.TxHash)),
  53. zap.String("txhash_b58", base58.Encode(m.TxHash)),
  54. zap.String("message_id", m.MessageId),
  55. )
  56. observationsReceivedTotal.Inc()
  57. // Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers
  58. // the public key that was used to sign the payload.
  59. pk, err := crypto.Ecrecover(m.Hash, m.Signature)
  60. if err != nil {
  61. p.logger.Warn("failed to verify signature on observation",
  62. zap.String("digest", hash),
  63. zap.String("signature", hex.EncodeToString(m.Signature)),
  64. zap.String("addr", hex.EncodeToString(m.Addr)),
  65. zap.Error(err))
  66. observationsFailedTotal.WithLabelValues("invalid_signature").Inc()
  67. return
  68. }
  69. // Verify that m.Addr matches the public key that signed m.Hash.
  70. their_addr := common.BytesToAddress(m.Addr)
  71. signer_pk := common.BytesToAddress(crypto.Keccak256(pk[1:])[12:])
  72. if their_addr != signer_pk {
  73. p.logger.Info("invalid observation - address does not match pubkey",
  74. zap.String("digest", hash),
  75. zap.String("signature", hex.EncodeToString(m.Signature)),
  76. zap.String("addr", hex.EncodeToString(m.Addr)),
  77. zap.String("pk", signer_pk.Hex()))
  78. observationsFailedTotal.WithLabelValues("pubkey_mismatch").Inc()
  79. return
  80. }
  81. // Determine which guardian set to use. The following cases are possible:
  82. //
  83. // - We have already seen the message and generated ourVAA. In this case, use the guardian set valid at the time,
  84. // even if the guardian set was updated. Old guardian sets remain valid for longer than aggregation state,
  85. // and the guardians in the old set stay online and observe and sign messages for the transition period.
  86. //
  87. // - We have not yet seen the message. In this case, we assume the latest guardian set because that's what
  88. // we will store once we do see the message.
  89. //
  90. // This ensures that during a guardian set update, a node which observed a given message with either the old
  91. // or the new guardian set can achieve consensus, since both the old and the new set would achieve consensus,
  92. // assuming that 2/3+ of the old and the new guardian set have seen the message and will periodically attempt
  93. // to retransmit their observations such that nodes who initially dropped the signature will get a 2nd chance.
  94. //
  95. // During an update, vaaState.signatures can contain signatures from *both* guardian sets.
  96. //
  97. var gs *node_common.GuardianSet
  98. if p.state.vaaSignatures[hash] != nil && p.state.vaaSignatures[hash].gs != nil {
  99. gs = p.state.vaaSignatures[hash].gs
  100. } else {
  101. gs = p.gs
  102. }
  103. // We haven't yet observed the trusted guardian set on Ethereum, and therefore, it's impossible to verify it.
  104. // May as well not have received it/been offline - drop it and wait for the guardian set.
  105. if gs == nil {
  106. p.logger.Warn("dropping observations since we haven't initialized our guardian set yet",
  107. zap.String("digest", hash),
  108. zap.String("their_addr", their_addr.Hex()),
  109. )
  110. observationsFailedTotal.WithLabelValues("uninitialized_guardian_set").Inc()
  111. return
  112. }
  113. // Verify that m.Addr is included in the guardian set. If it's not, drop the message. In case it's us
  114. // who have the outdated guardian set, we'll just wait for the message to be retransmitted eventually.
  115. _, ok := gs.KeyIndex(their_addr)
  116. if !ok {
  117. p.logger.Debug("received observation by unknown guardian - is our guardian set outdated?",
  118. zap.String("digest", hash),
  119. zap.String("their_addr", their_addr.Hex()),
  120. zap.Uint32("index", gs.Index),
  121. zap.Any("keys", gs.KeysAsHexStrings()),
  122. )
  123. observationsFailedTotal.WithLabelValues("unknown_guardian").Inc()
  124. return
  125. }
  126. // Hooray! Now, we have verified all fields on SignedObservation and know that it includes
  127. // a valid signature by an active guardian. We still don't fully trust them, as they may be
  128. // byzantine, but now we know who we're dealing with.
  129. // We can now count events by guardian without worry about cardinality explosions:
  130. observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc()
  131. // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging.
  132. if p.state.vaaSignatures[hash] == nil {
  133. // We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like.
  134. // However, we have established that a valid guardian has signed it, and therefore we can
  135. // already start aggregating signatures for it.
  136. //
  137. // A malicious guardian can potentially DoS this by creating fake observations at a faster rate than they decay,
  138. // leading to a slow out-of-memory crash. We do not attempt to automatically mitigate spam attacks with valid
  139. // signatures - such byzantine behavior would be plainly visible and would be dealt with by kicking them.
  140. observationsUnknownTotal.Inc()
  141. p.state.vaaSignatures[hash] = &vaaState{
  142. firstObserved: time.Now(),
  143. signatures: map[common.Address][]byte{},
  144. source: "unknown",
  145. }
  146. }
  147. p.state.vaaSignatures[hash].signatures[their_addr] = m.Signature
  148. // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
  149. agg := make([]bool, len(gs.Keys))
  150. var sigs []*vaa.Signature
  151. for i, a := range gs.Keys {
  152. s, ok := p.state.vaaSignatures[hash].signatures[a]
  153. if ok {
  154. var bs [65]byte
  155. if n := copy(bs[:], s); n != 65 {
  156. panic(fmt.Sprintf("invalid sig len: %d", n))
  157. }
  158. sigs = append(sigs, &vaa.Signature{
  159. Index: uint8(i),
  160. Signature: bs,
  161. })
  162. }
  163. agg[i] = ok
  164. }
  165. if p.state.vaaSignatures[hash].ourVAA != nil {
  166. // We have seen it on chain!
  167. // Deep copy the VAA and add signatures
  168. v := p.state.vaaSignatures[hash].ourVAA
  169. signed := &vaa.VAA{
  170. Version: v.Version,
  171. GuardianSetIndex: v.GuardianSetIndex,
  172. Signatures: sigs,
  173. Timestamp: v.Timestamp,
  174. Nonce: v.Nonce,
  175. Sequence: v.Sequence,
  176. EmitterChain: v.EmitterChain,
  177. EmitterAddress: v.EmitterAddress,
  178. Payload: v.Payload,
  179. ConsistencyLevel: v.ConsistencyLevel,
  180. }
  181. // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA.
  182. quorum := CalculateQuorum(len(gs.Keys))
  183. p.logger.Info("aggregation state for VAA",
  184. zap.String("digest", hash),
  185. zap.Any("set", gs.KeysAsHexStrings()),
  186. zap.Uint32("index", gs.Index),
  187. zap.Bools("aggregation", agg),
  188. zap.Int("required_sigs", quorum),
  189. zap.Int("have_sigs", len(sigs)),
  190. zap.Bool("quorum", len(sigs) >= quorum),
  191. )
  192. if len(sigs) >= quorum && !p.state.vaaSignatures[hash].submitted {
  193. vaaBytes, err := signed.Marshal()
  194. if err != nil {
  195. panic(err)
  196. }
  197. // Store signed VAA in database.
  198. p.logger.Info("signed VAA with quorum",
  199. zap.String("digest", hash),
  200. zap.Any("vaa", signed),
  201. zap.String("bytes", hex.EncodeToString(vaaBytes)),
  202. zap.String("message_id", signed.MessageID()))
  203. if err := p.db.StoreSignedVAA(signed); err != nil {
  204. p.logger.Error("failed to store signed VAA", zap.Error(err))
  205. }
  206. p.broadcastSignedVAA(signed)
  207. p.attestationEvents.ReportVAAQuorum(signed)
  208. p.state.vaaSignatures[hash].submitted = true
  209. } else {
  210. p.logger.Info("quorum not met or already submitted, doing nothing",
  211. zap.String("digest", hash))
  212. }
  213. } else {
  214. p.logger.Info("we have not yet seen this VAA - temporarily storing signature",
  215. zap.String("digest", hash),
  216. zap.Bools("aggregation", agg))
  217. }
  218. }
  219. func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) {
  220. v, err := vaa.Unmarshal(m.Vaa)
  221. if err != nil {
  222. p.logger.Warn("received invalid VAA in SignedVAAWithQuorum message",
  223. zap.Error(err), zap.Any("message", m))
  224. return
  225. }
  226. // Calculate digest for logging
  227. digest := v.SigningMsg()
  228. if err != nil {
  229. panic(err)
  230. }
  231. hash := hex.EncodeToString(digest.Bytes())
  232. if p.gs == nil {
  233. p.logger.Warn("dropping SignedVAAWithQuorum message since we haven't initialized our guardian set yet",
  234. zap.String("digest", hash),
  235. zap.Any("message", m),
  236. )
  237. return
  238. }
  239. // Verify VAA signature to prevent a DoS attack on our local store.
  240. if !v.VerifySignatures(p.gs.Keys) {
  241. p.logger.Warn("received SignedVAAWithQuorum message with invalid VAA signatures",
  242. zap.String("digest", hash),
  243. zap.Any("message", m),
  244. zap.Any("vaa", v),
  245. )
  246. return
  247. }
  248. quorum := CalculateQuorum(len(p.gs.Keys))
  249. if len(v.Signatures) < quorum {
  250. p.logger.Warn("received SignedVAAWithQuorum message without quorum",
  251. zap.String("digest", hash),
  252. zap.Any("message", m),
  253. zap.Any("vaa", v),
  254. zap.Int("wanted_sigs", quorum),
  255. zap.Int("got_sigs", len(v.Signatures)),
  256. )
  257. return
  258. }
  259. // We now established that:
  260. // - all signatures on the VAA are valid
  261. // - the signature's addresses match the node's current guardian set
  262. // - enough signatures are present for the VAA to reach quorum
  263. // Check if we already store this VAA
  264. _, err = p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v))
  265. if err == nil {
  266. p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already store",
  267. zap.String("digest", hash),
  268. )
  269. return
  270. } else if err != db.ErrVAANotFound {
  271. p.logger.Error("failed to look up VAA in database",
  272. zap.String("digest", hash),
  273. zap.Error(err),
  274. )
  275. return
  276. }
  277. // Store signed VAA in database.
  278. p.logger.Info("storing inbound signed VAA with quorum",
  279. zap.String("digest", hash),
  280. zap.Any("vaa", v),
  281. zap.String("bytes", hex.EncodeToString(m.Vaa)),
  282. zap.String("message_id", v.MessageID()))
  283. if err := p.db.StoreSignedVAA(v); err != nil {
  284. p.logger.Error("failed to store signed VAA", zap.Error(err))
  285. return
  286. }
  287. p.attestationEvents.ReportVAAQuorum(v)
  288. }