| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499 |
- // Package p contains an HTTP Cloud Function.
- package p
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net/http"
- "strconv"
- "sync"
- "time"
- "cloud.google.com/go/bigtable"
- )
- type transfersResult struct {
- Last24Hours map[string]map[string]map[string]float64
- WithinPeriod map[string]map[string]map[string]float64
- PeriodDurationDays int
- Daily map[string]map[string]map[string]map[string]float64
- }
- // an in-memory cache of previously calculated results
- var warmTransfersCache = map[string]map[string]map[string]map[string]map[string]float64{}
- var muWarmTransfersCache sync.RWMutex
- var warmTransfersCacheFilePath = "/notional-transferred-cache.json"
- // finds the daily amount of each symbol transferred from each chain, to each chain,
- // from the specified start to the present.
- func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]map[string]float64 {
- results := map[string]map[string]map[string]map[string]float64{}
- now := time.Now().UTC()
- numPrevDays := int(now.Sub(start).Hours() / 24)
- var intervalsWG sync.WaitGroup
- // there will be a query for each previous day, plus today
- intervalsWG.Add(numPrevDays + 1)
- // create the unique identifier for this query, for cache
- cachePrefix := createCachePrefix(prefix)
- cacheNeedsUpdate := false
- for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
- go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
- // start is the SOD, end is EOD
- // "0 daysAgo start" is 00:00:00 AM of the current day
- // "0 daysAgo end" is 23:59:59 of the current day (the future)
- // calulate the start and end times for the query
- hoursAgo := (24 * daysAgo)
- daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
- n := now.Add(daysAgoDuration)
- year := n.Year()
- month := n.Month()
- day := n.Day()
- loc := n.Location()
- start := time.Date(year, month, day, 0, 0, 0, 0, loc)
- end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
- dateStr := start.Format("2006-01-02")
- muWarmTransfersCache.Lock()
- // initialize the map for this date in the result set
- results[dateStr] = map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
- // check to see if there is cache data for this date/query
- if dates, ok := warmTransfersCache[cachePrefix]; ok {
- // have a cache for this query
- if dateCache, ok := dates[dateStr]; ok && len(dateCache) > 1 {
- // have a cache for this date
- if daysAgo >= 1 {
- // only use the cache for yesterday and older
- results[dateStr] = dateCache
- muWarmTransfersCache.Unlock()
- intervalsWG.Done()
- return
- }
- }
- } else {
- // no cache for this query, initialize the map
- warmTransfersCache[cachePrefix] = map[string]map[string]map[string]map[string]float64{}
- }
- muWarmTransfersCache.Unlock()
- defer intervalsWG.Done()
- queryResult := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
- // iterate through the rows and increment the amounts
- for _, row := range queryResult {
- if _, ok := results[dateStr][row.LeavingChain]; !ok {
- results[dateStr][row.LeavingChain] = map[string]map[string]float64{"*": {"*": 0}}
- }
- if _, ok := results[dateStr][row.LeavingChain][row.DestinationChain]; !ok {
- results[dateStr][row.LeavingChain][row.DestinationChain] = map[string]float64{"*": 0}
- }
- if _, ok := results[dateStr]["*"][row.DestinationChain]; !ok {
- results[dateStr]["*"][row.DestinationChain] = map[string]float64{"*": 0}
- }
- // add the transfer data to the result set every possible way:
- // by symbol, aggregated by: "leaving chain", "arriving at chain", "from any chain", "to any chain".
- // add to the total amount leaving this chain, going to any chain, for all symbols
- results[dateStr][row.LeavingChain]["*"]["*"] = results[dateStr][row.LeavingChain]["*"]["*"] + row.Notional
- // add to the total amount leaving this chain, going to the destination chain, for all symbols
- results[dateStr][row.LeavingChain][row.DestinationChain]["*"] = results[dateStr][row.LeavingChain][row.DestinationChain]["*"] + row.Notional
- // add to the total amount of this symbol leaving this chain, going to any chain
- results[dateStr][row.LeavingChain]["*"][row.TokenSymbol] = results[dateStr][row.LeavingChain]["*"][row.TokenSymbol] + row.Notional
- // add to the total amount of this symbol leaving this chain, going to the destination chain
- results[dateStr][row.LeavingChain][row.DestinationChain][row.TokenSymbol] = results[dateStr][row.LeavingChain][row.DestinationChain][row.TokenSymbol] + row.Notional
- // add to the total amount arriving at the destination chain, coming from anywhere, including all symbols
- results[dateStr]["*"][row.DestinationChain]["*"] = results[dateStr]["*"][row.DestinationChain]["*"] + row.Notional
- // add to the total amount of this symbol arriving at the destination chain
- results[dateStr]["*"][row.DestinationChain][row.TokenSymbol] = results[dateStr]["*"][row.DestinationChain][row.TokenSymbol] + row.Notional
- // add to the total amount of this symbol transferred, from any chain, to any chain
- results[dateStr]["*"]["*"][row.TokenSymbol] = results[dateStr]["*"]["*"][row.TokenSymbol] + row.Notional
- // and finally, total/total/total: amount of all symbols transferred from any chain to any other chain
- results[dateStr]["*"]["*"]["*"] = results[dateStr]["*"]["*"]["*"] + row.Notional
- }
- if daysAgo >= 1 {
- // set the result in the cache
- muWarmTransfersCache.Lock()
- if cacheData, ok := warmTransfersCache[cachePrefix][dateStr]; !ok || len(cacheData) == 1 {
- // cache does not have this date, add the data, and mark the cache stale
- warmTransfersCache[cachePrefix][dateStr] = results[dateStr]
- cacheNeedsUpdate = true
- }
- muWarmTransfersCache.Unlock()
- }
- }(tbl, ctx, prefix, daysAgo)
- }
- intervalsWG.Wait()
- if cacheNeedsUpdate {
- persistInterfaceToJson(warmTransfersCacheFilePath, &muWarmTransfersCache, warmTransfersCache)
- }
- // having consistent keys in each object is helpful for clients, explorer GUI
- // create a set of all the keys from all dates/chains, to ensure the result objects all have the same chain keys
- seenChainSet := map[string]bool{}
- for _, chains := range results {
- for leaving, dests := range chains {
- seenChainSet[leaving] = true
- for dest := range dests {
- seenChainSet[dest] = true
- }
- }
- }
- var muResult sync.RWMutex
- // ensure each chain object has all the same symbol keys:
- for date, chains := range results {
- for chain := range seenChainSet {
- if _, ok := chains[chain]; !ok {
- muResult.Lock()
- results[date][chain] = map[string]map[string]float64{"*": {"*": 0}}
- muResult.Unlock()
- }
- }
- for leaving := range chains {
- for chain := range seenChainSet {
- // check that date has all the chains
- if _, ok := chains[chain]; !ok {
- muResult.Lock()
- results[date][leaving][chain] = map[string]float64{"*": 0}
- muResult.Unlock()
- }
- }
- }
- }
- return results
- }
- // calculates the amount of each symbol that has gone from each chain, to each other chain, since the specified day.
- func transferredSinceDate(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
- result := map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
- dailyTotals := createTransfersOfInterval(tbl, ctx, prefix, start)
- for _, leaving := range dailyTotals {
- for chain, dests := range leaving {
- // ensure the chain exists in the result map
- if _, ok := result[chain]; !ok {
- result[chain] = map[string]map[string]float64{"*": {"*": 0}}
- }
- for dest, tokens := range dests {
- if _, ok := result[chain][dest]; !ok {
- result[chain][dest] = map[string]float64{"*": 0}
- }
- for symbol, amount := range tokens {
- if _, ok := result[chain][dest][symbol]; !ok {
- result[chain][dest][symbol] = 0
- }
- // add the amount of this symbol transferred this day to the
- // amount already in the result (amount of this symbol prevoiusly transferred)
- result[chain][dest][symbol] = result[chain][dest][symbol] + amount
- }
- }
- }
- }
- // create a set of chainIDs, the union of source and destination chains,
- // to ensure the result objects all have the same keys.
- seenChainSet := map[string]bool{}
- for leaving, dests := range result {
- seenChainSet[leaving] = true
- for dest := range dests {
- seenChainSet[dest] = true
- }
- }
- // make sure the root of the map has all the chainIDs
- for chain := range seenChainSet {
- if _, ok := result[chain]; !ok {
- result[chain] = map[string]map[string]float64{"*": {"*": 0}}
- }
- }
- // make sure that each chain at the root (leaving) as a key (destination) for each chain
- for leaving, dests := range result {
- for chain := range seenChainSet {
- // check that date has all the chains
- if _, ok := dests[chain]; !ok {
- result[leaving][chain] = map[string]float64{"*": 0}
- }
- }
- }
- return result
- }
- // returns the count of the rows in the query response
- func transfersForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) map[string]map[string]map[string]float64 {
- // query for all rows in time range, return result count
- queryResults := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
- result := map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
- // iterate through the rows and increment the count for each index
- for _, row := range queryResults {
- if _, ok := result[row.LeavingChain]; !ok {
- result[row.LeavingChain] = map[string]map[string]float64{"*": {"*": 0}}
- }
- if _, ok := result[row.LeavingChain][row.DestinationChain]; !ok {
- result[row.LeavingChain][row.DestinationChain] = map[string]float64{"*": 0}
- }
- if _, ok := result["*"][row.DestinationChain]; !ok {
- result["*"][row.DestinationChain] = map[string]float64{"*": 0}
- }
- // add the transfer data to the result set every possible way:
- // by symbol, aggregated by: "leaving chain", "arriving at chain", "from any chain", "to any chain".
- // add to the total amount leaving this chain, going to any chain, for all symbols
- result[row.LeavingChain]["*"]["*"] = result[row.LeavingChain]["*"]["*"] + row.Notional
- // add to the total amount leaving this chain, going to the destination chain, for all symbols
- result[row.LeavingChain][row.DestinationChain]["*"] = result[row.LeavingChain][row.DestinationChain]["*"] + row.Notional
- // add to the total amount of this symbol leaving this chain, going to any chain
- result[row.LeavingChain]["*"][row.TokenSymbol] = result[row.LeavingChain]["*"][row.TokenSymbol] + row.Notional
- // add to the total amount of this symbol leaving this chain, going to the destination chain
- result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] = result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] + row.Notional
- // add to the total amount arriving at the destination chain, coming from anywhere, including all symbols
- result["*"][row.DestinationChain]["*"] = result["*"][row.DestinationChain]["*"] + row.Notional
- // add to the total amount of this symbol arriving at the destination chain
- result["*"][row.DestinationChain][row.TokenSymbol] = result["*"][row.DestinationChain][row.TokenSymbol] + row.Notional
- // add to the total amount of this symbol transferred, from any chain, to any chain
- result["*"]["*"][row.TokenSymbol] = result["*"]["*"][row.TokenSymbol] + row.Notional
- // and finally, total/total/total: amount of all symbols transferred from any chain to any other chain
- result["*"]["*"]["*"] = result["*"]["*"]["*"] + row.Notional
- }
- // create a set of chainIDs, the union of source and destination chains,
- // to ensure the result objects all have the same keys.
- seenChainSet := map[string]bool{}
- for leaving, dests := range result {
- seenChainSet[leaving] = true
- for dest := range dests {
- seenChainSet[dest] = true
- }
- }
- // make sure the root of the map has all the chainIDs
- for chain := range seenChainSet {
- if _, ok := result[chain]; !ok {
- result[chain] = map[string]map[string]float64{"*": {"*": 0}}
- }
- }
- // make sure that each chain at the root (leaving) as a key (destination) for each chain
- for leaving, dests := range result {
- for chain := range seenChainSet {
- // check that date has all the chains
- if _, ok := dests[chain]; !ok {
- result[leaving][chain] = map[string]float64{"*": 0}
- }
- }
- }
- return result
- }
- // finds the value that has been transferred from each chain to each other, by symbol.
- func NotionalTransferred(w http.ResponseWriter, r *http.Request) {
- // Set CORS headers for the preflight request
- if r.Method == http.MethodOptions {
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Access-Control-Allow-Methods", "POST")
- w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
- w.Header().Set("Access-Control-Max-Age", "3600")
- w.WriteHeader(http.StatusNoContent)
- return
- }
- // Set CORS headers for the main request.
- w.Header().Set("Access-Control-Allow-Origin", "*")
- var numDays, forChain, forAddress, daily, last24Hours, forPeriod string
- // allow GET requests with querystring params, or POST requests with json body.
- switch r.Method {
- case http.MethodGet:
- queryParams := r.URL.Query()
- numDays = queryParams.Get("numDays")
- forChain = queryParams.Get("forChain")
- forAddress = queryParams.Get("forAddress")
- daily = queryParams.Get("daily")
- last24Hours = queryParams.Get("last24Hours")
- forPeriod = queryParams.Get("forPeriod")
- case http.MethodPost:
- // declare request body properties
- var d struct {
- NumDays string `json:"numDays"`
- ForChain string `json:"forChain"`
- ForAddress string `json:"forAddress"`
- Daily string `json:"daily"`
- Last24Hours string `json:"last24Hours"`
- ForPeriod string `json:"forPeriod"`
- }
- // deserialize request body
- if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
- switch err {
- case io.EOF:
- // do nothing, empty body is ok
- default:
- log.Printf("json.NewDecoder: %v", err)
- http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
- return
- }
- }
- numDays = d.NumDays
- forChain = d.ForChain
- forAddress = d.ForAddress
- daily = d.Daily
- last24Hours = d.Last24Hours
- forPeriod = d.ForPeriod
- default:
- http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
- log.Println("Method Not Allowed")
- return
- }
- if daily == "" && last24Hours == "" && forPeriod == "" {
- // none of the options were set, so set one
- last24Hours = "true"
- }
- var queryDays int
- if numDays == "" {
- queryDays = 30
- } else {
- var convErr error
- queryDays, convErr = strconv.Atoi(numDays)
- if convErr != nil {
- fmt.Fprint(w, "numDays must be an integer")
- http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
- return
- }
- }
- // create the rowkey prefix for querying
- prefix := ""
- if forChain != "" {
- prefix = forChain
- // if the request is forChain, always groupBy chain
- if forAddress != "" {
- // if the request is forAddress, always groupBy address
- prefix = forChain + ":" + forAddress
- }
- }
- ctx, cancel := context.WithTimeout(context.Background(), 600*time.Second)
- defer cancel()
- var wg sync.WaitGroup
- // total of last 24 hours
- last24HourCount := map[string]map[string]map[string]float64{}
- if last24Hours != "" {
- wg.Add(1)
- go func(prefix string) {
- last24HourInterval := -time.Duration(24) * time.Hour
- now := time.Now().UTC()
- start := now.Add(last24HourInterval)
- defer wg.Done()
- transfers := transfersForInterval(tbl, ctx, prefix, start, now)
- for chain, dests := range transfers {
- last24HourCount[chain] = map[string]map[string]float64{}
- for dest, tokens := range dests {
- last24HourCount[chain][dest] = map[string]float64{}
- for symbol, amount := range tokens {
- last24HourCount[chain][dest][symbol] = roundToTwoDecimalPlaces(amount)
- }
- }
- }
- }(prefix)
- }
- // transfers of the last numDays
- periodTransfers := map[string]map[string]map[string]float64{}
- if forPeriod != "" {
- wg.Add(1)
- go func(prefix string) {
- hours := (24 * queryDays)
- periodInterval := -time.Duration(hours) * time.Hour
- now := time.Now().UTC()
- prev := now.Add(periodInterval)
- start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
- defer wg.Done()
- transfers := transferredSinceDate(tbl, ctx, prefix, start)
- for chain, dests := range transfers {
- periodTransfers[chain] = map[string]map[string]float64{}
- for dest, tokens := range dests {
- periodTransfers[chain][dest] = map[string]float64{}
- for symbol, amount := range tokens {
- periodTransfers[chain][dest][symbol] = roundToTwoDecimalPlaces(amount)
- }
- }
- }
- }(prefix)
- }
- // daily totals
- dailyTransfers := map[string]map[string]map[string]map[string]float64{}
- if daily != "" {
- wg.Add(1)
- go func(prefix string, queryDays int) {
- hours := (24 * queryDays)
- periodInterval := -time.Duration(hours) * time.Hour
- now := time.Now().UTC()
- prev := now.Add(periodInterval)
- start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
- defer wg.Done()
- transfers := createTransfersOfInterval(tbl, ctx, prefix, start)
- for date, chains := range transfers {
- dailyTransfers[date] = map[string]map[string]map[string]float64{}
- for chain, dests := range chains {
- dailyTransfers[date][chain] = map[string]map[string]float64{}
- for destChain, tokens := range dests {
- dailyTransfers[date][chain][destChain] = map[string]float64{}
- for symbol, amount := range tokens {
- dailyTransfers[date][chain][destChain][symbol] = roundToTwoDecimalPlaces(amount)
- }
- }
- }
- }
- }(prefix, queryDays)
- }
- wg.Wait()
- result := &transfersResult{
- Last24Hours: last24HourCount,
- WithinPeriod: periodTransfers,
- PeriodDurationDays: queryDays,
- Daily: dailyTransfers,
- }
- jsonBytes, err := json.Marshal(result)
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- w.Write([]byte(err.Error()))
- log.Println(err.Error())
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Write(jsonBytes)
- }
|