| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- // Package p contains an HTTP Cloud Function.
- package p
- import (
- // "bytes"
- "context"
- // "encoding/binary"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net/http"
- "sort"
- "strconv"
- "sync"
- "time"
- "cloud.google.com/go/bigtable"
- )
- type cumulativeResult struct {
- AllTime map[string]map[string]float64
- AllTimeDurationDays int
- Daily map[string]map[string]map[string]float64
- }
- // an in-memory cache of previously calculated results
- var warmCumulativeCache = map[string]map[string]map[string]map[string]float64{}
- var muWarmCumulativeCache sync.RWMutex
- var warmCumulativeCacheFilePath = "/notional-transferred-to-cumulative-cache.json"
- var transferredToUpToYesterday = map[string]map[string]map[string]map[string]float64{}
- var muTransferredToUpToYesterday sync.RWMutex
- var transferredToUpToYesterdayFilePath = "/notional-transferred-to-up-to-yesterday-cache.json"
- // calculates the amount of each symbol transfered to each chain.
- func transferredToSince(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
- now := time.Now().UTC()
- today := now.Format("2006-01-02")
- oneDayAgo := -time.Duration(24) * time.Hour
- yesterday := now.Add(oneDayAgo).Format("2006-01-02")
- result := map[string]map[string]float64{"*": {"*": 0}}
- // create the unique identifier for this query, for cache
- cachePrefix := createCachePrefix(prefix)
- muTransferredToUpToYesterday.Lock()
- if _, ok := transferredToUpToYesterday[cachePrefix]; !ok {
- transferredToUpToYesterday[cachePrefix] = map[string]map[string]map[string]float64{}
- }
- if cacheData, ok := transferredToUpToYesterday[cachePrefix][yesterday]; ok {
- // cache has data through midnight yesterday
- result = cacheData
- // set the start to be the start of today
- start = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
- }
- muTransferredToUpToYesterday.Unlock()
- dailyTotals := amountsTransferredToInInterval(tbl, ctx, prefix, start)
- // loop through the query results to combine cache + fresh data
- for _, chains := range dailyTotals {
- for chain, tokens := range chains {
- // ensure the chain exists in the result map
- if _, ok := result[chain]; !ok {
- result[chain] = map[string]float64{"*": 0}
- }
- for symbol, amount := range tokens {
- if _, ok := result[chain][symbol]; !ok {
- result[chain][symbol] = 0
- }
- // add the amount of this symbol transferred this day to the
- // amount already in the result (amount of this symbol prevoiusly transferred)
- result[chain][symbol] = result[chain][symbol] + amount
- }
- }
- }
- muTransferredToUpToYesterday.Lock()
- if _, ok := transferredToUpToYesterday[cachePrefix][yesterday]; !ok {
- // no cache, populate it
- upToYesterday := result
- for chain, tokens := range dailyTotals[today] {
- for symbol, amount := range tokens {
- upToYesterday[chain][symbol] = upToYesterday[chain][symbol] - amount
- }
- }
- transferredToUpToYesterday[cachePrefix][yesterday] = upToYesterday
- muTransferredToUpToYesterday.Unlock()
- // write the updated cache to disc
- persistInterfaceToJson(transferredToUpToYesterdayFilePath, &muTransferredToUpToYesterday, transferredToUpToYesterday)
- } else {
- muTransferredToUpToYesterday.Unlock()
- }
- return result
- }
- // returns a slice of dates (strings) for each day in the period. Dates formatted: "2021-12-30".
- func getDaysInRange(start, end time.Time) []string {
- now := time.Now().UTC()
- numDays := int(end.Sub(start).Hours() / 24)
- days := []string{}
- for daysAgo := 0; daysAgo <= numDays; daysAgo++ {
- hoursAgo := (24 * daysAgo)
- daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
- n := now.Add(daysAgoDuration)
- year := n.Year()
- month := n.Month()
- day := n.Day()
- loc := n.Location()
- start := time.Date(year, month, day, 0, 0, 0, 0, loc)
- dateStr := start.Format("2006-01-02")
- days = append(days, dateStr)
- }
- return days
- }
- // calcuates a running total of notional value transferred, by symbol, since the start time specified.
- func createCumulativeAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
- now := time.Now().UTC()
- today := now.Format("2006-01-02")
- cachePrefix := createCachePrefix(prefix)
- cacheNeedsUpdate := false
- muWarmCumulativeCache.Lock()
- if _, ok := warmCumulativeCache[cachePrefix]; !ok {
- warmCumulativeCache[cachePrefix] = map[string]map[string]map[string]float64{}
- }
- muWarmCumulativeCache.Unlock()
- results := map[string]map[string]map[string]float64{}
- // fetch the amounts of transfers by symbol, for each day since launch (releaseDay)
- dailyAmounts := amountsTransferredToInInterval(tbl, ctx, prefix, releaseDay)
- // create a slice of dates, order oldest first
- dateKeys := make([]string, 0, len(dailyAmounts))
- for k := range dailyAmounts {
- dateKeys = append(dateKeys, k)
- }
- sort.Strings(dateKeys)
- // iterate through the dates in the result set, and accumulate the amounts
- // of each token transfer by symbol, based on the destination of the transfer.
- for i, date := range dateKeys {
- muWarmCumulativeCache.RLock()
- if dateCache, ok := warmCumulativeCache[cachePrefix][date]; ok && dateCache != nil {
- // have a cached value for this day, use it.
- results[date] = dateCache
- muWarmCumulativeCache.RUnlock()
- } else {
- // no cached value for this day, must calculate it
- muWarmCumulativeCache.RUnlock()
- if i == 0 {
- // special case for first day, no need to sum.
- results[date] = dailyAmounts[date]
- } else {
- results[date] = map[string]map[string]float64{"*": {"*": 0}}
- // find the string of the previous day
- prevDate := dateKeys[i-1]
- prevDayAmounts := results[prevDate]
- thisDayAmounts := dailyAmounts[date]
- // iterate through all the transfers and add the previous day's amount, if it exists
- for chain, thisDaySymbols := range thisDayAmounts {
- // create a union of the symbols from this day, and previous days
- symbolsUnion := map[string]string{}
- for symbol := range prevDayAmounts[chain] {
- symbolsUnion[symbol] = symbol
- }
- for symbol := range thisDaySymbols {
- symbolsUnion[symbol] = symbol
- }
- // initalize the chain/symbol map for this date
- if _, ok := results[date][chain]; !ok {
- results[date][chain] = map[string]float64{"*": 0}
- }
- // iterate through the union of symbols, creating an amount for each one,
- // and adding it the the results.
- for symbol := range symbolsUnion {
- thisDayAmount := float64(0)
- if amt, ok := thisDaySymbols[symbol]; ok {
- thisDayAmount = amt
- }
- prevDayAmount := float64(0)
- if amt, ok := results[prevDate][chain][symbol]; ok {
- prevDayAmount = amt
- }
- cumulativeAmount := prevDayAmount + thisDayAmount
- results[date][chain][symbol] = cumulativeAmount
- }
- }
- }
- // dont cache today
- if date != today {
- // set the result in the cache
- muWarmCumulativeCache.Lock()
- if _, ok := warmCumulativeCache[cachePrefix][date]; !ok {
- // cache does not have this date, persist it for other instances.
- warmCumulativeCache[cachePrefix][date] = results[date]
- cacheNeedsUpdate = true
- }
- muWarmCumulativeCache.Unlock()
- }
- }
- }
- if cacheNeedsUpdate {
- persistInterfaceToJson(warmCumulativeCacheFilePath, &muWarmCumulativeCache, warmCumulativeCache)
- }
- // take the most recent n days, rather than returning all days since launch
- selectDays := map[string]map[string]map[string]float64{}
- days := getDaysInRange(start, now)
- for _, day := range days {
- selectDays[day] = results[day]
- }
- return selectDays
- }
- // calculates the cumulative value transferred each day since launch.
- func NotionalTransferredToCumulative(w http.ResponseWriter, r *http.Request) {
- // Set CORS headers for the preflight request
- if r.Method == http.MethodOptions {
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Access-Control-Allow-Methods", "POST")
- w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
- w.Header().Set("Access-Control-Max-Age", "3600")
- w.WriteHeader(http.StatusNoContent)
- return
- }
- // Set CORS headers for the main request.
- w.Header().Set("Access-Control-Allow-Origin", "*")
- var numDays, forChain, forAddress, daily, allTime string
- // allow GET requests with querystring params, or POST requests with json body.
- switch r.Method {
- case http.MethodGet:
- queryParams := r.URL.Query()
- numDays = queryParams.Get("numDays")
- forChain = queryParams.Get("forChain")
- forAddress = queryParams.Get("forAddress")
- daily = queryParams.Get("daily")
- allTime = queryParams.Get("allTime")
- case http.MethodPost:
- // declare request body properties
- var d struct {
- NumDays string `json:"numDays"`
- ForChain string `json:"forChain"`
- ForAddress string `json:"forAddress"`
- Daily string `json:"daily"`
- AllTime string `json:"allTime"`
- }
- // deserialize request body
- if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
- switch err {
- case io.EOF:
- // do nothing, empty body is ok
- default:
- log.Printf("json.NewDecoder: %v", err)
- http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
- return
- }
- }
- numDays = d.NumDays
- forChain = d.ForChain
- forAddress = d.ForAddress
- daily = d.Daily
- allTime = d.AllTime
- default:
- http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
- log.Println("Method Not Allowed")
- return
- }
- if daily == "" && allTime == "" {
- // none of the options were set, so set one
- allTime = "true"
- }
- var queryDays int
- if numDays == "" {
- queryDays = 30
- } else {
- var convErr error
- queryDays, convErr = strconv.Atoi(numDays)
- if convErr != nil {
- fmt.Fprint(w, "numDays must be an integer")
- http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
- return
- }
- }
- // create the rowkey prefix for querying
- prefix := ""
- if forChain != "" {
- prefix = forChain
- // if the request is forChain, always groupBy chain
- if forAddress != "" {
- // if the request is forAddress, always groupBy address
- prefix = forChain + ":" + forAddress
- }
- }
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
- defer cancel()
- var wg sync.WaitGroup
- // total since launch
- periodTransfers := map[string]map[string]float64{}
- allTimeDays := int(time.Now().UTC().Sub(releaseDay).Hours() / 24)
- if allTime != "" {
- wg.Add(1)
- go func(prefix string) {
- defer wg.Done()
- transfers := transferredToSince(tbl, context.Background(), prefix, releaseDay)
- for chain, tokens := range transfers {
- periodTransfers[chain] = map[string]float64{}
- for symbol, amount := range tokens {
- periodTransfers[chain][symbol] = roundToTwoDecimalPlaces(amount)
- }
- }
- }(prefix)
- }
- // daily transfers by chain
- dailyTransfers := map[string]map[string]map[string]float64{}
- if daily != "" {
- wg.Add(1)
- go func(prefix string, queryDays int) {
- hours := (24 * queryDays)
- periodInterval := -time.Duration(hours) * time.Hour
- now := time.Now().UTC()
- prev := now.Add(periodInterval)
- start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
- defer wg.Done()
- transfers := createCumulativeAmountsOfInterval(tbl, ctx, prefix, start)
- for date, chains := range transfers {
- dailyTransfers[date] = map[string]map[string]float64{}
- for chain, tokens := range chains {
- dailyTransfers[date][chain] = map[string]float64{}
- for symbol, amount := range tokens {
- dailyTransfers[date][chain][symbol] = roundToTwoDecimalPlaces(amount)
- }
- }
- }
- }(prefix, queryDays)
- }
- wg.Wait()
- result := &cumulativeResult{
- AllTime: periodTransfers,
- AllTimeDurationDays: allTimeDays,
- Daily: dailyTransfers,
- }
- jsonBytes, err := json.Marshal(result)
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- w.Write([]byte(err.Error()))
- log.Println(err.Error())
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Write(jsonBytes)
- }
|