Przeglądaj źródła

BigTable: misc code cleanup and log improvements

Change-Id: Ibd601c358fbee3ed0a1a74673158cb861b631660

commit-id:533a8b35
justinschuldt 4 lat temu
rodzic
commit
6583771813

+ 9 - 0
event_database/cloud_functions/.gcloudignore

@@ -0,0 +1,9 @@
+
+.git
+.gitignore
+
+cmd
+
+*.md
+
+*.txt

+ 2 - 0
event_database/cloud_functions/.gitignore

@@ -1,3 +1,5 @@
 vendor
 *.json
 *.log
+
+*.txt

+ 2 - 2
event_database/cloud_functions/external-data.go

@@ -233,10 +233,10 @@ func fetchCoinGeckoPrice(coinId string, timestamp time.Time) (float64, error) {
 			priceIndex = numPrices / 2
 		}
 		price := parsed.Prices[priceIndex][1]
-		fmt.Printf("found a price for %v! %v\n", coinId, price)
+		log.Printf("found a price of $%f for %v!\n", price, coinId)
 		return price, nil
 	}
-	fmt.Println("no price found in coinGecko for", coinId)
+	log.Println("no price found in coinGecko for", coinId)
 	return 0, fmt.Errorf("no price found for %v", coinId)
 }
 

+ 33 - 24
event_database/cloud_functions/notional-transferred-to.go

@@ -9,7 +9,6 @@ import (
 	"fmt"
 	"io"
 	"log"
-	"math"
 	"net/http"
 	"strconv"
 	"sync"
@@ -40,10 +39,6 @@ type TransferData struct {
 	Notional         float64
 }
 
-func roundToTwoDecimalPlaces(num float64) float64 {
-	return math.Round(num*100) / 100
-}
-
 func fetchAmountRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]TransferData, error) {
 	rows := []TransferData{}
 	err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
@@ -119,10 +114,7 @@ func createAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix st
 	intervalsWG.Add(numPrevDays + 1)
 
 	// create the unique identifier for this query, for cache
-	cachePrefix := prefix
-	if prefix == "" {
-		cachePrefix = "*"
-	}
+	cachePrefix := createCachePrefix(prefix)
 
 	for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
 		go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
@@ -146,28 +138,28 @@ func createAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix st
 
 			mu.Lock()
 			// initialize the map for this date in the result set
-			results[dateStr] = map[string]map[string]float64{}
+			results[dateStr] = map[string]map[string]float64{"*": {"*": 0}}
 			// check to see if there is cache data for this date/query
-			if dateCache, ok := warmAmountsCache[dateStr]; ok {
-				// have a cache for this date
+			if dates, ok := warmAmountsCache[cachePrefix]; ok {
+				// have a cache for this query
 
-				if val, ok := dateCache[cachePrefix]; ok {
-					// have a cache for this query
+				if dateCache, ok := dates[dateStr]; ok {
+					// have a cache for this date
 					if daysAgo >= 1 {
 						// only use the cache for yesterday and older
-						results[dateStr] = val
+						results[dateStr] = dateCache
 						mu.Unlock()
 						intervalsWG.Done()
 						return
 					}
 				} else {
 					// no cache for this query
-					warmAmountsCache[dateStr][cachePrefix] = map[string]map[string]float64{}
+					warmAmountsCache[cachePrefix][dateStr] = map[string]map[string]float64{}
 				}
 			} else {
 				// no cache for this date, initialize the map
-				warmAmountsCache[dateStr] = map[string]map[string]map[string]float64{}
-				warmAmountsCache[dateStr][cachePrefix] = map[string]map[string]float64{}
+				warmAmountsCache[cachePrefix] = map[string]map[string]map[string]float64{}
+				warmAmountsCache[cachePrefix][dateStr] = map[string]map[string]float64{}
 			}
 			mu.Unlock()
 
@@ -187,32 +179,47 @@ func createAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix st
 				if _, ok := results[dateStr][row.DestinationChain]; !ok {
 					results[dateStr][row.DestinationChain] = map[string]float64{"*": 0}
 				}
-				// add to the total count
+				// add to the total count for the dest chain
 				results[dateStr][row.DestinationChain]["*"] = results[dateStr][row.DestinationChain]["*"] + row.Notional
+				// add to total for the day
+				results[dateStr]["*"]["*"] = results[dateStr]["*"]["*"] + row.Notional
+				// add to the symbol's daily total
+				results[dateStr]["*"][row.TokenSymbol] = results[dateStr]["*"][row.TokenSymbol] + row.Notional
 				// add to the count for chain/symbol
 				results[dateStr][row.DestinationChain][row.TokenSymbol] = results[dateStr][row.DestinationChain][row.TokenSymbol] + row.Notional
 
 			}
 			// set the result in the cache
-			warmAmountsCache[dateStr][cachePrefix] = results[dateStr]
+			warmAmountsCache[cachePrefix][dateStr] = results[dateStr]
 		}(tbl, ctx, prefix, daysAgo)
 	}
 
 	intervalsWG.Wait()
 
 	// create a set of all the keys from all dates/chains/symbols, to ensure the result objects all have the same keys
