|
|
@@ -193,14 +193,16 @@ func makeRowKey(emitterChain vaa.ChainID, emitterAddress vaa.Address, sequence u
|
|
|
// left-pad the sequence with zeros to 16 characters, because bigtable keys are stored lexicographically
|
|
|
return fmt.Sprintf("%d:%s:%016d", emitterChain, emitterAddress, sequence)
|
|
|
}
|
|
|
-func writePayloadToBigTable(ctx context.Context, rowKey string, colFam string, mutation *bigtable.Mutation) error {
|
|
|
-
|
|
|
- filter := bigtable.ChainFilters(
|
|
|
- bigtable.FamilyFilter(colFam),
|
|
|
- bigtable.ColumnFilter("PayloadId"))
|
|
|
- conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
|
|
|
+func writePayloadToBigTable(ctx context.Context, rowKey string, colFam string, mutation *bigtable.Mutation, forceWrite bool) error {
|
|
|
+ mut := mutation
|
|
|
+ if !forceWrite {
|
|
|
+ filter := bigtable.ChainFilters(
|
|
|
+ bigtable.FamilyFilter(colFam),
|
|
|
+ bigtable.ColumnFilter("PayloadId"))
|
|
|
+ mut = bigtable.NewCondMutation(filter, nil, mutation)
|
|
|
+ }
|
|
|
|
|
|
- err := tbl.Apply(ctx, rowKey, conditionalMutation)
|
|
|
+ err := tbl.Apply(ctx, rowKey, mut)
|
|
|
if err != nil {
|
|
|
log.Printf("Failed to write payload for %v to BigTable. err: %v", rowKey, err)
|
|
|
return err
|
|
|
@@ -217,6 +219,19 @@ func TrimUnicodeFromByteArray(b []byte) []byte {
|
|
|
return bytes.Trim(b, null+start+ack+tab+control)
|
|
|
}
|
|
|
|
|
|
+func addReceiverAddressToMutation(mut *bigtable.Mutation, ts bigtable.Timestamp, chainID uint16, hexAddress string) {
|
|
|
+ nativeAddress := transformHexAddressToNative(vaa.ChainID(chainID), hexAddress)
|
|
|
+ if vaa.ChainID(chainID) == vaa.ChainIDSolana {
|
|
|
+ ownerAddress := fetchSolanaAccountOwner(nativeAddress)
|
|
|
+ if ownerAddress == "" {
|
|
|
+ // exit with a failure code so the pubsub message is retried.
|
|
|
+ log.Fatalf("failed to find owner address for Solana account.")
|
|
|
+ }
|
|
|
+ nativeAddress = ownerAddress
|
|
|
+ }
|
|
|
+ mut.Set(columnFamilies[6], "ReceiverAddress", ts, []byte(nativeAddress))
|
|
|
+}
|
|
|
+
|
|
|
// ProcessVAA is triggered by a PubSub message, emitted after row is saved to BigTable by guardiand
|
|
|
func ProcessVAA(ctx context.Context, m PubSubMessage) error {
|
|
|
data := string(m.Data)
|
|
|
@@ -249,6 +264,8 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
|
|
|
log.Println("failed decoding payload for row ", rowKey)
|
|
|
return decodeErr
|
|
|
}
|
|
|
+ log.Printf("Processing Transfer: Amount %v\n", fmt.Sprint(payload.Amount[3]))
|
|
|
+
|
|
|
// save payload to bigtable, then publish a new PubSub message for further processing
|
|
|
colFam := columnFamilies[2]
|
|
|
mutation := bigtable.NewMutation()
|
|
|
@@ -260,13 +277,16 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
|
|
|
log.Printf("payload.Amount is larger than uint64 for row %v", rowKey)
|
|
|
amount = payload.Amount.Bytes()
|
|
|
}
|
|
|
+ targetAddressHex := hex.EncodeToString(payload.TargetAddress[:])
|
|
|
mutation.Set(colFam, "Amount", ts, amount)
|
|
|
mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:])))
|
|
|
mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain)))
|
|
|
- mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:])))
|
|
|
+ mutation.Set(colFam, "TargetAddress", ts, []byte(targetAddressHex))
|
|
|
mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain)))
|
|
|
|
|
|
- writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
|
|
|
+ addReceiverAddressToMutation(mutation, ts, payload.TargetChain, targetAddressHex)
|
|
|
+
|
|
|
+ writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation, false)
|
|
|
if writeErr != nil {
|
|
|
return writeErr
|
|
|
}
|
|
|
@@ -299,6 +319,8 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
|
|
|
name = foundName
|
|
|
}
|
|
|
|
|
|
+ log.Printf("Processing AssetMeta: Name %v, Symbol %v, coingeckoId %v\n", name, symbol, coinGeckoCoinId)
|
|
|
+
|
|
|
// save payload to bigtable
|
|
|
colFam := columnFamilies[3]
|
|
|
mutation := bigtable.NewMutation()
|
|
|
@@ -313,7 +335,7 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
|
|
|
mutation.Set(colFam, "CoinGeckoCoinId", ts, []byte(coinGeckoCoinId))
|
|
|
mutation.Set(colFam, "NativeAddress", ts, []byte(nativeAddress))
|
|
|
|
|
|
- writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
|
|
|
+ writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation, false)
|
|
|
return writeErr
|
|
|
} else {
|
|
|
// unknown payload type
|
|
|
@@ -328,11 +350,14 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
|
|
|
log.Println("failed decoding payload for row ", rowKey)
|
|
|
return decodeErr
|
|
|
}
|
|
|
+ log.Printf("Processing NTF: Name %v, Symbol %v\n", string(TrimUnicodeFromByteArray(payload.Name[:])), string(TrimUnicodeFromByteArray(payload.Symbol[:])))
|
|
|
+
|
|
|
// save payload to bigtable
|
|
|
colFam := columnFamilies[4]
|
|
|
mutation := bigtable.NewMutation()
|
|
|
ts := bigtable.Now()
|
|
|
|
|
|
+ targetAddressHex := hex.EncodeToString(payload.TargetAddress[:])
|
|
|
mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId)))
|
|
|
mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:])))
|
|
|
mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain)))
|
|
|
@@ -340,10 +365,12 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
|
|
|
mutation.Set(colFam, "Name", ts, TrimUnicodeFromByteArray(payload.Name[:]))
|
|
|
mutation.Set(colFam, "TokenId", ts, payload.TokenId.Bytes())
|
|
|
mutation.Set(colFam, "URI", ts, TrimUnicodeFromByteArray(payload.URI))
|
|
|
- mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:])))
|
|
|
+ mutation.Set(colFam, "TargetAddress", ts, []byte(targetAddressHex))
|
|
|
mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain)))
|
|
|
|
|
|
- writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
|
|
|
+ addReceiverAddressToMutation(mutation, ts, payload.TargetChain, targetAddressHex)
|
|
|
+
|
|
|
+ writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation, false)
|
|
|
return writeErr
|
|
|
} else {
|
|
|
// unknown payload type
|