message.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package processor
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "github.com/certusone/wormhole/node/pkg/db"
  6. "github.com/mr-tron/base58"
  7. "github.com/prometheus/client_golang/prometheus"
  8. "github.com/prometheus/client_golang/prometheus/promauto"
  9. "github.com/ethereum/go-ethereum/crypto"
  10. "go.uber.org/zap"
  11. "github.com/certusone/wormhole/node/pkg/common"
  12. "github.com/certusone/wormhole/node/pkg/reporter"
  13. "github.com/certusone/wormhole/node/pkg/supervisor"
  14. "github.com/certusone/wormhole/node/pkg/vaa"
  15. )
  16. var (
  17. // SECURITY: source_chain/target_chain are untrusted uint8 values. An attacker could cause a maximum of 255**2 label
  18. // pairs to be created, which is acceptable.
  19. messagesObservedTotal = promauto.NewCounterVec(
  20. prometheus.CounterOpts{
  21. Name: "wormhole_message_observations_total",
  22. Help: "Total number of messages observed",
  23. },
  24. []string{"emitter_chain"})
  25. messagesSignedTotal = promauto.NewCounterVec(
  26. prometheus.CounterOpts{
  27. Name: "wormhole_message_observations_signed_total",
  28. Help: "Total number of message observations that were successfully signed",
  29. },
  30. []string{"emitter_chain"})
  31. )
  32. // handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An
  33. // event may be received multiple times and must be handled in an idempotent fashion.
  34. func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublication) {
  35. if p.gs == nil {
  36. p.logger.Warn("dropping observation since we haven't initialized our guardian set yet",
  37. zap.Stringer("emitter_chain", k.EmitterChain),
  38. zap.Stringer("emitter_address", k.EmitterAddress),
  39. zap.Uint32("nonce", k.Nonce),
  40. zap.Stringer("txhash", k.TxHash),
  41. zap.Time("timestamp", k.Timestamp),
  42. )
  43. return
  44. }
  45. supervisor.Logger(ctx).Info("message publication confirmed",
  46. zap.Stringer("emitter_chain", k.EmitterChain),
  47. zap.Stringer("emitter_address", k.EmitterAddress),
  48. zap.Uint32("nonce", k.Nonce),
  49. zap.Stringer("txhash", k.TxHash),
  50. zap.Time("timestamp", k.Timestamp),
  51. )
  52. messagesObservedTotal.With(prometheus.Labels{
  53. "emitter_chain": k.EmitterChain.String(),
  54. }).Add(1)
  55. // All nodes will create the exact same VAA and sign its digest.
  56. // Consensus is established on this digest.
  57. v := &vaa.VAA{
  58. Version: vaa.SupportedVAAVersion,
  59. GuardianSetIndex: p.gs.Index,
  60. Signatures: nil,
  61. Timestamp: k.Timestamp,
  62. Nonce: k.Nonce,
  63. EmitterChain: k.EmitterChain,
  64. EmitterAddress: k.EmitterAddress,
  65. Payload: k.Payload,
  66. Sequence: k.Sequence,
  67. ConsistencyLevel: k.ConsistencyLevel,
  68. }
  69. // Ignore incoming observations when our database already has a quorum VAA for it.
  70. // This can occur when we're receiving late observations due to node catchup, and
  71. // processing those won't do us any good.
  72. //
  73. // Exception: if an observation is made within the settlement time (30s), we'll
  74. // process it so other nodes won't consider it a miss.
  75. if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v)); err == nil {
  76. // unmarshal vaa
  77. var existing *vaa.VAA
  78. if existing, err = vaa.Unmarshal(vb); err != nil {
  79. panic("failed to unmarshal VAA from db")
  80. }
  81. if k.Timestamp.Sub(existing.Timestamp) > settlementTime {
  82. p.logger.Info("ignoring observation since we already have a quorum VAA for it",
  83. zap.Stringer("emitter_chain", k.EmitterChain),
  84. zap.Stringer("emitter_address", k.EmitterAddress),
  85. zap.String("emitter_address_b58", base58.Encode(k.EmitterAddress.Bytes())),
  86. zap.Uint32("nonce", k.Nonce),
  87. zap.Stringer("txhash", k.TxHash),
  88. zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
  89. zap.Time("timestamp", k.Timestamp),
  90. zap.String("message_id", v.MessageID()),
  91. zap.Duration("settlement_time", settlementTime),
  92. )
  93. return
  94. }
  95. } else if err != db.ErrVAANotFound {
  96. p.logger.Error("failed to get VAA from db",
  97. zap.Stringer("emitter_chain", k.EmitterChain),
  98. zap.Stringer("emitter_address", k.EmitterAddress),
  99. zap.Uint32("nonce", k.Nonce),
  100. zap.Stringer("txhash", k.TxHash),
  101. zap.Time("timestamp", k.Timestamp),
  102. zap.Error(err),
  103. )
  104. }
  105. // Generate digest of the unsigned VAA.
  106. digest := v.SigningMsg()
  107. // Sign the digest using our node's guardian key.
  108. s, err := crypto.Sign(digest.Bytes(), p.gk)
  109. if err != nil {
  110. panic(err)
  111. }
  112. p.logger.Info("observed and signed confirmed message publication",
  113. zap.Stringer("source_chain", k.EmitterChain),
  114. zap.Stringer("txhash", k.TxHash),
  115. zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
  116. zap.String("digest", hex.EncodeToString(digest.Bytes())),
  117. zap.Uint32("nonce", k.Nonce),
  118. zap.Uint64("sequence", k.Sequence),
  119. zap.Stringer("emitter_chain", k.EmitterChain),
  120. zap.Stringer("emitter_address", k.EmitterAddress),
  121. zap.String("emitter_address_b58", base58.Encode(k.EmitterAddress.Bytes())),
  122. zap.Uint8("consistency_level", k.ConsistencyLevel),
  123. zap.String("message_id", v.MessageID()),
  124. zap.String("signature", hex.EncodeToString(s)))
  125. messagesSignedTotal.With(prometheus.Labels{
  126. "emitter_chain": k.EmitterChain.String()}).Add(1)
  127. p.attestationEvents.ReportMessagePublication(&reporter.MessagePublication{VAA: *v, InitiatingTxID: k.TxHash})
  128. p.broadcastSignature(v, s, k.TxHash.Bytes())
  129. }