-	seenKeySet := map[string]bool{}
+	seenSymbolSet := map[string]bool{}
+	seenChainSet := map[string]bool{}
 	for date, tokens := range results {
-		for leaving, _ := range tokens {
+		for leaving := range tokens {
+			seenChainSet[leaving] = true
 			for key := range results[date][leaving] {
-				seenKeySet[key] = true
+				seenSymbolSet[key] = true
 			}
 		}
 	}
 	// ensure each chain object has all the same symbol keys:
 	for date := range results {
 		for leaving := range results[date] {
-			for token := range seenKeySet {
+			// loop through seen chains
+			for chain := range seenChainSet {
+				// check that date has all the chains
+				if _, ok := results[date][chain]; !ok {
+					results[date][chain] = map[string]float64{"*": 0}
+				}
+			}
+			// loop through seen symbols
+			for token := range seenSymbolSet {
+				// check that the chain has all the symbols
 				if _, ok := results[date][leaving][token]; !ok {
 					// add the missing key to the map
 					results[date][leaving][token] = 0
@@ -246,6 +253,8 @@ func amountsForInterval(tbl *bigtable.Table, ctx context.Context, prefix string,
 		}
 		// add to total amount
 		result[row.DestinationChain]["*"] = result[row.DestinationChain]["*"] + row.Notional
+		// add to total per symbol
+		result["*"][row.TokenSymbol] = result["*"][row.TokenSymbol] + row.Notional
 		// add to symbol amount
 		result[row.DestinationChain][row.TokenSymbol] = result[row.DestinationChain][row.TokenSymbol] + row.Notional
 	}

+ 59 - 38
event_database/cloud_functions/notional-transferred.go

@@ -105,10 +105,7 @@ func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
 	intervalsWG.Add(numPrevDays + 1)
 
 	// create the unique identifier for this query, for cache
-	cachePrefix := prefix
-	if prefix == "" {
-		cachePrefix = "*"
-	}
+	cachePrefix := createCachePrefix(prefix)
 
 	for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
 		go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
@@ -134,26 +131,27 @@ func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
 			// initialize the map for this date in the result set
 			results[dateStr] = map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
 			// check to see if there is cache data for this date/query
-			if dateCache, ok := warmTransfersCache[dateStr]; ok {
-				// have a cache for this date
+			if dates, ok := warmTransfersCache[cachePrefix]; ok {
+				// have a cache for this query
+
+				if dateCache, ok := dates[dateStr]; ok {
+					// have a cache for this date
 
-				if val, ok := dateCache[cachePrefix]; ok {
-					// have a cache for this query
 					if daysAgo >= 1 {
 						// only use the cache for yesterday and older
-						results[dateStr] = val
+						results[dateStr] = dateCache
 						mu.Unlock()
 						intervalsWG.Done()
 						return
 					}
 				} else {
 					// no cache for this query
-					warmTransfersCache[dateStr][cachePrefix] = map[string]map[string]map[string]float64{}
+					warmTransfersCache[cachePrefix][dateStr] = map[string]map[string]map[string]float64{}
 				}
 			} else {
 				// no cache for this date, initialize the map
-				warmTransfersCache[dateStr] = map[string]map[string]map[string]map[string]float64{}
-				warmTransfersCache[dateStr][cachePrefix] = map[string]map[string]map[string]float64{}
+				warmTransfersCache[cachePrefix] = map[string]map[string]map[string]map[string]float64{}
+				warmTransfersCache[cachePrefix][dateStr] = map[string]map[string]map[string]float64{}
 			}
 			mu.Unlock()
 
@@ -185,37 +183,45 @@ func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
 
 			}
 			// set the result in the cache
-			warmTransfersCache[dateStr][cachePrefix] = results[dateStr]
+			warmTransfersCache[cachePrefix][dateStr] = results[dateStr]
 		}(tbl, ctx, prefix, daysAgo)
 	}
 
 	intervalsWG.Wait()
 
-	// not sure if having consistent keys is helpful or not, commenting out until this endpoint is consumed by GUIs
-	// // create a set of all the keys from all dates/chains/symbols, to ensure the result objects all have the same keys
-	// seenKeySet := map[string]bool{}
-	// for date, tokens := range results {
-	// 	for leaving, dests := range tokens {
-	// 		for dest := range dests {
-	// 			for key := range results[date][leaving][dest] {
-	// 				seenKeySet[key] = true
-	// 			}
-	// 		}
-	// 	}
-	// }
-	// // ensure each chain object has all the same symbol keys:
-	// for date := range results {
-	// 	for leaving := range results[date] {
-	// 		for dest := range results[date][leaving] {
-	// 			for token := range seenKeySet {
-	// 				if _, ok := results[date][leaving][token]; !ok {
-	// 					// add the missing key to the map
-	// 					results[date][leaving][dest][token] = 0
-	// 				}
-	// 			}
-	// 		}
-	// 	}
-	// }
+	// having consistent keys in each object is helpful for clients, explorer GUI
+	// create a set of all the keys from all dates/chains/symbols, to ensure the result objects all have the same keys
+	seenSymbolSet := map[string]bool{}
+	seenChainSet := map[string]bool{}
+	for date, tokens := range results {
+		for leaving, dests := range tokens {
+			seenChainSet[leaving] = true
+			for dest := range dests {
+				for key := range results[date][leaving][dest] {
+					seenSymbolSet[key] = true
+				}
+			}
+		}
+	}
+	// ensure each chain object has all the same symbol keys:
+	for date := range results {
+		for leaving := range results[date] {
+			for dest := range results[date][leaving] {
+				for chain := range seenChainSet {
+					// check that date has all the chains
+					if _, ok := results[date][leaving][chain]; !ok {
+						results[date][leaving][chain] = map[string]float64{"*": 0}
+					}
+				}
+				for token := range seenSymbolSet {
+					if _, ok := results[date][leaving][token]; !ok {
+						// add the missing key to the map
+						results[date][leaving][dest][token] = 0
+					}
+				}
+			}
+		}
+	}
 
 	return results, nil
 }
@@ -249,6 +255,21 @@ func transfersForInterval(tbl *bigtable.Table, ctx context.Context, prefix strin
 		// add to symbol amount
 		result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] = result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] + row.Notional
 	}
+
+	// create a set of all the keys from all dates/chains, to ensure the result objects all have the same keys.
+	seenChainSet := map[string]bool{}
+	for leaving := range result {
+		seenChainSet[leaving] = true
+	}
+
+	for leaving := range result {
+		for chain := range seenChainSet {
+			// check that date has all the chains
+			if _, ok := result[leaving][chain]; !ok {
+				result[leaving][chain] = map[string]float64{"*": 0}
+			}
+		}
+	}
 	return result, nil
 }
 

