message.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package processor
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "time"
  6. "github.com/mr-tron/base58"
  7. "github.com/prometheus/client_golang/prometheus"
  8. "github.com/prometheus/client_golang/prometheus/promauto"
  9. ethCommon "github.com/ethereum/go-ethereum/common"
  10. "go.uber.org/zap"
  11. "go.uber.org/zap/zapcore"
  12. "github.com/certusone/wormhole/node/pkg/common"
  13. "github.com/certusone/wormhole/node/pkg/p2p"
  14. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  15. )
  16. var (
  17. // SECURITY: source_chain/target_chain are untrusted uint8 values. An attacker could cause a maximum of 255**2 label
  18. // pairs to be created, which is acceptable.
  19. messagesObservedTotal = promauto.NewCounterVec(
  20. prometheus.CounterOpts{
  21. Name: "wormhole_message_observations_total",
  22. Help: "Total number of messages observed",
  23. },
  24. []string{"emitter_chain"})
  25. )
  26. // handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An
  27. // event may be received multiple times and must be handled in an idempotent fashion.
  28. func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublication) {
  29. if p.gs == nil {
  30. p.logger.Warn("dropping observation since we haven't initialized our guardian set yet",
  31. zap.String("message_id", k.MessageIDString()),
  32. zap.Uint32("nonce", k.Nonce),
  33. zap.String("txID", k.TxIDString()),
  34. zap.Time("timestamp", k.Timestamp),
  35. )
  36. return
  37. }
  38. messagesObservedTotal.WithLabelValues(k.EmitterChain.String()).Inc()
  39. // All nodes will create the exact same VAA and sign its digest.
  40. // Consensus is established on this digest.
  41. v := &VAA{
  42. VAA: vaa.VAA{
  43. Version: vaa.SupportedVAAVersion,
  44. GuardianSetIndex: p.gs.Index,
  45. Signatures: nil,
  46. Timestamp: k.Timestamp,
  47. Nonce: k.Nonce,
  48. EmitterChain: k.EmitterChain,
  49. EmitterAddress: k.EmitterAddress,
  50. Payload: k.Payload,
  51. Sequence: k.Sequence,
  52. ConsistencyLevel: k.ConsistencyLevel,
  53. },
  54. Unreliable: k.Unreliable,
  55. Reobservation: k.IsReobservation,
  56. }
  57. // Generate digest of the unsigned VAA.
  58. digest := v.SigningDigest()
  59. hash := hex.EncodeToString(digest.Bytes())
  60. // Sign the digest using the node's GuardianSigner
  61. signature, err := p.guardianSigner.Sign(ctx, digest.Bytes())
  62. if err != nil {
  63. panic(err)
  64. }
  65. shouldPublishImmediately := p.shouldPublishImmediately(&v.VAA)
  66. if p.logger.Core().Enabled(zapcore.DebugLevel) {
  67. p.logger.Debug("observed and signed confirmed message publication",
  68. zap.String("message_id", k.MessageIDString()),
  69. zap.String("txID", k.TxIDString()),
  70. zap.String("txID_b58", base58.Encode(k.TxID)),
  71. zap.String("hash", hash),
  72. zap.Uint32("nonce", k.Nonce),
  73. zap.Time("timestamp", k.Timestamp),
  74. zap.Uint8("consistency_level", k.ConsistencyLevel),
  75. zap.String("signature", hex.EncodeToString(signature)),
  76. zap.Bool("shouldPublishImmediately", shouldPublishImmediately),
  77. zap.Bool("isReobservation", k.IsReobservation),
  78. )
  79. }
  80. // Broadcast the signature.
  81. ourObs, msg := p.broadcastSignature(v.MessageID(), k, digest, signature, shouldPublishImmediately)
  82. // Indicate that we observed this one.
  83. observationsReceivedTotal.Inc()
  84. observationsReceivedByGuardianAddressTotal.WithLabelValues(p.ourAddr.Hex()).Inc()
  85. // Update the last VAA timestamp for this chain in the registry.
  86. p.pegLastObservationSignedAtTime(k.EmitterChain)
  87. // Get / create our state entry.
  88. s := p.state.signatures[hash]
  89. if s == nil {
  90. s = &state{
  91. firstObserved: time.Now(),
  92. nextRetry: time.Now().Add(nextRetryDuration(0)),
  93. signatures: map[ethCommon.Address][]byte{},
  94. source: "loopback",
  95. }
  96. p.state.signatures[hash] = s
  97. }
  98. // Update our state.
  99. s.ourObservation = v
  100. s.txHash = k.TxID
  101. s.source = v.GetEmitterChain().String()
  102. s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs
  103. s.signatures[p.ourAddr] = signature
  104. s.ourObs = ourObs
  105. s.ourMsg = msg
  106. // Fast path for our own signature.
  107. if !s.submitted {
  108. start := time.Now()
  109. p.checkForQuorum(ourObs, s, s.gs, hash)
  110. timeToHandleObservation.Observe(float64(time.Since(start).Microseconds()))
  111. }
  112. }
  113. // pegLastObservationSignedAtTime updates the registry with the current wall clock timestamp of the last signed observation for a chain.
  114. func (p *Processor) pegLastObservationSignedAtTime(chain vaa.ChainID) {
  115. // Get current time as Unix nanoseconds.
  116. timestamp := time.Now()
  117. p2p.DefaultRegistry.SetLastObservationSignedAtTimestamp(chain, timestamp.UnixNano())
  118. }