notional-transferred-to.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/binary"
  7. "encoding/json"
  8. "fmt"
  9. "io"
  10. "log"
  11. "net/http"
  12. "strconv"
  13. "sync"
  14. "time"
  15. "cloud.google.com/go/bigtable"
  16. )
  17. type amountsResult struct {
  18. Last24Hours map[string]map[string]float64
  19. WithinPeriod map[string]map[string]float64
  20. PeriodDurationDays int
  21. Daily map[string]map[string]map[string]float64
  22. }
  23. // warmCache keeps some data around between invocations, so that we don't have
  24. // to do a full table scan with each request.
  25. // https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
  26. // TODO - make a struct for cache
  27. var warmAmountsCache = map[string]map[string]map[string]map[string]float64{}
  28. type TransferData struct {
  29. TokenSymbol string
  30. TokenAmount float64
  31. OriginChain string
  32. LeavingChain string
  33. DestinationChain string
  34. Notional float64
  35. }
  36. func fetchAmountRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]TransferData, error) {
  37. rows := []TransferData{}
  38. err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
  39. t := &TransferData{}
  40. if _, ok := row[transferDetailsFam]; ok {
  41. for _, item := range row[transferDetailsFam] {
  42. switch item.Column {
  43. case "TokenTransferDetails:Amount":
  44. amount, _ := strconv.ParseFloat(string(item.Value), 64)
  45. t.TokenAmount = amount
  46. case "TokenTransferDetails:NotionalUSD":
  47. reader := bytes.NewReader(item.Value)
  48. var notionalFloat float64
  49. if err := binary.Read(reader, binary.BigEndian, &notionalFloat); err != nil {
  50. log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err)
  51. }
  52. t.Notional = notionalFloat
  53. case "TokenTransferDetails:OriginSymbol":
  54. t.TokenSymbol = string(item.Value)
  55. }
  56. }
  57. if _, ok := row[transferPayloadFam]; ok {
  58. for _, item := range row[transferPayloadFam] {
  59. switch item.Column {
  60. case "TokenTransferPayload:OriginChain":
  61. t.OriginChain = string(item.Value)
  62. case "TokenTransferPayload:TargetChain":
  63. t.DestinationChain = string(item.Value)
  64. }
  65. }
  66. }
  67. t.LeavingChain = row.Key()[:1]
  68. rows = append(rows, *t)
  69. }
  70. return true
  71. }, bigtable.RowFilter(
  72. bigtable.ConditionFilter(
  73. bigtable.ChainFilters(
  74. bigtable.FamilyFilter(columnFamilies[1]),
  75. bigtable.CellsPerRowLimitFilter(1), // only the first cell in column
  76. bigtable.TimestampRangeFilter(start, end), // within time range
  77. bigtable.StripValueFilter(), // no columns/values, just the row.Key()
  78. ),
  79. bigtable.ChainFilters(
  80. bigtable.FamilyFilter(fmt.Sprintf("%v|%v", columnFamilies[2], columnFamilies[5])),
  81. bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginChain|TargetChain"),
  82. bigtable.LatestNFilter(1),
  83. ),
  84. bigtable.BlockAllFilter(),
  85. ),
  86. ))
  87. if err != nil {
  88. fmt.Println("failed reading rows to create RowList.", err)
  89. return nil, err
  90. }
  91. return rows, err
  92. }
  93. func createAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int) (map[string]map[string]map[string]float64, error) {
  94. var mu sync.RWMutex
  95. results := map[string]map[string]map[string]float64{}
  96. now := time.Now().UTC()
  97. var intervalsWG sync.WaitGroup
  98. // there will be a query for each previous day, plus today
  99. intervalsWG.Add(numPrevDays + 1)
  100. // create the unique identifier for this query, for cache
  101. cachePrefix := createCachePrefix(prefix)
  102. for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
  103. go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
  104. // start is the SOD, end is EOD
  105. // "0 daysAgo start" is 00:00:00 AM of the current day
  106. // "0 daysAgo end" is 23:59:59 of the current day (the future)
  107. // calulate the start and end times for the query
  108. hoursAgo := (24 * daysAgo)
  109. daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
  110. n := now.Add(daysAgoDuration)
  111. year := n.Year()
  112. month := n.Month()
  113. day := n.Day()
  114. loc := n.Location()
  115. start := time.Date(year, month, day, 0, 0, 0, 0, loc)
  116. end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
  117. dateStr := start.Format("2006-01-02")
  118. mu.Lock()
  119. // initialize the map for this date in the result set
  120. results[dateStr] = map[string]map[string]float64{"*": {"*": 0}}
  121. // check to see if there is cache data for this date/query
  122. if dates, ok := warmAmountsCache[cachePrefix]; ok {
  123. // have a cache for this query
  124. if dateCache, ok := dates[dateStr]; ok {
  125. // have a cache for this date
  126. if daysAgo >= 1 {
  127. // only use the cache for yesterday and older
  128. results[dateStr] = dateCache
  129. mu.Unlock()
  130. intervalsWG.Done()
  131. return
  132. }
  133. } else {
  134. // no cache for this query
  135. warmAmountsCache[cachePrefix][dateStr] = map[string]map[string]float64{}
  136. }
  137. } else {
  138. // no cache for this date, initialize the map
  139. warmAmountsCache[cachePrefix] = map[string]map[string]map[string]float64{}
  140. warmAmountsCache[cachePrefix][dateStr] = map[string]map[string]float64{}
  141. }
  142. mu.Unlock()
  143. var result []TransferData
  144. var fetchErr error
  145. defer intervalsWG.Done()
  146. result, fetchErr = fetchAmountRowsInInterval(tbl, ctx, prefix, start, end)
  147. if fetchErr != nil {
  148. log.Fatalf("fetchAmountRowsInInterval returned an error: %v\n", fetchErr)
  149. }
  150. // iterate through the rows and increment the count
  151. for _, row := range result {
  152. if _, ok := results[dateStr][row.DestinationChain]; !ok {
  153. results[dateStr][row.DestinationChain] = map[string]float64{"*": 0}
  154. }
  155. // add to the total count for the dest chain
  156. results[dateStr][row.DestinationChain]["*"] = results[dateStr][row.DestinationChain]["*"] + row.Notional
  157. // add to total for the day
  158. results[dateStr]["*"]["*"] = results[dateStr]["*"]["*"] + row.Notional
  159. // add to the symbol's daily total
  160. results[dateStr]["*"][row.TokenSymbol] = results[dateStr]["*"][row.TokenSymbol] + row.Notional
  161. // add to the count for chain/symbol
  162. results[dateStr][row.DestinationChain][row.TokenSymbol] = results[dateStr][row.DestinationChain][row.TokenSymbol] + row.Notional
  163. }
  164. // set the result in the cache
  165. warmAmountsCache[cachePrefix][dateStr] = results[dateStr]
  166. }(tbl, ctx, prefix, daysAgo)
  167. }
  168. intervalsWG.Wait()
  169. // create a set of all the keys from all dates/chains/symbols, to ensure the result objects all have the same keys
  170. seenSymbolSet := map[string]bool{}
  171. seenChainSet := map[string]bool{}
  172. for date, tokens := range results {
  173. for leaving := range tokens {
  174. seenChainSet[leaving] = true
  175. for key := range results[date][leaving] {
  176. seenSymbolSet[key] = true
  177. }
  178. }
  179. }
  180. // ensure each chain object has all the same symbol keys:
  181. for date := range results {
  182. for leaving := range results[date] {
  183. // loop through seen chains
  184. for chain := range seenChainSet {
  185. // check that date has all the chains
  186. if _, ok := results[date][chain]; !ok {
  187. results[date][chain] = map[string]float64{"*": 0}
  188. }
  189. }
  190. // loop through seen symbols
  191. for token := range seenSymbolSet {
  192. // check that the chain has all the symbols
  193. if _, ok := results[date][leaving][token]; !ok {
  194. // add the missing key to the map
  195. results[date][leaving][token] = 0
  196. }
  197. }
  198. }
  199. }
  200. return results, nil
  201. }
  202. // returns the count of the rows in the query response
  203. func amountsForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) (map[string]map[string]float64, error) {
  204. // query for all rows in time range, return result count
  205. results, fetchErr := fetchAmountRowsInInterval(tbl, ctx, prefix, start, end)
  206. if fetchErr != nil {
  207. log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
  208. return nil, fetchErr
  209. }
  210. var total = float64(0)
  211. for _, item := range results {
  212. total = total + item.Notional
  213. }
  214. result := map[string]map[string]float64{"*": {"*": total}}
  215. // iterate through the rows and increment the count for each index
  216. for _, row := range results {
  217. if _, ok := result[row.DestinationChain]; !ok {
  218. result[row.DestinationChain] = map[string]float64{"*": 0}
  219. }
  220. // add to total amount
  221. result[row.DestinationChain]["*"] = result[row.DestinationChain]["*"] + row.Notional
  222. // add to total per symbol
  223. result["*"][row.TokenSymbol] = result["*"][row.TokenSymbol] + row.Notional
  224. // add to symbol amount
  225. result[row.DestinationChain][row.TokenSymbol] = result[row.DestinationChain][row.TokenSymbol] + row.Notional
  226. }
  227. return result, nil
  228. }
  229. // get number of recent transactions in the last 24 hours, and daily for a period
  230. // optionally group by a EmitterChain or EmitterAddress
  231. // optionally query for recent rows of a given EmitterChain or EmitterAddress
  232. func NotionalTransferredTo(w http.ResponseWriter, r *http.Request) {
  233. // Set CORS headers for the preflight request
  234. if r.Method == http.MethodOptions {
  235. w.Header().Set("Access-Control-Allow-Origin", "*")
  236. w.Header().Set("Access-Control-Allow-Methods", "POST")
  237. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  238. w.Header().Set("Access-Control-Max-Age", "3600")
  239. w.WriteHeader(http.StatusNoContent)
  240. return
  241. }
  242. // Set CORS headers for the main request.
  243. w.Header().Set("Access-Control-Allow-Origin", "*")
  244. var numDays, forChain, forAddress, daily, last24Hours, forPeriod string
  245. // allow GET requests with querystring params, or POST requests with json body.
  246. switch r.Method {
  247. case http.MethodGet:
  248. queryParams := r.URL.Query()
  249. numDays = queryParams.Get("numDays")
  250. forChain = queryParams.Get("forChain")
  251. forAddress = queryParams.Get("forAddress")
  252. daily = queryParams.Get("daily")
  253. last24Hours = queryParams.Get("last24Hours")
  254. forPeriod = queryParams.Get("forPeriod")
  255. case http.MethodPost:
  256. // declare request body properties
  257. var d struct {
  258. NumDays string `json:"numDays"`
  259. ForChain string `json:"forChain"`
  260. ForAddress string `json:"forAddress"`
  261. Daily string `json:"daily"`
  262. Last24Hours string `json:"last24Hours"`
  263. ForPeriod string `json:"forPeriod"`
  264. }
  265. // deserialize request body
  266. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  267. switch err {
  268. case io.EOF:
  269. // do nothing, empty body is ok
  270. default:
  271. log.Printf("json.NewDecoder: %v", err)
  272. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  273. return
  274. }
  275. }
  276. numDays = d.NumDays
  277. forChain = d.ForChain
  278. forAddress = d.ForAddress
  279. daily = d.Daily
  280. last24Hours = d.Last24Hours
  281. forPeriod = d.ForPeriod
  282. default:
  283. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  284. log.Println("Method Not Allowed")
  285. return
  286. }
  287. if daily == "" && last24Hours == "" && forPeriod == "" {
  288. // none of the options were set, so set one
  289. last24Hours = "true"
  290. }
  291. var queryDays int
  292. if numDays == "" {
  293. queryDays = 30
  294. } else {
  295. var convErr error
  296. queryDays, convErr = strconv.Atoi(numDays)
  297. if convErr != nil {
  298. fmt.Fprint(w, "numDays must be an integer")
  299. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  300. return
  301. }
  302. }
  303. // create the rowkey prefix for querying
  304. prefix := ""
  305. if forChain != "" {
  306. prefix = forChain
  307. // if the request is forChain, always groupBy chain
  308. if forAddress != "" {
  309. // if the request is forAddress, always groupBy address
  310. prefix = forChain + ":" + forAddress
  311. }
  312. }
  313. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  314. defer cancel()
  315. var wg sync.WaitGroup
  316. // total of last 24 hours
  317. var last24HourCount map[string]map[string]float64
  318. if last24Hours != "" {
  319. wg.Add(1)
  320. go func(prefix string) {
  321. var err error
  322. last24HourInterval := -time.Duration(24) * time.Hour
  323. now := time.Now().UTC()
  324. start := now.Add(last24HourInterval)
  325. defer wg.Done()
  326. last24HourCount, err = amountsForInterval(tbl, ctx, prefix, start, now)
  327. for chain, tokens := range last24HourCount {
  328. for symbol, amount := range tokens {
  329. last24HourCount[chain][symbol] = roundToTwoDecimalPlaces(amount)
  330. }
  331. }
  332. if err != nil {
  333. log.Printf("failed getting count for 24h interval, err: %v", err)
  334. }
  335. }(prefix)
  336. }
  337. // total of the last numDays
  338. var periodCount map[string]map[string]float64
  339. if forPeriod != "" {
  340. wg.Add(1)
  341. go func(prefix string) {
  342. var err error
  343. hours := (24 * queryDays)
  344. periodInterval := -time.Duration(hours) * time.Hour
  345. now := time.Now().UTC()
  346. prev := now.Add(periodInterval)
  347. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  348. defer wg.Done()
  349. periodCount, err = amountsForInterval(tbl, ctx, prefix, start, now)
  350. for chain, tokens := range periodCount {
  351. for symbol, amount := range tokens {
  352. periodCount[chain][symbol] = roundToTwoDecimalPlaces(amount)
  353. }
  354. }
  355. if err != nil {
  356. log.Printf("failed getting count for numDays interval, err: %v\n", err)
  357. }
  358. }(prefix)
  359. }
  360. // daily totals
  361. var dailyTotals map[string]map[string]map[string]float64
  362. if daily != "" {
  363. wg.Add(1)
  364. go func(prefix string, queryDays int) {
  365. var err error
  366. defer wg.Done()
  367. dailyTotals, err = createAmountsOfInterval(tbl, ctx, prefix, queryDays)
  368. for date, chains := range dailyTotals {
  369. for chain, tokens := range chains {
  370. for symbol, amount := range tokens {
  371. dailyTotals[date][chain][symbol] = roundToTwoDecimalPlaces(amount)
  372. }
  373. }
  374. }
  375. if err != nil {
  376. log.Fatalf("failed getting createCountsOfInterval err %v", err)
  377. }
  378. }(prefix, queryDays)
  379. }
  380. wg.Wait()
  381. result := &amountsResult{
  382. Last24Hours: last24HourCount,
  383. WithinPeriod: periodCount,
  384. PeriodDurationDays: queryDays,
  385. Daily: dailyTotals,
  386. }
  387. jsonBytes, err := json.Marshal(result)
  388. if err != nil {
  389. w.WriteHeader(http.StatusInternalServerError)
  390. w.Write([]byte(err.Error()))
  391. log.Println(err.Error())
  392. return
  393. }
  394. w.WriteHeader(http.StatusOK)
  395. w.Write(jsonBytes)
  396. }