Przeglądaj źródła

cloud_functions: use daily token prices to compute cumulative TVL

Use daily token prices to compute TVL instead of the prices fetched
at token transfer time, if available.

Added inactive tokenlist to exclude tokens that have essentially been
deactivated by CoinGecko (market data no longer available).
Kevin Peters 3 lat temu
rodzic
commit
c5a7d3e517

+ 71 - 2
event_database/cloud_functions/external-data.go

@@ -343,13 +343,82 @@ func fetchTokenPrices(ctx context.Context, coinIds []string) map[string]float64
 			allPrices[coinId] = price
 		}
 
-		// CoinGecko rate limit is low (5/second), be very cautious about bursty requests
-		time.Sleep(time.Millisecond * 200)
+		coinGeckoRateLimitSleep()
 	}
 
 	return allPrices
 }
 
+func coinGeckoRateLimitSleep() {
+	// CoinGecko rate limit is low (5/second), be very cautious about bursty requests
+	time.Sleep(time.Millisecond * 200)
+}
+
+// fetchTokenPriceHistories returns the daily prices for coinIds from start to end
+func fetchTokenPriceHistories(ctx context.Context, coinIds []string, start time.Time, end time.Time) map[string]map[string]float64 {
+	log.Printf("Fetching price history for %d tokens\n", len(coinIds))
+	priceHistories := map[string]map[string]float64{}
+	baseUrl := cgBaseUrl
+	cgApiKey := os.Getenv("COINGECKO_API_KEY")
+	if cgApiKey != "" {
+		baseUrl = cgProBaseUrl
+	}
+	startTimestamp := start.Unix()
+	endTimestamp := end.Unix()
+	for _, coinId := range coinIds {
+		defer coinGeckoRateLimitSleep()
+		url := fmt.Sprintf("%vcoins/%v/market_chart/range?vs_currency=usd&from=%v&to=%v", baseUrl, coinId, startTimestamp, endTimestamp)
+		req, reqErr := http.NewRequest("GET", url, nil)
+		if reqErr != nil {
+			log.Fatalf("failed coins request, err: %v\n", reqErr)
+		}
+		if cgApiKey != "" {
+			req.Header.Set("X-Cg-Pro-Api-Key", cgApiKey)
+		}
+
+		res, resErr := http.DefaultClient.Do(req)
+		if resErr != nil {
+			log.Fatalf("failed get coins response, err: %v\n", resErr)
+		}
+		defer res.Body.Close()
+		if res.StatusCode >= 400 {
+			errorMsg := fmt.Sprintf("failed to get CoinGecko price history for %s, Status: %s", coinId, res.Status)
+			if res.StatusCode == 404 {
+				log.Println(errorMsg)
+				continue
+			} else {
+				log.Fatalln(errorMsg)
+			}
+		}
+
+		body, bodyErr := ioutil.ReadAll(res.Body)
+		if bodyErr != nil {
+			log.Fatalf("failed decoding coins body, err: %v\n", bodyErr)
+		}
+
+		var parsed CoinGeckoMarketRes
+		parseErr := json.Unmarshal(body, &parsed)
+		if parseErr != nil {
+			log.Printf("fetchTokenPriceHistories failed parsing body. err %v\n", parseErr)
+			var errRes CoinGeckoErrorRes
+			if err := json.Unmarshal(body, &errRes); err == nil {
+				log.Println("Failed calling CoinGecko, got err", errRes.Error)
+			}
+		} else {
+			for _, market := range parsed.Prices {
+				seconds := int64(market[0]) / 1e3
+				date := time.Unix(seconds, 0).Format("2006-01-02")
+				price := market[1]
+				if _, ok := priceHistories[date]; !ok {
+					priceHistories[date] = map[string]float64{}
+				}
+				priceHistories[date][coinId] = price
+			}
+		}
+	}
+	return priceHistories
+}
+
 const solanaTokenListURL = "https://raw.githubusercontent.com/solana-labs/token-list/main/src/tokens/solana.tokenlist.json"
 
 type SolanaToken struct {

+ 18 - 14
event_database/cloud_functions/notional-transferred-to.go

@@ -31,17 +31,18 @@ var muWarmTransfersToCache sync.RWMutex
 var warmTransfersToCacheFilePath = "notional-transferred-to-cache.json"
 
 type TransferData struct {
-	TokenSymbol      string
-	TokenName        string
-	TokenAddress     string
-	TokenAmount      float64
-	CoinGeckoCoinId  string
-	OriginChain      string
-	LeavingChain     string
-	DestinationChain string
-	Notional         float64
-	TokenPrice       float64
-	TokenDecimals    int
+	TokenSymbol       string
+	TokenName         string
+	TokenAddress      string
+	TokenAmount       float64
+	CoinGeckoCoinId   string
+	OriginChain       string
+	LeavingChain      string
+	DestinationChain  string
+	Notional          float64
+	TokenPrice        float64
+	TokenDecimals     int
+	TransferTimestamp string
 }
 
 // finds all the TokenTransfer rows within the specified period
@@ -83,6 +84,8 @@ func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefi
 					t.CoinGeckoCoinId = string(item.Value)
 				case "TokenTransferDetails:Decimals":
 					t.TokenDecimals, _ = strconv.Atoi(string(item.Value))
+				case "TokenTransferDetails:TransferTimestamp":
+					t.TransferTimestamp = string(item.Value)
 				}
 			}
 
