notional-transferred.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net/http"
  10. "strconv"
  11. "sync"
  12. "time"
  13. "cloud.google.com/go/bigtable"
  14. )
  15. type transfersResult struct {
  16. Last24Hours map[string]map[string]map[string]float64
  17. WithinPeriod map[string]map[string]map[string]float64
  18. PeriodDurationDays int
  19. Daily map[string]map[string]map[string]map[string]float64
  20. }
  21. // an in-memory cache of previously calculated results
  22. var warmTransfersCache = map[string]map[string]map[string]map[string]map[string]float64{}
  23. var muWarmTransfersCache sync.RWMutex
  24. var warmTransfersCacheFilePath = "/notional-transferred-cache.json"
  25. // finds the daily amount of each symbol transferred from each chain, to each chain,
  26. // from the specified start to the present.
  27. func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]map[string]float64 {
  28. results := map[string]map[string]map[string]map[string]float64{}
  29. now := time.Now().UTC()
  30. numPrevDays := int(now.Sub(start).Hours() / 24)
  31. var intervalsWG sync.WaitGroup
  32. // there will be a query for each previous day, plus today
  33. intervalsWG.Add(numPrevDays + 1)
  34. // create the unique identifier for this query, for cache
  35. cachePrefix := createCachePrefix(prefix)
  36. cacheNeedsUpdate := false
  37. for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
  38. go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
  39. // start is the SOD, end is EOD
  40. // "0 daysAgo start" is 00:00:00 AM of the current day
  41. // "0 daysAgo end" is 23:59:59 of the current day (the future)
  42. // calulate the start and end times for the query
  43. hoursAgo := (24 * daysAgo)
  44. daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
  45. n := now.Add(daysAgoDuration)
  46. year := n.Year()
  47. month := n.Month()
  48. day := n.Day()
  49. loc := n.Location()
  50. start := time.Date(year, month, day, 0, 0, 0, 0, loc)
  51. end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
  52. dateStr := start.Format("2006-01-02")
  53. muWarmTransfersCache.Lock()
  54. // initialize the map for this date in the result set
  55. results[dateStr] = map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
  56. // check to see if there is cache data for this date/query
  57. if dates, ok := warmTransfersCache[cachePrefix]; ok {
  58. // have a cache for this query
  59. if dateCache, ok := dates[dateStr]; ok && len(dateCache) > 1 {
  60. // have a cache for this date
  61. if daysAgo >= 1 {
  62. // only use the cache for yesterday and older
  63. results[dateStr] = dateCache
  64. muWarmTransfersCache.Unlock()
  65. intervalsWG.Done()
  66. return
  67. }
  68. }
  69. } else {
  70. // no cache for this query, initialize the map
  71. warmTransfersCache[cachePrefix] = map[string]map[string]map[string]map[string]float64{}
  72. }
  73. muWarmTransfersCache.Unlock()
  74. defer intervalsWG.Done()
  75. queryResult := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
  76. // iterate through the rows and increment the amounts
  77. for _, row := range queryResult {
  78. if _, ok := results[dateStr][row.LeavingChain]; !ok {
  79. results[dateStr][row.LeavingChain] = map[string]map[string]float64{"*": {"*": 0}}
  80. }
  81. if _, ok := results[dateStr][row.LeavingChain][row.DestinationChain]; !ok {
  82. results[dateStr][row.LeavingChain][row.DestinationChain] = map[string]float64{"*": 0}
  83. }
  84. if _, ok := results[dateStr]["*"][row.DestinationChain]; !ok {
  85. results[dateStr]["*"][row.DestinationChain] = map[string]float64{"*": 0}
  86. }
  87. // add the transfer data to the result set every possible way:
  88. // by symbol, aggregated by: "leaving chain", "arriving at chain", "from any chain", "to any chain".
  89. // add to the total amount leaving this chain, going to any chain, for all symbols
  90. results[dateStr][row.LeavingChain]["*"]["*"] = results[dateStr][row.LeavingChain]["*"]["*"] + row.Notional
  91. // add to the total amount leaving this chain, going to the destination chain, for all symbols
  92. results[dateStr][row.LeavingChain][row.DestinationChain]["*"] = results[dateStr][row.LeavingChain][row.DestinationChain]["*"] + row.Notional
  93. // add to the total amount of this symbol leaving this chain, going to any chain
  94. results[dateStr][row.LeavingChain]["*"][row.TokenSymbol] = results[dateStr][row.LeavingChain]["*"][row.TokenSymbol] + row.Notional
  95. // add to the total amount of this symbol leaving this chain, going to the destination chain
  96. results[dateStr][row.LeavingChain][row.DestinationChain][row.TokenSymbol] = results[dateStr][row.LeavingChain][row.DestinationChain][row.TokenSymbol] + row.Notional
  97. // add to the total amount arriving at the destination chain, coming from anywhere, including all symbols
  98. results[dateStr]["*"][row.DestinationChain]["*"] = results[dateStr]["*"][row.DestinationChain]["*"] + row.Notional
  99. // add to the total amount of this symbol arriving at the destination chain
  100. results[dateStr]["*"][row.DestinationChain][row.TokenSymbol] = results[dateStr]["*"][row.DestinationChain][row.TokenSymbol] + row.Notional
  101. // add to the total amount of this symbol transferred, from any chain, to any chain
  102. results[dateStr]["*"]["*"][row.TokenSymbol] = results[dateStr]["*"]["*"][row.TokenSymbol] + row.Notional
  103. // and finally, total/total/total: amount of all symbols transferred from any chain to any other chain
  104. results[dateStr]["*"]["*"]["*"] = results[dateStr]["*"]["*"]["*"] + row.Notional
  105. }
  106. if daysAgo >= 1 {
  107. // set the result in the cache
  108. muWarmTransfersCache.Lock()
  109. if cacheData, ok := warmTransfersCache[cachePrefix][dateStr]; !ok || len(cacheData) == 1 {
  110. // cache does not have this date, add the data, and mark the cache stale
  111. warmTransfersCache[cachePrefix][dateStr] = results[dateStr]
  112. cacheNeedsUpdate = true
  113. }
  114. muWarmTransfersCache.Unlock()
  115. }
  116. }(tbl, ctx, prefix, daysAgo)
  117. }
  118. intervalsWG.Wait()
  119. if cacheNeedsUpdate {
  120. persistInterfaceToJson(warmTransfersCacheFilePath, &muWarmTransfersCache, warmTransfersCache)
  121. }
  122. // having consistent keys in each object is helpful for clients, explorer GUI
  123. // create a set of all the keys from all dates/chains, to ensure the result objects all have the same chain keys
  124. seenChainSet := map[string]bool{}
  125. for _, chains := range results {
  126. for leaving, dests := range chains {
  127. seenChainSet[leaving] = true
  128. for dest := range dests {
  129. seenChainSet[dest] = true
  130. }
  131. }
  132. }
  133. var muResult sync.RWMutex
  134. // ensure each chain object has all the same symbol keys:
  135. for date, chains := range results {
  136. for chain := range seenChainSet {
  137. if _, ok := chains[chain]; !ok {
  138. muResult.Lock()
  139. results[date][chain] = map[string]map[string]float64{"*": {"*": 0}}
  140. muResult.Unlock()
  141. }
  142. }
  143. for leaving := range chains {
  144. for chain := range seenChainSet {
  145. // check that date has all the chains
  146. if _, ok := chains[chain]; !ok {
  147. muResult.Lock()
  148. results[date][leaving][chain] = map[string]float64{"*": 0}
  149. muResult.Unlock()
  150. }
  151. }
  152. }
  153. }
  154. return results
  155. }
  156. // calculates the amount of each symbol that has gone from each chain, to each other chain, since the specified day.
  157. func transferredSinceDate(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
  158. result := map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
  159. dailyTotals := createTransfersOfInterval(tbl, ctx, prefix, start)
  160. for _, leaving := range dailyTotals {
  161. for chain, dests := range leaving {
  162. // ensure the chain exists in the result map
  163. if _, ok := result[chain]; !ok {
  164. result[chain] = map[string]map[string]float64{"*": {"*": 0}}
  165. }
  166. for dest, tokens := range dests {
  167. if _, ok := result[chain][dest]; !ok {
  168. result[chain][dest] = map[string]float64{"*": 0}
  169. }
  170. for symbol, amount := range tokens {
  171. if _, ok := result[chain][dest][symbol]; !ok {
  172. result[chain][dest][symbol] = 0
  173. }
  174. // add the amount of this symbol transferred this day to the
  175. // amount already in the result (amount of this symbol prevoiusly transferred)
  176. result[chain][dest][symbol] = result[chain][dest][symbol] + amount
  177. }
  178. }
  179. }
  180. }
  181. // create a set of chainIDs, the union of source and destination chains,
  182. // to ensure the result objects all have the same keys.
  183. seenChainSet := map[string]bool{}
  184. for leaving, dests := range result {
  185. seenChainSet[leaving] = true
  186. for dest := range dests {
  187. seenChainSet[dest] = true
  188. }
  189. }
  190. // make sure the root of the map has all the chainIDs
  191. for chain := range seenChainSet {
  192. if _, ok := result[chain]; !ok {
  193. result[chain] = map[string]map[string]float64{"*": {"*": 0}}
  194. }
  195. }
  196. // make sure that each chain at the root (leaving) as a key (destination) for each chain
  197. for leaving, dests := range result {
  198. for chain := range seenChainSet {
  199. // check that date has all the chains
  200. if _, ok := dests[chain]; !ok {
  201. result[leaving][chain] = map[string]float64{"*": 0}
  202. }
  203. }
  204. }
  205. return result
  206. }
  207. // returns the count of the rows in the query response
  208. func transfersForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) map[string]map[string]map[string]float64 {
  209. // query for all rows in time range, return result count
  210. queryResults := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
  211. result := map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
  212. // iterate through the rows and increment the count for each index
  213. for _, row := range queryResults {
  214. if _, ok := result[row.LeavingChain]; !ok {
  215. result[row.LeavingChain] = map[string]map[string]float64{"*": {"*": 0}}
  216. }
  217. if _, ok := result[row.LeavingChain][row.DestinationChain]; !ok {
  218. result[row.LeavingChain][row.DestinationChain] = map[string]float64{"*": 0}
  219. }
  220. if _, ok := result["*"][row.DestinationChain]; !ok {
  221. result["*"][row.DestinationChain] = map[string]float64{"*": 0}
  222. }
  223. // add the transfer data to the result set every possible way:
  224. // by symbol, aggregated by: "leaving chain", "arriving at chain", "from any chain", "to any chain".
  225. // add to the total amount leaving this chain, going to any chain, for all symbols
  226. result[row.LeavingChain]["*"]["*"] = result[row.LeavingChain]["*"]["*"] + row.Notional
  227. // add to the total amount leaving this chain, going to the destination chain, for all symbols
  228. result[row.LeavingChain][row.DestinationChain]["*"] = result[row.LeavingChain][row.DestinationChain]["*"] + row.Notional
  229. // add to the total amount of this symbol leaving this chain, going to any chain
  230. result[row.LeavingChain]["*"][row.TokenSymbol] = result[row.LeavingChain]["*"][row.TokenSymbol] + row.Notional
  231. // add to the total amount of this symbol leaving this chain, going to the destination chain
  232. result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] = result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] + row.Notional
  233. // add to the total amount arriving at the destination chain, coming from anywhere, including all symbols
  234. result["*"][row.DestinationChain]["*"] = result["*"][row.DestinationChain]["*"] + row.Notional
  235. // add to the total amount of this symbol arriving at the destination chain
  236. result["*"][row.DestinationChain][row.TokenSymbol] = result["*"][row.DestinationChain][row.TokenSymbol] + row.Notional
  237. // add to the total amount of this symbol transferred, from any chain, to any chain
  238. result["*"]["*"][row.TokenSymbol] = result["*"]["*"][row.TokenSymbol] + row.Notional
  239. // and finally, total/total/total: amount of all symbols transferred from any chain to any other chain
  240. result["*"]["*"]["*"] = result["*"]["*"]["*"] + row.Notional
  241. }
  242. // create a set of chainIDs, the union of source and destination chains,
  243. // to ensure the result objects all have the same keys.
  244. seenChainSet := map[string]bool{}
  245. for leaving, dests := range result {
  246. seenChainSet[leaving] = true
  247. for dest := range dests {
  248. seenChainSet[dest] = true
  249. }
  250. }
  251. // make sure the root of the map has all the chainIDs
  252. for chain := range seenChainSet {
  253. if _, ok := result[chain]; !ok {
  254. result[chain] = map[string]map[string]float64{"*": {"*": 0}}
  255. }
  256. }
  257. // make sure that each chain at the root (leaving) as a key (destination) for each chain
  258. for leaving, dests := range result {
  259. for chain := range seenChainSet {
  260. // check that date has all the chains
  261. if _, ok := dests[chain]; !ok {
  262. result[leaving][chain] = map[string]float64{"*": 0}
  263. }
  264. }
  265. }
  266. return result
  267. }
  268. // finds the value that has been transferred from each chain to each other, by symbol.
  269. func NotionalTransferred(w http.ResponseWriter, r *http.Request) {
  270. // Set CORS headers for the preflight request
  271. if r.Method == http.MethodOptions {
  272. w.Header().Set("Access-Control-Allow-Origin", "*")
  273. w.Header().Set("Access-Control-Allow-Methods", "POST")
  274. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  275. w.Header().Set("Access-Control-Max-Age", "3600")
  276. w.WriteHeader(http.StatusNoContent)
  277. return
  278. }
  279. // Set CORS headers for the main request.
  280. w.Header().Set("Access-Control-Allow-Origin", "*")
  281. var numDays, forChain, forAddress, daily, last24Hours, forPeriod string
  282. // allow GET requests with querystring params, or POST requests with json body.
  283. switch r.Method {
  284. case http.MethodGet:
  285. queryParams := r.URL.Query()
  286. numDays = queryParams.Get("numDays")
  287. forChain = queryParams.Get("forChain")
  288. forAddress = queryParams.Get("forAddress")
  289. daily = queryParams.Get("daily")
  290. last24Hours = queryParams.Get("last24Hours")
  291. forPeriod = queryParams.Get("forPeriod")
  292. case http.MethodPost:
  293. // declare request body properties
  294. var d struct {
  295. NumDays string `json:"numDays"`
  296. ForChain string `json:"forChain"`
  297. ForAddress string `json:"forAddress"`
  298. Daily string `json:"daily"`
  299. Last24Hours string `json:"last24Hours"`
  300. ForPeriod string `json:"forPeriod"`
  301. }
  302. // deserialize request body
  303. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  304. switch err {
  305. case io.EOF:
  306. // do nothing, empty body is ok
  307. default:
  308. log.Printf("json.NewDecoder: %v", err)
  309. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  310. return
  311. }
  312. }
  313. numDays = d.NumDays
  314. forChain = d.ForChain
  315. forAddress = d.ForAddress
  316. daily = d.Daily
  317. last24Hours = d.Last24Hours
  318. forPeriod = d.ForPeriod
  319. default:
  320. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  321. log.Println("Method Not Allowed")
  322. return
  323. }
  324. if daily == "" && last24Hours == "" && forPeriod == "" {
  325. // none of the options were set, so set one
  326. last24Hours = "true"
  327. }
  328. var queryDays int
  329. if numDays == "" {
  330. queryDays = 30
  331. } else {
  332. var convErr error
  333. queryDays, convErr = strconv.Atoi(numDays)
  334. if convErr != nil {
  335. fmt.Fprint(w, "numDays must be an integer")
  336. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  337. return
  338. }
  339. }
  340. // create the rowkey prefix for querying
  341. prefix := ""
  342. if forChain != "" {
  343. prefix = forChain
  344. // if the request is forChain, always groupBy chain
  345. if forAddress != "" {
  346. // if the request is forAddress, always groupBy address
  347. prefix = forChain + ":" + forAddress
  348. }
  349. }
  350. ctx, cancel := context.WithTimeout(context.Background(), 600*time.Second)
  351. defer cancel()
  352. var wg sync.WaitGroup
  353. // total of last 24 hours
  354. last24HourCount := map[string]map[string]map[string]float64{}
  355. if last24Hours != "" {
  356. wg.Add(1)
  357. go func(prefix string) {
  358. last24HourInterval := -time.Duration(24) * time.Hour
  359. now := time.Now().UTC()
  360. start := now.Add(last24HourInterval)
  361. defer wg.Done()
  362. transfers := transfersForInterval(tbl, ctx, prefix, start, now)
  363. for chain, dests := range transfers {
  364. last24HourCount[chain] = map[string]map[string]float64{}
  365. for dest, tokens := range dests {
  366. last24HourCount[chain][dest] = map[string]float64{}
  367. for symbol, amount := range tokens {
  368. last24HourCount[chain][dest][symbol] = roundToTwoDecimalPlaces(amount)
  369. }
  370. }
  371. }
  372. }(prefix)
  373. }
  374. // transfers of the last numDays
  375. periodTransfers := map[string]map[string]map[string]float64{}
  376. if forPeriod != "" {
  377. wg.Add(1)
  378. go func(prefix string) {
  379. hours := (24 * queryDays)
  380. periodInterval := -time.Duration(hours) * time.Hour
  381. now := time.Now().UTC()
  382. prev := now.Add(periodInterval)
  383. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  384. defer wg.Done()
  385. transfers := transferredSinceDate(tbl, ctx, prefix, start)
  386. for chain, dests := range transfers {
  387. periodTransfers[chain] = map[string]map[string]float64{}
  388. for dest, tokens := range dests {
  389. periodTransfers[chain][dest] = map[string]float64{}
  390. for symbol, amount := range tokens {
  391. periodTransfers[chain][dest][symbol] = roundToTwoDecimalPlaces(amount)
  392. }
  393. }
  394. }
  395. }(prefix)
  396. }
  397. // daily totals
  398. dailyTransfers := map[string]map[string]map[string]map[string]float64{}
  399. if daily != "" {
  400. wg.Add(1)
  401. go func(prefix string, queryDays int) {
  402. hours := (24 * queryDays)
  403. periodInterval := -time.Duration(hours) * time.Hour
  404. now := time.Now().UTC()
  405. prev := now.Add(periodInterval)
  406. start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
  407. defer wg.Done()
  408. transfers := createTransfersOfInterval(tbl, ctx, prefix, start)
  409. for date, chains := range transfers {
  410. dailyTransfers[date] = map[string]map[string]map[string]float64{}
  411. for chain, dests := range chains {
  412. dailyTransfers[date][chain] = map[string]map[string]float64{}
  413. for destChain, tokens := range dests {
  414. dailyTransfers[date][chain][destChain] = map[string]float64{}
  415. for symbol, amount := range tokens {
  416. dailyTransfers[date][chain][destChain][symbol] = roundToTwoDecimalPlaces(amount)
  417. }
  418. }
  419. }
  420. }
  421. }(prefix, queryDays)
  422. }
  423. wg.Wait()
  424. result := &transfersResult{
  425. Last24Hours: last24HourCount,
  426. WithinPeriod: periodTransfers,
  427. PeriodDurationDays: queryDays,
  428. Daily: dailyTransfers,
  429. }
  430. jsonBytes, err := json.Marshal(result)
  431. if err != nil {
  432. w.WriteHeader(http.StatusInternalServerError)
  433. w.Write([]byte(err.Error()))
  434. log.Println(err.Error())
  435. return
  436. }
  437. w.WriteHeader(http.StatusOK)
  438. w.Write(jsonBytes)
  439. }