ソースを参照

BigTable: PubSub trigger after decoding payload

Change-Id: Ie46903be7d41c27778babbeb864f3e4adf001825

commit-id:cee21010
justinschuldt 4 年 前
コミット
8931153f7e

+ 6 - 2
devnet/bigtable.yaml

@@ -158,10 +158,14 @@ spec:
               value: local-dev
             - name: BIGTABLE_INSTANCE
               value: wormhole
-            - name: PUBSUB_TOPIC
+            - name: PUBSUB_NEW_VAA_TOPIC
               value: new-vaa-devnet
-            - name: PUBSUB_SUBSCRIPTION
+            - name: PUBSUB_NEW_VAA_SUBSCRIPTION
               value: extract-payload-devnet
+            - name: PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC
+              value: create-token-transfer-details-devnet
+            - name: PUBSUB_TOKEN_TRANSFER_DETAILS_SUBSCRIPTION
+              value: calculate-transfer-data-devnet
           ports:
             - containerPort: 8080
               name: functions

+ 8 - 3
event_database/cloud_functions/.vscode/launch.json

@@ -11,9 +11,14 @@
             "mode": "auto",
             "program": "${workspaceFolder}/cmd/main.go",
             "env": {
-                "GCP_PROJECT": "wormhole-315720",
-                "BIGTABLE_INSTANCE": "wormhole-mainnet",
-                "GOOGLE_APPLICATION_CREDENTIALS": "./bigtable-admin.json"
+                "GCP_PROJECT": "local-dev",
+                "BIGTABLE_INSTANCE": "wormhole",
+                "BIGTABLE_EMULATOR_HOST": "localhost:8086",
+                "PUBSUB_EMULATOR_HOST": "localhost:8085",
+                "PUBSUB_NEW_VAA_TOPIC": "new-vaa-devnet",
+                "PUBSUB_NEW_VAA_SUBSCRIPTION": "extract-payload-devnet",
+                "PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC": "create-token-transfer-details-devnet",
+                "PUBSUB_TOKEN_TRANSFER_DETAILS_SUBSCRIPTION": "calculate-transfer-data-devnet",
             },
         },
     ]

+ 52 - 36
event_database/cloud_functions/cmd/main.go

@@ -12,6 +12,39 @@ import (
 	p "github.com/certusone/wormhole/event_database/cloud_functions"
 )
 
+func createAndSubscribe(client *pubsub.Client, topicName, subscriptionName string, handler func(ctx context.Context, m p.PubSubMessage) error) {
+	var topic *pubsub.Topic
+	var topicErr error
+	ctx := context.Background()
+	topic, topicErr = client.CreateTopic(ctx, topicName)
+	if topicErr != nil {
+		log.Printf("pubsub.CreateTopic err: %v", topicErr)
+		// already exists
+		topic = client.Topic(topicName)
+	} else {
+		log.Println("created topic:", topicName)
+	}
+
+	subConf := pubsub.SubscriptionConfig{Topic: topic}
+	_, subErr := client.CreateSubscription(ctx, subscriptionName, subConf)
+	if subErr != nil {
+		log.Printf("pubsub.CreateSubscription err: %v", subErr)
+	} else {
+		log.Println("created subscription:", subscriptionName)
+	}
+
+	sub := client.Subscription(subscriptionName)
+
+	err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
+		msg.Ack()
+		handler(ctx, p.PubSubMessage{Data: msg.Data})
+
+	})
+	if err != nil {
+		fmt.Println(fmt.Errorf("receive err: %v", err))
+	}
+}
+
 func main() {
 	var wg sync.WaitGroup
 
@@ -34,51 +67,34 @@ func main() {
 	}()
 
 	// pubsub functions
+	pubsubCtx := context.Background()
+	gcpProject := os.Getenv("GCP_PROJECT")
+
+	pubsubClient, err := pubsub.NewClient(pubsubCtx, gcpProject)
+	if err != nil {
+		fmt.Println(fmt.Errorf("pubsub.NewClient err: %v", err))
+	}
+
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
-		pubsubCtx := context.Background()
-		gcpProject := os.Getenv("GCP_PROJECT")
 
-		client, err := pubsub.NewClient(pubsubCtx, gcpProject)
-		if err != nil {
-			fmt.Println(fmt.Errorf("pubsub.NewClient err: %v", err))
-		}
-		defer client.Close()
-
-		pubsubTopic := os.Getenv("PUBSUB_TOPIC")
-		pubsubSubscription := os.Getenv("PUBSUB_SUBSCRIPTION")
-		var topic *pubsub.Topic
-		var topicErr error
-
-		topic, topicErr = client.CreateTopic(pubsubCtx, pubsubTopic)
-		if topicErr != nil {
-			log.Printf("pubsub.CreateTopic err: %v", topicErr)
-			// already exists
-			topic = client.Topic(pubsubTopic)
-		} else {
-			log.Println("created topic:", pubsubTopic)
-		}
+		pubsubTopic := os.Getenv("PUBSUB_NEW_VAA_TOPIC")
+		pubsubSubscription := os.Getenv("PUBSUB_NEW_VAA_SUBSCRIPTION")
 
-		subConf := pubsub.SubscriptionConfig{Topic: topic}
-		_, subErr := client.CreateSubscription(pubsubCtx, pubsubSubscription, subConf)
-		if subErr != nil {
-			log.Printf("pubsub.CreateSubscription err: %v", subErr)
-		} else {
-			log.Println("created subscription:", pubsubSubscription)
-		}
+		createAndSubscribe(pubsubClient, pubsubTopic, pubsubSubscription, p.ProcessVAA)
+	}()
 