@@ -100,7 +103,8 @@ func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefi
 			keyParts := strings.Split(row.Key(), ":")
 			t.LeavingChain = keyParts[0]
 
-			if isTokenAllowed(t.OriginChain, t.TokenAddress) {
+			transferDateStr := t.TransferTimestamp[0:10]
+			if isTokenAllowed(t.OriginChain, t.TokenAddress) && isTokenActive(t.OriginChain, t.TokenAddress, transferDateStr) {
 				rows = append(rows, *t)
 			}
 		}
@@ -116,8 +120,8 @@ func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefi
 				bigtable.StripValueFilter(),               // no columns/values, just the row.Key()
 			),
 			bigtable.ChainFilters(
-				bigtable.FamilyFilter(fmt.Sprintf("%v|%v", columnFamilies[2], columnFamilies[5])),
-				bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginName|OriginChain|TargetChain|CoinGeckoCoinId|OriginTokenAddress|TokenPriceUSD|Decimals"),
+				bigtable.FamilyFilter(fmt.Sprintf("%v|%v", transferPayloadFam, transferDetailsFam)),
+				bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginName|OriginChain|TargetChain|CoinGeckoCoinId|OriginTokenAddress|TokenPriceUSD|Decimals|TransferTimestamp"),
 				bigtable.LatestNFilter(1),
 			),
 			bigtable.BlockAllFilter(),

+ 72 - 1
event_database/cloud_functions/notional-tvl-cumulative.go

@@ -27,12 +27,58 @@ var warmTvlCumulativeCacheFilePath = "tvl-cumulative-cache.json"
 
 var notionalTvlCumulativeResultPath = "notional-tvl-cumulative.json"
 
+var coinGeckoPriceCacheFilePath = "coingecko-price-cache.json"
+var coinGeckoPriceCache = map[string]map[string]float64{}
+var loadedCoinGeckoPriceCache bool
+
 // days to be excluded from the TVL result
 var skipDays = map[string]bool{
 	// for example:
 	// "2022-02-19": true,
 }
 
