notional-transferred-to-cumulative.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. // "bytes"
  5. "context"
  6. // "encoding/binary"
  7. "encoding/json"
  8. "fmt"
  9. "io"
  10. "log"
  11. "net/http"
  12. "sort"
  13. "strconv"
  14. "sync"
  15. "time"
  16. "cloud.google.com/go/bigtable"
  17. )
  18. type cumulativeResult struct {
  19. AllTime map[string]map[string]float64
  20. AllTimeDurationDays int
  21. Daily map[string]map[string]map[string]float64
  22. }
  23. // an in-memory cache of previously calculated results
  24. var warmCumulativeCache = map[string]map[string]map[string]map[string]float64{}
  25. var muWarmCumulativeCache sync.RWMutex
  26. var warmCumulativeCacheFilePath = "/notional-transferred-to-cumulative-cache.json"
  27. var transferredToUpToYesterday = map[string]map[string]map[string]map[string]float64{}
  28. var muTransferredToUpToYesterday sync.RWMutex
  29. var transferredToUpToYesterdayFilePath = "/notional-transferred-to-up-to-yesterday-cache.json"
  30. // calculates the amount of each symbol transfered to each chain.
  31. func transferredToSince(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
  32. now := time.Now().UTC()
  33. today := now.Format("2006-01-02")
  34. oneDayAgo := -time.Duration(24) * time.Hour
  35. yesterday := now.Add(oneDayAgo).Format("2006-01-02")
  36. result := map[string]map[string]float64{"*": {"*": 0}}
  37. // create the unique identifier for this query, for cache
  38. cachePrefix := createCachePrefix(prefix)
  39. muTransferredToUpToYesterday.Lock()
  40. if _, ok := transferredToUpToYesterday[cachePrefix]; !ok {
  41. transferredToUpToYesterday[cachePrefix] = map[string]map[string]map[string]float64{}
  42. }
  43. if cacheData, ok := transferredToUpToYesterday[cachePrefix][yesterday]; ok {
  44. // cache has data through midnight yesterday
  45. result = cacheData
  46. // set the start to be the start of today
  47. start = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  48. }
  49. muTransferredToUpToYesterday.Unlock()
  50. dailyTotals := amountsTransferredToInInterval(tbl, ctx, prefix, start)
  51. // loop through the query results to combine cache + fresh data
  52. for _, chains := range dailyTotals {
  53. for chain, tokens := range chains {
  54. // ensure the chain exists in the result map
  55. if _, ok := result[chain]; !ok {
  56. result[chain] = map[string]float64{"*": 0}
  57. }
  58. for symbol, amount := range tokens {
  59. if _, ok := result[chain][symbol]; !ok {
  60. result[chain][symbol] = 0
  61. }
  62. // add the amount of this symbol transferred this day to the
  63. // amount already in the result (amount of this symbol prevoiusly transferred)
  64. result[chain][symbol] = result[chain][symbol] + amount
  65. }
  66. }
  67. }
  68. muTransferredToUpToYesterday.Lock()
  69. if _, ok := transferredToUpToYesterday[cachePrefix][yesterday]; !ok {
  70. // no cache, populate it
  71. upToYesterday := result
  72. for chain, tokens := range dailyTotals[today] {
  73. for symbol, amount := range tokens {
  74. upToYesterday[chain][symbol] = upToYesterday[chain][symbol] - amount
  75. }
  76. }
  77. transferredToUpToYesterday[cachePrefix][yesterday] = upToYesterday
  78. muTransferredToUpToYesterday.Unlock()
  79. // write the updated cache to disc
  80. persistInterfaceToJson(transferredToUpToYesterdayFilePath, &muTransferredToUpToYesterday, transferredToUpToYesterday)
  81. } else {
  82. muTransferredToUpToYesterday.Unlock()
  83. }
  84. return result
  85. }
  86. // returns a slice of dates (strings) for each day in the period. Dates formatted: "2021-12-30".
  87. func getDaysInRange(start, end time.Time) []string {
  88. now := time.Now().UTC()
  89. numDays := int(end.Sub(start).Hours() / 24)
  90. days := []string{}
  91. for daysAgo := 0; daysAgo <= numDays; daysAgo++ {
  92. hoursAgo := (24 * daysAgo)
  93. daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
  94. n := now.Add(daysAgoDuration)
  95. year := n.Year()
  96. month := n.Month()
  97. day := n.Day()
  98. loc := n.Location()
  99. start := time.Date(year, month, day, 0, 0, 0, 0, loc)
  100. dateStr := start.Format("2006-01-02")
  101. days = append(days, dateStr)
  102. }
  103. return days
  104. }
  105. // calcuates a running total of notional value transferred, by symbol, since the start time specified.
  106. func createCumulativeAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
  107. now := time.Now().UTC()
  108. today := now.Format("2006-01-02")
  109. cachePrefix := createCachePrefix(prefix)
  110. cacheNeedsUpdate := false
  111. muWarmCumulativeCache.Lock()
  112. if _, ok := warmCumulativeCache[cachePrefix]; !ok {
  113. warmCumulativeCache[cachePrefix] = map[string]map[string]map[string]float64{}
  114. }
  115. muWarmCumulativeCache.Unlock()
  116. results := map[string]map[string]map[string]float64{}
  117. // fetch the amounts of transfers by symbol, for each day since launch (releaseDay)
  118. dailyAmounts := amountsTransferredToInInterval(tbl, ctx, prefix, releaseDay)
  119. // create a slice of dates, order oldest first
  120. dateKeys := make([]string, 0, len(dailyAmounts))
  121. for k := range dailyAmounts {
  122. dateKeys = append(dateKeys, k)
  123. }
  124. sort.Strings(dateKeys)
  125. // iterate through the dates in the result set, and accumulate the amounts
  126. // of each token transfer by symbol, based on the destination of the transfer.
  127. for i, date := range dateKeys {
  128. muWarmCumulativeCache.RLock()
  129. if dateCache, ok := warmCumulativeCache[cachePrefix][date]; ok && dateCache != nil {
  130. // have a cached value for this day, use it.
  131. results[date] = dateCache
  132. muWarmCumulativeCache.RUnlock()
  133. } else {
  134. // no cached value for this day, must calculate it
  135. muWarmCumulativeCache.RUnlock()
  136. if i == 0 {
  137. // special case for first day, no need to sum.
  138. results[date] = dailyAmounts[date]
  139. } else {
  140. results[date] = map[string]map[string]float64{"*": {"*": 0}}
  141. // find the string of the previous day
  142. prevDate := dateKeys[i-1]
  143. prevDayAmounts := results[prevDate]
  144. thisDayAmounts := dailyAmounts[date]
  145. // iterate through all the transfers and add the previous day's amount, if it exists
  146. for chain, thisDaySymbols := range thisDayAmounts {
  147. // create a union of the symbols from this day, and previous days
  148. symbolsUnion := map[string]string{}
  149. for symbol := range prevDayAmounts[chain] {
  150. symbolsUnion[symbol] = symbol
  151. }
  152. for symbol := range thisDaySymbols {
  153. symbolsUnion[symbol] = symbol
  154. }
  155. // initalize the chain/symbol map for this date
  156. if _, ok := results[date][chain]; !ok {
  157. results[date][chain] = map[string]float64{"*": 0}
  158. }
  159. // iterate through the union of symbols, creating an amount for each one,
  160. // and adding it the the results.
  161. for symbol := range symbolsUnion {
  162. thisDayAmount := float64(0)
  163. if amt, ok := thisDaySymbols[symbol]; ok {
  164. thisDayAmount = amt
  165. }
  166. prevDayAmount := float64(0)
  167. if amt, ok := results[prevDate][chain][symbol]; ok {
  168. prevDayAmount = amt
  169. }
  170. cumulativeAmount := prevDayAmount + thisDayAmount
  171. results[date][chain][symbol] = cumulativeAmount
  172. }
  173. }
  174. }
  175. // dont cache today
  176. if date != today {
  177. // set the result in the cache
  178. muWarmCumulativeCache.Lock()
  179. if _, ok := warmCumulativeCache[cachePrefix][date]; !ok {
  180. // cache does not have this date, persist it for other instances.
  181. warmCumulativeCache[cachePrefix][date] = results[date]
  182. cacheNeedsUpdate = true
  183. }
  184. muWarmCumulativeCache.Unlock()
  185. }
  186. }
  187. }
  188. if cacheNeedsUpdate {
  189. persistInterfaceToJson(warmCumulativeCacheFilePath, &muWarmCumulativeCache, warmCumulativeCache)
  190. }
  191. // take the most recent n days, rather than returning all days since launch
  192. selectDays := map[string]map[string]map[string]float64{}
  193. days := getDaysInRange(start, now)
  194. for _, day := range days {
  195. selectDays[day] = results[day]
  196. }
  197. return selectDays
  198. }
  199. // calculates the cumulative value transferred each day since launch.
  200. func NotionalTransferredToCumulative(w http.ResponseWriter, r *http.Request) {
  201. // Set CORS headers for the preflight request
  202. if r.Method == http.MethodOptions {
  203. w.Header().Set("Access-Control-Allow-Origin", "*")
  204. w.Header().Set("Access-Control-Allow-Methods", "POST")
  205. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  206. w.Header().Set("Access-Control-Max-Age", "3600")
  207. w.WriteHeader(http.StatusNoContent)
  208. return
  209. }
  210. // Set CORS headers for the main request.
  211. w.Header().Set("Access-Control-Allow-Origin", "*")
  212. var numDays, forChain, forAddress, daily, allTime string
  213. // allow GET requests with querystring params, or POST requests with json body.
  214. switch r.Method {
  215. case http.MethodGet:
  216. queryParams := r.URL.Query()
  217. numDays = queryParams.Get("numDays")
  218. forChain = queryParams.Get("forChain")
  219. forAddress = queryParams.Get("forAddress")
  220. daily = queryParams.Get("daily")
  221. allTime = queryParams.Get("allTime")
  222. case http.MethodPost:
  223. // declare request body properties
  224. var d struct {
  225. NumDays string `json:"numDays"`
  226. ForChain string `json:"forChain"`
  227. ForAddress string `json:"forAddress"`
  228. Daily string `json:"daily"`
  229. AllTime string `json:"allTime"`
  230. }
  231. // deserialize request body
  232. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  233. switch err {
  234. case io.EOF:
  235. // do nothing, empty body is ok
  236. default:
  237. log.Printf("json.NewDecoder: %v", err)
  238. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  239. return
  240. }
  241. }
  242. numDays = d.NumDays
  243. forChain = d.ForChain
  244. forAddress = d.ForAddress
  245. daily = d.Daily
  246. allTime = d.AllTime
  247. default:
  248. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  249. log.Println("Method Not Allowed")
  250. return
  251. }
  252. if daily == "" && allTime == "" {
  253. // none of the options were set, so set one
  254. allTime = "true"
  255. }
  256. var queryDays int
  257. if numDays == "" {
  258. queryDays = 30
  259. } else {
  260. var convErr error
  261. queryDays, convErr = strconv.Atoi(numDays)
  262. if convErr != nil {
  263. fmt.Fprint(w, "numDays must be an integer")
  264. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  265. return
  266. }
  267. }
  268. // create the rowkey prefix for querying
  269. prefix := ""
  270. if forChain != "" {
  271. prefix = forChain
  272. // if the request is forChain, always groupBy chain
  273. if forAddress != "" {
  274. // if the request is forAddress, always groupBy address
  275. prefix = forChain + ":" + forAddress
  276. }
  277. }
  278. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  279. defer cancel()
  280. var wg sync.WaitGroup
  281. // total since launch
  282. periodTransfers := map[string]map[string]float64{}
  283. allTimeDays := int(time.Now().UTC().Sub(releaseDay).Hours() / 24)
  284. if allTime != "" {
  285. wg.Add(1)
  286. go func(prefix string) {
  287. defer wg.Done()
  288. transfers := transferredToSince(tbl, context.Background(), prefix, releaseDay)
  289. for chain, tokens := range transfers {
  290. periodTransfers[chain] = map[string]float64{}
  291. for symbol, amount := range tokens {
  292. periodTransfers[chain][symbol] = roundToTwoDecimalPlaces(amount)
  293. }
  294. }
  295. }(prefix)
  296. }
  297. // daily transfers by chain
  298. dailyTransfers := map[string]map[string]map[string]float64{}
  299. if daily != "" {
  300. wg.Add(1)
  301. go func(prefix string, queryDays int) {
  302. hours := (24 * queryDays)
  303. periodInterval := -time.Duration(hours) * time.Hour
  304. now := time.Now().UTC()
  305. prev := now.Add(periodInterval)
  306. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  307. defer wg.Done()
  308. transfers := createCumulativeAmountsOfInterval(tbl, ctx, prefix, start)
  309. for date, chains := range transfers {
  310. dailyTransfers[date] = map[string]map[string]float64{}
  311. for chain, tokens := range chains {
  312. dailyTransfers[date][chain] = map[string]float64{}
  313. for symbol, amount := range tokens {
  314. dailyTransfers[date][chain][symbol] = roundToTwoDecimalPlaces(amount)
  315. }
  316. }
  317. }
  318. }(prefix, queryDays)
  319. }
  320. wg.Wait()
  321. result := &cumulativeResult{
  322. AllTime: periodTransfers,
  323. AllTimeDurationDays: allTimeDays,
  324. Daily: dailyTransfers,
  325. }
  326. jsonBytes, err := json.Marshal(result)
  327. if err != nil {
  328. w.WriteHeader(http.StatusInternalServerError)
  329. w.Write([]byte(err.Error()))
  330. log.Println(err.Error())
  331. return
  332. }
  333. w.WriteHeader(http.StatusOK)
  334. w.Write(jsonBytes)
  335. }