bigtablewriter.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package reporter
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "github.com/certusone/wormhole/node/pkg/supervisor"
  7. "github.com/certusone/wormhole/node/pkg/vaa"
  8. "go.uber.org/zap"
  9. "google.golang.org/api/option"
  10. "cloud.google.com/go/bigtable"
  11. "cloud.google.com/go/pubsub"
  12. )
  13. type BigTableConnectionConfig struct {
  14. GcpProjectID string
  15. GcpInstanceName string
  16. GcpKeyFilePath string
  17. TableName string
  18. TopicName string
  19. }
  20. type bigTableWriter struct {
  21. connectionConfig *BigTableConnectionConfig
  22. events *AttestationEventReporter
  23. }
  24. // rowKey returns a string with the input vales delimited by colons.
  25. func MakeRowKey(emitterChain vaa.ChainID, emitterAddress vaa.Address, sequence uint64) string {
  26. // left-pad the sequence with zeros to 16 characters, because bigtable keys are stored lexicographically
  27. return fmt.Sprintf("%d:%s:%016d", emitterChain, emitterAddress, sequence)
  28. }
  29. func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTableConnectionConfig) func(ctx context.Context) error {
  30. return func(ctx context.Context) error {
  31. e := &bigTableWriter{events: events, connectionConfig: connectionConfig}
  32. hostname, err := os.Hostname()
  33. if err != nil {
  34. panic(err)
  35. }
  36. errC := make(chan error)
  37. logger := supervisor.Logger(ctx)
  38. client, err := bigtable.NewClient(ctx,
  39. e.connectionConfig.GcpProjectID,
  40. e.connectionConfig.GcpInstanceName,
  41. option.WithCredentialsFile(e.connectionConfig.GcpKeyFilePath))
  42. if err != nil {
  43. return fmt.Errorf("failed to create BigTable client: %w", err)
  44. }
  45. tbl := client.Open(e.connectionConfig.TableName)
  46. pubsubClient, err := pubsub.NewClient(ctx,
  47. e.connectionConfig.GcpProjectID,
  48. option.WithCredentialsFile(e.connectionConfig.GcpKeyFilePath))
  49. if err != nil {
  50. logger.Error("failed to create GCP PubSub client", zap.Error(err))
  51. return fmt.Errorf("failed to create GCP PubSub client: %w", err)
  52. }
  53. logger.Info("GCP PubSub.NewClient initialized")
  54. pubsubTopic := pubsubClient.Topic(e.connectionConfig.TopicName)
  55. logger.Info("GCP PubSub.Topic initialized",
  56. zap.String("Topic", e.connectionConfig.TopicName))
  57. // call to subscribe to event channels
  58. sub := e.events.Subscribe()
  59. logger.Info("subscribed to AttestationEvents")
  60. go func() {
  61. for {
  62. select {
  63. case <-ctx.Done():
  64. return
  65. case msg := <-sub.Channels.MessagePublicationC:
  66. colFam := "MessagePublication"
  67. mutation := bigtable.NewMutation()
  68. ts := bigtable.Now()
  69. mutation.Set(colFam, "Version", ts, []byte(fmt.Sprint(msg.VAA.Version)))
  70. mutation.Set(colFam, "GuardianSetIndex", ts, []byte(fmt.Sprint(msg.VAA.GuardianSetIndex)))
  71. mutation.Set(colFam, "Timestamp", ts, []byte(ts.Time().String()))
  72. mutation.Set(colFam, "Nonce", ts, []byte(fmt.Sprint(msg.VAA.Nonce)))
  73. mutation.Set(colFam, "EmitterChain", ts, []byte(msg.VAA.EmitterChain.String()))
  74. mutation.Set(colFam, "EmitterAddress", ts, []byte(msg.VAA.EmitterAddress.String()))
  75. mutation.Set(colFam, "Sequence", ts, []byte(fmt.Sprint(msg.VAA.Sequence)))
  76. mutation.Set(colFam, "InitiatingTxID", ts, []byte(msg.InitiatingTxID.Hex()))
  77. mutation.Set(colFam, "Payload", ts, msg.VAA.Payload)
  78. mutation.Set(colFam, "ReporterHostname", ts, []byte(hostname))
  79. // filter to see if there is a row with this rowKey, and has a value for EmitterAddress
  80. filter := bigtable.ChainFilters(
  81. bigtable.FamilyFilter(colFam),
  82. bigtable.ColumnFilter("EmitterAddress"))
  83. conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
  84. rowKey := MakeRowKey(msg.VAA.EmitterChain, msg.VAA.EmitterAddress, msg.VAA.Sequence)
  85. err := tbl.Apply(ctx, rowKey, conditionalMutation)
  86. if err != nil {
  87. logger.Error("Failed to write message publication to BigTable",
  88. zap.String("rowKey", rowKey),
  89. zap.String("columnFamily", colFam),
  90. zap.Error(err))
  91. errC <- err
  92. }
  93. case msg := <-sub.Channels.VAAQuorumC:
  94. colFam := "QuorumState"
  95. mutation := bigtable.NewMutation()
  96. ts := bigtable.Now()
  97. b, marshalErr := msg.Marshal()
  98. if marshalErr != nil {
  99. logger.Error("failed to marshal VAAQuorum VAA.")
  100. }
  101. mutation.Set(colFam, "SignedVAA", ts, b)
  102. // filter to see if this row already has the signature.
  103. filter := bigtable.ChainFilters(
  104. bigtable.FamilyFilter(colFam),
  105. bigtable.ColumnFilter("SignedVAA"))
  106. conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
  107. rowKey := MakeRowKey(msg.EmitterChain, msg.EmitterAddress, msg.Sequence)
  108. err := tbl.Apply(ctx, rowKey, conditionalMutation)
  109. if err != nil {
  110. logger.Error("Failed to write persistence info to BigTable",
  111. zap.String("rowKey", rowKey),
  112. zap.String("columnFamily", colFam),
  113. zap.Error(err))
  114. errC <- err
  115. }
  116. publishResult := pubsubTopic.Publish(ctx, &pubsub.Message{
  117. Data: []byte(b),
  118. })
  119. if _, err = publishResult.Get(ctx); err != nil {
  120. logger.Error("Failed getting GCP PubSub publish reciept",
  121. zap.String("rowKey", rowKey),
  122. zap.Error(err))
  123. }
  124. }
  125. }
  126. }()
  127. select {
  128. case <-ctx.Done():
  129. e.events.Unsubscribe(sub.ClientId)
  130. if err = client.Close(); err != nil {
  131. logger.Error("Could not close BigTable client", zap.Error(err))
  132. }
  133. if pubsubErr := pubsubClient.Close(); pubsubErr != nil {
  134. logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr))
  135. }
  136. return ctx.Err()
  137. case err := <-errC:
  138. logger.Error("bigtablewriter encountered an error", zap.Error(err))
  139. e.events.Unsubscribe(sub.ClientId)
  140. // try to close the connection before returning
  141. if closeErr := client.Close(); closeErr != nil {
  142. logger.Error("Could not close BigTable client", zap.Error(closeErr))
  143. }
  144. if pubsubErr := pubsubClient.Close(); pubsubErr != nil {
  145. logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr))
  146. }
  147. return err
  148. }
  149. }
  150. }