+func loadAndUpdateCoinGeckoPriceCache(ctx context.Context, coinIds []string, now time.Time) {
+	// at cold-start, load the price cache into memory, and fetch any missing token price histories and add them to the cache
+	if !loadedCoinGeckoPriceCache {
+		// load the price cache
+		loadJsonToInterface(ctx, coinGeckoPriceCacheFilePath, &muWarmTvlCumulativeCache, &coinGeckoPriceCache)
+		loadedCoinGeckoPriceCache = true
+
+		// find tokens missing price history
+		missing := []string{}
+		for _, coinId := range coinIds {
+			found := false
+			for _, prices := range coinGeckoPriceCache {
+				if _, ok := prices[coinId]; ok {
+					found = true
+					break
+				}
+			}
+			if !found {
+				missing = append(missing, coinId)
+			}
+		}
+
+		// fetch missing price histories and add them to the cache
+		priceHistories := fetchTokenPriceHistories(ctx, missing, releaseDay, now)
+		for date, prices := range priceHistories {
+			for coinId, price := range prices {
+				if _, ok := coinGeckoPriceCache[date]; !ok {
+					coinGeckoPriceCache[date] = map[string]float64{}
+				}
+				coinGeckoPriceCache[date][coinId] = price
+			}
+		}
+	}
+
+	// fetch today's latest prices
+	today := now.Format("2006-01-02")
+	coinGeckoPriceCache[today] = fetchTokenPrices(ctx, coinIds)
+
+	// write to the cache file
+	persistInterfaceToJson(ctx, coinGeckoPriceCacheFilePath, &muWarmCumulativeAddressesCache, coinGeckoPriceCache)
+}
+
 // calculates a running total of notional value transferred, by symbol, since the start time specified.
 func createTvlCumulativeOfInterval(tbl *bigtable.Table, ctx context.Context, start time.Time) map[string]map[string]map[string]LockedAsset {
 	if len(warmTvlCumulativeCache) == 0 {
@@ -238,6 +284,22 @@ func ComputeTvlCumulative(w http.ResponseWriter, r *http.Request) {
 
 	transfers := createTvlCumulativeOfInterval(tbl, ctx, start)
 
+	coinIdSet := map[string]bool{}
+	for _, chains := range transfers {
+		for _, assets := range chains {
+			for _, asset := range assets {
+				if asset.CoinGeckoId != "*" {
+					coinIdSet[asset.CoinGeckoId] = true
+				}
+			}
+		}
+	}
+	coinIds := []string{}
+	for coinId := range coinIdSet {
+		coinIds = append(coinIds, coinId)
+	}
+	loadAndUpdateCoinGeckoPriceCache(ctx, coinIds, now)
+
 	// calculate the notional tvl based on the price of the tokens each day
 	for date, chains := range transfers {
 		if _, ok := skipDays[date]; ok {
@@ -265,7 +327,16 @@ func ComputeTvlCumulative(w http.ResponseWriter, r *http.Request) {
 					continue
 				}
 
-				notional := asset.Amount * asset.TokenPrice
+				// asset.TokenPrice is the price that was fetched when this token was last transferred, possibly before this date
+				// prefer to use the cached price for this date if it's available, because it might be newer
+				tokenPrice := asset.TokenPrice
+				if prices, ok := coinGeckoPriceCache[date]; ok {
+					if price, ok := prices[asset.CoinGeckoId]; ok {
+						// use the cached price
+						tokenPrice = price
+					}
+				}
+				notional := asset.Amount * tokenPrice
 				if notional <= 0 {
 					continue
 				}

+ 6 - 1
event_database/cloud_functions/notional-tvl.go

@@ -306,6 +306,9 @@ func ComputeTVL(w http.ResponseWriter, r *http.Request) {
 	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
 	defer cancel()
 
+	now := time.Now().UTC()
+	todaysDateStr := now.Format("2006-01-02")
+
 	getNotionalAmounts := func(ctx context.Context, tokensLocked map[string]map[string]LockedAsset) map[string]map[string]LockedAsset {
 		// create a map of all the coinIds
 		seenCoinIds := map[string]bool{}
@@ -340,6 +343,9 @@ func ComputeTVL(w http.ResponseWriter, r *http.Request) {
 				Address: "*",
 			}
 			for address, lockedAsset := range tokens {
+				if !isTokenActive(chain, address, todaysDateStr) {
+					continue
+				}
 
 				coinId := lockedAsset.CoinGeckoId
 				amount := lockedAsset.Amount
@@ -391,7 +397,6 @@ func ComputeTVL(w http.ResponseWriter, r *http.Request) {
 	wg.Add(1)
 	go func() {
 		last24HourInterval := -time.Duration(24) * time.Hour
-		now := time.Now().UTC()
 		start := now.Add(last24HourInterval)
 		defer wg.Done()
 		transfers := tvlForInterval(tbl, ctx, start, now)

+ 21 - 0
event_database/cloud_functions/shared.go

@@ -9,6 +9,7 @@ import (
 	"log"
 	"math"
 	"os"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -264,6 +265,10 @@ func chainIdStringToType(chainId string) vaa.ChainID {
 	return vaa.ChainIDUnset
 }
 
+func chainIDToNumberString(c vaa.ChainID) string {
+	return strconv.FormatUint(uint64(c), 10)
+}
+
 func makeSummary(row bigtable.Row) *Summary {
 	summary := &Summary{}
 	if _, ok := row[messagePubFam]; ok {
@@ -460,3 +465,19 @@ func isTokenAllowed(chainId string, tokenAddress string) bool {
 	}
 	return false
 }
+
+// tokens with no trading activity recorded by exchanges integrated on CoinGecko since the specified date
+var inactiveTokens = map[string]map[string]string{
+	chainIDToNumberString(vaa.ChainIDEthereum): {
+		"0x707f9118e33a9b8998bea41dd0d46f38bb963fc8": "2022-06-15", // Anchor bETH token
+	},
+}
+
+func isTokenActive(chainId string, tokenAddress string, date string) bool {
+	if deactivatedDates, ok := inactiveTokens[chainId]; ok {
+		if deactivatedDate, ok := deactivatedDates[tokenAddress]; ok {
+			return date < deactivatedDate
+		}
+	}
+	return true
+}