shared.go 16 KB


  1. package p
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log"
  6. "math"
  7. "net/http"
  8. "os"
  9. "strings"
  10. "sync"
  11. "time"
  12. "cloud.google.com/go/bigtable"
  13. "cloud.google.com/go/pubsub"
  14. "github.com/certusone/wormhole/node/pkg/vaa"
  15. )
  16. // shared code for the various functions, primarily response formatting.
  17. // client is a global Bigtable client, to avoid initializing a new client for
  18. // every request.
  19. var client *bigtable.Client
  20. var clientOnce sync.Once
  21. var tbl *bigtable.Table
  22. var pubsubClient *pubsub.Client
  23. var pubSubTokenTransferDetailsTopic *pubsub.Topic
  24. var coinGeckoCoins = map[string][]CoinGeckoCoin{}
  25. var solanaTokens = map[string]SolanaToken{}
  26. var releaseDay = time.Date(2021, 9, 13, 0, 0, 0, 0, time.UTC)
  27. var pwd string
  28. func initCache(waitgroup *sync.WaitGroup, filePath string, mutex *sync.RWMutex, cacheInterface interface{}) {
  29. defer waitgroup.Done()
  30. loadJsonToInterface(filePath, mutex, cacheInterface)
  31. }
  32. // init runs during cloud function initialization. So, this will only run during an
  33. // an instance's cold start.
  34. // https://cloud.google.com/functions/docs/bestpractices/networking#accessing_google_apis
  35. func init() {
  36. clientOnce.Do(func() {
  37. // Declare a separate err variable to avoid shadowing client.
  38. var err error
  39. project := os.Getenv("GCP_PROJECT")
  40. instance := os.Getenv("BIGTABLE_INSTANCE")
  41. client, err = bigtable.NewClient(context.Background(), project, instance)
  42. if err != nil {
  43. // http.Error(w, "Error initializing client", http.StatusInternalServerError)
  44. log.Printf("bigtable.NewClient error: %v", err)
  45. return
  46. }
  47. var pubsubErr error
  48. pubsubClient, pubsubErr = pubsub.NewClient(context.Background(), project)
  49. if pubsubErr != nil {
  50. log.Printf("pubsub.NewClient error: %v", pubsubErr)
  51. return
  52. }
  53. })
  54. tbl = client.Open("v2Events")
  55. // create the topic that will be published to after decoding token transfer payloads
  56. tokenTransferDetailsTopic := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC")
  57. if tokenTransferDetailsTopic != "" {
  58. pubSubTokenTransferDetailsTopic = pubsubClient.Topic(tokenTransferDetailsTopic)
  59. // fetch the token lists once at start up
  60. coinGeckoCoins = fetchCoinGeckoCoins()
  61. solanaTokens = fetchSolanaTokenList()
  62. }
  63. pwd, _ = os.Getwd()
  64. // initialize in-memory caches
  65. var initWG sync.WaitGroup
  66. initWG.Add(1)
  67. // populates cache used by amountsTransferredToInInterval
  68. go initCache(&initWG, warmTransfersToCacheFilePath, &muWarmTransfersToCache, &warmTransfersToCache)
  69. initWG.Add(1)
  70. // populates cache used by createTransfersOfInterval
  71. go initCache(&initWG, warmTransfersCacheFilePath, &muWarmTransfersCache, &warmTransfersCache)
  72. initWG.Add(1)
  73. // populates cache used by createAddressesOfInterval
  74. go initCache(&initWG, warmAddressesCacheFilePath, &muWarmAddressesCache, &warmAddressesCache)
  75. initWG.Add(1)
  76. // populates cache used by transferredToSince
  77. go initCache(&initWG, transferredToUpToYesterdayFilePath, &muTransferredToUpToYesterday, &transferredToUpToYesterday)
  78. // initWG.Add(1)
  79. // populates cache used by transferredSince
  80. // initCache(initWG, transferredUpToYesterdayFilePath, &muTransferredToUpYesterday, &transferredUpToYesterday)
  81. initWG.Add(1)
  82. // populates cache used by addressesTransferredToSince
  83. go initCache(&initWG, addressesToUpToYesterdayFilePath, &muAddressesToUpToYesterday, &addressesToUpToYesterday)
  84. initWG.Add(1)
  85. // populates cache used by createCumulativeAmountsOfInterval
  86. go initCache(&initWG, warmCumulativeCacheFilePath, &muWarmCumulativeCache, &warmCumulativeCache)
  87. initWG.Add(1)
  88. // populates cache used by createCumulativeAddressesOfInterval
  89. go initCache(&initWG, warmCumulativeAddressesCacheFilePath, &muWarmCumulativeAddressesCache, &warmCumulativeAddressesCache)
  90. initWG.Wait()
  91. log.Println("done initializing caches, starting.")
  92. }
  93. var gcpCachePath = "/workspace/src/p/cache"
  94. func loadJsonToInterface(filePath string, mutex *sync.RWMutex, cacheMap interface{}) {
  95. // create path to the static cache dir
  96. path := gcpCachePath + filePath
  97. // create path to the "hot" cache dir
  98. hotPath := "/tmp" + filePath
  99. if strings.HasSuffix(pwd, "cmd") {
  100. // alter the path to be correct when running locally, and in Tilt devnet
  101. path = "../cache" + filePath
  102. hotPath = ".." + hotPath
  103. }
  104. mutex.Lock()
  105. // first check to see if there is a cache file in the tmp dir of the cloud function.
  106. // if so, this is a long running instance with a recently generated cache available.
  107. fileData, readErrTmp := os.ReadFile(hotPath)
  108. if readErrTmp != nil {
  109. log.Printf("failed reading from tmp cache %v, err: %v", hotPath, readErrTmp)
  110. var readErr error
  111. fileData, readErr = os.ReadFile(path)
  112. if readErr != nil {
  113. log.Printf("failed reading %v, err: %v", path, readErr)
  114. } else {
  115. log.Printf("successfully read from cache: %v", path)
  116. }
  117. } else {
  118. log.Printf("successfully read from tmp cache: %v", hotPath)
  119. }
  120. unmarshalErr := json.Unmarshal(fileData, &cacheMap)
  121. mutex.Unlock()
  122. if unmarshalErr != nil {
  123. log.Printf("failed unmarshaling %v, err: %v", path, unmarshalErr)
  124. }
  125. }
  126. func persistInterfaceToJson(filePath string, mutex *sync.RWMutex, cacheMap interface{}) {
  127. path := "/tmp" + filePath
  128. if strings.HasSuffix(pwd, "cmd") {
  129. // alter the path to be correct when running locally, and in Tilt devnet
  130. path = "../cache" + filePath
  131. }
  132. mutex.Lock()
  133. cacheBytes, marshalErr := json.MarshalIndent(cacheMap, "", " ")
  134. if marshalErr != nil {
  135. log.Fatal("failed marshaling cacheMap.", marshalErr)
  136. }
  137. writeErr := os.WriteFile(path, cacheBytes, 0666)
  138. mutex.Unlock()
  139. if writeErr != nil {
  140. log.Fatalf("failed writing to file %v, err: %v", path, writeErr)
  141. }
  142. log.Printf("successfully wrote cache to file: %v", path)
  143. }
  144. var columnFamilies = []string{
  145. "MessagePublication",
  146. "QuorumState",
  147. "TokenTransferPayload",
  148. "AssetMetaPayload",
  149. "NFTTransferPayload",
  150. "TokenTransferDetails",
  151. "ChainDetails",
  152. }
  153. var messagePubFam = columnFamilies[0]
  154. var quorumStateFam = columnFamilies[1]
  155. var transferPayloadFam = columnFamilies[2]
  156. var metaPayloadFam = columnFamilies[3]
  157. var nftPayloadFam = columnFamilies[4]
  158. var transferDetailsFam = columnFamilies[5]
  159. var chainDetailsFam = columnFamilies[6]
  160. type (
  161. // Summary is MessagePublication data & QuorumState data
  162. Summary struct {
  163. EmitterChain string
  164. EmitterAddress string
  165. Sequence string
  166. InitiatingTxID string
  167. Payload []byte
  168. SignedVAABytes []byte
  169. QuorumTime string
  170. TransferDetails *TransferDetails
  171. }
  172. // Details is a Summary extended with all the post-processing ColumnFamilies
  173. Details struct {
  174. Summary
  175. SignedVAA *vaa.VAA
  176. TokenTransferPayload *TokenTransferPayload
  177. AssetMetaPayload *AssetMetaPayload
  178. NFTTransferPayload *NFTTransferPayload
  179. ChainDetails *ChainDetails
  180. }
  181. // The following structs match the ColumnFamiles they are named after
  182. TokenTransferPayload struct {
  183. Amount string
  184. OriginAddress string
  185. OriginChain string
  186. TargetAddress string
  187. TargetChain string
  188. }
  189. AssetMetaPayload struct {
  190. TokenAddress string
  191. TokenChain string
  192. Decimals string
  193. Symbol string
  194. Name string
  195. CoinGeckoCoinId string
  196. NativeAddress string
  197. }
  198. NFTTransferPayload struct {
  199. OriginAddress string
  200. OriginChain string
  201. Symbol string
  202. Name string
  203. TokenId string
  204. URI string
  205. TargetAddress string
  206. TargetChain string
  207. }
  208. TransferDetails struct {
  209. Amount string
  210. Decimals string
  211. NotionalUSDStr string
  212. TokenPriceUSDStr string
  213. TransferTimestamp string
  214. OriginSymbol string
  215. OriginName string
  216. OriginTokenAddress string
  217. // fields below exist on the row, but no need to return them currently.
  218. // NotionalUSD uint64
  219. // TokenPriceUSD uint64
  220. }
  221. ChainDetails struct {
  222. SenderAddress string
  223. ReceiverAddress string
  224. }
  225. )
  226. func chainIdStringToType(chainId string) vaa.ChainID {
  227. switch chainId {
  228. case "1":
  229. return vaa.ChainIDSolana
  230. case "2":
  231. return vaa.ChainIDEthereum
  232. case "3":
  233. return vaa.ChainIDTerra
  234. case "4":
  235. return vaa.ChainIDBSC
  236. case "5":
  237. return vaa.ChainIDPolygon
  238. }
  239. return vaa.ChainIDUnset
  240. }
  241. func makeSummary(row bigtable.Row) *Summary {
  242. summary := &Summary{}
  243. if _, ok := row[messagePubFam]; ok {
  244. for _, item := range row[messagePubFam] {
  245. switch item.Column {
  246. case "MessagePublication:InitiatingTxID":
  247. summary.InitiatingTxID = string(item.Value)
  248. case "MessagePublication:Payload":
  249. summary.Payload = item.Value
  250. case "MessagePublication:EmitterChain":
  251. summary.EmitterChain = string(item.Value)
  252. case "MessagePublication:EmitterAddress":
  253. summary.EmitterAddress = string(item.Value)
  254. case "MessagePublication:Sequence":
  255. summary.Sequence = string(item.Value)
  256. }
  257. }
  258. } else {
  259. // Some rows have a QuorumState, but no MessagePublication,
  260. // so populate Summary values from the rowKey.
  261. keyParts := strings.Split(row.Key(), ":")
  262. chainId := chainIdStringToType(keyParts[0])
  263. summary.EmitterChain = chainId.String()
  264. summary.EmitterAddress = keyParts[1]
  265. seq := strings.TrimLeft(keyParts[2], "0")
  266. if seq == "" {
  267. seq = "0"
  268. }
  269. summary.Sequence = seq
  270. }
  271. if _, ok := row[quorumStateFam]; ok {
  272. item := row[quorumStateFam][0]
  273. summary.SignedVAABytes = item.Value
  274. summary.QuorumTime = item.Timestamp.Time().String()
  275. }
  276. if _, ok := row[transferDetailsFam]; ok {
  277. transferDetails := &TransferDetails{}
  278. for _, item := range row[transferDetailsFam] {
  279. switch item.Column {
  280. case "TokenTransferDetails:Amount":
  281. transferDetails.Amount = string(item.Value)
  282. case "TokenTransferDetails:Decimals":
  283. transferDetails.Decimals = string(item.Value)
  284. case "TokenTransferDetails:NotionalUSDStr":
  285. transferDetails.NotionalUSDStr = string(item.Value)
  286. case "TokenTransferDetails:TokenPriceUSDStr":
  287. transferDetails.TokenPriceUSDStr = string(item.Value)
  288. case "TokenTransferDetails:TransferTimestamp":
  289. transferDetails.TransferTimestamp = string(item.Value)
  290. case "TokenTransferDetails:OriginSymbol":
  291. transferDetails.OriginSymbol = string(item.Value)
  292. case "TokenTransferDetails:OriginName":
  293. transferDetails.OriginName = string(item.Value)
  294. case "TokenTransferDetails:OriginTokenAddress":
  295. transferDetails.OriginTokenAddress = string(item.Value)
  296. }
  297. }
  298. summary.TransferDetails = transferDetails
  299. }
  300. return summary
  301. }
  302. func makeDetails(row bigtable.Row) *Details {
  303. deets := &Details{}
  304. sum := makeSummary(row)
  305. deets.Summary = Summary{
  306. EmitterChain: sum.EmitterChain,
  307. EmitterAddress: sum.EmitterAddress,
  308. Sequence: sum.Sequence,
  309. InitiatingTxID: sum.InitiatingTxID,
  310. Payload: sum.Payload,
  311. SignedVAABytes: sum.SignedVAABytes,
  312. QuorumTime: sum.QuorumTime,
  313. TransferDetails: sum.TransferDetails,
  314. }
  315. if _, ok := row[quorumStateFam]; ok {
  316. item := row[quorumStateFam][0]
  317. deets.SignedVAA, _ = vaa.Unmarshal(item.Value)
  318. }
  319. if _, ok := row[transferPayloadFam]; ok {
  320. tokenTransferPayload := &TokenTransferPayload{}
  321. for _, item := range row[transferPayloadFam] {
  322. switch item.Column {
  323. case "TokenTransferPayload:Amount":
  324. tokenTransferPayload.Amount = string(item.Value)
  325. case "TokenTransferPayload:OriginAddress":
  326. tokenTransferPayload.OriginAddress = string(item.Value)
  327. case "TokenTransferPayload:OriginChain":
  328. tokenTransferPayload.OriginChain = string(item.Value)
  329. case "TokenTransferPayload:TargetAddress":
  330. tokenTransferPayload.TargetAddress = string(item.Value)
  331. case "TokenTransferPayload:TargetChain":
  332. tokenTransferPayload.TargetChain = string(item.Value)
  333. }
  334. }
  335. deets.TokenTransferPayload = tokenTransferPayload
  336. }
  337. if _, ok := row[metaPayloadFam]; ok {
  338. assetMetaPayload := &AssetMetaPayload{}
  339. for _, item := range row[metaPayloadFam] {
  340. switch item.Column {
  341. case "AssetMetaPayload:TokenAddress":
  342. assetMetaPayload.TokenAddress = string(item.Value)
  343. case "AssetMetaPayload:TokenChain":
  344. assetMetaPayload.TokenChain = string(item.Value)
  345. case "AssetMetaPayload:Decimals":
  346. assetMetaPayload.Decimals = string(item.Value)
  347. case "AssetMetaPayload:Symbol":
  348. assetMetaPayload.Symbol = string(item.Value)
  349. case "AssetMetaPayload:Name":
  350. assetMetaPayload.Name = string(item.Value)
  351. case "AssetMetaPayload:CoinGeckoCoinId":
  352. assetMetaPayload.CoinGeckoCoinId = string(item.Value)
  353. case "AssetMetaPayload:NativeAddress":
  354. assetMetaPayload.NativeAddress = string(item.Value)
  355. }
  356. }
  357. deets.AssetMetaPayload = assetMetaPayload
  358. }
  359. if _, ok := row[nftPayloadFam]; ok {
  360. nftTransferPayload := &NFTTransferPayload{}
  361. for _, item := range row[nftPayloadFam] {
  362. switch item.Column {
  363. case "NFTTransferPayload:OriginAddress":
  364. nftTransferPayload.OriginAddress = string(item.Value)
  365. case "NFTTransferPayload:OriginChain":
  366. nftTransferPayload.OriginChain = string(item.Value)
  367. case "NFTTransferPayload:Symbol":
  368. nftTransferPayload.Symbol = string(item.Value)
  369. case "NFTTransferPayload:Name":
  370. nftTransferPayload.Name = string(item.Value)
  371. case "NFTTransferPayload:TokenId":
  372. nftTransferPayload.TokenId = string(item.Value)
  373. case "NFTTransferPayload:URI":
  374. nftTransferPayload.URI = string(TrimUnicodeFromByteArray(item.Value))
  375. case "NFTTransferPayload:TargetAddress":
  376. nftTransferPayload.TargetAddress = string(item.Value)
  377. case "NFTTransferPayload:TargetChain":
  378. nftTransferPayload.TargetChain = string(item.Value)
  379. }
  380. }
  381. deets.NFTTransferPayload = nftTransferPayload
  382. }
  383. // NotionalUSD and TokenPriceUSD are more percise than the string versions returned,
  384. // however the precision is not required, so leaving this commented out for now.
  385. // if _, ok := row[transferDetailsFam]; ok {
  386. // for _, item := range row[transferDetailsFam] {
  387. // switch item.Column {
  388. // case "TokenTransferDetails:NotionalUSD":
  389. // reader := bytes.NewReader(item.Value)
  390. // var notionalUSD uint64
  391. // if err := binary.Read(reader, binary.BigEndian, &notionalUSD); err != nil {
  392. // log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err)
  393. // }
  394. // deets.TransferDetails.NotionalUSD = notionalUSD
  395. // case "TokenTransferDetails:TokenPriceUSD":
  396. // reader := bytes.NewReader(item.Value)
  397. // var tokenPriceUSD uint64
  398. // if err := binary.Read(reader, binary.BigEndian, &tokenPriceUSD); err != nil {
  399. // log.Fatalf("failed to read TokenPriceUSD of row: %v. err %v", row.Key(), err)
  400. // }
  401. // deets.TransferDetails.TokenPriceUSD = tokenPriceUSD
  402. // }
  403. // }
  404. // }
  405. if _, ok := row[chainDetailsFam]; ok {
  406. chainDetails := &ChainDetails{}
  407. for _, item := range row[chainDetailsFam] {
  408. switch item.Column {
  409. case "ChainDetails:SenderAddress":
  410. chainDetails.SenderAddress = string(item.Value)
  411. case "ChainDetails:ReceiverAddress":
  412. chainDetails.ReceiverAddress = string(item.Value)
  413. }
  414. }
  415. deets.ChainDetails = chainDetails
  416. }
  417. return deets
  418. }
  419. func roundToTwoDecimalPlaces(num float64) float64 {
  420. return math.Round(num*100) / 100
  421. }
  422. func createCachePrefix(prefix string) string {
  423. cachePrefix := prefix
  424. if prefix == "" {
  425. cachePrefix = "*"
  426. }
  427. return cachePrefix
  428. }
  429. var mux = newMux()
  430. // Entry is the cloud function entry point
  431. func Entry(w http.ResponseWriter, r *http.Request) {
  432. mux.ServeHTTP(w, r)
  433. }
  434. func newMux() *http.ServeMux {
  435. mux := http.NewServeMux()
  436. mux.HandleFunc("/notionaltransferred", NotionalTransferred)
  437. mux.HandleFunc("/notionaltransferredto", NotionalTransferredTo)
  438. mux.HandleFunc("/notionaltransferredtocumulative", NotionalTransferredToCumulative)
  439. mux.HandleFunc("/addressestransferredto", AddressesTransferredTo)
  440. mux.HandleFunc("/addressestransferredtocumulative", AddressesTransferredToCumulative)
  441. mux.HandleFunc("/totals", Totals)
  442. mux.HandleFunc("/recent", Recent)
  443. mux.HandleFunc("/transaction", Transaction)
  444. mux.HandleFunc("/readrow", ReadRow)
  445. mux.HandleFunc("/findvalues", FindValues)
  446. mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })
  447. return mux
  448. }