broadcast.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package processor
  2. import (
  3. "encoding/hex"
  4. "github.com/prometheus/client_golang/prometheus"
  5. "github.com/prometheus/client_golang/prometheus/promauto"
  6. "time"
  7. ethcommon "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/crypto"
  9. "google.golang.org/protobuf/proto"
  10. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  11. "github.com/certusone/wormhole/node/pkg/vaa"
  12. )
  13. var (
  14. observationsBroadcastTotal = promauto.NewCounter(
  15. prometheus.CounterOpts{
  16. Name: "wormhole_observations_broadcast_total",
  17. Help: "Total number of signed observations queued for broadcast",
  18. })
  19. )
  20. func (p *Processor) broadcastSignature(v *vaa.VAA, signature []byte, txhash []byte) {
  21. digest := v.SigningMsg()
  22. obsv := gossipv1.SignedObservation{
  23. Addr: crypto.PubkeyToAddress(p.gk.PublicKey).Bytes(),
  24. Hash: digest.Bytes(),
  25. Signature: signature,
  26. TxHash: txhash,
  27. MessageId: v.MessageID(),
  28. }
  29. w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservation{SignedObservation: &obsv}}
  30. msg, err := proto.Marshal(&w)
  31. if err != nil {
  32. panic(err)
  33. }
  34. p.sendC <- msg
  35. // Store our VAA in case we're going to submit it to Solana
  36. hash := hex.EncodeToString(digest.Bytes())
  37. if p.state.vaaSignatures[hash] == nil {
  38. p.state.vaaSignatures[hash] = &vaaState{
  39. firstObserved: time.Now(),
  40. signatures: map[ethcommon.Address][]byte{},
  41. source: "loopback",
  42. }
  43. }
  44. p.state.vaaSignatures[hash].ourVAA = v
  45. p.state.vaaSignatures[hash].ourMsg = msg
  46. p.state.vaaSignatures[hash].source = v.EmitterChain.String()
  47. p.state.vaaSignatures[hash].gs = p.gs // guaranteed to match ourVAA - there's no concurrent access to p.gs
  48. // Fast path for our own signature
  49. go func() { p.obsvC <- &obsv }()
  50. observationsBroadcastTotal.Inc()
  51. }
  52. func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
  53. b, err := v.Marshal()
  54. if err != nil {
  55. panic(err)
  56. }
  57. w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedVaaWithQuorum{
  58. SignedVaaWithQuorum: &gossipv1.SignedVAAWithQuorum{Vaa: b},
  59. }}
  60. msg, err := proto.Marshal(&w)
  61. if err != nil {
  62. panic(err)
  63. }
  64. p.sendC <- msg
  65. }