recent.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "html"
  8. "io"
  9. "log"
  10. "net/http"
  11. "sort"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "cloud.google.com/go/bigtable"
  16. )
  17. // warmCache keeps some data around between invocations, so that we don't have
  18. // to do a full table scan with each request.
  19. // https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
  20. var warmCache = map[string]map[string]string{}
  21. var lastCacheReset = time.Now()
  22. // query for last of each rowKey prefix
  23. func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int) map[string]string {
  24. // get cache data for query
  25. cachePrefix := prefix
  26. if prefix == "" {
  27. cachePrefix = "*"
  28. }
  29. var rowSet bigtable.RowSet
  30. rowSet = bigtable.PrefixRange(prefix)
  31. now := time.Now()
  32. oneHourAgo := now.Add(-time.Duration(1) * time.Hour)
  33. if oneHourAgo.Before(lastCacheReset) {
  34. // cache is less than one hour old, use it
  35. if cached, ok := warmCache[cachePrefix]; ok {
  36. // use the highest possible sequence number as the range end.
  37. maxSeq := "9999999999999999"
  38. rowSets := bigtable.RowRangeList{}
  39. for k, v := range cached {
  40. start := fmt.Sprintf("%v:%v", k, v)
  41. end := fmt.Sprintf("%v:%v", k, maxSeq)
  42. rowSets = append(rowSets, bigtable.NewRange(start, end))
  43. }
  44. if len(rowSets) >= 1 {
  45. rowSet = rowSets
  46. }
  47. }
  48. } else {
  49. // cache is more than hour old, don't use it, reset it
  50. warmCache = map[string]map[string]string{}
  51. lastCacheReset = now
  52. }
  53. // create a time range for query: last 30 days
  54. thirtyDays := -time.Duration(24*30) * time.Hour
  55. prev := now.Add(thirtyDays)
  56. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  57. end := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, maxNano, now.Location())
  58. mostRecentByKeySegment := map[string]string{}
  59. err := tbl.ReadRows(ctx, rowSet, func(row bigtable.Row) bool {
  60. keyParts := strings.Split(row.Key(), ":")
  61. groupByKey := strings.Join(keyParts[:2], ":")
  62. mostRecentByKeySegment[groupByKey] = keyParts[2]
  63. return true
  64. }, bigtable.RowFilter(
  65. bigtable.ChainFilters(
  66. bigtable.CellsPerRowLimitFilter(1),
  67. bigtable.TimestampRangeFilter(start, end),
  68. bigtable.StripValueFilter(),
  69. )))
  70. if err != nil {
  71. log.Fatalf("failed to read recent rows: %v", err)
  72. }
  73. // update the cache with the latest rows
  74. warmCache[cachePrefix] = mostRecentByKeySegment
  75. return mostRecentByKeySegment
  76. }
  77. func fetchMostRecentRows(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int, numRowsToFetch int) (map[string][]bigtable.Row, error) {
  78. // returns { key: []bigtable.Row }, key either being "*", "chainID", "chainID:address"
  79. latest := getLatestOfEachEmitterAddress(tbl, ctx, prefix, keySegments)
  80. // key/value pairs are the start/stop rowKeys for range queries
  81. rangePairs := map[string]string{}
  82. for prefixGroup, highestSequence := range latest {
  83. rowKeyParts := strings.Split(prefixGroup, ":")
  84. // convert the sequence part of the rowkey from a string to an int, so it can be used for math
  85. highSequence, _ := strconv.Atoi(highestSequence)
  86. lowSequence := highSequence - numRowsToFetch
  87. // create a rowKey to use as the start of the range query
  88. rangeQueryStart := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], lowSequence)
  89. // create a rowKey with the highest seen sequence + 1, because range end is exclusive
  90. rangeQueryEnd := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], highSequence+1)
  91. rangePairs[rangeQueryStart] = rangeQueryEnd
  92. }
  93. rangeList := bigtable.RowRangeList{}
  94. for k, v := range rangePairs {
  95. rangeList = append(rangeList, bigtable.NewRange(k, v))
  96. }
  97. results := map[string][]bigtable.Row{}
  98. err := tbl.ReadRows(ctx, rangeList, func(row bigtable.Row) bool {
  99. var groupByKey string
  100. if keySegments == 0 {
  101. groupByKey = "*"
  102. } else {
  103. keyParts := strings.Split(row.Key(), ":")
  104. groupByKey = strings.Join(keyParts[:keySegments], ":")
  105. }
  106. results[groupByKey] = append(results[groupByKey], row)
  107. return true
  108. })
  109. if err != nil {
  110. log.Printf("failed reading row ranges. err: %v", err)
  111. return nil, err
  112. }
  113. return results, nil
  114. }
  115. // fetch recent rows.
  116. // optionally group by a EmitterChain or EmitterAddress
  117. // optionally query for recent rows of a given EmitterChain or EmitterAddress
  118. func Recent(w http.ResponseWriter, r *http.Request) {
  119. // Set CORS headers for the preflight request
  120. if r.Method == http.MethodOptions {
  121. w.Header().Set("Access-Control-Allow-Origin", "*")
  122. w.Header().Set("Access-Control-Allow-Methods", "POST")
  123. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  124. w.Header().Set("Access-Control-Max-Age", "3600")
  125. w.WriteHeader(http.StatusNoContent)
  126. return
  127. }
  128. // Set CORS headers for the main request.
  129. w.Header().Set("Access-Control-Allow-Origin", "*")
  130. var numRows, groupBy, forChain, forAddress string
  131. // allow GET requests with querystring params, or POST requests with json body.
  132. switch r.Method {
  133. case http.MethodGet:
  134. queryParams := r.URL.Query()
  135. numRows = queryParams.Get("numRows")
  136. groupBy = queryParams.Get("groupBy")
  137. forChain = queryParams.Get("forChain")
  138. forAddress = queryParams.Get("forAddress")
  139. readyCheck := queryParams.Get("readyCheck")
  140. if readyCheck != "" {
  141. // for running in devnet
  142. w.WriteHeader(http.StatusOK)
  143. fmt.Fprint(w, html.EscapeString("ready"))
  144. return
  145. }
  146. case http.MethodPost:
  147. // declare request body properties
  148. var d struct {
  149. NumRows string `json:"numRows"`
  150. GroupBy string `json:"groupBy"`
  151. ForChain string `json:"forChain"`
  152. ForAddress string `json:"forAddress"`
  153. }
  154. // deserialize request body
  155. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  156. switch err {
  157. case io.EOF:
  158. // do nothing, empty body is ok
  159. default:
  160. log.Printf("json.NewDecoder: %v", err)
  161. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  162. return
  163. }
  164. }
  165. numRows = d.NumRows
  166. groupBy = d.GroupBy
  167. forChain = d.ForChain
  168. forAddress = d.ForAddress
  169. default:
  170. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  171. log.Println("Method Not Allowed")
  172. return
  173. }
  174. var resultCount int
  175. if numRows == "" {
  176. resultCount = 30
  177. } else {
  178. var convErr error
  179. resultCount, convErr = strconv.Atoi(numRows)
  180. if convErr != nil {
  181. fmt.Fprint(w, "numRows must be an integer")
  182. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  183. return
  184. }
  185. }
  186. // use the groupBy value to determine how many segements of the rowkey should be used for indexing results.
  187. keySegments := 0
  188. if groupBy == "chain" {
  189. keySegments = 1
  190. }
  191. if groupBy == "address" {
  192. keySegments = 2
  193. }
  194. // create the rowkey prefix for querying, and the keySegments to use for indexing results.
  195. prefix := ""
  196. if forChain != "" {
  197. prefix = forChain
  198. if groupBy == "" {
  199. // groupBy was not set, but forChain was, so set the keySegments to index by chain
  200. keySegments = 1
  201. }
  202. if forAddress != "" {
  203. prefix = forChain + ":" + forAddress
  204. if groupBy == "" {
  205. // groupBy was not set, but forAddress was, so set the keySegments to index by address
  206. keySegments = 2
  207. }
  208. }
  209. }
  210. recent, err := fetchMostRecentRows(tbl, r.Context(), prefix, keySegments, resultCount)
  211. if err != nil {
  212. w.WriteHeader(http.StatusInternalServerError)
  213. w.Write([]byte(err.Error()))
  214. log.Println(err.Error())
  215. return
  216. }
  217. res := map[string][]*Summary{}
  218. for k, v := range recent {
  219. sort.Slice(v, func(i, j int) bool {
  220. // bigtable rows dont have timestamps, use a cell timestamp all rows will have.
  221. var iTimestamp bigtable.Timestamp
  222. var jTimestamp bigtable.Timestamp
  223. // rows may have: only MessagePublication, only QuorumState, or both.
  224. // find a timestamp for each row, try to use MessagePublication, if it exists:
  225. if len(v[i]["MessagePublication"]) >= 1 {
  226. iTimestamp = v[i]["MessagePublication"][0].Timestamp
  227. } else if len(v[i]["QuorumState"]) >= 1 {
  228. iTimestamp = v[i]["QuorumState"][0].Timestamp
  229. }
  230. if len(v[j]["MessagePublication"]) >= 1 {
  231. jTimestamp = v[j]["MessagePublication"][0].Timestamp
  232. } else if len(v[j]["QuorumState"]) >= 1 {
  233. jTimestamp = v[j]["QuorumState"][0].Timestamp
  234. }
  235. return iTimestamp > jTimestamp
  236. })
  237. // trim the result down to the requested amount now that sorting is complete
  238. num := len(v)
  239. var rows []bigtable.Row
  240. if num > resultCount {
  241. rows = v[:resultCount]
  242. } else {
  243. rows = v[:]
  244. }
  245. res[k] = make([]*Summary, len(rows))
  246. for i, r := range rows {
  247. res[k][i] = makeSummary(r)
  248. }
  249. }
  250. jsonBytes, err := json.Marshal(res)
  251. if err != nil {
  252. w.WriteHeader(http.StatusInternalServerError)
  253. w.Write([]byte(err.Error()))
  254. log.Println(err.Error())
  255. return
  256. }
  257. w.WriteHeader(http.StatusOK)
  258. w.Write(jsonBytes)
  259. }