-		sub := client.Subscription(pubsubSubscription)
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
 
-		err = sub.Receive(pubsubCtx, func(ctx context.Context, msg *pubsub.Message) {
-			msg.Ack()
-			p.ProcessVAA(ctx, p.PubSubMessage{Data: msg.Data})
+		pubsubTopic := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC")
+		pubsubSubscription := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_SUBSCRIPTION")
 
-		})
-		if err != nil {
-			fmt.Println(fmt.Errorf("receive err: %v", err))
-		}
+		createAndSubscribe(pubsubClient, pubsubTopic, pubsubSubscription, p.ProcessTransfer)
 	}()
 
 	wg.Wait()
+	pubsubClient.Close()
 }

+ 29 - 0
event_database/cloud_functions/process-transfer.go

@@ -0,0 +1,29 @@
+package p
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"log"
+
+	"github.com/certusone/wormhole/node/pkg/vaa"
+)
+
+// ProcessTransfer is triggered by a PubSub message, once a TokenTransferPayload is written to a row.
+func ProcessTransfer(ctx context.Context, m PubSubMessage) error {
+	data := string(m.Data)
+	if data == "" {
+		return fmt.Errorf("no data to process in message")
+	}
+
+	log.Printf("ProcessTransfer got message!")
+	signedVaa, err := vaa.Unmarshal(m.Data)
+	if err != nil {
+		log.Println("failed Unmarshaling VAA")
+		return err
+	}
+	jsonVaa, _ := json.MarshalIndent(signedVaa, "", "  ")
+	log.Printf("ProcessTransfer Unmarshaled VAA: %q\n", string(jsonVaa))
+
+	return nil
+}

+ 9 - 2
event_database/cloud_functions/process-vaa.go

@@ -9,6 +9,7 @@ import (
 	"log"
 
 	"cloud.google.com/go/bigtable"
+	"cloud.google.com/go/pubsub"
 	"github.com/certusone/wormhole/node/pkg/vaa"
 	"github.com/holiman/uint256"
 )
@@ -232,7 +233,7 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
 				log.Println("failed decoding payload for row ", rowKey)
 				return decodeErr
 			}
-			// save payload to bigtable
+			// save payload to bigtable, then publish a new PubSub message for further processing
 			colFam := columnFamilies[2]
 			mutation := bigtable.NewMutation()
 			ts := bigtable.Now()
@@ -245,7 +246,13 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
 			mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain)))
 
 			writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
-			return writeErr
+			if writeErr != nil {
+				return writeErr
+			}
+
+			// now that the payload is saved to BigTable,
+			// pass along the message to the topic that will calculate TokenTransferDetails
+			pubSubTokenTransferDetailsTopic.Publish(ctx, &pubsub.Message{Data: m.Data})
 		} else if payloadId == 2 {
 			// asset meta
 			payload, decodeErr := DecodeAssetMeta(signedVaa.Payload)

+ 15 - 0
event_database/cloud_functions/shared.go

@@ -9,6 +9,7 @@ import (
 	"sync"
 
 	"cloud.google.com/go/bigtable"
+	"cloud.google.com/go/pubsub"
 	"github.com/certusone/wormhole/node/pkg/vaa"
 )
 
@@ -20,6 +21,9 @@ var client *bigtable.Client
 var clientOnce sync.Once
 var tbl *bigtable.Table
 
+var pubsubClient *pubsub.Client
+var pubSubTokenTransferDetailsTopic *pubsub.Topic
+
 // init runs during cloud function initialization. So, this will only run during an
 // an instance's cold start.
 // https://cloud.google.com/functions/docs/bestpractices/networking#accessing_google_apis
@@ -36,8 +40,19 @@ func init() {
 
 			return
 		}
+
+		var pubsubErr error
+		pubsubClient, pubsubErr = pubsub.NewClient(context.Background(), project)
+		if pubsubErr != nil {
+			log.Printf("pubsub.NewClient error: %v", pubsubErr)
+			return
+		}
 	})
 	tbl = client.Open("v2Events")
+
+	// create the topic that will be published to after decoding token transfer payloads
+	tokenTransferDetailsTopic := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC")
+	pubSubTokenTransferDetailsTopic = pubsubClient.Topic(tokenTransferDetailsTopic)
 }
 
 var columnFamilies = []string{