attestation_events.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package reporter
  2. import (
  3. "math/rand"
  4. "sync"
  5. "go.uber.org/zap"
  6. "github.com/certusone/wormhole/node/pkg/vaa"
  7. "github.com/ethereum/go-ethereum/common"
  8. )
  9. type (
  10. // MessagePublication is a VAA along with a transaction identifer from the EmiterChain
  11. MessagePublication struct {
  12. VAA vaa.VAA
  13. // The native transaction identifier from the EmitterAddress interaction.
  14. InitiatingTxID common.Hash
  15. }
  16. // VerifiedPeerSignature is a message observation from a Guardian that has been verified
  17. // to be authentic and authorized to contribute to VAA quorum (ie. within the Guardian set).
  18. VerifiedPeerSignature struct {
  19. // The chain the transaction took place on
  20. EmitterChain vaa.ChainID
  21. // EmitterAddress of the contract that emitted the Message
  22. EmitterAddress vaa.Address
  23. // Sequence of the VAA
  24. Sequence uint64
  25. // The address of the Guardian that observed and signed the message
  26. GuardianAddress common.Address
  27. // Transaction Identifier of the initiating event
  28. Signature []byte
  29. }
  30. )
  31. type lifecycleEventChannels struct {
  32. // channel for each event
  33. MessagePublicationC chan *MessagePublication
  34. VAAStateUpdateC chan *vaa.VAA
  35. VerifiedSignatureC chan *VerifiedPeerSignature
  36. VAAQuorumC chan *vaa.VAA
  37. }
  38. type AttestationEventReporter struct {
  39. mu sync.RWMutex
  40. logger *zap.Logger
  41. subs map[int]*lifecycleEventChannels
  42. }
  43. type activeSubscription struct {
  44. ClientId int
  45. Channels *lifecycleEventChannels
  46. }
  47. func EventListener(logger *zap.Logger) *AttestationEventReporter {
  48. events := &AttestationEventReporter{
  49. logger: logger.Named("eventlistener"),
  50. subs: map[int]*lifecycleEventChannels{},
  51. }
  52. return events
  53. }
  54. // getUniqueClientId loops to generate & test integers for existence as key of map. returns an int that is not a key in map.
  55. func (re *AttestationEventReporter) getUniqueClientId() int {
  56. clientId := rand.Intn(1e6)
  57. found := false
  58. for found {
  59. clientId = rand.Intn(1e6)
  60. _, found = re.subs[clientId]
  61. }
  62. return clientId
  63. }
  64. func (re *AttestationEventReporter) Subscribe() *activeSubscription {
  65. re.mu.Lock()
  66. defer re.mu.Unlock()
  67. clientId := re.getUniqueClientId()
  68. re.logger.Debug("Subscribe for client", zap.Int("clientId", clientId))
  69. channels := &lifecycleEventChannels{
  70. MessagePublicationC: make(chan *MessagePublication, 50),
  71. VAAStateUpdateC: make(chan *vaa.VAA, 50),
  72. VerifiedSignatureC: make(chan *VerifiedPeerSignature, 50),
  73. VAAQuorumC: make(chan *vaa.VAA, 50),
  74. }
  75. re.subs[clientId] = channels
  76. sub := &activeSubscription{ClientId: clientId, Channels: channels}
  77. return sub
  78. }
  79. func (re *AttestationEventReporter) Unsubscribe(clientId int) {
  80. re.mu.Lock()
  81. defer re.mu.Unlock()
  82. re.logger.Debug("Unsubscribe for client", zap.Int("clientId", clientId))
  83. delete(re.subs, clientId)
  84. }
  85. // ReportMessagePublication is invoked when an on-chain message is observed.
  86. func (re *AttestationEventReporter) ReportMessagePublication(msg *MessagePublication) {
  87. re.mu.RLock()
  88. defer re.mu.RUnlock()
  89. for client, sub := range re.subs {
  90. select {
  91. case sub.MessagePublicationC <- msg:
  92. re.logger.Debug("published MessagePublication to client", zap.Int("client", client))
  93. default:
  94. re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client))
  95. }
  96. }
  97. }
  98. // ReportVerifiedPeerSignature is invoked after a SignedObservation is verified.
  99. func (re *AttestationEventReporter) ReportVerifiedPeerSignature(msg *VerifiedPeerSignature) {
  100. re.mu.RLock()
  101. defer re.mu.RUnlock()
  102. for client, sub := range re.subs {
  103. select {
  104. case sub.VerifiedSignatureC <- msg:
  105. re.logger.Debug("published VerifiedPeerSignature to client", zap.Int("client", client))
  106. default:
  107. re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client))
  108. }
  109. }
  110. }
  111. // ReportVAAStateUpdate is invoked each time the local VAAState is updated.
  112. func (re *AttestationEventReporter) ReportVAAStateUpdate(msg *vaa.VAA) {
  113. re.mu.RLock()
  114. defer re.mu.RUnlock()
  115. for client, sub := range re.subs {
  116. select {
  117. case sub.VAAStateUpdateC <- msg:
  118. re.logger.Debug("published VAAStateUpdate to client", zap.Int("client", client))
  119. default:
  120. re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client))
  121. }
  122. }
  123. }
  124. // ReportVAAQuorum is invoked when quorum is reached.
  125. func (re *AttestationEventReporter) ReportVAAQuorum(msg *vaa.VAA) {
  126. re.mu.RLock()
  127. defer re.mu.RUnlock()
  128. for client, sub := range re.subs {
  129. select {
  130. case sub.VAAQuorumC <- msg:
  131. re.logger.Debug("published VAAQuorum to client", zap.Int("client", client))
  132. default:
  133. re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client))
  134. }
  135. }
  136. }