Bladeren bron

bigtable: save decoded payload to row

Change-Id: I94938a501060e865b6f12ac74e0c20f8c0bca441

commit-id:ed8d235f
justinschuldt 4 jaren geleden
bovenliggende
commit
4b397e79b4

+ 2 - 1
event_database/cloud_functions/go.mod

@@ -12,5 +12,6 @@ require (
 require (
 	cloud.google.com/go/pubsub v1.3.1 // indirect
 	github.com/GoogleCloudPlatform/functions-framework-go v1.3.0
-	github.com/certusone/wormhole/node v0.0.0-20211102011245-d412cb8a936a
+	github.com/certusone/wormhole/node v0.0.0-20211109211005-7ad15fbfc427
+	github.com/holiman/uint256 v1.2.0 // indirect
 )

+ 5 - 0
event_database/cloud_functions/go.sum

@@ -165,13 +165,17 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7
 github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
 github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
 github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
+github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
 github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
+github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
 github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/certusone/wormhole/node v0.0.0-20211027001206-19628733285e h1:BQJb2Taq7MMbh+o4AVOiNmFxgNtfMbx8nGJGA3tjiek=
 github.com/certusone/wormhole/node v0.0.0-20211027001206-19628733285e/go.mod h1:YncgdSOYam7ELyXFo7PFCj6tUo0pe6cjlj+O3Vt28mo=
 github.com/certusone/wormhole/node v0.0.0-20211102011245-d412cb8a936a h1:XR4jqFpH5MhKlYjWPHnd+agQaxKs7kynj5vKc09A18Y=
 github.com/certusone/wormhole/node v0.0.0-20211102011245-d412cb8a936a/go.mod h1:YncgdSOYam7ELyXFo7PFCj6tUo0pe6cjlj+O3Vt28mo=
+github.com/certusone/wormhole/node v0.0.0-20211109211005-7ad15fbfc427 h1:3h2ilCx/YiHFSVL5J/p0hpQGcxA0A27bRlqtN6gXrgQ=
+github.com/certusone/wormhole/node v0.0.0-20211109211005-7ad15fbfc427/go.mod h1:YncgdSOYam7ELyXFo7PFCj6tUo0pe6cjlj+O3Vt28mo=
 github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -476,6 +480,7 @@ github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0m
 github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
 github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
 github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
+github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM=
 github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=

+ 293 - 14
event_database/cloud_functions/process-vaa.go

@@ -1,37 +1,316 @@
 package p
 
 import (
+	"bytes"
 	"context"
-	"encoding/json"
+	"encoding/binary"
+	"encoding/hex"
 	"fmt"
 	"log"
 
+	"cloud.google.com/go/bigtable"
 	"github.com/certusone/wormhole/node/pkg/vaa"
+	"github.com/holiman/uint256"
 )
 
 type PubSubMessage struct {
 	Data []byte `json:"data"`
 }
 
-// ProcessVAA is triggered by PubSub messages
+// The keys are emitterAddress hex values, so that we can quickly check a message against the index to see if it
+// meets the criteria for saving payload info: if it is a token transfer, or an NFT transfer.
+var NFTEmitters = map[string]string{
+	// mainnet
+	"0def15a24423e1edd1a5ab16f557b9060303ddbab8c803d2ee48f4b78a1cfd6b": "WnFt12ZrnzZrFZkt2xsNsaNWoQribnuQ5B5FrDbwDhD", // solana
+	"0000000000000000000000006ffd7ede62328b3af38fcd61461bbfc52f5651fe": "0x6FFd7EdE62328b3Af38FCD61461Bbfc52F5651fE",  // ethereum
+	"0000000000000000000000005a58505a96d1dbf8df91cb21b54419fc36e93fde": "0x5a58505a96D1dbf8dF91cB21B54419FC36e93fdE",  // bsc
+	"00000000000000000000000090bbd86a6fe93d3bc3ed6335935447e75fab7fcf": "0x90bbd86a6fe93d3bc3ed6335935447e75fab7fcf",  // polygon
+
+	// devnet
+	"96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab": "NFTWqJR8YnRVqPDvTJrYuLrQDitTG5AScqbeghi4zSA", // solana
+	"00000000000000000000000026b4afb60d6c903165150c6f0aa14f8016be4aec": "0x26b4afb60d6c903165150c6f0aa14f8016be4aec",  // ethereum
+}
+var TokenTransferEmitters = map[string]string{
+	// mainnet
+	"ec7372995d5cc8732397fb0ad35c0121e0eaa90d26f828a534cab54391b3a4f5": "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb",  // solana
+	"0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585": "0x3ee18B2214AFF97000D974cf647E7C347E8fa585",   // ethereum
+	"0000000000000000000000007cf7b764e38a0a5e967972c1df77d432510564e2": "terra10nmmwe8r3g99a9newtqa7a75xfgs2e8z87r2sf", // terra
+	"000000000000000000000000b6f6d86a8f9879a9c87f643768d9efc38c1da6e7": "0xB6F6D86a8f9879A9c87f643768d9efc38c1Da6E7",   // bsc
+	"0000000000000000000000005a58505a96d1dbf8df91cb21b54419fc36e93fde": "0x5a58505a96d1dbf8df91cb21b54419fc36e93fde",   // polygon
+
+	// devnet
+	"c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f": "B6RHG3mfcckmrYN1UhmJzyS1XX3fZKbkeUcpJe9Sy3FE", // solana
+	"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": "0x0290fb167208af455bb137780163b7b7a9a10c16",   // EVM chains
+	"000000000000000000000000784999135aaa8a3ca5914468852fdddbddd8789d": "terra10pyejy66429refv3g35g2t7am0was7ya7kz2a4", // terra
+}
+
+type (
+	TokenTransfer struct {
+		PayloadId     uint8
+		Amount        uint256.Int
+		OriginAddress [32]byte
+		OriginChain   uint16
+		TargetAddress [32]byte
+		TargetChain   uint16
+	}
+	NFTTransfer struct {
+		PayloadId     uint8
+		OriginAddress [32]byte
+		OriginChain   uint16
+		Symbol        [32]byte
+		Name          [32]byte
+		TokenId       uint256.Int
+		URI           []byte
+		TargetAddress [32]byte
+		TargetChain   uint16
+	}
+	AssetMeta struct {
+		PayloadId    uint8
+		TokenAddress [32]byte
+		TokenChain   uint16
+		Decimals     uint8
+		Symbol       [32]byte
+		Name         [32]byte
+	}
+)
+
+func DecodeTokenTransfer(data []byte) (*TokenTransfer, error) {
+	tt := &TokenTransfer{}
+	tt.PayloadId = data[0]
+
+	reader := bytes.NewReader(data[1:])
+
+	if err := binary.Read(reader, binary.BigEndian, &tt.Amount); err != nil {
+		return nil, fmt.Errorf("failed to read Amount: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &tt.OriginAddress); err != nil {
+		return nil, fmt.Errorf("failed to read OriginAddress: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &tt.OriginChain); err != nil {
+		return nil, fmt.Errorf("failed to read OriginChain: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &tt.TargetAddress); err != nil {
+		return nil, fmt.Errorf("failed to read TargetAddress: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &tt.TargetChain); err != nil {
+		return nil, fmt.Errorf("failed to read TargetChain: %w", err)
+	}
+
+	return tt, nil
+}
+func DecodeNFTTransfer(data []byte) (*NFTTransfer, error) {
+	nt := &NFTTransfer{}
+	nt.PayloadId = data[0]
+
+	reader := bytes.NewReader(data[1:])
+
+	if err := binary.Read(reader, binary.BigEndian, &nt.OriginAddress); err != nil {
+		return nil, fmt.Errorf("failed to read OriginAddress: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &nt.OriginChain); err != nil {
+		return nil, fmt.Errorf("failed to read OriginChain: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &nt.Symbol); err != nil {
+		return nil, fmt.Errorf("failed to read Symbol: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &nt.Name); err != nil {
+		return nil, fmt.Errorf("failed to read Name: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &nt.TokenId); err != nil {
+		return nil, fmt.Errorf("failed to read TokenId: %w", err)
+	}
+
+	// uri len
+	uriLen, er := reader.ReadByte()
+	if er != nil {
+		return nil, fmt.Errorf("failed to read URI length")
+	}
+
+	// uri
+	uri := make([]byte, int(uriLen))
+	n, err := reader.Read(uri)
+	if err != nil || n == 0 {
+		return nil, fmt.Errorf("failed to read uri [%d]: %w", n, err)
+	}
+	nt.URI = uri[:n]
+
+	if err := binary.Read(reader, binary.BigEndian, &nt.TargetAddress); err != nil {
+		return nil, fmt.Errorf("failed to read : %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &nt.TargetChain); err != nil {
+		return nil, fmt.Errorf("failed to read : %w", err)
+	}
+
+	return nt, nil
+}
+
+func DecodeAssetMeta(data []byte) (*AssetMeta, error) {
+	am := &AssetMeta{}
+	am.PayloadId = data[0]
+
+	reader := bytes.NewReader(data[1:])
+
+	tokenAddress := [32]byte{}
+	if n, err := reader.Read(tokenAddress[:]); err != nil || n != 32 {
+		return nil, fmt.Errorf("failed to read TokenAddress [%d]: %w", n, err)
+	}
+	am.TokenAddress = tokenAddress
+
+	if err := binary.Read(reader, binary.BigEndian, &am.TokenChain); err != nil {
+		return nil, fmt.Errorf("failed to read TokenChain: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &am.Decimals); err != nil {
+		return nil, fmt.Errorf("failed to read Decimals: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &am.Symbol); err != nil {
+		return nil, fmt.Errorf("failed to read Symbol: %w", err)
+	}
+
+	if err := binary.Read(reader, binary.BigEndian, &am.Name); err != nil {
+		return nil, fmt.Errorf("failed to read Name: %w", err)
+	}
+
+	return am, nil
+}
+
+// TEMP: until this https://forge.certus.one/c/wormhole/+/1850 lands
+func makeRowKey(emitterChain vaa.ChainID, emitterAddress vaa.Address, sequence uint64) string {
+	// left-pad the sequence with zeros to 16 characters, because bigtable keys are stored lexicographically
+	return fmt.Sprintf("%d:%s:%016d", emitterChain, emitterAddress, sequence)
+}
+func writePayloadToBigTable(ctx context.Context, rowKey string, colFam string, mutation *bigtable.Mutation) error {
+
+	filter := bigtable.ChainFilters(
+		bigtable.FamilyFilter(colFam),
+		bigtable.ColumnFilter("PayloadId"))
+	conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
+
+	err := tbl.Apply(ctx, rowKey, conditionalMutation)
+	if err != nil {
+		log.Printf("Failed to write payload for %v to BigTable. err: %v", rowKey, err)
+		return err
+	}
+	return nil
+}
+
+// ProcessVAA is triggered by a PubSub message, emitted after row is saved to BigTable by guardiand
 func ProcessVAA(ctx context.Context, m PubSubMessage) error {
 	data := string(m.Data)
 	if data == "" {
-		log.Print("no data in message.")
+		return fmt.Errorf("no data to process in message")
+	}
 
-	} else {
-		log.Printf("ProcessVAA got message!")
-		signedVaa, err := vaa.Unmarshal(m.Data)
-		if err != nil {
-			fmt.Println("failed Unmarshaling VAA")
+	signedVaa, err := vaa.Unmarshal(m.Data)
+	if err != nil {
+		log.Println("failed Unmarshaling VAA")
+		return err
+	}
+
+	// create the bigtable identifier from the VAA data
+	rowKey := makeRowKey(signedVaa.EmitterChain, signedVaa.EmitterAddress, signedVaa.Sequence)
+	emitterHex := signedVaa.EmitterAddress.String()
+	payloadId := int(signedVaa.Payload[0])
+
+	if _, ok := TokenTransferEmitters[emitterHex]; ok {
+		// figure out if it's a transfer or asset metadata
+
+		if payloadId == 1 {
+			// token transfer
+			payload, decodeErr := DecodeTokenTransfer(signedVaa.Payload)
+			if decodeErr != nil {
+				log.Println("failed decoding payload for row ", rowKey)
+				return decodeErr
+			}
+			// save payload to bigtable
+			colFam := columnFamilies[2]
+			mutation := bigtable.NewMutation()
+			ts := bigtable.Now()
+			mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId)))
+			// TODO: find a better way of representing amount as a string
+			mutation.Set(colFam, "Amount", ts, []byte(fmt.Sprint(payload.Amount[3])))
+			mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:])))
+			mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain)))
+			mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:])))
+			mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain)))
+
+			writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
+			return writeErr
+		} else if payloadId == 2 {
+			// asset meta
+			payload, decodeErr := DecodeAssetMeta(signedVaa.Payload)
+			if decodeErr != nil {
+				log.Println("failed decoding payload for row ", rowKey)
+				return decodeErr
+			}
+
+			// save payload to bigtable
+			colFam := columnFamilies[3]
+			mutation := bigtable.NewMutation()
+			ts := bigtable.Now()
+
+			mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId)))
+			mutation.Set(colFam, "TokenAddress", ts, []byte(hex.EncodeToString(payload.TokenAddress[:])))
+			mutation.Set(colFam, "TokenChain", ts, []byte(fmt.Sprint(payload.TokenChain)))
+			mutation.Set(colFam, "Decimals", ts, []byte(fmt.Sprint(payload.Decimals)))
+			mutation.Set(colFam, "Name", ts, []byte(payload.Name[:]))
+			mutation.Set(colFam, "Symbol", ts, []byte(payload.Symbol[:]))
+			writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
+			if writeErr != nil {
+				log.Println("wrote TokenTransferPayload to bigtable!", rowKey)
+			}
+			return writeErr
+		} else {
+			// unknown payload type
+			log.Println("encountered unknown payload type for row ", rowKey)
+			return nil
 		}
