addresses-transferred-to.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/binary"
  7. "encoding/json"
  8. "fmt"
  9. "io"
  10. "log"
  11. "net/http"
  12. "strconv"
  13. "sync"
  14. "time"
  15. "cloud.google.com/go/bigtable"
  16. )
  17. type addressesResult struct {
  18. Last24HoursAmounts map[string]map[string]float64
  19. Last24HoursCounts map[string]int
  20. WithinPeriodAmounts map[string]map[string]float64
  21. WithinPeriodCounts map[string]int
  22. PeriodDurationDays int
  23. DailyAmounts map[string]map[string]map[string]float64
  24. DailyCounts map[string]map[string]int
  25. }
  26. // an in-memory cache of previously calculated results
  27. var warmAddressesCache = map[string]map[string]map[string]map[string]float64{}
  28. var muWarmAddressesCache sync.RWMutex
  29. var warmAddressesCacheFilePath = "/addresses-transferred-to-cache.json"
  30. type AddressData struct {
  31. TokenSymbol string
  32. TokenAmount float64
  33. OriginChain string
  34. LeavingChain string
  35. DestinationChain string
  36. DestinationAddress string
  37. Notional float64
  38. }
  39. func fetchAddressRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) []AddressData {
  40. rows := []AddressData{}
  41. err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
  42. t := &AddressData{}
  43. if _, ok := row[transferDetailsFam]; ok {
  44. for _, item := range row[transferDetailsFam] {
  45. switch item.Column {
  46. case "TokenTransferDetails:Amount":
  47. amount, _ := strconv.ParseFloat(string(item.Value), 64)
  48. t.TokenAmount = amount
  49. case "TokenTransferDetails:NotionalUSD":
  50. reader := bytes.NewReader(item.Value)
  51. var notionalFloat float64
  52. if err := binary.Read(reader, binary.BigEndian, &notionalFloat); err != nil {
  53. log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err)
  54. }
  55. t.Notional = notionalFloat
  56. case "TokenTransferDetails:OriginSymbol":
  57. t.TokenSymbol = string(item.Value)
  58. }
  59. }
  60. if _, ok := row[transferPayloadFam]; ok {
  61. for _, item := range row[transferPayloadFam] {
  62. switch item.Column {
  63. case "TokenTransferPayload:OriginChain":
  64. t.OriginChain = string(item.Value)
  65. case "TokenTransferPayload:TargetChain":
  66. t.DestinationChain = string(item.Value)
  67. case "TokenTransferPayload:TargetAddress":
  68. t.DestinationAddress = string(item.Value)
  69. }
  70. }
  71. t.DestinationAddress = transformHexAddressToNative(chainIdStringToType(t.DestinationChain), t.DestinationAddress)
  72. }
  73. t.LeavingChain = row.Key()[:1]
  74. rows = append(rows, *t)
  75. }
  76. return true
  77. }, bigtable.RowFilter(
  78. bigtable.ConditionFilter(
  79. bigtable.ChainFilters(
  80. bigtable.FamilyFilter(columnFamilies[1]),
  81. bigtable.CellsPerRowLimitFilter(1), // only the first cell in column
  82. bigtable.TimestampRangeFilter(start, end), // within time range
  83. bigtable.StripValueFilter(), // no columns/values, just the row.Key()
  84. ),
  85. bigtable.ChainFilters(
  86. bigtable.FamilyFilter(fmt.Sprintf("%v|%v", columnFamilies[2], columnFamilies[5])),
  87. bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginChain|TargetChain|TargetAddress"),
  88. bigtable.LatestNFilter(1),
  89. ),
  90. bigtable.BlockAllFilter(),
  91. ),
  92. ))
  93. if err != nil {
  94. log.Fatalln("failed reading rows to create RowList.", err)
  95. }
  96. return rows
  97. }
  98. // finds unique addresses tokens have been sent to, for each day since the start time passed in.
  99. func createAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
  100. results := map[string]map[string]map[string]float64{}
  101. now := time.Now().UTC()
  102. numPrevDays := int(now.Sub(start).Hours() / 24)
  103. var intervalsWG sync.WaitGroup
  104. // there will be a query for each previous day, plus today
  105. intervalsWG.Add(numPrevDays + 1)
  106. // create the unique identifier for this query, for cache
  107. cachePrefix := createCachePrefix(prefix)
  108. cacheNeedsUpdate := false
  109. for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
  110. go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
  111. // start is the SOD, end is EOD
  112. // "0 daysAgo start" is 00:00:00 AM of the current day
  113. // "0 daysAgo end" is 23:59:59 of the current day (the future)
  114. // calulate the start and end times for the query
  115. hoursAgo := (24 * daysAgo)
  116. daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
  117. n := now.Add(daysAgoDuration)
  118. year := n.Year()
  119. month := n.Month()
  120. day := n.Day()
  121. loc := n.Location()
  122. start := time.Date(year, month, day, 0, 0, 0, 0, loc)
  123. end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
  124. dateStr := start.Format("2006-01-02")
  125. muWarmAddressesCache.Lock()
  126. // initialize the map for this date in the result set
  127. results[dateStr] = map[string]map[string]float64{}
  128. // check to see if there is cache data for this date/query
  129. if dates, ok := warmAddressesCache[cachePrefix]; ok {
  130. // have a cache for this query
  131. if dateCache, ok := dates[dateStr]; ok {
  132. // have a cache for this date
  133. if daysAgo >= 1 {
  134. // only use the cache for yesterday and older
  135. results[dateStr] = dateCache
  136. muWarmAddressesCache.Unlock()
  137. intervalsWG.Done()
  138. return
  139. }
  140. }
  141. } else {
  142. // no cache for this query, initialize the map
  143. warmAddressesCache[cachePrefix] = map[string]map[string]map[string]float64{}
  144. }
  145. muWarmAddressesCache.Unlock()
  146. defer intervalsWG.Done()
  147. queryResult := fetchAddressRowsInInterval(tbl, ctx, prefix, start, end)
  148. // iterate through the rows and increment the count
  149. for _, row := range queryResult {
  150. if _, ok := results[dateStr][row.DestinationChain]; !ok {
  151. results[dateStr][row.DestinationChain] = map[string]float64{}
  152. }
  153. results[dateStr][row.DestinationChain][row.DestinationAddress] = results[dateStr][row.DestinationChain][row.DestinationAddress] + row.Notional
  154. }
  155. if daysAgo >= 1 {
  156. // set the result in the cache
  157. muWarmAddressesCache.Lock()
  158. if _, ok := warmAddressesCache[cachePrefix][dateStr]; !ok {
  159. // cache does not have this date, persist it for other instances.
  160. warmAddressesCache[cachePrefix][dateStr] = results[dateStr]
  161. cacheNeedsUpdate = true
  162. }
  163. muWarmAddressesCache.Unlock()
  164. }
  165. }(tbl, ctx, prefix, daysAgo)
  166. }
  167. intervalsWG.Wait()
  168. if cacheNeedsUpdate {
  169. persistInterfaceToJson(warmAddressesCacheFilePath, &muWarmAddressesCache, warmAddressesCache)
  170. }
  171. // create a set of all the keys from all dates/chains, to ensure the result objects all have the same keys
  172. seenChainSet := map[string]bool{}
  173. for _, chains := range results {
  174. for leaving := range chains {
  175. seenChainSet[leaving] = true
  176. }
  177. }
  178. // ensure each chain object has all the same symbol keys:
  179. for date := range results {
  180. for chain := range seenChainSet {
  181. // check that date has all the chains
  182. if _, ok := results[date][chain]; !ok {
  183. results[date][chain] = map[string]float64{}
  184. }
  185. }
  186. }
  187. return results
  188. }
  189. // finds all the unique addresses that have received tokens since a particular moment.
  190. func addressesTransferredToSinceDate(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
  191. result := map[string]map[string]float64{}
  192. // fetch data for days not in the cache
  193. dailyAddresses := createAddressesOfInterval(tbl, ctx, prefix, start)
  194. // loop through the query results to combine cache + fresh data
  195. for _, chains := range dailyAddresses {
  196. for chain, addresses := range chains {
  197. // ensure the chain exists in the result map
  198. if _, ok := result[chain]; !ok {
  199. result[chain] = map[string]float64{}
  200. }
  201. for address, amount := range addresses {
  202. if _, ok := result[chain][address]; !ok {
  203. result[chain][address] = 0
  204. }
  205. // add the amount the address received this day to the
  206. // amount already in the result (amount the address has recieved so far)
  207. result[chain][address] = result[chain][address] + amount
  208. }
  209. }
  210. }
  211. return result
  212. }
  213. // returns addresses that received tokens within the specified time range
  214. func addressesForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) map[string]map[string]float64 {
  215. // query for all rows in time range, return result count
  216. queryResult := fetchAddressRowsInInterval(tbl, ctx, prefix, start, end)
  217. result := map[string]map[string]float64{}
  218. // iterate through the rows and increment the count for each index
  219. for _, row := range queryResult {
  220. if _, ok := result[row.DestinationChain]; !ok {
  221. result[row.DestinationChain] = map[string]float64{}
  222. }
  223. result[row.DestinationChain][row.DestinationAddress] = result[row.DestinationChain][row.DestinationAddress] + row.Notional
  224. }
  225. return result
  226. }
  227. // find the addresses tokens have been transferred to
  228. func AddressesTransferredTo(w http.ResponseWriter, r *http.Request) {
  229. // Set CORS headers for the preflight request
  230. if r.Method == http.MethodOptions {
  231. w.Header().Set("Access-Control-Allow-Origin", "*")
  232. w.Header().Set("Access-Control-Allow-Methods", "POST")
  233. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  234. w.Header().Set("Access-Control-Max-Age", "3600")
  235. w.WriteHeader(http.StatusNoContent)
  236. return
  237. }
  238. // Set CORS headers for the main request.
  239. w.Header().Set("Access-Control-Allow-Origin", "*")
  240. var numDays, forChain, forAddress, daily, last24Hours, forPeriod, counts, amounts string
  241. // allow GET requests with querystring params, or POST requests with json body.
  242. switch r.Method {
  243. case http.MethodGet:
  244. queryParams := r.URL.Query()
  245. numDays = queryParams.Get("numDays")
  246. forChain = queryParams.Get("forChain")
  247. forAddress = queryParams.Get("forAddress")
  248. daily = queryParams.Get("daily")
  249. last24Hours = queryParams.Get("last24Hours")
  250. forPeriod = queryParams.Get("forPeriod")
  251. counts = queryParams.Get("counts")
  252. amounts = queryParams.Get("amounts")
  253. case http.MethodPost:
  254. // declare request body properties
  255. var d struct {
  256. NumDays string `json:"numDays"`
  257. ForChain string `json:"forChain"`
  258. ForAddress string `json:"forAddress"`
  259. Daily string `json:"daily"`
  260. Last24Hours string `json:"last24Hours"`
  261. ForPeriod string `json:"forPeriod"`
  262. Counts string `json:"counts"`
  263. Amounts string `json:"amounts"`
  264. }
  265. // deserialize request body
  266. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  267. switch err {
  268. case io.EOF:
  269. // do nothing, empty body is ok
  270. default:
  271. log.Printf("json.NewDecoder: %v", err)
  272. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  273. return
  274. }
  275. }
  276. numDays = d.NumDays
  277. forChain = d.ForChain
  278. forAddress = d.ForAddress
  279. daily = d.Daily
  280. last24Hours = d.Last24Hours
  281. forPeriod = d.ForPeriod
  282. counts = d.Counts
  283. amounts = d.Amounts
  284. default:
  285. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  286. log.Println("Method Not Allowed")
  287. return
  288. }
  289. if daily == "" && last24Hours == "" && forPeriod == "" {
  290. // none of the options were set, so set one
  291. last24Hours = "true"
  292. }
  293. if counts == "" && amounts == "" {
  294. // neither of the options were set, so set one
  295. counts = "true"
  296. }
  297. var queryDays int
  298. if numDays == "" {
  299. queryDays = 30
  300. } else {
  301. var convErr error
  302. queryDays, convErr = strconv.Atoi(numDays)
  303. if convErr != nil {
  304. fmt.Fprint(w, "numDays must be an integer")
  305. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  306. return
  307. }
  308. }
  309. // create the rowkey prefix for querying
  310. prefix := ""
  311. if forChain != "" {
  312. prefix = forChain
  313. // if the request is forChain, always groupBy chain
  314. if forAddress != "" {
  315. // if the request is forAddress, always groupBy address
  316. prefix = forChain + ":" + forAddress
  317. }
  318. }
  319. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  320. defer cancel()
  321. var wg sync.WaitGroup
  322. // total of last 24 hours
  323. last24HourAmounts := map[string]map[string]float64{}
  324. last24HourCounts := map[string]int{}
  325. if last24Hours != "" {
  326. wg.Add(1)
  327. go func(prefix string) {
  328. last24HourInterval := -time.Duration(24) * time.Hour
  329. now := time.Now().UTC()
  330. start := now.Add(last24HourInterval)
  331. defer wg.Done()
  332. last24HourAddresses := addressesForInterval(tbl, ctx, prefix, start, now)
  333. if amounts != "" {
  334. for chain, addresses := range last24HourAddresses {
  335. last24HourAmounts[chain] = map[string]float64{}
  336. for address, amount := range addresses {
  337. last24HourAmounts[chain][address] = roundToTwoDecimalPlaces(amount)
  338. }
  339. }
  340. }
  341. if counts != "" {
  342. for chain, addresses := range last24HourAddresses {
  343. // need to sum all the chains to get the total count of addresses,
  344. // since addresses are not unique across chains.
  345. numAddresses := len(addresses)
  346. last24HourCounts[chain] = numAddresses
  347. last24HourCounts["*"] = last24HourCounts["*"] + numAddresses
  348. }
  349. }
  350. }(prefix)
  351. }
  352. // total of the last numDays
  353. addressesDailyAmounts := map[string]map[string]float64{}
  354. addressesDailyCounts := map[string]int{}
  355. if forPeriod != "" {
  356. wg.Add(1)
  357. go func(prefix string) {
  358. hours := (24 * queryDays)
  359. periodInterval := -time.Duration(hours) * time.Hour
  360. now := time.Now().UTC()
  361. prev := now.Add(periodInterval)
  362. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  363. defer wg.Done()
  364. // periodAmounts, err := addressesTransferredToSince(tbl, ctx, prefix, start)
  365. periodAmounts := addressesTransferredToSinceDate(tbl, ctx, prefix, start)
  366. if amounts != "" {
  367. for chain, addresses := range periodAmounts {
  368. addressesDailyAmounts[chain] = map[string]float64{}
  369. for address, amount := range addresses {
  370. addressesDailyAmounts[chain][address] = roundToTwoDecimalPlaces(amount)
  371. }
  372. }
  373. }
  374. if counts != "" {
  375. for chain, addresses := range periodAmounts {
  376. // need to sum all the chains to get the total count of addresses,
  377. // since addresses are not unique across chains.
  378. numAddresses := len(addresses)
  379. addressesDailyCounts[chain] = numAddresses
  380. addressesDailyCounts["*"] = addressesDailyCounts["*"] + numAddresses
  381. }
  382. }
  383. }(prefix)
  384. }
  385. // daily totals
  386. dailyAmounts := map[string]map[string]map[string]float64{}
  387. dailyCounts := map[string]map[string]int{}
  388. if daily != "" {
  389. wg.Add(1)
  390. go func(prefix string, queryDays int) {
  391. hours := (24 * queryDays)
  392. periodInterval := -time.Duration(hours) * time.Hour
  393. now := time.Now().UTC()
  394. prev := now.Add(periodInterval)
  395. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  396. defer wg.Done()
  397. dailyTotals := createAddressesOfInterval(tbl, ctx, prefix, start)
  398. if amounts != "" {
  399. for date, chains := range dailyTotals {
  400. dailyAmounts[date] = map[string]map[string]float64{}
  401. for chain, addresses := range chains {
  402. dailyAmounts[date][chain] = map[string]float64{}
  403. for address, amount := range addresses {
  404. dailyAmounts[date][chain][address] = roundToTwoDecimalPlaces(amount)
  405. }
  406. }
  407. }
  408. }
  409. if counts != "" {
  410. for date, chains := range dailyTotals {
  411. dailyCounts[date] = map[string]int{}
  412. for chain, addresses := range chains {
  413. // need to sum all the chains to get the total count of addresses,
  414. // since addresses are not unique across chains.
  415. numAddresses := len(addresses)
  416. dailyCounts[date][chain] = numAddresses
  417. dailyCounts[date]["*"] = dailyCounts[date]["*"] + numAddresses
  418. }
  419. }
  420. }
  421. }(prefix, queryDays)
  422. }
  423. wg.Wait()
  424. result := &addressesResult{
  425. Last24HoursAmounts: last24HourAmounts,
  426. Last24HoursCounts: last24HourCounts,
  427. WithinPeriodAmounts: addressesDailyAmounts,
  428. WithinPeriodCounts: addressesDailyCounts,
  429. PeriodDurationDays: queryDays,
  430. DailyAmounts: dailyAmounts,
  431. DailyCounts: dailyCounts,
  432. }
  433. w.WriteHeader(http.StatusOK)
  434. w.Header().Add("Content-Type", "application/json")
  435. json.NewEncoder(w).Encode(result)
  436. }