observation.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package processor
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/certusone/wormhole/bridge/pkg/terra"
  9. "github.com/ethereum/go-ethereum/common"
  10. "github.com/ethereum/go-ethereum/crypto"
  11. "go.uber.org/zap"
  12. "github.com/certusone/wormhole/bridge/pkg/devnet"
  13. gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
  14. "github.com/certusone/wormhole/bridge/pkg/vaa"
  15. )
  16. // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum,
  17. // and assembles and submits a valid VAA if possible.
  18. func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObservation) {
  19. // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!)
  20. //
  21. // Note that observations are never tied to the (verified) p2p identity key - the p2p network
  22. // identity is completely decoupled from the guardian identity, p2p is just transport.
  23. p.logger.Info("received lockup observation",
  24. zap.String("digest", hex.EncodeToString(m.Hash)),
  25. zap.String("signature", hex.EncodeToString(m.Signature)),
  26. zap.String("addr", hex.EncodeToString(m.Addr)))
  27. // Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers
  28. // the public key that was used to sign the payload.
  29. pk, err := crypto.Ecrecover(m.Hash, m.Signature)
  30. if err != nil {
  31. p.logger.Warn("failed to verify signature on lockup observation",
  32. zap.String("digest", hex.EncodeToString(m.Hash)),
  33. zap.String("signature", hex.EncodeToString(m.Signature)),
  34. zap.String("addr", hex.EncodeToString(m.Addr)),
  35. zap.Error(err))
  36. return
  37. }
  38. // Verify that m.Addr matches the public key that signed m.Hash.
  39. their_addr := common.BytesToAddress(m.Addr)
  40. signer_pk := common.BytesToAddress(crypto.Keccak256(pk[1:])[12:])
  41. if their_addr != signer_pk {
  42. p.logger.Info("invalid lockup observation - address does not match pubkey",
  43. zap.String("digest", hex.EncodeToString(m.Hash)),
  44. zap.String("signature", hex.EncodeToString(m.Signature)),
  45. zap.String("addr", hex.EncodeToString(m.Addr)),
  46. zap.String("pk", signer_pk.Hex()))
  47. return
  48. }
  49. // Verify that m.Addr is included in the current guardian set.
  50. _, ok := p.gs.KeyIndex(their_addr)
  51. if !ok {
  52. p.logger.Warn("received observation by unknown guardian - is our guardian set outdated?",
  53. zap.String("their_addr", their_addr.Hex()),
  54. zap.Any("current_set", p.gs.KeysAsHexStrings()),
  55. )
  56. return
  57. }
  58. // Hooray! Now, we have verified all fields on SignedObservation and know that it includes
  59. // a valid signature by an active guardian. We still don't fully trust them, as they may be
  60. // byzantine, but now we know who we're dealing with.
  61. // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging.
  62. hash := hex.EncodeToString(m.Hash)
  63. if p.state.vaaSignatures[hash] == nil {
  64. // We haven't yet seen this lockup ourselves, and therefore do not know what the VAA looks like.
  65. // However, we have established that a valid guardian has signed it, and therefore we can
  66. // already start aggregating signatures for it.
  67. //
  68. // A malicious guardian can potentially DoS this by creating fake lockups at a faster rate than they decay,
  69. // leading to a slow out-of-memory crash. We do not attempt to automatically mitigate spam attacks with valid
  70. // signatures - such byzantine behavior would be plainly visible and would be dealt with by kicking them.
  71. p.state.vaaSignatures[hash] = &vaaState{
  72. firstObserved: time.Now(),
  73. signatures: map[common.Address][]byte{},
  74. }
  75. }
  76. p.state.vaaSignatures[hash].signatures[their_addr] = m.Signature
  77. // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
  78. agg := make([]bool, len(p.gs.Keys))
  79. var sigs []*vaa.Signature
  80. for i, a := range p.gs.Keys {
  81. s, ok := p.state.vaaSignatures[hash].signatures[a]
  82. if ok {
  83. var bs [65]byte
  84. if n := copy(bs[:], s); n != 65 {
  85. panic(fmt.Sprintf("invalid sig len: %d", n))
  86. }
  87. sigs = append(sigs, &vaa.Signature{
  88. Index: uint8(i),
  89. Signature: bs,
  90. })
  91. }
  92. agg[i] = ok
  93. }
  94. if p.state.vaaSignatures[hash].ourVAA != nil {
  95. // We have seen it on chain!
  96. // Deep copy the VAA and add signatures
  97. v := p.state.vaaSignatures[hash].ourVAA
  98. signed := &vaa.VAA{
  99. Version: v.Version,
  100. GuardianSetIndex: v.GuardianSetIndex,
  101. Signatures: sigs,
  102. Timestamp: v.Timestamp,
  103. Payload: v.Payload,
  104. }
  105. // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA.
  106. quorum := CalculateQuorum(len(p.gs.Keys))
  107. p.logger.Info("aggregation state for VAA",
  108. zap.String("digest", hash),
  109. zap.Any("set", p.gs.KeysAsHexStrings()),
  110. zap.Uint32("index", p.gs.Index),
  111. zap.Bools("aggregation", agg),
  112. zap.Int("required_sigs", quorum),
  113. zap.Int("have_sigs", len(sigs)),
  114. )
  115. if len(sigs) >= quorum && !p.state.vaaSignatures[hash].submitted {
  116. vaaBytes, err := signed.Marshal()
  117. if err != nil {
  118. panic(err)
  119. }
  120. // Submit every VAA to Solana for data availability.
  121. p.logger.Info("submitting signed VAA to Solana",
  122. zap.String("digest", hash),
  123. zap.Any("vaa", signed),
  124. zap.String("bytes", hex.EncodeToString(vaaBytes)))
  125. p.vaaC <- signed
  126. switch t := v.Payload.(type) {
  127. case *vaa.BodyTransfer:
  128. // Depending on the target chain, guardians submit VAAs directly to the chain.
  129. switch t.TargetChain {
  130. case vaa.ChainIDSolana:
  131. // No-op.
  132. case vaa.ChainIDEthereum:
  133. // Ethereum is special because it's expensive, and guardians cannot
  134. // be expected to pay the fees. We only submit to Ethereum in devnet mode.
  135. p.devnetVAASubmission(ctx, signed, hash)
  136. case vaa.ChainIDTerra:
  137. go p.terraVAASubmission(ctx, signed, hash)
  138. default:
  139. p.logger.Error("unknown target chain ID",
  140. zap.String("digest", hash),
  141. zap.Any("vaa", signed),
  142. zap.String("bytes", hex.EncodeToString(vaaBytes)),
  143. zap.Stringer("target_chain", t.TargetChain))
  144. }
  145. case *vaa.BodyGuardianSetUpdate:
  146. // A guardian set update is broadcast to every chain that we talk to.
  147. p.devnetVAASubmission(ctx, signed, hash)
  148. p.terraVAASubmission(ctx, signed, hash)
  149. default:
  150. panic(fmt.Sprintf("unknown VAA payload type: %+v", v))
  151. }
  152. p.state.vaaSignatures[hash].submitted = true
  153. } else {
  154. p.logger.Info("quorum not met or already submitted, doing nothing",
  155. zap.String("digest", hash))
  156. }
  157. } else {
  158. p.logger.Info("we have not yet seen this VAA - temporarily storing signature",
  159. zap.String("digest", hash))
  160. }
  161. }
  162. // devnetVAASubmission submits VAA to a local Ethereum devnet. For production, the bridge won't
  163. // have an Ethereum account and the user retrieves the VAA and submits the transactions themselves.
  164. func (p *Processor) devnetVAASubmission(ctx context.Context, signed *vaa.VAA, hash string) {
  165. if p.devnetMode {
  166. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  167. tx, err := devnet.SubmitVAA(timeout, p.devnetEthRPC, signed)
  168. cancel()
  169. if err != nil {
  170. if strings.Contains(err.Error(), "VAA was already executed") {
  171. p.logger.Info("lockup already submitted to Ethereum by another node, ignoring",
  172. zap.Error(err), zap.String("digest", hash))
  173. } else {
  174. p.logger.Error("failed to submit lockup to Ethereum",
  175. zap.Error(err), zap.String("digest", hash))
  176. }
  177. return
  178. }
  179. p.logger.Info("lockup submitted to Ethereum", zap.Any("tx", tx), zap.String("digest", hash))
  180. }
  181. }
  182. // Submit VAA to Terra.
  183. func (p *Processor) terraVAASubmission(ctx context.Context, signed *vaa.VAA, hash string) {
  184. if !p.devnetMode || !p.terraEnabled {
  185. p.logger.Warn("ignoring terra VAA submission",
  186. zap.String("digest", hash))
  187. return
  188. }
  189. tx, err := terra.SubmitVAA(ctx, p.terraLCD, p.terraChaidID, p.terraContract, p.terraFeePayer, signed)
  190. if err != nil {
  191. if strings.Contains(err.Error(), "VaaAlreadyExecuted") {
  192. p.logger.Info("lockup already submitted to Terra by another node, ignoring",
  193. zap.Error(err), zap.String("digest", hash))
  194. } else {
  195. p.logger.Error("failed to submit lockup to Terra",
  196. zap.Error(err), zap.String("digest", hash))
  197. }
  198. return
  199. }
  200. p.logger.Info("lockup submitted to Terra", zap.Any("tx", tx), zap.String("digest", hash))
  201. }