broadcast.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package processor
  2. import (
  3. "github.com/prometheus/client_golang/prometheus"
  4. "github.com/prometheus/client_golang/prometheus/promauto"
  5. ethCommon "github.com/ethereum/go-ethereum/common"
  6. "google.golang.org/protobuf/proto"
  7. "github.com/certusone/wormhole/node/pkg/common"
  8. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  9. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  10. )
  11. var (
  12. observationsBroadcast = promauto.NewCounter(
  13. prometheus.CounterOpts{
  14. Name: "wormhole_observations_queued_for_broadcast",
  15. Help: "Total number of signed observations queued for broadcast",
  16. })
  17. batchObservationsBroadcast = promauto.NewCounter(
  18. prometheus.CounterOpts{
  19. Name: "wormhole_batch_observations_queued_for_broadcast",
  20. Help: "Total number of signed batched observations queued for broadcast",
  21. })
  22. signedVAAsBroadcast = promauto.NewCounter(
  23. prometheus.CounterOpts{
  24. Name: "wormhole_signed_vaas_queued_for_broadcast",
  25. Help: "Total number of signed vaas queued for broadcast",
  26. })
  27. )
  28. // broadcastSignature broadcasts the observation for something we observed locally.
  29. func (p *Processor) broadcastSignature(
  30. messageID string,
  31. k *common.MessagePublication,
  32. digest ethCommon.Hash,
  33. signature []byte,
  34. shouldPublishImmediately bool,
  35. ) (ourObs *gossipv1.Observation, msg []byte) {
  36. // Create the observation to either be submitted to the batch processor or published immediately.
  37. ourObs = &gossipv1.Observation{
  38. Hash: digest.Bytes(),
  39. Signature: signature,
  40. TxHash: k.TxID,
  41. MessageId: messageID,
  42. }
  43. if shouldPublishImmediately {
  44. msg = p.publishImmediately(ourObs)
  45. observationsBroadcast.Inc()
  46. } else {
  47. p.postObservationToBatch(ourObs)
  48. batchObservationsBroadcast.Inc()
  49. }
  50. if p.alternatePublisher != nil {
  51. p.alternatePublisher.PublishObservation(k.EmitterChain, ourObs)
  52. }
  53. return ourObs, msg
  54. }
  55. // broadcastSignedVAA broadcasts a VAA to the gossip network.
  56. func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
  57. b, err := v.Marshal()
  58. if err != nil {
  59. panic(err)
  60. }
  61. w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedVaaWithQuorum{
  62. SignedVaaWithQuorum: &gossipv1.SignedVAAWithQuorum{Vaa: b},
  63. }}
  64. msg, err := proto.Marshal(&w)
  65. if err != nil {
  66. panic(err)
  67. }
  68. // Broadcast the signed VAA. The channel is buffered. If it overflows, just drop it and rely on a reobservation if necessary.
  69. common.WriteToChannelWithoutBlocking(p.gossipVaaSendC, msg, "vaa_broadcast")
  70. select {
  71. case p.gossipVaaSendC <- msg:
  72. signedVAAsBroadcast.Inc()
  73. default:
  74. vaaPublishChannelOverflow.Inc()
  75. }
  76. if p.gatewayRelayer != nil {
  77. p.gatewayRelayer.SubmitVAA(v)
  78. }
  79. }