| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- // Package p contains an HTTP Cloud Function.
- package p
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net/http"
- "sort"
- "strconv"
- "sync"
- "time"
- "cloud.google.com/go/bigtable"
- )
- type cumulativeAddressesResult struct {
- AllTimeAmounts map[string]map[string]float64
- AllTimeCounts map[string]int
- AllTimeDurationDays int
- DailyAmounts map[string]map[string]map[string]float64
- DailyCounts map[string]map[string]int
- }
- // an in-memory cache of previously calculated results
- var warmCumulativeAddressesCache = map[string]map[string]map[string]map[string]float64{}
- var muWarmCumulativeAddressesCache sync.RWMutex
- var warmCumulativeAddressesCacheFilePath = "/addresses-transferred-to-cumulative-cache.json"
- var addressesToUpToYesterday = map[string]map[string]map[string]map[string]float64{}
- var muAddressesToUpToYesterday sync.RWMutex
- var addressesToUpToYesterdayFilePath = "/addresses-transferred-to-up-to-yesterday-cache.json"
- // finds all the unique addresses that have received tokens since a particular moment.
- func addressesTransferredToSince(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
- now := time.Now().UTC()
- today := now.Format("2006-01-02")
- oneDayAgo := -time.Duration(24) * time.Hour
- yesterday := now.Add(oneDayAgo).Format("2006-01-02")
- result := map[string]map[string]float64{}
- // create the unique identifier for this query, for cache
- cachePrefix := createCachePrefix(prefix)
- muAddressesToUpToYesterday.Lock()
- if _, ok := addressesToUpToYesterday[cachePrefix]; !ok {
- addressesToUpToYesterday[cachePrefix] = map[string]map[string]map[string]float64{}
- }
- if cacheData, ok := addressesToUpToYesterday[cachePrefix][yesterday]; ok {
- // cache has data through midnight yesterday
- result = cacheData
- // set the start to be the start of today
- start = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
- }
- muAddressesToUpToYesterday.Unlock()
- // fetch data for days not in the cache
- dailyAddresses := createAddressesOfInterval(tbl, ctx, prefix, start)
- // loop through the query results to combine cache + fresh data
- for _, chains := range dailyAddresses {
- for chain, addresses := range chains {
- // ensure the chain exists in the result map
- if _, ok := result[chain]; !ok {
- result[chain] = map[string]float64{}
- }
- for address, amount := range addresses {
- if _, ok := result[chain][address]; !ok {
- result[chain][address] = 0
- }
- // add the amount the address received this day to the
- // amount already in the result (amount the address has recieved so far)
- result[chain][address] = result[chain][address] + amount
- }
- }
- }
- muAddressesToUpToYesterday.Lock()
- if _, ok := addressesToUpToYesterday[cachePrefix][yesterday]; !ok {
- // no cache, populate it
- upToYesterday := result
- for chain, addresses := range dailyAddresses[today] {
- for address, amount := range addresses {
- upToYesterday[chain][address] = upToYesterday[chain][address] - amount
- }
- }
- addressesToUpToYesterday[cachePrefix][yesterday] = upToYesterday
- muAddressesToUpToYesterday.Unlock()
- // write cache to disc
- persistInterfaceToJson(addressesToUpToYesterdayFilePath, &muAddressesToUpToYesterday, addressesToUpToYesterday)
- } else {
- muAddressesToUpToYesterday.Unlock()
- }
- return result
- }
- // calcuates a map of recepient address to notional value received, by chain, since the start time specified.
- func createCumulativeAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
- now := time.Now().UTC()
- today := now.Format("2006-01-02")
- cachePrefix := createCachePrefix(prefix)
- cacheNeedsUpdate := false
- muWarmCumulativeAddressesCache.Lock()
- if _, ok := warmCumulativeAddressesCache[cachePrefix]; !ok {
- warmCumulativeAddressesCache[cachePrefix] = map[string]map[string]map[string]float64{}
- }
- muWarmCumulativeAddressesCache.Unlock()
- results := map[string]map[string]map[string]float64{}
- dailyAddresses := createAddressesOfInterval(tbl, ctx, prefix, releaseDay)
- dateKeys := make([]string, 0, len(dailyAddresses))
- for k := range dailyAddresses {
- dateKeys = append(dateKeys, k)
- }
- sort.Strings(dateKeys)
- // iterate through the dates in the result set, and accumulate the amounts
- // of each token transfer by symbol, based on the destination of the transfer.
- for i, date := range dateKeys {
- muWarmCumulativeAddressesCache.RLock()
- if dateCache, ok := warmCumulativeAddressesCache[cachePrefix][date]; ok && dateCache != nil {
- // have a cached value for this day, use it.
- results[date] = dateCache
- muWarmCumulativeAddressesCache.RUnlock()
- } else {
- // no cached value for this day, must calculate it
- muWarmCumulativeAddressesCache.RUnlock()
- if i == 0 {
- // special case for first day, no need to sum.
- results[date] = dailyAddresses[date]
- } else {
- results[date] = map[string]map[string]float64{}
- // find the string of the previous day
- prevDate := dateKeys[i-1]
- prevDayChains := results[prevDate]
- thisDayChains := dailyAddresses[date]
- for chain, thisDayAddresses := range thisDayChains {
- // create a union of the addresses from this day, and previous days
- addressUnion := map[string]string{}
- for address := range prevDayChains[chain] {
- addressUnion[address] = address
- }
- for address := range thisDayAddresses {
- addressUnion[address] = address
- }
- // initalize the chain/symbol map for this date
- if _, ok := results[date][chain]; !ok {
- results[date][chain] = map[string]float64{}
- }
- // iterate through the union of addresses, creating an amount for each one,
- // and adding it the the results.
- for address := range addressUnion {
- thisDayAmount := float64(0)
- if amt, ok := thisDayAddresses[address]; ok {
- thisDayAmount = amt
- }
- prevDayAmount := float64(0)
- if prevAmount, ok := results[prevDate][chain][address]; ok && prevAmount != 0 {
- prevDayAmount = prevAmount
- }
- cumulativeAmount := prevDayAmount + thisDayAmount
- results[date][chain][address] = cumulativeAmount
- }
- }
- }
- // dont cache today
- if date != today {
- // set the result in the cache
- muWarmCumulativeAddressesCache.Lock()
- if _, ok := warmCumulativeAddressesCache[cachePrefix][date]; !ok {
- // cache does not have this date, persist it for other instances.
- warmCumulativeAddressesCache[cachePrefix][date] = results[date]
- cacheNeedsUpdate = true
- }
- muWarmCumulativeAddressesCache.Unlock()
- }
- }
- }
- if cacheNeedsUpdate {
- persistInterfaceToJson(warmCumulativeAddressesCacheFilePath, &muWarmCumulativeAddressesCache, warmCumulativeAddressesCache)
- }
- selectDays := map[string]map[string]map[string]float64{}
- days := getDaysInRange(start, now)
- for _, day := range days {
- selectDays[day] = results[day]
- }
- return selectDays
- }
- // finds unique addresses that tokens have been transferred to.
- func AddressesTransferredToCumulative(w http.ResponseWriter, r *http.Request) {
- // Set CORS headers for the preflight request
- if r.Method == http.MethodOptions {
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Access-Control-Allow-Methods", "POST")
- w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
- w.Header().Set("Access-Control-Max-Age", "3600")
- w.WriteHeader(http.StatusNoContent)
- return
- }
- // Set CORS headers for the main request.
- w.Header().Set("Access-Control-Allow-Origin", "*")
- var numDays, forChain, forAddress, daily, allTime, counts, amounts string
- // allow GET requests with querystring params, or POST requests with json body.
- switch r.Method {
- case http.MethodGet:
- queryParams := r.URL.Query()
- numDays = queryParams.Get("numDays")
- forChain = queryParams.Get("forChain")
- forAddress = queryParams.Get("forAddress")
- daily = queryParams.Get("daily")
- allTime = queryParams.Get("allTime")
- counts = queryParams.Get("counts")
- amounts = queryParams.Get("amounts")
- case http.MethodPost:
- // declare request body properties
- var d struct {
- NumDays string `json:"numDays"`
- ForChain string `json:"forChain"`
- ForAddress string `json:"forAddress"`
- Daily string `json:"daily"`
- AllTime string `json:"allTime"`
- Counts string `json:"counts"`
- Amounts string `json:"amounts"`
- }
- // deserialize request body
- if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
- switch err {
- case io.EOF:
- // do nothing, empty body is ok
- default:
- log.Printf("json.NewDecoder: %v", err)
- http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
- return
- }
- }
- numDays = d.NumDays
- forChain = d.ForChain
- forAddress = d.ForAddress
- daily = d.Daily
- allTime = d.AllTime
- counts = d.Counts
- amounts = d.Amounts
- default:
- http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
- log.Println("Method Not Allowed")
- return
- }
- if daily == "" && allTime == "" {
- // none of the options were set, so set one
- allTime = "true"
- }
- if counts == "" && amounts == "" {
- // neither of the options were set, so set one
- counts = "true"
- }
- var queryDays int
- if numDays == "" {
- queryDays = 30
- } else {
- var convErr error
- queryDays, convErr = strconv.Atoi(numDays)
- if convErr != nil {
- fmt.Fprint(w, "numDays must be an integer")
- http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
- return
- }
- }
- // create the rowkey prefix for querying
- prefix := ""
- if forChain != "" {
- prefix = forChain
- // if the request is forChain, always groupBy chain
- if forAddress != "" {
- // if the request is forAddress, always groupBy address
- prefix = forChain + ":" + forAddress
- }
- }
- ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
- defer cancel()
- var wg sync.WaitGroup
- // total of the last numDays
- addressesDailyAmounts := map[string]map[string]float64{}
- addressesDailyCounts := map[string]int{}
- allTimeDays := int(time.Now().UTC().Sub(releaseDay).Hours() / 24)
- if allTime != "" {
- wg.Add(1)
- go func(prefix string) {
- defer wg.Done()
- periodAmounts := addressesTransferredToSince(tbl, ctx, prefix, releaseDay)
- if amounts != "" {
- for chain, addresses := range periodAmounts {
- addressesDailyAmounts[chain] = map[string]float64{}
- for address, amount := range addresses {
- addressesDailyAmounts[chain][address] = roundToTwoDecimalPlaces(amount)
- }
- }
- }
- if counts != "" {
- for chain, addresses := range periodAmounts {
- // need to sum all the chains to get the total count of addresses,
- // since addresses are not unique across chains.
- numAddresses := len(addresses)
- addressesDailyCounts[chain] = len(addresses)
- addressesDailyCounts["*"] = addressesDailyCounts["*"] + numAddresses
- }
- }
- }(prefix)
- }
- // daily totals
- dailyAmounts := map[string]map[string]map[string]float64{}
- dailyCounts := map[string]map[string]int{}
- if daily != "" {
- wg.Add(1)
- go func(prefix string, queryDays int) {
- hours := (24 * queryDays)
- periodInterval := -time.Duration(hours) * time.Hour
- now := time.Now().UTC()
- prev := now.Add(periodInterval)
- start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
- defer wg.Done()
- dailyTotals := createCumulativeAddressesOfInterval(tbl, ctx, prefix, start)
- if amounts != "" {
- for date, chains := range dailyTotals {
- dailyAmounts[date] = map[string]map[string]float64{}
- for chain, addresses := range chains {
- dailyAmounts[date][chain] = map[string]float64{}
- for address, amount := range addresses {
- dailyAmounts[date][chain][address] = roundToTwoDecimalPlaces(amount)
- }
- }
- }
- }
- if counts != "" {
- for date, chains := range dailyTotals {
- dailyCounts[date] = map[string]int{}
- for chain, addresses := range chains {
- // need to sum all the chains to get the total count of addresses,
- // since addresses are not unique across chains.
- numAddresses := len(addresses)
- dailyCounts[date][chain] = numAddresses
- dailyCounts[date]["*"] = dailyCounts[date]["*"] + numAddresses
- }
- }
- }
- }(prefix, queryDays)
- }
- wg.Wait()
- result := &cumulativeAddressesResult{
- AllTimeAmounts: addressesDailyAmounts,
- AllTimeCounts: addressesDailyCounts,
- AllTimeDurationDays: allTimeDays,
- DailyAmounts: dailyAmounts,
- DailyCounts: dailyCounts,
- }
- jsonBytes, err := json.Marshal(result)
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- w.Write([]byte(err.Error()))
- log.Println(err.Error())
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Header().Add("Content-Type", "application/json")
- w.Write(jsonBytes)
- }
|