| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package reporter
- import (
- "context"
- "fmt"
- "os"
- "github.com/certusone/wormhole/node/pkg/supervisor"
- "github.com/certusone/wormhole/node/pkg/vaa"
- "go.uber.org/zap"
- "google.golang.org/api/option"
- "cloud.google.com/go/bigtable"
- "cloud.google.com/go/pubsub"
- )
- type BigTableConnectionConfig struct {
- GcpProjectID string
- GcpInstanceName string
- GcpKeyFilePath string
- TableName string
- TopicName string
- }
- type bigTableWriter struct {
- connectionConfig *BigTableConnectionConfig
- events *AttestationEventReporter
- }
- // rowKey returns a string with the input vales delimited by colons.
- func MakeRowKey(emitterChain vaa.ChainID, emitterAddress vaa.Address, sequence uint64) string {
- // left-pad the sequence with zeros to 16 characters, because bigtable keys are stored lexicographically
- return fmt.Sprintf("%d:%s:%016d", emitterChain, emitterAddress, sequence)
- }
- func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTableConnectionConfig) func(ctx context.Context) error {
- return func(ctx context.Context) error {
- e := &bigTableWriter{events: events, connectionConfig: connectionConfig}
- hostname, err := os.Hostname()
- if err != nil {
- panic(err)
- }
- errC := make(chan error)
- logger := supervisor.Logger(ctx)
- client, err := bigtable.NewClient(ctx,
- e.connectionConfig.GcpProjectID,
- e.connectionConfig.GcpInstanceName,
- option.WithCredentialsFile(e.connectionConfig.GcpKeyFilePath))
- if err != nil {
- return fmt.Errorf("failed to create BigTable client: %w", err)
- }
- tbl := client.Open(e.connectionConfig.TableName)
- pubsubClient, err := pubsub.NewClient(ctx, e.connectionConfig.GcpProjectID)
- if err != nil {
- logger.Error("failed to create GCP PubSub client", zap.Error(err))
- return fmt.Errorf("failed to create GCP PubSub client: %w", err)
- }
- logger.Info("GCP PubSub.NewClient initialized")
- pubsubTopic := pubsubClient.Topic(e.connectionConfig.TopicName)
- logger.Info("GCP PubSub.Topic initialized",
- zap.String("Topic", e.connectionConfig.TopicName))
- // call to subscribe to event channels
- sub := e.events.Subscribe()
- logger.Info("subscribed to AttestationEvents")
- go func() {
- for {
- select {
- case <-ctx.Done():
- return
- case msg := <-sub.Channels.MessagePublicationC:
- colFam := "MessagePublication"
- mutation := bigtable.NewMutation()
- ts := bigtable.Now()
- mutation.Set(colFam, "Version", ts, []byte(fmt.Sprint(msg.VAA.Version)))
- mutation.Set(colFam, "GuardianSetIndex", ts, []byte(fmt.Sprint(msg.VAA.GuardianSetIndex)))
- mutation.Set(colFam, "Timestamp", ts, []byte(ts.Time().String()))
- mutation.Set(colFam, "Nonce", ts, []byte(fmt.Sprint(msg.VAA.Nonce)))
- mutation.Set(colFam, "EmitterChain", ts, []byte(msg.VAA.EmitterChain.String()))
- mutation.Set(colFam, "EmitterAddress", ts, []byte(msg.VAA.EmitterAddress.String()))
- mutation.Set(colFam, "Sequence", ts, []byte(fmt.Sprint(msg.VAA.Sequence)))
- mutation.Set(colFam, "InitiatingTxID", ts, []byte(msg.InitiatingTxID.Hex()))
- mutation.Set(colFam, "Payload", ts, msg.VAA.Payload)
- mutation.Set(colFam, "ReporterHostname", ts, []byte(hostname))
- // filter to see if there is a row with this rowKey, and has a value for EmitterAddress
- filter := bigtable.ChainFilters(
- bigtable.FamilyFilter(colFam),
- bigtable.ColumnFilter("EmitterAddress"))
- conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
- rowKey := MakeRowKey(msg.VAA.EmitterChain, msg.VAA.EmitterAddress, msg.VAA.Sequence)
- err := tbl.Apply(ctx, rowKey, conditionalMutation)
- if err != nil {
- logger.Error("Failed to write message publication to BigTable",
- zap.String("rowKey", rowKey),
- zap.String("columnFamily", colFam),
- zap.Error(err))
- errC <- err
- }
- case msg := <-sub.Channels.VAAQuorumC:
- colFam := "QuorumState"
- mutation := bigtable.NewMutation()
- ts := bigtable.Now()
- b, marshalErr := msg.Marshal()
- if marshalErr != nil {
- logger.Error("failed to marshal VAAQuorum VAA.")
- }
- mutation.Set(colFam, "SignedVAA", ts, b)
- // filter to see if this row already has the signature.
- filter := bigtable.ChainFilters(
- bigtable.FamilyFilter(colFam),
- bigtable.ColumnFilter("SignedVAA"))
- conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
- rowKey := MakeRowKey(msg.EmitterChain, msg.EmitterAddress, msg.Sequence)
- err := tbl.Apply(ctx, rowKey, conditionalMutation)
- if err != nil {
- logger.Error("Failed to write persistence info to BigTable",
- zap.String("rowKey", rowKey),
- zap.String("columnFamily", colFam),
- zap.Error(err))
- errC <- err
- }
- publishResult := pubsubTopic.Publish(ctx, &pubsub.Message{
- Data: []byte(b),
- })
- if _, err = publishResult.Get(ctx); err != nil {
- logger.Error("Failed getting GCP PubSub publish reciept",
- zap.String("rowKey", rowKey),
- zap.Error(err))
- }
- }
- }
- }()
- select {
- case <-ctx.Done():
- e.events.Unsubscribe(sub.ClientId)
- if err = client.Close(); err != nil {
- logger.Error("Could not close BigTable client", zap.Error(err))
- }
- if pubsubErr := pubsubClient.Close(); pubsubErr != nil {
- logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr))
- }
- return ctx.Err()
- case err := <-errC:
- logger.Error("bigtablewriter encountered an error", zap.Error(err))
- e.events.Unsubscribe(sub.ClientId)
- // try to close the connection before returning
- if closeErr := client.Close(); closeErr != nil {
- logger.Error("Could not close BigTable client", zap.Error(closeErr))
- }
- if pubsubErr := pubsubClient.Close(); pubsubErr != nil {
- logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr))
- }
- return err
- }
- }
- }
|