totals.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "html"
  8. "io"
  9. "log"
  10. "net/http"
  11. "os"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "cloud.google.com/go/bigtable"
  17. )
  18. const maxNano int = 999999999
  19. type totalsResult struct {
  20. LastDayCount map[string]int
  21. TotalCount map[string]int
  22. DailyTotals map[string]map[string]int
  23. }
  24. // derive the result index relevant to a row.
  25. func makeGroupKey(keySegments int, rowKey string) string {
  26. var countBy string
  27. if keySegments == 0 {
  28. countBy = "*"
  29. } else {
  30. keyParts := strings.Split(rowKey, ":")
  31. countBy = strings.Join(keyParts[:keySegments], ":")
  32. }
  33. return countBy
  34. }
  35. func fetchRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]bigtable.Row, error) {
  36. rows := []bigtable.Row{}
  37. err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
  38. rows = append(rows, row)
  39. return true
  40. }, bigtable.RowFilter(
  41. bigtable.ChainFilters(
  42. // combine filters to get only what we need:
  43. bigtable.CellsPerRowLimitFilter(1), // only the first cell in each column (helps for devnet where sequence resets)
  44. bigtable.TimestampRangeFilter(start, end), // within time range
  45. bigtable.StripValueFilter(), // no columns/values, just the row.Key()
  46. )))
  47. return rows, err
  48. }
  49. func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int, keySegments int) (map[string]map[string]int, error) {
  50. results := map[string]map[string]int{}
  51. // key track of all the keys seen, to ensure the result objects all have the same keys
  52. seenKeySet := map[string]bool{}
  53. now := time.Now()
  54. for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
  55. // start is the SOD, end is EOD
  56. // "0 daysAgo start" is 00:00:00 AM of the current day
  57. // "0 daysAgo end" is 23:59:59 of the current day (the future)
  58. // calulate the start and end times for the query
  59. hoursAgo := (24 * daysAgo)
  60. daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
  61. n := now.Add(daysAgoDuration)
  62. year := n.Year()
  63. month := n.Month()
  64. day := n.Day()
  65. loc := n.Location()
  66. start := time.Date(year, month, day, 0, 0, 0, 0, loc)
  67. end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
  68. result, fetchErr := fetchRowsInInterval(tbl, ctx, prefix, start, end)
  69. if fetchErr != nil {
  70. log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
  71. return nil, fetchErr
  72. }
  73. dateStr := start.Format("2006-01-02")
  74. // initialize the map for this date in the result set
  75. if results[dateStr] == nil {
  76. results[dateStr] = map[string]int{"*": 0}
  77. }
  78. // iterate through the rows and increment the count
  79. for _, row := range result {
  80. countBy := makeGroupKey(keySegments, row.Key())
  81. if keySegments != 0 {
  82. // increment the total count
  83. results[dateStr]["*"] = results[dateStr]["*"] + 1
  84. }
  85. results[dateStr][countBy] = results[dateStr][countBy] + 1
  86. // add this key to the set
  87. seenKeySet[countBy] = true
  88. }
  89. }
  90. // ensure each date object has the same keys:
  91. for _, v := range results {
  92. for key := range seenKeySet {
  93. if _, ok := v[key]; !ok {
  94. // add the missing key to the map
  95. v[key] = 0
  96. }
  97. }
  98. }
  99. return results, nil
  100. }
  101. // returns the count of the rows in the query response
  102. func messageCountForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, interval time.Duration, keySegments int) (map[string]int, error) {
  103. now := time.Now()
  104. // calulate the start and end times for the query
  105. n := now.Add(interval)
  106. year := n.Year()
  107. month := n.Month()
  108. day := n.Day()
  109. loc := n.Location()
  110. start := time.Date(year, month, day, 0, 0, 0, 0, loc)
  111. end := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, maxNano, loc)
  112. // query for all rows in time range, return result count
  113. results, fetchErr := fetchRowsInInterval(tbl, ctx, prefix, start, end)
  114. if fetchErr != nil {
  115. log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
  116. return nil, fetchErr
  117. }
  118. result := map[string]int{"*": len(results)}
  119. // iterate through the rows and increment the count for each index
  120. if keySegments != 0 {
  121. for _, row := range results {
  122. countBy := makeGroupKey(keySegments, row.Key())
  123. result[countBy] = result[countBy] + 1
  124. }
  125. }
  126. return result, nil
  127. }
  128. // get number of recent transactions in the last 24 hours, and daily for a period
  129. // optionally group by a EmitterChain or EmitterAddress
  130. // optionally query for recent rows of a given EmitterChain or EmitterAddress
  131. func Totals(w http.ResponseWriter, r *http.Request) {
  132. // Set CORS headers for the preflight request
  133. if r.Method == http.MethodOptions {
  134. w.Header().Set("Access-Control-Allow-Origin", "*")
  135. w.Header().Set("Access-Control-Allow-Methods", "POST")
  136. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  137. w.Header().Set("Access-Control-Max-Age", "3600")
  138. w.WriteHeader(http.StatusNoContent)
  139. return
  140. }
  141. // Set CORS headers for the main request.
  142. w.Header().Set("Access-Control-Allow-Origin", "*")
  143. var numDays, groupBy, forChain, forAddress string
  144. // allow GET requests with querystring params, or POST requests with json body.
  145. switch r.Method {
  146. case http.MethodGet:
  147. queryParams := r.URL.Query()
  148. numDays = queryParams.Get("numDays")
  149. groupBy = queryParams.Get("groupBy")
  150. forChain = queryParams.Get("forChain")
  151. forAddress = queryParams.Get("forAddress")
  152. readyCheck := queryParams.Get("readyCheck")
  153. if readyCheck != "" {
  154. // for running in devnet
  155. w.WriteHeader(http.StatusOK)
  156. fmt.Fprint(w, html.EscapeString("ready"))
  157. return
  158. }
  159. case http.MethodPost:
  160. // declare request body properties
  161. var d struct {
  162. NumDays string `json:"numDays"`
  163. GroupBy string `json:"groupBy"`
  164. ForChain string `json:"forChain"`
  165. ForAddress string `json:"forAddress"`
  166. }
  167. // deserialize request body
  168. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  169. switch err {
  170. case io.EOF:
  171. // do nothing, empty body is ok
  172. default:
  173. log.Printf("json.NewDecoder: %v", err)
  174. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  175. return
  176. }
  177. }
  178. numDays = d.NumDays
  179. groupBy = d.GroupBy
  180. forChain = d.ForChain
  181. forAddress = d.ForAddress
  182. default:
  183. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  184. log.Println("Method Not Allowed")
  185. return
  186. }
  187. var queryDays int
  188. if numDays == "" {
  189. queryDays = 30
  190. } else {
  191. var convErr error
  192. queryDays, convErr = strconv.Atoi(numDays)
  193. if convErr != nil {
  194. fmt.Fprint(w, "numDays must be an integer")
  195. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  196. return
  197. }
  198. }
  199. // create bibtable client and open table
  200. clientOnce.Do(func() {
  201. // Declare a separate err variable to avoid shadowing client.
  202. var err error
  203. project := os.Getenv("GCP_PROJECT")
  204. instance := os.Getenv("BIGTABLE_INSTANCE")
  205. client, err = bigtable.NewClient(context.Background(), project, instance)
  206. if err != nil {
  207. http.Error(w, "Error initializing client", http.StatusInternalServerError)
  208. log.Printf("bigtable.NewClient: %v", err)
  209. return
  210. }
  211. })
  212. tbl := client.Open("v2Events")
  213. // create the rowkey prefix for querying
  214. prefix := ""
  215. if forChain != "" {
  216. prefix = forChain
  217. if forAddress != "" {
  218. prefix = forChain + ":" + forAddress
  219. }
  220. }
  221. // use the groupBy value to determine how many segements of the rowkey should be used.
  222. keySegments := 0
  223. if groupBy == "chain" {
  224. keySegments = 1
  225. }
  226. if groupBy == "address" {
  227. keySegments = 2
  228. }
  229. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  230. defer cancel()
  231. var wg sync.WaitGroup
  232. // total of last 24 hours
  233. var last24HourCount map[string]int
  234. wg.Add(1)
  235. go func(prefix string, keySegments int) {
  236. var err error
  237. last24HourInterval := -time.Duration(24) * time.Hour
  238. defer wg.Done()
  239. last24HourCount, err = messageCountForInterval(tbl, ctx, prefix, last24HourInterval, keySegments)
  240. if err != nil {
  241. log.Printf("failed getting count for interval, err: %v", err)
  242. }
  243. }(prefix, keySegments)
  244. // total of the last 30 days
  245. var periodCount map[string]int
  246. wg.Add(1)
  247. go func(prefix string, keySegments int) {
  248. var err error
  249. hours := (24 * queryDays)
  250. periodInterval := -time.Duration(hours) * time.Hour
  251. defer wg.Done()
  252. periodCount, err = messageCountForInterval(tbl, ctx, prefix, periodInterval, keySegments)
  253. if err != nil {
  254. log.Fatalf("failed getting count for interval, err: %v", err)
  255. }
  256. }(prefix, keySegments)
  257. // daily totals
  258. var dailyTotals map[string]map[string]int
  259. wg.Add(1)
  260. go func(prefix string, keySegments int, queryDays int) {
  261. var err error
  262. defer wg.Done()
  263. dailyTotals, err = createCountsOfInterval(tbl, ctx, prefix, queryDays, keySegments)
  264. if err != nil {
  265. log.Fatalf("failed getting createCountsOfInterval err %v", err)
  266. }
  267. }(prefix, keySegments, queryDays)
  268. wg.Wait()
  269. result := &totalsResult{
  270. LastDayCount: last24HourCount,
  271. TotalCount: periodCount,
  272. DailyTotals: dailyTotals,
  273. }
  274. jsonBytes, err := json.Marshal(result)
  275. if err != nil {
  276. w.WriteHeader(http.StatusInternalServerError)
  277. w.Write([]byte(err.Error()))
  278. log.Println(err.Error())
  279. return
  280. }
  281. w.WriteHeader(http.StatusOK)
  282. w.Write(jsonBytes)
  283. }