observation.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package processor
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. bridge_common "github.com/certusone/wormhole/bridge/pkg/common"
  7. "github.com/prometheus/client_golang/prometheus"
  8. "strings"
  9. "time"
  10. "github.com/certusone/wormhole/bridge/pkg/terra"
  11. "github.com/ethereum/go-ethereum/common"
  12. "github.com/ethereum/go-ethereum/crypto"
  13. "go.uber.org/zap"
  14. "github.com/certusone/wormhole/bridge/pkg/devnet"
  15. gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
  16. "github.com/certusone/wormhole/bridge/pkg/vaa"
  17. )
  18. var (
  19. observationsReceivedTotal = prometheus.NewCounter(
  20. prometheus.CounterOpts{
  21. Name: "wormhole_observations_received_total",
  22. Help: "Total number of raw VAA observations received from gossip",
  23. })
  24. observationsReceivedByGuardianAddressTotal = prometheus.NewCounterVec(
  25. prometheus.CounterOpts{
  26. Name: "wormhole_observations_signed_by_guardian_total",
  27. Help: "Total number of signed and verified VAA observations grouped by guardian address",
  28. }, []string{"addr"})
  29. observationsFailedTotal = prometheus.NewCounterVec(
  30. prometheus.CounterOpts{
  31. Name: "wormhole_observations_verification_failures_total",
  32. Help: "Total number of observations verification failure, grouped by failure reason",
  33. }, []string{"cause"})
  34. observationsUnknownLockupTotal = prometheus.NewCounter(
  35. prometheus.CounterOpts{
  36. Name: "wormhole_observations_unknown_lockup_total",
  37. Help: "Total number of verified VAA observations for a lockup we haven't seen yet",
  38. })
  39. observationsDirectSubmissionsTotal = prometheus.NewCounterVec(
  40. prometheus.CounterOpts{
  41. Name: "wormhole_observations_direct_submissions_queued_total",
  42. Help: "Total number of observations for a specific target chain that were queued for direct submission",
  43. }, []string{"target_chain"})
  44. observationsDirectSubmissionSuccessTotal = prometheus.NewCounterVec(
  45. prometheus.CounterOpts{
  46. Name: "wormhole_observations_direct_submission_success_total",
  47. Help: "Total number of observations for a specific target chain that succeeded",
  48. }, []string{"target_chain"})
  49. )
  50. func init() {
  51. prometheus.MustRegister(observationsReceivedTotal)
  52. prometheus.MustRegister(observationsReceivedByGuardianAddressTotal)
  53. prometheus.MustRegister(observationsFailedTotal)
  54. prometheus.MustRegister(observationsUnknownLockupTotal)
  55. prometheus.MustRegister(observationsDirectSubmissionsTotal)
  56. prometheus.MustRegister(observationsDirectSubmissionSuccessTotal)
  57. }
  58. // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum,
  59. // and assembles and submits a valid VAA if possible.
  60. func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObservation) {
  61. // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!)
  62. //
  63. // Note that observations are never tied to the (verified) p2p identity key - the p2p network
  64. // identity is completely decoupled from the guardian identity, p2p is just transport.
  65. hash := hex.EncodeToString(m.Hash)
  66. p.logger.Info("received observation",
  67. zap.String("digest", hash),
  68. zap.String("signature", hex.EncodeToString(m.Signature)),
  69. zap.String("addr", hex.EncodeToString(m.Addr)))
  70. observationsReceivedTotal.Inc()
  71. // Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers
  72. // the public key that was used to sign the payload.
  73. pk, err := crypto.Ecrecover(m.Hash, m.Signature)
  74. if err != nil {
  75. p.logger.Warn("failed to verify signature on observation",
  76. zap.String("digest", hash),
  77. zap.String("signature", hex.EncodeToString(m.Signature)),
  78. zap.String("addr", hex.EncodeToString(m.Addr)),
  79. zap.Error(err))
  80. observationsFailedTotal.WithLabelValues("invalid_signature").Inc()
  81. return
  82. }
  83. // Verify that m.Addr matches the public key that signed m.Hash.
  84. their_addr := common.BytesToAddress(m.Addr)
  85. signer_pk := common.BytesToAddress(crypto.Keccak256(pk[1:])[12:])
  86. if their_addr != signer_pk {
  87. p.logger.Info("invalid observation - address does not match pubkey",
  88. zap.String("digest", hash),
  89. zap.String("signature", hex.EncodeToString(m.Signature)),
  90. zap.String("addr", hex.EncodeToString(m.Addr)),
  91. zap.String("pk", signer_pk.Hex()))
  92. observationsFailedTotal.WithLabelValues("pubkey_mismatch").Inc()
  93. return
  94. }
  95. // Determine which guardian set to use. The following cases are possible:
  96. //
  97. // - We have already seen the lockup and generated ourVAA. In this case, use the guardian set valid at the time,
  98. // even if the guardian set was updated. Old guardian sets remain valid for longer than aggregation state,
  99. // and the guardians in the old set stay online and observe and sign lockup for the transition period.
  100. //
  101. // - We have not yet seen the lockup. In this case, we assume the latest guardian set because that's what
  102. // we will store once we do see the lockup.
  103. //
  104. // This ensures that during a guardian set update, a node which observed a given lockup with either the old
  105. // or the new guardian set can achieve consensus, since both the old and the new set would achieve consensus,
  106. // assuming that 2/3+ of the old and the new guardian set have seen the lockup and will periodically attempt
  107. // to retransmit their observations such that nodes who initially dropped the signature will get a 2nd chance.
  108. //
  109. // During an update, vaaState.signatures can contain signatures from *both* guardian sets.
  110. //
  111. var gs *bridge_common.GuardianSet
  112. if p.state.vaaSignatures[hash] != nil && p.state.vaaSignatures[hash].gs != nil {
  113. gs = p.state.vaaSignatures[hash].gs
  114. } else {
  115. gs = p.gs
  116. }
  117. // We haven't yet observed the trusted guardian set on Ethereum, and therefore, it's impossible to verify it.
  118. // May as well not have received it/been offline - drop it and wait for the guardian set.
  119. if gs == nil {
  120. p.logger.Warn("dropping observations since we haven't initialized our guardian set yet",
  121. zap.String("digest", their_addr.Hex()),
  122. zap.String("their_addr", their_addr.Hex()),
  123. )
  124. observationsFailedTotal.WithLabelValues("uninitialized_guardian_set").Inc()
  125. return
  126. }
  127. // Verify that m.Addr is included in the guardian set. If it's not, drop the message. In case it's us
  128. // who have the outdated guardian set, we'll just wait for the message to be retransmitted eventually.
  129. _, ok := gs.KeyIndex(their_addr)
  130. if !ok {
  131. p.logger.Warn("received observation by unknown guardian - is our guardian set outdated?",
  132. zap.String("digest", their_addr.Hex()),
  133. zap.String("their_addr", their_addr.Hex()),
  134. zap.Uint32("index", gs.Index),
  135. zap.Any("keys", gs.KeysAsHexStrings()),
  136. )
  137. observationsFailedTotal.WithLabelValues("unknown_guardian").Inc()
  138. return
  139. }
  140. // Hooray! Now, we have verified all fields on SignedObservation and know that it includes
  141. // a valid signature by an active guardian. We still don't fully trust them, as they may be
  142. // byzantine, but now we know who we're dealing with.
  143. // We can now count events by guardian without worry about cardinality explosions:
  144. observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc()
  145. // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging.
  146. if p.state.vaaSignatures[hash] == nil {
  147. // We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like.
  148. // However, we have established that a valid guardian has signed it, and therefore we can
  149. // already start aggregating signatures for it.
  150. //
  151. // A malicious guardian can potentially DoS this by creating fake lockups at a faster rate than they decay,
  152. // leading to a slow out-of-memory crash. We do not attempt to automatically mitigate spam attacks with valid
  153. // signatures - such byzantine behavior would be plainly visible and would be dealt with by kicking them.
  154. observationsUnknownLockupTotal.Inc()
  155. p.state.vaaSignatures[hash] = &vaaState{
  156. firstObserved: time.Now(),
  157. signatures: map[common.Address][]byte{},
  158. source: "unknown",
  159. }
  160. }
  161. p.state.vaaSignatures[hash].signatures[their_addr] = m.Signature
  162. // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
  163. agg := make([]bool, len(gs.Keys))
  164. var sigs []*vaa.Signature
  165. for i, a := range gs.Keys {
  166. s, ok := p.state.vaaSignatures[hash].signatures[a]
  167. if ok {
  168. var bs [65]byte
  169. if n := copy(bs[:], s); n != 65 {
  170. panic(fmt.Sprintf("invalid sig len: %d", n))
  171. }
  172. sigs = append(sigs, &vaa.Signature{
  173. Index: uint8(i),
  174. Signature: bs,
  175. })
  176. }
  177. agg[i] = ok
  178. }
  179. if p.state.vaaSignatures[hash].ourVAA != nil {
  180. // We have seen it on chain!
  181. // Deep copy the VAA and add signatures
  182. v := p.state.vaaSignatures[hash].ourVAA
  183. signed := &vaa.VAA{
  184. Version: v.Version,
  185. GuardianSetIndex: v.GuardianSetIndex,
  186. Signatures: sigs,
  187. Timestamp: v.Timestamp,
  188. Payload: v.Payload,
  189. }
  190. // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA.
  191. quorum := CalculateQuorum(len(gs.Keys))
  192. p.logger.Info("aggregation state for VAA",
  193. zap.String("digest", hash),
  194. zap.Any("set", gs.KeysAsHexStrings()),
  195. zap.Uint32("index", gs.Index),
  196. zap.Bools("aggregation", agg),
  197. zap.Int("required_sigs", quorum),
  198. zap.Int("have_sigs", len(sigs)),
  199. )
  200. if len(sigs) >= quorum && !p.state.vaaSignatures[hash].submitted {
  201. vaaBytes, err := signed.Marshal()
  202. if err != nil {
  203. panic(err)
  204. }
  205. // Submit every VAA to Solana for data availability.
  206. p.logger.Info("submitting signed VAA to Solana",
  207. zap.String("digest", hash),
  208. zap.Any("vaa", signed),
  209. zap.String("bytes", hex.EncodeToString(vaaBytes)))
  210. p.vaaC <- signed
  211. switch t := v.Payload.(type) {
  212. case *vaa.BodyTransfer:
  213. p.state.vaaSignatures[hash].source = t.SourceChain.String()
  214. // Depending on the target chain, guardians submit VAAs directly to the chain.
  215. switch t.TargetChain {
  216. case vaa.ChainIDSolana:
  217. // No-op.
  218. case vaa.ChainIDEthereum:
  219. // Ethereum is special because it's expensive, and guardians cannot
  220. // be expected to pay the fees. We only submit to Ethereum in devnet mode.
  221. p.devnetVAASubmission(ctx, signed, hash)
  222. case vaa.ChainIDTerra:
  223. go p.terraVAASubmission(ctx, signed, hash)
  224. default:
  225. p.logger.Error("unknown target chain ID",
  226. zap.String("digest", hash),
  227. zap.Any("vaa", signed),
  228. zap.String("bytes", hex.EncodeToString(vaaBytes)),
  229. zap.Stringer("target_chain", t.TargetChain))
  230. }
  231. case *vaa.BodyGuardianSetUpdate:
  232. p.state.vaaSignatures[hash].source = "guardian_set_upgrade"
  233. // A guardian set update is broadcast to every chain that we talk to.
  234. p.devnetVAASubmission(ctx, signed, hash)
  235. p.terraVAASubmission(ctx, signed, hash)
  236. case *vaa.BodyContractUpgrade:
  237. p.state.vaaSignatures[hash].source = "contract_upgrade"
  238. switch t.ChainID {
  239. case vaa.ChainIDSolana:
  240. // Already submitted to Solana.
  241. default:
  242. p.logger.Error("unsupported target chain for contract upgrade",
  243. zap.String("digest", hash),
  244. zap.Any("vaa", signed),
  245. zap.String("bytes", hex.EncodeToString(vaaBytes)),
  246. zap.Uint8("target_chain", t.ChainID))
  247. }
  248. default:
  249. panic(fmt.Sprintf("unknown VAA payload type: %+v", v))
  250. }
  251. p.state.vaaSignatures[hash].submitted = true
  252. } else {
  253. p.logger.Info("quorum not met or already submitted, doing nothing",
  254. zap.String("digest", hash))
  255. }
  256. } else {
  257. p.logger.Info("we have not yet seen this VAA - temporarily storing signature",
  258. zap.String("digest", hash),
  259. zap.Bools("aggregation", agg))
  260. }
  261. }
  262. // devnetVAASubmission submits VAA to a local Ethereum devnet. For production, the bridge won't
  263. // have an Ethereum account and the user retrieves the VAA and submits the transactions themselves.
  264. func (p *Processor) devnetVAASubmission(ctx context.Context, signed *vaa.VAA, hash string) {
  265. if p.devnetMode {
  266. observationsDirectSubmissionsTotal.WithLabelValues("ethereum").Inc()
  267. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  268. tx, err := devnet.SubmitVAA(timeout, p.devnetEthRPC, signed)
  269. cancel()
  270. if err != nil {
  271. if strings.Contains(err.Error(), "VAA was already executed") {
  272. p.logger.Info("VAA already submitted to Ethereum by another node, ignoring",
  273. zap.Error(err), zap.String("digest", hash))
  274. } else {
  275. p.logger.Error("failed to submit VAA to Ethereum",
  276. zap.Error(err), zap.String("digest", hash))
  277. }
  278. return
  279. }
  280. observationsDirectSubmissionSuccessTotal.WithLabelValues("ethereum").Inc()
  281. p.logger.Info("VAA submitted to Ethereum", zap.Any("tx", tx), zap.String("digest", hash))
  282. }
  283. }
  284. // Submit VAA to Terra.
  285. func (p *Processor) terraVAASubmission(ctx context.Context, signed *vaa.VAA, hash string) {
  286. if !p.devnetMode || !p.terraEnabled {
  287. p.logger.Warn("ignoring terra VAA submission",
  288. zap.String("digest", hash))
  289. return
  290. }
  291. observationsDirectSubmissionsTotal.WithLabelValues("terra").Inc()
  292. tx, err := terra.SubmitVAA(ctx, p.terraLCD, p.terraChainID, p.terraContract, p.terraFeePayer, signed)
  293. if err != nil {
  294. if strings.Contains(err.Error(), "VaaAlreadyExecuted") {
  295. p.logger.Info("VAA already submitted to Terra by another node, ignoring",
  296. zap.Error(err), zap.String("digest", hash))
  297. } else {
  298. p.logger.Error("failed to submit VAA to Terra",
  299. zap.Error(err), zap.String("digest", hash))
  300. }
  301. return
  302. }
  303. observationsDirectSubmissionSuccessTotal.WithLabelValues("terra").Inc()
  304. p.logger.Info("VAA submitted to Terra", zap.Any("tx", tx), zap.String("digest", hash))
  305. }