addresses-transferred-to-cumulative.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net/http"
  10. "sort"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "cloud.google.com/go/bigtable"
  15. )
  16. type cumulativeAddressesResult struct {
  17. AllTimeAmounts map[string]map[string]float64
  18. AllTimeCounts map[string]int
  19. AllTimeDurationDays int
  20. DailyAmounts map[string]map[string]map[string]float64
  21. DailyCounts map[string]map[string]int
  22. }
  23. // an in-memory cache of previously calculated results
  24. var warmCumulativeAddressesCache = map[string]map[string]map[string]map[string]float64{}
  25. var muWarmCumulativeAddressesCache sync.RWMutex
  26. var warmCumulativeAddressesCacheFilePath = "/addresses-transferred-to-cumulative-cache.json"
  27. var addressesToUpToYesterday = map[string]map[string]map[string]map[string]float64{}
  28. var muAddressesToUpToYesterday sync.RWMutex
  29. var addressesToUpToYesterdayFilePath = "/addresses-transferred-to-up-to-yesterday-cache.json"
  30. // finds all the unique addresses that have received tokens since a particular moment.
  31. func addressesTransferredToSince(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
  32. now := time.Now().UTC()
  33. today := now.Format("2006-01-02")
  34. oneDayAgo := -time.Duration(24) * time.Hour
  35. yesterday := now.Add(oneDayAgo).Format("2006-01-02")
  36. result := map[string]map[string]float64{}
  37. // create the unique identifier for this query, for cache
  38. cachePrefix := createCachePrefix(prefix)
  39. muAddressesToUpToYesterday.Lock()
  40. if _, ok := addressesToUpToYesterday[cachePrefix]; !ok {
  41. addressesToUpToYesterday[cachePrefix] = map[string]map[string]map[string]float64{}
  42. }
  43. if cacheData, ok := addressesToUpToYesterday[cachePrefix][yesterday]; ok {
  44. // cache has data through midnight yesterday
  45. result = cacheData
  46. // set the start to be the start of today
  47. start = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  48. }
  49. muAddressesToUpToYesterday.Unlock()
  50. // fetch data for days not in the cache
  51. dailyAddresses := createAddressesOfInterval(tbl, ctx, prefix, start)
  52. // loop through the query results to combine cache + fresh data
  53. for _, chains := range dailyAddresses {
  54. for chain, addresses := range chains {
  55. // ensure the chain exists in the result map
  56. if _, ok := result[chain]; !ok {
  57. result[chain] = map[string]float64{}
  58. }
  59. for address, amount := range addresses {
  60. if _, ok := result[chain][address]; !ok {
  61. result[chain][address] = 0
  62. }
  63. // add the amount the address received this day to the
  64. // amount already in the result (amount the address has recieved so far)
  65. result[chain][address] = result[chain][address] + amount
  66. }
  67. }
  68. }
  69. muAddressesToUpToYesterday.Lock()
  70. if _, ok := addressesToUpToYesterday[cachePrefix][yesterday]; !ok {
  71. // no cache, populate it
  72. upToYesterday := result
  73. for chain, addresses := range dailyAddresses[today] {
  74. for address, amount := range addresses {
  75. upToYesterday[chain][address] = upToYesterday[chain][address] - amount
  76. }
  77. }
  78. addressesToUpToYesterday[cachePrefix][yesterday] = upToYesterday
  79. muAddressesToUpToYesterday.Unlock()
  80. // write cache to disc
  81. persistInterfaceToJson(addressesToUpToYesterdayFilePath, &muAddressesToUpToYesterday, addressesToUpToYesterday)
  82. } else {
  83. muAddressesToUpToYesterday.Unlock()
  84. }
  85. return result
  86. }
  87. // calcuates a map of recepient address to notional value received, by chain, since the start time specified.
  88. func createCumulativeAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
  89. now := time.Now().UTC()
  90. today := now.Format("2006-01-02")
  91. cachePrefix := createCachePrefix(prefix)
  92. cacheNeedsUpdate := false
  93. muWarmCumulativeAddressesCache.Lock()
  94. if _, ok := warmCumulativeAddressesCache[cachePrefix]; !ok {
  95. warmCumulativeAddressesCache[cachePrefix] = map[string]map[string]map[string]float64{}
  96. }
  97. muWarmCumulativeAddressesCache.Unlock()
  98. results := map[string]map[string]map[string]float64{}
  99. dailyAddresses := createAddressesOfInterval(tbl, ctx, prefix, releaseDay)
  100. dateKeys := make([]string, 0, len(dailyAddresses))
  101. for k := range dailyAddresses {
  102. dateKeys = append(dateKeys, k)
  103. }
  104. sort.Strings(dateKeys)
  105. // iterate through the dates in the result set, and accumulate the amounts
  106. // of each token transfer by symbol, based on the destination of the transfer.
  107. for i, date := range dateKeys {
  108. muWarmCumulativeAddressesCache.RLock()
  109. if dateCache, ok := warmCumulativeAddressesCache[cachePrefix][date]; ok && dateCache != nil {
  110. // have a cached value for this day, use it.
  111. results[date] = dateCache
  112. muWarmCumulativeAddressesCache.RUnlock()
  113. } else {
  114. // no cached value for this day, must calculate it
  115. muWarmCumulativeAddressesCache.RUnlock()
  116. if i == 0 {
  117. // special case for first day, no need to sum.
  118. results[date] = dailyAddresses[date]
  119. } else {
  120. results[date] = map[string]map[string]float64{}
  121. // find the string of the previous day
  122. prevDate := dateKeys[i-1]
  123. prevDayChains := results[prevDate]
  124. thisDayChains := dailyAddresses[date]
  125. for chain, thisDayAddresses := range thisDayChains {
  126. // create a union of the addresses from this day, and previous days
  127. addressUnion := map[string]string{}
  128. for address := range prevDayChains[chain] {
  129. addressUnion[address] = address
  130. }
  131. for address := range thisDayAddresses {
  132. addressUnion[address] = address
  133. }
  134. // initalize the chain/symbol map for this date
  135. if _, ok := results[date][chain]; !ok {
  136. results[date][chain] = map[string]float64{}
  137. }
  138. // iterate through the union of addresses, creating an amount for each one,
  139. // and adding it the the results.
  140. for address := range addressUnion {
  141. thisDayAmount := float64(0)
  142. if amt, ok := thisDayAddresses[address]; ok {
  143. thisDayAmount = amt
  144. }
  145. prevDayAmount := float64(0)
  146. if prevAmount, ok := results[prevDate][chain][address]; ok && prevAmount != 0 {
  147. prevDayAmount = prevAmount
  148. }
  149. cumulativeAmount := prevDayAmount + thisDayAmount
  150. results[date][chain][address] = cumulativeAmount
  151. }
  152. }
  153. }
  154. // dont cache today
  155. if date != today {
  156. // set the result in the cache
  157. muWarmCumulativeAddressesCache.Lock()
  158. if _, ok := warmCumulativeAddressesCache[cachePrefix][date]; !ok {
  159. // cache does not have this date, persist it for other instances.
  160. warmCumulativeAddressesCache[cachePrefix][date] = results[date]
  161. cacheNeedsUpdate = true
  162. }
  163. muWarmCumulativeAddressesCache.Unlock()
  164. }
  165. }
  166. }
  167. if cacheNeedsUpdate {
  168. persistInterfaceToJson(warmCumulativeAddressesCacheFilePath, &muWarmCumulativeAddressesCache, warmCumulativeAddressesCache)
  169. }
  170. selectDays := map[string]map[string]map[string]float64{}
  171. days := getDaysInRange(start, now)
  172. for _, day := range days {
  173. selectDays[day] = results[day]
  174. }
  175. return selectDays
  176. }
  177. // finds unique addresses that tokens have been transferred to.
  178. func AddressesTransferredToCumulative(w http.ResponseWriter, r *http.Request) {
  179. // Set CORS headers for the preflight request
  180. if r.Method == http.MethodOptions {
  181. w.Header().Set("Access-Control-Allow-Origin", "*")
  182. w.Header().Set("Access-Control-Allow-Methods", "POST")
  183. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  184. w.Header().Set("Access-Control-Max-Age", "3600")
  185. w.WriteHeader(http.StatusNoContent)
  186. return
  187. }
  188. // Set CORS headers for the main request.
  189. w.Header().Set("Access-Control-Allow-Origin", "*")
  190. var numDays, forChain, forAddress, daily, allTime, counts, amounts string
  191. // allow GET requests with querystring params, or POST requests with json body.
  192. switch r.Method {
  193. case http.MethodGet:
  194. queryParams := r.URL.Query()
  195. numDays = queryParams.Get("numDays")
  196. forChain = queryParams.Get("forChain")
  197. forAddress = queryParams.Get("forAddress")
  198. daily = queryParams.Get("daily")
  199. allTime = queryParams.Get("allTime")
  200. counts = queryParams.Get("counts")
  201. amounts = queryParams.Get("amounts")
  202. case http.MethodPost:
  203. // declare request body properties
  204. var d struct {
  205. NumDays string `json:"numDays"`
  206. ForChain string `json:"forChain"`
  207. ForAddress string `json:"forAddress"`
  208. Daily string `json:"daily"`
  209. AllTime string `json:"allTime"`
  210. Counts string `json:"counts"`
  211. Amounts string `json:"amounts"`
  212. }
  213. // deserialize request body
  214. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  215. switch err {
  216. case io.EOF:
  217. // do nothing, empty body is ok
  218. default:
  219. log.Printf("json.NewDecoder: %v", err)
  220. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  221. return
  222. }
  223. }
  224. numDays = d.NumDays
  225. forChain = d.ForChain
  226. forAddress = d.ForAddress
  227. daily = d.Daily
  228. allTime = d.AllTime
  229. counts = d.Counts
  230. amounts = d.Amounts
  231. default:
  232. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  233. log.Println("Method Not Allowed")
  234. return
  235. }
  236. if daily == "" && allTime == "" {
  237. // none of the options were set, so set one
  238. allTime = "true"
  239. }
  240. if counts == "" && amounts == "" {
  241. // neither of the options were set, so set one
  242. counts = "true"
  243. }
  244. var queryDays int
  245. if numDays == "" {
  246. queryDays = 30
  247. } else {
  248. var convErr error
  249. queryDays, convErr = strconv.Atoi(numDays)
  250. if convErr != nil {
  251. fmt.Fprint(w, "numDays must be an integer")
  252. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  253. return
  254. }
  255. }
  256. // create the rowkey prefix for querying
  257. prefix := ""
  258. if forChain != "" {
  259. prefix = forChain
  260. // if the request is forChain, always groupBy chain
  261. if forAddress != "" {
  262. // if the request is forAddress, always groupBy address
  263. prefix = forChain + ":" + forAddress
  264. }
  265. }
  266. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  267. defer cancel()
  268. var wg sync.WaitGroup
  269. // total of the last numDays
  270. addressesDailyAmounts := map[string]map[string]float64{}
  271. addressesDailyCounts := map[string]int{}
  272. allTimeDays := int(time.Now().UTC().Sub(releaseDay).Hours() / 24)
  273. if allTime != "" {
  274. wg.Add(1)
  275. go func(prefix string) {
  276. defer wg.Done()
  277. periodAmounts := addressesTransferredToSince(tbl, ctx, prefix, releaseDay)
  278. if amounts != "" {
  279. for chain, addresses := range periodAmounts {
  280. addressesDailyAmounts[chain] = map[string]float64{}
  281. for address, amount := range addresses {
  282. addressesDailyAmounts[chain][address] = roundToTwoDecimalPlaces(amount)
  283. }
  284. }
  285. }
  286. if counts != "" {
  287. for chain, addresses := range periodAmounts {
  288. // need to sum all the chains to get the total count of addresses,
  289. // since addresses are not unique across chains.
  290. numAddresses := len(addresses)
  291. addressesDailyCounts[chain] = len(addresses)
  292. addressesDailyCounts["*"] = addressesDailyCounts["*"] + numAddresses
  293. }
  294. }
  295. }(prefix)
  296. }
  297. // daily totals
  298. dailyAmounts := map[string]map[string]map[string]float64{}
  299. dailyCounts := map[string]map[string]int{}
  300. if daily != "" {
  301. wg.Add(1)
  302. go func(prefix string, queryDays int) {
  303. hours := (24 * queryDays)
  304. periodInterval := -time.Duration(hours) * time.Hour
  305. now := time.Now().UTC()
  306. prev := now.Add(periodInterval)
  307. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  308. defer wg.Done()
  309. dailyTotals := createCumulativeAddressesOfInterval(tbl, ctx, prefix, start)
  310. if amounts != "" {
  311. for date, chains := range dailyTotals {
  312. dailyAmounts[date] = map[string]map[string]float64{}
  313. for chain, addresses := range chains {
  314. dailyAmounts[date][chain] = map[string]float64{}
  315. for address, amount := range addresses {
  316. dailyAmounts[date][chain][address] = roundToTwoDecimalPlaces(amount)
  317. }
  318. }
  319. }
  320. }
  321. if counts != "" {
  322. for date, chains := range dailyTotals {
  323. dailyCounts[date] = map[string]int{}
  324. for chain, addresses := range chains {
  325. // need to sum all the chains to get the total count of addresses,
  326. // since addresses are not unique across chains.
  327. numAddresses := len(addresses)
  328. dailyCounts[date][chain] = numAddresses
  329. dailyCounts[date]["*"] = dailyCounts[date]["*"] + numAddresses
  330. }
  331. }
  332. }
  333. }(prefix, queryDays)
  334. }
  335. wg.Wait()
  336. result := &cumulativeAddressesResult{
  337. AllTimeAmounts: addressesDailyAmounts,
  338. AllTimeCounts: addressesDailyCounts,
  339. AllTimeDurationDays: allTimeDays,
  340. DailyAmounts: dailyAmounts,
  341. DailyCounts: dailyCounts,
  342. }
  343. jsonBytes, err := json.Marshal(result)
  344. if err != nil {
  345. w.WriteHeader(http.StatusInternalServerError)
  346. w.Write([]byte(err.Error()))
  347. log.Println(err.Error())
  348. return
  349. }
  350. w.WriteHeader(http.StatusOK)
  351. w.Header().Add("Content-Type", "application/json")
  352. w.Write(jsonBytes)
  353. }