+ 2 - 1
event_database/cloud_functions/process-transfer.go

@@ -198,6 +198,8 @@ func ProcessTransfer(ctx context.Context, m PubSubMessage) error {
 	notional := amountFloat * price
 	notionalStr := fmt.Sprintf("%f", notional)
 
+	log.Printf("processed transfer of $%0.2f = %v %v * $%0.2f\n", notional, calculatedAmount, symbol, price)
+
 	// write to BigTable
 	colFam := columnFamilies[5]
 	mutation := bigtable.NewMutation()
@@ -233,7 +235,6 @@ func ProcessTransfer(ctx context.Context, m PubSubMessage) error {
 		log.Printf("Failed to write TokenTransferDetails for %v to BigTable. err: %v\n", rowKey, writeErr)
 		return writeErr
 	}
-	log.Println("done writing TokenTransferDetails to bigtable", rowKey)
 
 	// success
 	return nil

+ 0 - 6
event_database/cloud_functions/process-vaa.go

@@ -314,9 +314,6 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
 			mutation.Set(colFam, "NativeAddress", ts, []byte(nativeAddress))
 
 			writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
-			if writeErr != nil {
-				log.Println("wrote TokenTransferPayload to bigtable!", rowKey)
-			}
 			return writeErr
 		} else {
 			// unknown payload type
@@ -347,9 +344,6 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
 			mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain)))
 
 			writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
