notional-transferred.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/binary"
  7. "encoding/json"
  8. "fmt"
  9. "io"
  10. "log"
  11. "net/http"
  12. "strconv"
  13. "sync"
  14. "time"
  15. "cloud.google.com/go/bigtable"
  16. )
  17. type transfersResult struct {
  18. Last24Hours map[string]map[string]map[string]float64
  19. WithinPeriod map[string]map[string]map[string]float64
  20. PeriodDurationDays int
  21. Daily map[string]map[string]map[string]map[string]float64
  22. }
  23. // warmCache keeps some data around between invocations, so that we don't have
  24. // to do a full table scan with each request.
  25. // https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
  26. // TODO - make a struct for cache
  27. var warmTransfersCache = map[string]map[string]map[string]map[string]map[string]float64{}
  28. func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]TransferData, error) {
  29. rows := []TransferData{}
  30. err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
  31. t := &TransferData{}
  32. if _, ok := row[transferDetailsFam]; ok {
  33. for _, item := range row[transferDetailsFam] {
  34. switch item.Column {
  35. case "TokenTransferDetails:Amount":
  36. amount, _ := strconv.ParseFloat(string(item.Value), 64)
  37. t.TokenAmount = amount
  38. case "TokenTransferDetails:NotionalUSD":
  39. reader := bytes.NewReader(item.Value)
  40. var notionalFloat float64
  41. if err := binary.Read(reader, binary.BigEndian, &notionalFloat); err != nil {
  42. log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err)
  43. }
  44. t.Notional = notionalFloat
  45. case "TokenTransferDetails:OriginSymbol":
  46. t.TokenSymbol = string(item.Value)
  47. }
  48. }
  49. if _, ok := row[transferPayloadFam]; ok {
  50. for _, item := range row[transferPayloadFam] {
  51. switch item.Column {
  52. case "TokenTransferPayload:OriginChain":
  53. t.OriginChain = string(item.Value)
  54. case "TokenTransferPayload:TargetChain":
  55. t.DestinationChain = string(item.Value)
  56. }
  57. }
  58. }
  59. t.LeavingChain = row.Key()[:1]
  60. rows = append(rows, *t)
  61. }
  62. return true
  63. }, bigtable.RowFilter(
  64. bigtable.ConditionFilter(
  65. bigtable.ChainFilters(
  66. bigtable.FamilyFilter(columnFamilies[1]),
  67. bigtable.CellsPerRowLimitFilter(1), // only the first cell in column
  68. bigtable.TimestampRangeFilter(start, end), // within time range
  69. bigtable.StripValueFilter(), // no columns/values, just the row.Key()
  70. ),
  71. bigtable.ChainFilters(
  72. bigtable.FamilyFilter(fmt.Sprintf("%v|%v", columnFamilies[2], columnFamilies[5])),
  73. bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginChain|TargetChain"),
  74. bigtable.LatestNFilter(1),
  75. ),
  76. bigtable.BlockAllFilter(),
  77. ),
  78. ))
  79. if err != nil {
  80. fmt.Println("failed reading rows to create RowList.", err)
  81. return nil, err
  82. }
  83. return rows, err
  84. }
  85. func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int) (map[string]map[string]map[string]map[string]float64, error) {
  86. var mu sync.RWMutex
  87. results := map[string]map[string]map[string]map[string]float64{}
  88. now := time.Now().UTC()
  89. var intervalsWG sync.WaitGroup
  90. // there will be a query for each previous day, plus today
  91. intervalsWG.Add(numPrevDays + 1)
  92. // create the unique identifier for this query, for cache
  93. cachePrefix := createCachePrefix(prefix)
  94. for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
  95. go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
  96. // start is the SOD, end is EOD
  97. // "0 daysAgo start" is 00:00:00 AM of the current day
  98. // "0 daysAgo end" is 23:59:59 of the current day (the future)
  99. // calulate the start and end times for the query
  100. hoursAgo := (24 * daysAgo)
  101. daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
  102. n := now.Add(daysAgoDuration)
  103. year := n.Year()
  104. month := n.Month()
  105. day := n.Day()
  106. loc := n.Location()
  107. start := time.Date(year, month, day, 0, 0, 0, 0, loc)
  108. end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
  109. dateStr := start.Format("2006-01-02")
  110. mu.Lock()
  111. // initialize the map for this date in the result set
  112. results[dateStr] = map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
  113. // check to see if there is cache data for this date/query
  114. if dates, ok := warmTransfersCache[cachePrefix]; ok {
  115. // have a cache for this query
  116. if dateCache, ok := dates[dateStr]; ok {
  117. // have a cache for this date
  118. if daysAgo >= 1 {
  119. // only use the cache for yesterday and older
  120. results[dateStr] = dateCache
  121. mu.Unlock()
  122. intervalsWG.Done()
  123. return
  124. }
  125. } else {
  126. // no cache for this query
  127. warmTransfersCache[cachePrefix][dateStr] = map[string]map[string]map[string]float64{}
  128. }
  129. } else {
  130. // no cache for this date, initialize the map
  131. warmTransfersCache[cachePrefix] = map[string]map[string]map[string]map[string]float64{}
  132. warmTransfersCache[cachePrefix][dateStr] = map[string]map[string]map[string]float64{}
  133. }
  134. mu.Unlock()
  135. var result []TransferData
  136. var fetchErr error
  137. defer intervalsWG.Done()
  138. result, fetchErr = fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
  139. if fetchErr != nil {
  140. log.Fatalf("fetchTransferRowsInInterval returned an error: %v\n", fetchErr)
  141. }
  142. // iterate through the rows and increment the count
  143. for _, row := range result {
  144. if _, ok := results[dateStr][row.LeavingChain]; !ok {
  145. results[dateStr][row.LeavingChain] = map[string]map[string]float64{"*": {"*": 0}}
  146. }
  147. if _, ok := results[dateStr][row.LeavingChain][row.DestinationChain]; !ok {
  148. results[dateStr][row.LeavingChain][row.DestinationChain] = map[string]float64{"*": 0}
  149. }
  150. // add to the total count
  151. results[dateStr][row.LeavingChain]["*"]["*"] = results[dateStr][row.LeavingChain]["*"]["*"] + row.Notional
  152. results[dateStr][row.LeavingChain][row.DestinationChain]["*"] = results[dateStr][row.LeavingChain][row.DestinationChain]["*"] + row.Notional
  153. // add to the count for chain/symbol
  154. results[dateStr][row.LeavingChain][row.DestinationChain][row.TokenSymbol] = results[dateStr][row.LeavingChain][row.DestinationChain][row.TokenSymbol] + row.Notional
  155. }
  156. // set the result in the cache
  157. warmTransfersCache[cachePrefix][dateStr] = results[dateStr]
  158. }(tbl, ctx, prefix, daysAgo)
  159. }
  160. intervalsWG.Wait()
  161. // having consistent keys in each object is helpful for clients, explorer GUI
  162. // create a set of all the keys from all dates/chains/symbols, to ensure the result objects all have the same keys
  163. seenSymbolSet := map[string]bool{}
  164. seenChainSet := map[string]bool{}
  165. for date, tokens := range results {
  166. for leaving, dests := range tokens {
  167. seenChainSet[leaving] = true
  168. for dest := range dests {
  169. for key := range results[date][leaving][dest] {
  170. seenSymbolSet[key] = true
  171. }
  172. }
  173. }
  174. }
  175. // ensure each chain object has all the same symbol keys:
  176. for date := range results {
  177. for leaving := range results[date] {
  178. for dest := range results[date][leaving] {
  179. for chain := range seenChainSet {
  180. // check that date has all the chains
  181. if _, ok := results[date][leaving][chain]; !ok {
  182. results[date][leaving][chain] = map[string]float64{"*": 0}
  183. }
  184. }
  185. for token := range seenSymbolSet {
  186. if _, ok := results[date][leaving][token]; !ok {
  187. // add the missing key to the map
  188. results[date][leaving][dest][token] = 0
  189. }
  190. }
  191. }
  192. }
  193. }
  194. return results, nil
  195. }
  196. // returns the count of the rows in the query response
  197. func transfersForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) (map[string]map[string]map[string]float64, error) {
  198. // query for all rows in time range, return result count
  199. results, fetchErr := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
  200. if fetchErr != nil {
  201. log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
  202. return nil, fetchErr
  203. }
  204. var total = float64(0)
  205. for _, item := range results {
  206. total = total + item.Notional
  207. }
  208. result := map[string]map[string]map[string]float64{"*": {"*": {"*": total}}}
  209. // iterate through the rows and increment the count for each index
  210. for _, row := range results {
  211. if _, ok := result[row.LeavingChain]; !ok {
  212. result[row.LeavingChain] = map[string]map[string]float64{"*": {"*": 0}}
  213. }
  214. if _, ok := result[row.LeavingChain][row.DestinationChain]; !ok {
  215. result[row.LeavingChain][row.DestinationChain] = map[string]float64{"*": 0}
  216. }
  217. // add to total amount
  218. result[row.LeavingChain]["*"]["*"] = result[row.LeavingChain]["*"]["*"] + row.Notional
  219. result[row.LeavingChain][row.DestinationChain]["*"] = result[row.LeavingChain][row.DestinationChain]["*"] + row.Notional
  220. // add to symbol amount
  221. result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] = result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] + row.Notional
  222. }
  223. // create a set of all the keys from all dates/chains, to ensure the result objects all have the same keys.
  224. seenChainSet := map[string]bool{}
  225. for leaving := range result {
  226. seenChainSet[leaving] = true
  227. }
  228. for leaving := range result {
  229. for chain := range seenChainSet {
  230. // check that date has all the chains
  231. if _, ok := result[leaving][chain]; !ok {
  232. result[leaving][chain] = map[string]float64{"*": 0}
  233. }
  234. }
  235. }
  236. return result, nil
  237. }
  238. // get number of recent transactions in the last 24 hours, and daily for a period
  239. // optionally group by a EmitterChain or EmitterAddress
  240. // optionally query for recent rows of a given EmitterChain or EmitterAddress
  241. func NotionalTransferred(w http.ResponseWriter, r *http.Request) {
  242. // Set CORS headers for the preflight request
  243. if r.Method == http.MethodOptions {
  244. w.Header().Set("Access-Control-Allow-Origin", "*")
  245. w.Header().Set("Access-Control-Allow-Methods", "POST")
  246. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  247. w.Header().Set("Access-Control-Max-Age", "3600")
  248. w.WriteHeader(http.StatusNoContent)
  249. return
  250. }
  251. // Set CORS headers for the main request.
  252. w.Header().Set("Access-Control-Allow-Origin", "*")
  253. var numDays, forChain, forAddress, daily, last24Hours, forPeriod string
  254. // allow GET requests with querystring params, or POST requests with json body.
  255. switch r.Method {
  256. case http.MethodGet:
  257. queryParams := r.URL.Query()
  258. numDays = queryParams.Get("numDays")
  259. forChain = queryParams.Get("forChain")
  260. forAddress = queryParams.Get("forAddress")
  261. daily = queryParams.Get("daily")
  262. last24Hours = queryParams.Get("last24Hours")
  263. forPeriod = queryParams.Get("forPeriod")
  264. case http.MethodPost:
  265. // declare request body properties
  266. var d struct {
  267. NumDays string `json:"numDays"`
  268. ForChain string `json:"forChain"`
  269. ForAddress string `json:"forAddress"`
  270. Daily string `json:"daily"`
  271. Last24Hours string `json:"last24Hours"`
  272. ForPeriod string `json:"forPeriod"`
  273. }
  274. // deserialize request body
  275. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  276. switch err {
  277. case io.EOF:
  278. // do nothing, empty body is ok
  279. default:
  280. log.Printf("json.NewDecoder: %v", err)
  281. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  282. return
  283. }
  284. }
  285. numDays = d.NumDays
  286. forChain = d.ForChain
  287. forAddress = d.ForAddress
  288. daily = d.Daily
  289. last24Hours = d.Last24Hours
  290. forPeriod = d.ForPeriod
  291. default:
  292. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  293. log.Println("Method Not Allowed")
  294. return
  295. }
  296. if daily == "" && last24Hours == "" && forPeriod == "" {
  297. // none of the options were set, so set one
  298. last24Hours = "true"
  299. }
  300. var queryDays int
  301. if numDays == "" {
  302. queryDays = 30
  303. } else {
  304. var convErr error
  305. queryDays, convErr = strconv.Atoi(numDays)
  306. if convErr != nil {
  307. fmt.Fprint(w, "numDays must be an integer")
  308. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  309. return
  310. }
  311. }
  312. // create the rowkey prefix for querying
  313. prefix := ""
  314. if forChain != "" {
  315. prefix = forChain
  316. // if the request is forChain, always groupBy chain
  317. if forAddress != "" {
  318. // if the request is forAddress, always groupBy address
  319. prefix = forChain + ":" + forAddress
  320. }
  321. }
  322. ctx, cancel := context.WithTimeout(context.Background(), 600*time.Second)
  323. defer cancel()
  324. var wg sync.WaitGroup
  325. // total of last 24 hours
  326. var last24HourCount map[string]map[string]map[string]float64
  327. if last24Hours != "" {
  328. wg.Add(1)
  329. go func(prefix string) {
  330. var err error
  331. last24HourInterval := -time.Duration(24) * time.Hour
  332. now := time.Now().UTC()
  333. start := now.Add(last24HourInterval)
  334. defer wg.Done()
  335. last24HourCount, err = transfersForInterval(tbl, ctx, prefix, start, now)
  336. for chain, dests := range last24HourCount {
  337. for dest, tokens := range dests {
  338. for symbol, amount := range tokens {
  339. last24HourCount[chain][dest][symbol] = roundToTwoDecimalPlaces(amount)
  340. }
  341. }
  342. }
  343. if err != nil {
  344. log.Printf("failed getting count for 24h interval, err: %v", err)
  345. }
  346. }(prefix)
  347. }
  348. // total of the last numDays
  349. var periodCount map[string]map[string]map[string]float64
  350. if forPeriod != "" {
  351. wg.Add(1)
  352. go func(prefix string) {
  353. var err error
  354. hours := (24 * queryDays)
  355. periodInterval := -time.Duration(hours) * time.Hour
  356. now := time.Now().UTC()
  357. prev := now.Add(periodInterval)
  358. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  359. defer wg.Done()
  360. periodCount, err = transfersForInterval(tbl, ctx, prefix, start, now)
  361. for chain, dests := range periodCount {
  362. for dest, tokens := range dests {
  363. for symbol, amount := range tokens {
  364. periodCount[chain][dest][symbol] = roundToTwoDecimalPlaces(amount)
  365. }
  366. }
  367. }
  368. if err != nil {
  369. log.Printf("failed getting count for numDays interval, err: %v\n", err)
  370. }
  371. }(prefix)
  372. }
  373. // daily totals
  374. var dailyTotals map[string]map[string]map[string]map[string]float64
  375. if daily != "" {
  376. wg.Add(1)
  377. go func(prefix string, queryDays int) {
  378. var err error
  379. defer wg.Done()
  380. dailyTotals, err = createTransfersOfInterval(tbl, ctx, prefix, queryDays)
  381. for date, chains := range dailyTotals {
  382. for chain, dests := range chains {
  383. for destChain, tokens := range dests {
  384. for symbol, amount := range tokens {
  385. dailyTotals[date][chain][destChain][symbol] = roundToTwoDecimalPlaces(amount)
  386. }
  387. }
  388. }
  389. }
  390. if err != nil {
  391. log.Fatalf("failed getting createCountsOfInterval err %v", err)
  392. }
  393. }(prefix, queryDays)
  394. }
  395. wg.Wait()
  396. result := &transfersResult{
  397. Last24Hours: last24HourCount,
  398. WithinPeriod: periodCount,
  399. PeriodDurationDays: queryDays,
  400. Daily: dailyTotals,
  401. }
  402. jsonBytes, err := json.Marshal(result)
  403. if err != nil {
  404. w.WriteHeader(http.StatusInternalServerError)
  405. w.Write([]byte(err.Error()))
  406. log.Println(err.Error())
  407. return
  408. }
  409. w.WriteHeader(http.StatusOK)
  410. w.Write(jsonBytes)
  411. }