nfts.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "html"
  8. "io"
  9. "log"
  10. "net/http"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "cloud.google.com/go/bigtable"
  15. )
  16. // warmNFTCache keeps some data around between invocations, so that we don't have
  17. // to do a full table scan with each request.
  18. // https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
  19. var warmNFTCache = map[string]map[string]map[string]int{}
  20. func fetchNFTRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]bigtable.Row, error) {
  21. rows := []bigtable.Row{}
  22. err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
  23. rows = append(rows, row)
  24. return true
  25. }, bigtable.RowFilter(
  26. bigtable.ConditionFilter(
  27. bigtable.ChainFilters(
  28. bigtable.FamilyFilter(columnFamilies[1]),
  29. bigtable.CellsPerRowLimitFilter(1), // only the first cell in column
  30. bigtable.TimestampRangeFilter(start, end), // within time range
  31. bigtable.StripValueFilter(), // no columns/values, just the row.Key()
  32. ),
  33. bigtable.ChainFilters(
  34. bigtable.FamilyFilter(columnFamilies[4]),
  35. bigtable.ColumnFilter("PayloadId"),
  36. bigtable.ValueFilter("1"),
  37. ),
  38. bigtable.BlockAllFilter(),
  39. )))
  40. return rows, err
  41. }
  42. func createNFTCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int, keySegments int) (map[string]map[string]int, error) {
  43. var mu sync.RWMutex
  44. results := map[string]map[string]int{}
  45. now := time.Now().UTC()
  46. var intervalsWG sync.WaitGroup
  47. // there will be a query for each previous day, plus today
  48. intervalsWG.Add(numPrevDays + 1)
  49. // create the unique identifier for this query, for cache
  50. cachePrefix := prefix
  51. if prefix == "" {
  52. cachePrefix = "*"
  53. }
  54. cachePrefix = fmt.Sprintf("%v-%v", cachePrefix, keySegments)
  55. for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
  56. go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
  57. // start is the SOD, end is EOD
  58. // "0 daysAgo start" is 00:00:00 AM of the current day
  59. // "0 daysAgo end" is 23:59:59 of the current day (the future)
  60. // calulate the start and end times for the query
  61. hoursAgo := (24 * daysAgo)
  62. daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
  63. n := now.Add(daysAgoDuration)
  64. year := n.Year()
  65. month := n.Month()
  66. day := n.Day()
  67. loc := n.Location()
  68. start := time.Date(year, month, day, 0, 0, 0, 0, loc)
  69. end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
  70. dateStr := start.Format("2006-01-02")
  71. mu.Lock()
  72. // initialize the map for this date in the result set
  73. results[dateStr] = map[string]int{"*": 0}
  74. // check to see if there is cache data for this date/query
  75. if dateCache, ok := warmNFTCache[dateStr]; ok {
  76. // have a cache for this date
  77. if val, ok := dateCache[cachePrefix]; ok {
  78. // have a cache for this query
  79. if daysAgo >= 1 {
  80. // only use the cache for yesterday and older
  81. results[dateStr] = val
  82. mu.Unlock()
  83. intervalsWG.Done()
  84. return
  85. }
  86. } else {
  87. // no cache for this query
  88. warmNFTCache[dateStr][cachePrefix] = map[string]int{}
  89. }
  90. } else {
  91. // no cache for this date, initialize the map
  92. warmNFTCache[dateStr] = map[string]map[string]int{}
  93. warmNFTCache[dateStr][cachePrefix] = map[string]int{}
  94. }
  95. mu.Unlock()
  96. var result []bigtable.Row
  97. var fetchErr error
  98. defer intervalsWG.Done()
  99. result, fetchErr = fetchNFTRowsInInterval(tbl, ctx, prefix, start, end)
  100. if fetchErr != nil {
  101. log.Fatalf("fetchNFTRowsInInterval returned an error: %v", fetchErr)
  102. }
  103. // iterate through the rows and increment the count
  104. for _, row := range result {
  105. countBy := makeGroupKey(keySegments, row.Key())
  106. if keySegments != 0 {
  107. // increment the total count
  108. results[dateStr]["*"] = results[dateStr]["*"] + 1
  109. }
  110. results[dateStr][countBy] = results[dateStr][countBy] + 1
  111. }
  112. // set the result in the cache
  113. warmNFTCache[dateStr][cachePrefix] = results[dateStr]
  114. }(tbl, ctx, prefix, daysAgo)
  115. }
  116. intervalsWG.Wait()
  117. // create a set of all the keys from all dates, to ensure the result objects all have the same keys
  118. seenKeySet := map[string]bool{}
  119. for _, v := range results {
  120. for key := range v {
  121. seenKeySet[key] = true
  122. }
  123. }
  124. // ensure each date object has the same keys:
  125. for date := range results {
  126. for key := range seenKeySet {
  127. if _, ok := results[date][key]; !ok {
  128. // add the missing key to the map
  129. results[date][key] = 0
  130. }
  131. }
  132. }
  133. return results, nil
  134. }
  135. // returns the count of the rows in the query response
  136. func nftMessageCountForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time, keySegments int) (map[string]int, error) {
  137. // query for all rows in time range, return result count
  138. results, fetchErr := fetchNFTRowsInInterval(tbl, ctx, prefix, start, end)
  139. if fetchErr != nil {
  140. log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
  141. return nil, fetchErr
  142. }
  143. result := map[string]int{"*": len(results)}
  144. // iterate through the rows and increment the count for each index
  145. if keySegments != 0 {
  146. for _, row := range results {
  147. countBy := makeGroupKey(keySegments, row.Key())
  148. result[countBy] = result[countBy] + 1
  149. }
  150. }
  151. return result, nil
  152. }
  153. // get number of recent transactions in the last 24 hours, and daily for a period
  154. // optionally group by a EmitterChain or EmitterAddress
  155. // optionally query for recent rows of a given EmitterChain or EmitterAddress
  156. func NFTs(w http.ResponseWriter, r *http.Request) {
  157. // Set CORS headers for the preflight request
  158. if r.Method == http.MethodOptions {
  159. w.Header().Set("Access-Control-Allow-Origin", "*")
  160. w.Header().Set("Access-Control-Allow-Methods", "POST")
  161. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  162. w.Header().Set("Access-Control-Max-Age", "3600")
  163. w.WriteHeader(http.StatusNoContent)
  164. return
  165. }
  166. // Set CORS headers for the main request.
  167. w.Header().Set("Access-Control-Allow-Origin", "*")
  168. var numDays, groupBy, forChain, forAddress string
  169. // allow GET requests with querystring params, or POST requests with json body.
  170. switch r.Method {
  171. case http.MethodGet:
  172. queryParams := r.URL.Query()
  173. numDays = queryParams.Get("numDays")
  174. groupBy = queryParams.Get("groupBy")
  175. forChain = queryParams.Get("forChain")
  176. forAddress = queryParams.Get("forAddress")
  177. readyCheck := queryParams.Get("readyCheck")
  178. if readyCheck != "" {
  179. // for running in devnet
  180. w.WriteHeader(http.StatusOK)
  181. fmt.Fprint(w, html.EscapeString("ready"))
  182. return
  183. }
  184. case http.MethodPost:
  185. // declare request body properties
  186. var d struct {
  187. NumDays string `json:"numDays"`
  188. GroupBy string `json:"groupBy"`
  189. ForChain string `json:"forChain"`
  190. ForAddress string `json:"forAddress"`
  191. }
  192. // deserialize request body
  193. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  194. switch err {
  195. case io.EOF:
  196. // do nothing, empty body is ok
  197. default:
  198. log.Printf("json.NewDecoder: %v", err)
  199. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  200. return
  201. }
  202. }
  203. numDays = d.NumDays
  204. groupBy = d.GroupBy
  205. forChain = d.ForChain
  206. forAddress = d.ForAddress
  207. default:
  208. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  209. log.Println("Method Not Allowed")
  210. return
  211. }
  212. var queryDays int
  213. if numDays == "" {
  214. queryDays = 30
  215. } else {
  216. var convErr error
  217. queryDays, convErr = strconv.Atoi(numDays)
  218. if convErr != nil {
  219. fmt.Fprint(w, "numDays must be an integer")
  220. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  221. return
  222. }
  223. }
  224. // create the rowkey prefix for querying
  225. prefix := ""
  226. if forChain != "" {
  227. prefix = forChain
  228. if groupBy == "" {
  229. // if the request is forChain, and groupBy is empty, set it to groupBy chain
  230. groupBy = "chain"
  231. }
  232. if forAddress != "" {
  233. // if the request is forAddress, always groupBy address
  234. groupBy = "address"
  235. prefix = forChain + ":" + forAddress
  236. }
  237. }
  238. // use the groupBy value to determine how many segements of the rowkey should be used.
  239. keySegments := 0
  240. if groupBy == "chain" {
  241. keySegments = 1
  242. }
  243. if groupBy == "address" {
  244. keySegments = 2
  245. }
  246. ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
  247. defer cancel()
  248. var wg sync.WaitGroup
  249. // total of last 24 hours
  250. var last24HourCount map[string]int
  251. wg.Add(1)
  252. go func(prefix string, keySegments int) {
  253. var err error
  254. last24HourInterval := -time.Duration(24) * time.Hour
  255. now := time.Now().UTC()
  256. start := now.Add(last24HourInterval)
  257. defer wg.Done()
  258. last24HourCount, err = nftMessageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
  259. if err != nil {
  260. log.Printf("failed getting count for interval, err: %v", err)
  261. }
  262. }(prefix, keySegments)
  263. // total of the last 30 days
  264. var periodCount map[string]int
  265. wg.Add(1)
  266. go func(prefix string, keySegments int) {
  267. var err error
  268. hours := (24 * queryDays)
  269. periodInterval := -time.Duration(hours) * time.Hour
  270. now := time.Now().UTC()
  271. prev := now.Add(periodInterval)
  272. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  273. defer wg.Done()
  274. periodCount, err = nftMessageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
  275. if err != nil {
  276. log.Fatalf("failed getting count for interval, err: %v", err)
  277. }
  278. }(prefix, keySegments)
  279. // daily totals
  280. var dailyTotals map[string]map[string]int
  281. wg.Add(1)
  282. go func(prefix string, keySegments int, queryDays int) {
  283. var err error
  284. defer wg.Done()
  285. dailyTotals, err = createNFTCountsOfInterval(tbl, ctx, prefix, queryDays, keySegments)
  286. if err != nil {
  287. log.Fatalf("failed getting createCountsOfInterval err %v", err)
  288. }
  289. }(prefix, keySegments, queryDays)
  290. wg.Wait()
  291. result := &totalsResult{
  292. LastDayCount: last24HourCount,
  293. TotalCount: periodCount,
  294. DailyTotals: dailyTotals,
  295. }
  296. jsonBytes, err := json.Marshal(result)
  297. if err != nil {
  298. w.WriteHeader(http.StatusInternalServerError)
  299. w.Write([]byte(err.Error()))
  300. log.Println(err.Error())
  301. return
  302. }
  303. w.WriteHeader(http.StatusOK)
  304. w.Write(jsonBytes)
  305. }