totals.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "html"
  8. "io"
  9. "log"
  10. "net/http"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "cloud.google.com/go/bigtable"
  16. )
  17. const maxNano int = 999999999
  18. type totalsResult struct {
  19. LastDayCount map[string]int
  20. TotalCount map[string]int
  21. DailyTotals map[string]map[string]int
  22. }
  23. // warmCache keeps some data around between invocations, so that we don't have
  24. // to do a full table scan with each request.
  25. // https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
  26. var warmTotalsCache = map[string]map[string]map[string]int{}
  27. // derive the result index relevant to a row.
  28. func makeGroupKey(keySegments int, rowKey string) string {
  29. var countBy string
  30. if keySegments == 0 {
  31. countBy = "*"
  32. } else {
  33. keyParts := strings.Split(rowKey, ":")
  34. countBy = strings.Join(keyParts[:keySegments], ":")
  35. }
  36. return countBy
  37. }
  38. func fetchRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]bigtable.Row, error) {
  39. rows := []bigtable.Row{}
  40. err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
  41. rows = append(rows, row)
  42. return true
  43. }, bigtable.RowFilter(
  44. bigtable.ChainFilters(
  45. // combine filters to get only what we need:
  46. bigtable.FamilyFilter(columnFamilies[1]),
  47. bigtable.CellsPerRowLimitFilter(1), // only the first cell in each column (helps for devnet where sequence resets)
  48. bigtable.TimestampRangeFilter(start, end), // within time range
  49. bigtable.StripValueFilter(), // no columns/values, just the row.Key()
  50. )))
  51. return rows, err
  52. }
  53. func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int, keySegments int) (map[string]map[string]int, error) {
  54. var mu sync.RWMutex
  55. results := map[string]map[string]int{}
  56. now := time.Now().UTC()
  57. var intervalsWG sync.WaitGroup
  58. // there will be a query for each previous day, plus today
  59. intervalsWG.Add(numPrevDays + 1)
  60. // create the unique identifier for this query, for cache
  61. cachePrefix := prefix
  62. if prefix == "" {
  63. cachePrefix = "*"
  64. }
  65. cachePrefix = fmt.Sprintf("%v-%v", cachePrefix, keySegments)
  66. for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
  67. go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
  68. // start is the SOD, end is EOD
  69. // "0 daysAgo start" is 00:00:00 AM of the current day
  70. // "0 daysAgo end" is 23:59:59 of the current day (the future)
  71. // calulate the start and end times for the query
  72. hoursAgo := (24 * daysAgo)
  73. daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
  74. n := now.Add(daysAgoDuration)
  75. year := n.Year()
  76. month := n.Month()
  77. day := n.Day()
  78. loc := n.Location()
  79. start := time.Date(year, month, day, 0, 0, 0, 0, loc)
  80. end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
  81. dateStr := start.Format("2006-01-02")
  82. mu.Lock()
  83. // initialize the map for this date in the result set
  84. results[dateStr] = map[string]int{"*": 0}
  85. // check to see if there is cache data for this date/query
  86. if dateCache, ok := warmTotalsCache[dateStr]; ok {
  87. // have a cache for this date
  88. if val, ok := dateCache[cachePrefix]; ok {
  89. // have a cache for this query
  90. if daysAgo >= 1 {
  91. // only use the cache for yesterday and older
  92. results[dateStr] = val
  93. mu.Unlock()
  94. intervalsWG.Done()
  95. return
  96. }
  97. } else {
  98. // no cache for this query
  99. warmTotalsCache[dateStr][cachePrefix] = map[string]int{}
  100. }
  101. } else {
  102. // no cache for this date, initialize the map
  103. warmTotalsCache[dateStr] = map[string]map[string]int{}
  104. warmTotalsCache[dateStr][cachePrefix] = map[string]int{}
  105. }
  106. mu.Unlock()
  107. var result []bigtable.Row
  108. var fetchErr error
  109. defer intervalsWG.Done()
  110. result, fetchErr = fetchRowsInInterval(tbl, ctx, prefix, start, end)
  111. if fetchErr != nil {
  112. log.Fatalf("fetchRowsInInterval returned an error: %v", fetchErr)
  113. }
  114. // iterate through the rows and increment the count
  115. for _, row := range result {
  116. countBy := makeGroupKey(keySegments, row.Key())
  117. if keySegments != 0 {
  118. // increment the total count
  119. results[dateStr]["*"] = results[dateStr]["*"] + 1
  120. }
  121. results[dateStr][countBy] = results[dateStr][countBy] + 1
  122. }
  123. // set the result in the cache
  124. warmTotalsCache[dateStr][cachePrefix] = results[dateStr]
  125. }(tbl, ctx, prefix, daysAgo)
  126. }
  127. intervalsWG.Wait()
  128. // create a set of all the keys from all dates, to ensure the result objects all have the same keys
  129. seenKeySet := map[string]bool{}
  130. for _, v := range results {
  131. for key := range v {
  132. seenKeySet[key] = true
  133. }
  134. }
  135. // ensure each date object has the same keys:
  136. for date := range results {
  137. for key := range seenKeySet {
  138. if _, ok := results[date][key]; !ok {
  139. // add the missing key to the map
  140. results[date][key] = 0
  141. }
  142. }
  143. }
  144. return results, nil
  145. }
  146. // returns the count of the rows in the query response
  147. func messageCountForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time, keySegments int) (map[string]int, error) {
  148. // query for all rows in time range, return result count
  149. results, fetchErr := fetchRowsInInterval(tbl, ctx, prefix, start, end)
  150. if fetchErr != nil {
  151. log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
  152. return nil, fetchErr
  153. }
  154. result := map[string]int{"*": len(results)}
  155. // iterate through the rows and increment the count for each index
  156. if keySegments != 0 {
  157. for _, row := range results {
  158. countBy := makeGroupKey(keySegments, row.Key())
  159. result[countBy] = result[countBy] + 1
  160. }
  161. }
  162. return result, nil
  163. }
  164. // get number of recent transactions in the last 24 hours, and daily for a period
  165. // optionally group by a EmitterChain or EmitterAddress
  166. // optionally query for recent rows of a given EmitterChain or EmitterAddress
  167. func Totals(w http.ResponseWriter, r *http.Request) {
  168. // Set CORS headers for the preflight request
  169. if r.Method == http.MethodOptions {
  170. w.Header().Set("Access-Control-Allow-Origin", "*")
  171. w.Header().Set("Access-Control-Allow-Methods", "POST")
  172. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  173. w.Header().Set("Access-Control-Max-Age", "3600")
  174. w.WriteHeader(http.StatusNoContent)
  175. return
  176. }
  177. // Set CORS headers for the main request.
  178. w.Header().Set("Access-Control-Allow-Origin", "*")
  179. var numDays, groupBy, forChain, forAddress string
  180. // allow GET requests with querystring params, or POST requests with json body.
  181. switch r.Method {
  182. case http.MethodGet:
  183. queryParams := r.URL.Query()
  184. numDays = queryParams.Get("numDays")
  185. groupBy = queryParams.Get("groupBy")
  186. forChain = queryParams.Get("forChain")
  187. forAddress = queryParams.Get("forAddress")
  188. readyCheck := queryParams.Get("readyCheck")
  189. if readyCheck != "" {
  190. // for running in devnet
  191. w.WriteHeader(http.StatusOK)
  192. fmt.Fprint(w, html.EscapeString("ready"))
  193. return
  194. }
  195. case http.MethodPost:
  196. // declare request body properties
  197. var d struct {
  198. NumDays string `json:"numDays"`
  199. GroupBy string `json:"groupBy"`
  200. ForChain string `json:"forChain"`
  201. ForAddress string `json:"forAddress"`
  202. }
  203. // deserialize request body
  204. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  205. switch err {
  206. case io.EOF:
  207. // do nothing, empty body is ok
  208. default:
  209. log.Printf("json.NewDecoder: %v", err)
  210. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  211. return
  212. }
  213. }
  214. numDays = d.NumDays
  215. groupBy = d.GroupBy
  216. forChain = d.ForChain
  217. forAddress = d.ForAddress
  218. default:
  219. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  220. log.Println("Method Not Allowed")
  221. return
  222. }
  223. var queryDays int
  224. if numDays == "" {
  225. queryDays = 30
  226. } else {
  227. var convErr error
  228. queryDays, convErr = strconv.Atoi(numDays)
  229. if convErr != nil {
  230. fmt.Fprint(w, "numDays must be an integer")
  231. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  232. return
  233. }
  234. }
  235. // create the rowkey prefix for querying
  236. prefix := ""
  237. if forChain != "" {
  238. prefix = forChain
  239. if groupBy == "" {
  240. // if the request is forChain, and groupBy is empty, set it to groupBy chain
  241. groupBy = "chain"
  242. }
  243. if forAddress != "" {
  244. // if the request is forAddress, always groupBy address
  245. groupBy = "address"
  246. prefix = forChain + ":" + forAddress
  247. }
  248. }
  249. // use the groupBy value to determine how many segements of the rowkey should be used.
  250. keySegments := 0
  251. if groupBy == "chain" {
  252. keySegments = 1
  253. }
  254. if groupBy == "address" {
  255. keySegments = 2
  256. }
  257. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  258. defer cancel()
  259. var wg sync.WaitGroup
  260. // total of last 24 hours
  261. var last24HourCount map[string]int
  262. wg.Add(1)
  263. go func(prefix string, keySegments int) {
  264. var err error
  265. last24HourInterval := -time.Duration(24) * time.Hour
  266. now := time.Now().UTC()
  267. start := now.Add(last24HourInterval)
  268. defer wg.Done()
  269. last24HourCount, err = messageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
  270. if err != nil {
  271. log.Printf("failed getting count for interval, err: %v", err)
  272. }
  273. }(prefix, keySegments)
  274. // total of the last 30 days
  275. var periodCount map[string]int
  276. wg.Add(1)
  277. go func(prefix string, keySegments int) {
  278. var err error
  279. hours := (24 * queryDays)
  280. periodInterval := -time.Duration(hours) * time.Hour
  281. now := time.Now().UTC()
  282. prev := now.Add(periodInterval)
  283. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  284. defer wg.Done()
  285. periodCount, err = messageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
  286. if err != nil {
  287. log.Fatalf("failed getting count for interval, err: %v", err)
  288. }
  289. }(prefix, keySegments)
  290. // daily totals
  291. var dailyTotals map[string]map[string]int
  292. wg.Add(1)
  293. go func(prefix string, keySegments int, queryDays int) {
  294. var err error
  295. defer wg.Done()
  296. dailyTotals, err = createCountsOfInterval(tbl, ctx, prefix, queryDays, keySegments)
  297. if err != nil {
  298. log.Fatalf("failed getting createCountsOfInterval err %v", err)
  299. }
  300. }(prefix, keySegments, queryDays)
  301. wg.Wait()
  302. result := &totalsResult{
  303. LastDayCount: last24HourCount,
  304. TotalCount: periodCount,
  305. DailyTotals: dailyTotals,
  306. }
  307. jsonBytes, err := json.Marshal(result)
  308. if err != nil {
  309. w.WriteHeader(http.StatusInternalServerError)
  310. w.Write([]byte(err.Error()))
  311. log.Println(err.Error())
  312. return
  313. }
  314. w.WriteHeader(http.StatusOK)
  315. w.Write(jsonBytes)
  316. }