-		jsonVaa, _ := json.MarshalIndent(signedVaa, "", "  ")
-		fmt.Printf("ProcessVAA Unmarshaled VAA: %q\n", string(jsonVaa))
+	} else if _, ok := NFTEmitters[emitterHex]; ok {
+		if payloadId == 1 {
+			// NFT transfer
+			payload, decodeErr := DecodeNFTTransfer(signedVaa.Payload)
+			if decodeErr != nil {
+				log.Println("failed decoding payload for row ", rowKey)
+				return decodeErr
+			}
+			// save payload to bigtable
+			colFam := columnFamilies[4]
+			mutation := bigtable.NewMutation()
+			ts := bigtable.Now()
+
+			mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId)))
+			mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:])))
+			mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain)))
+			mutation.Set(colFam, "Symbol", ts, []byte(payload.Symbol[:]))
+			mutation.Set(colFam, "Name", ts, []byte(payload.Name[:]))
+			// TODO: find a better way of representing tokenId as a string
+			mutation.Set(colFam, "TokenId", ts, []byte(fmt.Sprint(payload.TokenId[3])))
+			mutation.Set(colFam, "URI", ts, []byte(payload.URI))
+			mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:])))
+			mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain)))
 
-		// TODO:
-		// decode payload
-		// save payload to bigtable
-		// publish pubsub message for token transfer messages
+			writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
+			if writeErr == nil {
+				log.Println("wrote NFTTransferPayload to bigtable!", rowKey)
+			}
+			return writeErr
+		} else {
+			// unknown payload type
+			log.Println("encountered unknown payload type for row ", rowKey)
+			return nil
+		}
 	}
+
+	// this is not a payload we are ready to decode & save. return success
 	return nil
 }