recent.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "html"
  8. "io"
  9. "log"
  10. "net/http"
  11. "os"
  12. "sort"
  13. "strconv"
  14. "strings"
  15. "cloud.google.com/go/bigtable"
  16. )
  17. // query for last of each rowKey prefix
  18. func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int) map[string]string {
  19. mostRecentByKeySegment := map[string]string{}
  20. err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
  21. keyParts := strings.Split(row.Key(), ":")
  22. groupByKey := strings.Join(keyParts[:2], ":")
  23. mostRecentByKeySegment[groupByKey] = row.Key()
  24. return true
  25. // TODO - add filter to only return rows created within the last 30(?) days
  26. }, bigtable.RowFilter(bigtable.StripValueFilter()))
  27. if err != nil {
  28. log.Fatalf("failed to read recent rows: %v", err)
  29. }
  30. return mostRecentByKeySegment
  31. }
  32. func fetchMostRecentRows(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int, numRowsToFetch int) (map[string][]bigtable.Row, error) {
  33. // returns { key: []bigtable.Row }, key either being "*", "chainID", "chainID:address"
  34. latest := getLatestOfEachEmitterAddress(tbl, ctx, prefix, keySegments)
  35. // key/value pairs are the start/stop rowKeys for range queries
  36. rangePairs := map[string]string{}
  37. for _, highestSequenceKey := range latest {
  38. rowKeyParts := strings.Split(highestSequenceKey, ":")
  39. // convert the sequence part of the rowkey from a string to an int, so it can be used for math
  40. highSequence, _ := strconv.Atoi(rowKeyParts[2])
  41. lowSequence := highSequence - numRowsToFetch
  42. // create a rowKey to use as the start of the range query
  43. rangeQueryStart := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], lowSequence)
  44. // create a rowKey with the highest seen sequence + 1, because range end is exclusive
  45. rangeQueryEnd := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], highSequence+1)
  46. rangePairs[rangeQueryStart] = rangeQueryEnd
  47. }
  48. rangeList := bigtable.RowRangeList{}
  49. for k, v := range rangePairs {
  50. rangeList = append(rangeList, bigtable.NewRange(k, v))
  51. }
  52. results := map[string][]bigtable.Row{}
  53. err := tbl.ReadRows(ctx, rangeList, func(row bigtable.Row) bool {
  54. var groupByKey string
  55. if keySegments == 0 {
  56. groupByKey = "*"
  57. } else {
  58. keyParts := strings.Split(row.Key(), ":")
  59. groupByKey = strings.Join(keyParts[:keySegments], ":")
  60. }
  61. results[groupByKey] = append(results[groupByKey], row)
  62. return true
  63. })
  64. if err != nil {
  65. log.Printf("failed reading row ranges. err: %v", err)
  66. return nil, err
  67. }
  68. return results, nil
  69. }
  70. // fetch recent rows.
  71. // optionally group by a EmitterChain or EmitterAddress
  72. // optionally query for recent rows of a given EmitterChain or EmitterAddress
  73. func Recent(w http.ResponseWriter, r *http.Request) {
  74. // Set CORS headers for the preflight request
  75. if r.Method == http.MethodOptions {
  76. w.Header().Set("Access-Control-Allow-Origin", "*")
  77. w.Header().Set("Access-Control-Allow-Methods", "POST")
  78. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  79. w.Header().Set("Access-Control-Max-Age", "3600")
  80. w.WriteHeader(http.StatusNoContent)
  81. return
  82. }
  83. // Set CORS headers for the main request.
  84. w.Header().Set("Access-Control-Allow-Origin", "*")
  85. var numRows, groupBy, forChain, forAddress string
  86. // allow GET requests with querystring params, or POST requests with json body.
  87. switch r.Method {
  88. case http.MethodGet:
  89. queryParams := r.URL.Query()
  90. numRows = queryParams.Get("numRows")
  91. groupBy = queryParams.Get("groupBy")
  92. forChain = queryParams.Get("forChain")
  93. forAddress = queryParams.Get("forAddress")
  94. readyCheck := queryParams.Get("readyCheck")
  95. if readyCheck != "" {
  96. // for running in devnet
  97. w.WriteHeader(http.StatusOK)
  98. fmt.Fprint(w, html.EscapeString("ready"))
  99. return
  100. }
  101. case http.MethodPost:
  102. // declare request body properties
  103. var d struct {
  104. NumRows string `json:"numRows"`
  105. GroupBy string `json:"groupBy"`
  106. ForChain string `json:"forChain"`
  107. ForAddress string `json:"forAddress"`
  108. }
  109. // deserialize request body
  110. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  111. switch err {
  112. case io.EOF:
  113. // do nothing, empty body is ok
  114. default:
  115. log.Printf("json.NewDecoder: %v", err)
  116. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  117. return
  118. }
  119. }
  120. numRows = d.NumRows
  121. groupBy = d.GroupBy
  122. forChain = d.ForChain
  123. forAddress = d.ForAddress
  124. default:
  125. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  126. log.Println("Method Not Allowed")
  127. return
  128. }
  129. var resultCount int
  130. if numRows == "" {
  131. resultCount = 30
  132. } else {
  133. var convErr error
  134. resultCount, convErr = strconv.Atoi(numRows)
  135. if convErr != nil {
  136. fmt.Fprint(w, "numRows must be an integer")
  137. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  138. return
  139. }
  140. }
  141. // create bibtable client and open table
  142. clientOnce.Do(func() {
  143. // Declare a separate err variable to avoid shadowing client.
  144. var err error
  145. project := os.Getenv("GCP_PROJECT")
  146. instance := os.Getenv("BIGTABLE_INSTANCE")
  147. client, err = bigtable.NewClient(context.Background(), project, instance)
  148. if err != nil {
  149. http.Error(w, "Error initializing client", http.StatusInternalServerError)
  150. log.Printf("bigtable.NewClient: %v", err)
  151. return
  152. }
  153. })
  154. tbl := client.Open("v2Events")
  155. // use the groupBy value to determine how many segements of the rowkey should be used for indexing results.
  156. keySegments := 0
  157. if groupBy == "chain" {
  158. keySegments = 1
  159. }
  160. if groupBy == "address" {
  161. keySegments = 2
  162. }
  163. // create the rowkey prefix for querying, and the keySegments to use for indexing results.
  164. prefix := ""
  165. if forChain != "" {
  166. prefix = forChain
  167. if groupBy == "" {
  168. // groupBy was not set, but forChain was, so set the keySegments to index by chain
  169. keySegments = 1
  170. }
  171. if forAddress != "" {
  172. prefix = forChain + ":" + forAddress
  173. if groupBy == "" {
  174. // groupBy was not set, but forAddress was, so set the keySegments to index by address
  175. keySegments = 2
  176. }
  177. }
  178. }
  179. recent, err := fetchMostRecentRows(tbl, r.Context(), prefix, keySegments, resultCount)
  180. if err != nil {
  181. w.WriteHeader(http.StatusInternalServerError)
  182. w.Write([]byte(err.Error()))
  183. log.Println(err.Error())
  184. return
  185. }
  186. res := map[string][]*Summary{}
  187. for k, v := range recent {
  188. sort.Slice(v, func(i, j int) bool {
  189. // bigtable rows dont have timestamps, use a cell timestamp all rows will have.
  190. return v[i]["MessagePublication"][0].Timestamp > v[j]["MessagePublication"][0].Timestamp
  191. })
  192. // trim the result down to the requested amount now that sorting is complete
  193. num := len(v)
  194. var rows []bigtable.Row
  195. if num > resultCount {
  196. rows = v[:resultCount]
  197. } else {
  198. rows = v[:]
  199. }
  200. res[k] = make([]*Summary, len(rows))
  201. for i, r := range rows {
  202. res[k][i] = makeSummary(r)
  203. }
  204. }
  205. jsonBytes, err := json.Marshal(res)
  206. if err != nil {
  207. w.WriteHeader(http.StatusInternalServerError)
  208. w.Write([]byte(err.Error()))
  209. log.Println(err.Error())
  210. return
  211. }
  212. w.WriteHeader(http.StatusOK)
  213. w.Write(jsonBytes)
  214. }