bigtablewriter.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package reporter
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "github.com/certusone/wormhole/node/pkg/supervisor"
  7. "github.com/certusone/wormhole/node/pkg/vaa"
  8. "go.uber.org/zap"
  9. "google.golang.org/api/option"
  10. "cloud.google.com/go/bigtable"
  11. "cloud.google.com/go/pubsub"
  12. )
  13. type BigTableConnectionConfig struct {
  14. GcpProjectID string
  15. GcpInstanceName string
  16. GcpKeyFilePath string
  17. TableName string
  18. TopicName string
  19. }
  20. type bigTableWriter struct {
  21. connectionConfig *BigTableConnectionConfig
  22. events *AttestationEventReporter
  23. }
  24. // rowKey returns a string with the input vales delimited by colons.
  25. func MakeRowKey(emitterChain vaa.ChainID, emitterAddress vaa.Address, sequence uint64) string {
  26. // left-pad the sequence with zeros to 16 characters, because bigtable keys are stored lexicographically
  27. return fmt.Sprintf("%d:%s:%016d", emitterChain, emitterAddress, sequence)
  28. }
  29. func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTableConnectionConfig) func(ctx context.Context) error {
  30. return func(ctx context.Context) error {
  31. e := &bigTableWriter{events: events, connectionConfig: connectionConfig}
  32. hostname, err := os.Hostname()
  33. if err != nil {
  34. panic(err)
  35. }
  36. errC := make(chan error)
  37. logger := supervisor.Logger(ctx)
  38. client, err := bigtable.NewClient(ctx,
  39. e.connectionConfig.GcpProjectID,
  40. e.connectionConfig.GcpInstanceName,
  41. option.WithCredentialsFile(e.connectionConfig.GcpKeyFilePath))
  42. if err != nil {
  43. return fmt.Errorf("failed to create BigTable client: %w", err)
  44. }
  45. tbl := client.Open(e.connectionConfig.TableName)
  46. pubsubClient, err := pubsub.NewClient(ctx, e.connectionConfig.GcpProjectID)
  47. if err != nil {
  48. logger.Error("failed to create GCP PubSub client", zap.Error(err))
  49. return fmt.Errorf("failed to create GCP PubSub client: %w", err)
  50. }
  51. logger.Info("GCP PubSub.NewClient initialized")
  52. pubsubTopic := pubsubClient.Topic(e.connectionConfig.TopicName)
  53. logger.Info("GCP PubSub.Topic initialized",
  54. zap.String("Topic", e.connectionConfig.TopicName))
  55. // call to subscribe to event channels
  56. sub := e.events.Subscribe()
  57. logger.Info("subscribed to AttestationEvents")
  58. go func() {
  59. for {
  60. select {
  61. case <-ctx.Done():
  62. return
  63. case msg := <-sub.Channels.MessagePublicationC:
  64. colFam := "MessagePublication"
  65. mutation := bigtable.NewMutation()
  66. ts := bigtable.Now()
  67. mutation.Set(colFam, "Version", ts, []byte(fmt.Sprint(msg.VAA.Version)))
  68. mutation.Set(colFam, "GuardianSetIndex", ts, []byte(fmt.Sprint(msg.VAA.GuardianSetIndex)))
  69. mutation.Set(colFam, "Timestamp", ts, []byte(ts.Time().String()))
  70. mutation.Set(colFam, "Nonce", ts, []byte(fmt.Sprint(msg.VAA.Nonce)))
  71. mutation.Set(colFam, "EmitterChain", ts, []byte(msg.VAA.EmitterChain.String()))
  72. mutation.Set(colFam, "EmitterAddress", ts, []byte(msg.VAA.EmitterAddress.String()))
  73. mutation.Set(colFam, "Sequence", ts, []byte(fmt.Sprint(msg.VAA.Sequence)))
  74. mutation.Set(colFam, "InitiatingTxID", ts, []byte(msg.InitiatingTxID.Hex()))
  75. mutation.Set(colFam, "Payload", ts, msg.VAA.Payload)
  76. mutation.Set(colFam, "ReporterHostname", ts, []byte(hostname))
  77. // filter to see if there is a row with this rowKey, and has a value for EmitterAddress
  78. filter := bigtable.ChainFilters(
  79. bigtable.FamilyFilter(colFam),
  80. bigtable.ColumnFilter("EmitterAddress"))
  81. conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
  82. rowKey := MakeRowKey(msg.VAA.EmitterChain, msg.VAA.EmitterAddress, msg.VAA.Sequence)
  83. err := tbl.Apply(ctx, rowKey, conditionalMutation)
  84. if err != nil {
  85. logger.Error("Failed to write message publication to BigTable",
  86. zap.String("rowKey", rowKey),
  87. zap.String("columnFamily", colFam),
  88. zap.Error(err))
  89. errC <- err
  90. }
  91. case msg := <-sub.Channels.VAAQuorumC:
  92. colFam := "QuorumState"
  93. mutation := bigtable.NewMutation()
  94. ts := bigtable.Now()
  95. b, marshalErr := msg.Marshal()
  96. if marshalErr != nil {
  97. logger.Error("failed to marshal VAAQuorum VAA.")
  98. }
  99. mutation.Set(colFam, "SignedVAA", ts, b)
  100. // filter to see if this row already has the signature.
  101. filter := bigtable.ChainFilters(
  102. bigtable.FamilyFilter(colFam),
  103. bigtable.ColumnFilter("SignedVAA"))
  104. conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
  105. rowKey := MakeRowKey(msg.EmitterChain, msg.EmitterAddress, msg.Sequence)
  106. err := tbl.Apply(ctx, rowKey, conditionalMutation)
  107. if err != nil {
  108. logger.Error("Failed to write persistence info to BigTable",
  109. zap.String("rowKey", rowKey),
  110. zap.String("columnFamily", colFam),
  111. zap.Error(err))
  112. errC <- err
  113. }
  114. publishResult := pubsubTopic.Publish(ctx, &pubsub.Message{
  115. Data: []byte(b),
  116. })
  117. if _, err = publishResult.Get(ctx); err != nil {
  118. logger.Error("Failed getting GCP PubSub publish reciept",
  119. zap.String("rowKey", rowKey),
  120. zap.Error(err))
  121. }
  122. }
  123. }
  124. }()
  125. select {
  126. case <-ctx.Done():
  127. e.events.Unsubscribe(sub.ClientId)
  128. if err = client.Close(); err != nil {
  129. logger.Error("Could not close BigTable client", zap.Error(err))
  130. }
  131. if pubsubErr := pubsubClient.Close(); pubsubErr != nil {
  132. logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr))
  133. }
  134. return ctx.Err()
  135. case err := <-errC:
  136. logger.Error("bigtablewriter encountered an error", zap.Error(err))
  137. e.events.Unsubscribe(sub.ClientId)
  138. // try to close the connection before returning
  139. if closeErr := client.Close(); closeErr != nil {
  140. logger.Error("Could not close BigTable client", zap.Error(closeErr))
  141. }
  142. if pubsubErr := pubsubClient.Close(); pubsubErr != nil {
  143. logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr))
  144. }
  145. return err
  146. }
  147. }
  148. }