-			if writeErr == nil {
-				log.Println("wrote NFTTransferPayload to bigtable!", rowKey)
-			}
 			return writeErr
 		} else {
 			// unknown payload type

+ 20 - 7
event_database/cloud_functions/shared.go

@@ -3,6 +3,7 @@ package p
 import (
 	"context"
 	"log"
+	"math"
 	"net/http"
 	"os"
 	"strings"
@@ -231,13 +232,14 @@ func makeDetails(row bigtable.Row) *Details {
 	deets := &Details{}
 	sum := makeSummary(row)
 	deets.Summary = Summary{
-		EmitterChain:   sum.EmitterChain,
-		EmitterAddress: sum.EmitterAddress,
-		Sequence:       sum.Sequence,
-		InitiatingTxID: sum.InitiatingTxID,
-		Payload:        sum.Payload,
-		SignedVAABytes: sum.SignedVAABytes,
-		QuorumTime:     sum.QuorumTime,
+		EmitterChain:    sum.EmitterChain,
+		EmitterAddress:  sum.EmitterAddress,
+		Sequence:        sum.Sequence,
+		InitiatingTxID:  sum.InitiatingTxID,
+		Payload:         sum.Payload,
+		SignedVAABytes:  sum.SignedVAABytes,
+		QuorumTime:      sum.QuorumTime,
+		TransferDetails: sum.TransferDetails,
 	}
 
 	if _, ok := row[quorumStateFam]; ok {
@@ -346,6 +348,17 @@ func makeDetails(row bigtable.Row) *Details {
 	return deets
 }
 
+func roundToTwoDecimalPlaces(num float64) float64 {
+	return math.Round(num*100) / 100
+}
+func createCachePrefix(prefix string) string {
+	cachePrefix := prefix
+	if prefix == "" {
+		cachePrefix = "*"
+	}
+	return cachePrefix
+}
+
 var mux = newMux()
 
 // Entry is the cloud function entry point

+ 4 - 2
event_database/cloud_functions/totals.go

@@ -278,8 +278,10 @@ func Totals(w http.ResponseWriter, r *http.Request) {
 	prefix := ""
 	if forChain != "" {
 		prefix = forChain
-		// if the request is forChain, always groupBy chain
-		groupBy = "chain"
+		if groupBy == "" {
+			// if the request is forChain, and groupBy is empty, set it to groupBy chain
+			groupBy = "chain"
+		}
 		if forAddress != "" {
 			// if the request is forAddress, always groupBy address
 			groupBy = "address"

+ 4 - 3
explorer_data/bigtable_event_schema.md

@@ -53,7 +53,7 @@ Each column qualifier below is prefixed with its column family.
 - `TokenTransferPayload:Amount` the amount of the transfer.
 - `TokenTransferPayload:OriginAddress` the address the transfer originates from.
 - `TokenTransferPayload:OriginChain` the chain identifier of the chain the transfer originates from.
-- `TokenTransferPayload:TargetAdress` the destination address of the transfer.
+- `TokenTransferPayload:TargetAddress` the destination address of the transfer.
 - `TokenTransferPayload:TargetChain` the destination chain identifier of the transfer.
 
 #### AssetMetaPayload
@@ -77,10 +77,11 @@ Each column qualifier below is prefixed with its column family.
 
 #### TokenTransferDetails
 - `TokenTransferDetails:Amount` the amount transfered.
+- `TokenTransferDetails:NotionalUSD` the notional value of the transfer in USD.
 - `TokenTransferDetails:OriginSymbol` the symbol of the token sent to wormhole.
 - `TokenTransferDetails:OriginName` the name of the token sent to wormhole.
-- `TokenTransferDetails:TargetSymbol` the symbol of the token disbursed by wormhole.
-- `TokenTransferDetails:TargetName` the name of the token disbursed by wormhole.
+- `TokenTransferDetails:OriginTokenAddress` the address of the token sent to wormhole.
 
 #### ChainDetails
 - `ChainDetails:SenderAddress` the native address that sent the message.
+- `ChainDetails:ReceiverAddress` the native address that received the message.