shared.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package p
  2. import (
  3. "context"
  4. "log"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "sync"
  9. "cloud.google.com/go/bigtable"
  10. "cloud.google.com/go/pubsub"
  11. "github.com/certusone/wormhole/node/pkg/vaa"
  12. )
  13. // shared code for the various functions, primarily response formatting.
  14. // client is a global Bigtable client, to avoid initializing a new client for
  15. // every request.
  16. var client *bigtable.Client
  17. var clientOnce sync.Once
  18. var tbl *bigtable.Table
  19. var pubsubClient *pubsub.Client
  20. var pubSubTokenTransferDetailsTopic *pubsub.Topic
  21. var coinGeckoCoins = map[string][]CoinGeckoCoin{}
  22. var solanaTokens = map[string]SolanaToken{}
  23. // init runs during cloud function initialization. So, this will only run during an
  24. // an instance's cold start.
  25. // https://cloud.google.com/functions/docs/bestpractices/networking#accessing_google_apis
  26. func init() {
  27. clientOnce.Do(func() {
  28. // Declare a separate err variable to avoid shadowing client.
  29. var err error
  30. project := os.Getenv("GCP_PROJECT")
  31. instance := os.Getenv("BIGTABLE_INSTANCE")
  32. client, err = bigtable.NewClient(context.Background(), project, instance)
  33. if err != nil {
  34. // http.Error(w, "Error initializing client", http.StatusInternalServerError)
  35. log.Printf("bigtable.NewClient error: %v", err)
  36. return
  37. }
  38. var pubsubErr error
  39. pubsubClient, pubsubErr = pubsub.NewClient(context.Background(), project)
  40. if pubsubErr != nil {
  41. log.Printf("pubsub.NewClient error: %v", pubsubErr)
  42. return
  43. }
  44. })
  45. tbl = client.Open("v2Events")
  46. // create the topic that will be published to after decoding token transfer payloads
  47. tokenTransferDetailsTopic := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC")
  48. if tokenTransferDetailsTopic != "" {
  49. pubSubTokenTransferDetailsTopic = pubsubClient.Topic(tokenTransferDetailsTopic)
  50. // fetch the token lists once at start up
  51. coinGeckoCoins = fetchCoinGeckoCoins()
  52. solanaTokens = fetchSolanaTokenList()
  53. }
  54. }
  55. var columnFamilies = []string{
  56. "MessagePublication",
  57. "QuorumState",
  58. "TokenTransferPayload",
  59. "AssetMetaPayload",
  60. "NFTTransferPayload",
  61. "TokenTransferDetails",
  62. "ChainDetails",
  63. }
  64. type (
  65. Summary struct {
  66. EmitterChain string
  67. EmitterAddress string
  68. Sequence string
  69. InitiatingTxID string
  70. Payload []byte
  71. SignedVAABytes []byte
  72. QuorumTime string
  73. }
  74. // Details is a Summary, with the VAA decoded as SignedVAA
  75. Details struct {
  76. SignedVAA *vaa.VAA
  77. EmitterChain string
  78. EmitterAddress string
  79. Sequence string
  80. InitiatingTxID string
  81. Payload []byte
  82. SignedVAABytes []byte
  83. QuorumTime string
  84. }
  85. )
  86. func chainIdStringToType(chainId string) vaa.ChainID {
  87. switch chainId {
  88. case "1":
  89. return vaa.ChainIDSolana
  90. case "2":
  91. return vaa.ChainIDEthereum
  92. case "3":
  93. return vaa.ChainIDTerra
  94. case "4":
  95. return vaa.ChainIDBSC
  96. case "5":
  97. return vaa.ChainIDPolygon
  98. }
  99. return vaa.ChainIDUnset
  100. }
  101. func makeSummary(row bigtable.Row) *Summary {
  102. summary := &Summary{}
  103. if _, ok := row[columnFamilies[0]]; ok {
  104. for _, item := range row[columnFamilies[0]] {
  105. switch item.Column {
  106. case "MessagePublication:InitiatingTxID":
  107. summary.InitiatingTxID = string(item.Value)
  108. case "MessagePublication:Payload":
  109. summary.Payload = item.Value
  110. case "MessagePublication:EmitterChain":
  111. summary.EmitterChain = string(item.Value)
  112. case "MessagePublication:EmitterAddress":
  113. summary.EmitterAddress = string(item.Value)
  114. case "MessagePublication:Sequence":
  115. summary.Sequence = string(item.Value)
  116. }
  117. }
  118. } else {
  119. // Some rows have a QuorumState, but no MessagePublication,
  120. // so populate Summary values from the rowKey.
  121. keyParts := strings.Split(row.Key(), ":")
  122. chainId := chainIdStringToType(keyParts[0])
  123. summary.EmitterChain = chainId.String()
  124. summary.EmitterAddress = keyParts[1]
  125. seq := strings.TrimLeft(keyParts[2], "0")
  126. if seq == "" {
  127. seq = "0"
  128. }
  129. summary.Sequence = seq
  130. }
  131. if _, ok := row[columnFamilies[1]]; ok {
  132. item := row[columnFamilies[1]][0]
  133. summary.SignedVAABytes = item.Value
  134. summary.QuorumTime = item.Timestamp.Time().String()
  135. }
  136. return summary
  137. }
  138. func makeDetails(row bigtable.Row) *Details {
  139. sum := makeSummary(row)
  140. deets := &Details{
  141. EmitterChain: sum.EmitterChain,
  142. EmitterAddress: sum.EmitterAddress,
  143. Sequence: sum.Sequence,
  144. InitiatingTxID: sum.InitiatingTxID,
  145. Payload: sum.Payload,
  146. SignedVAABytes: sum.SignedVAABytes,
  147. QuorumTime: sum.QuorumTime,
  148. }
  149. if _, ok := row[columnFamilies[1]]; ok {
  150. item := row[columnFamilies[1]][0]
  151. deets.SignedVAA, _ = vaa.Unmarshal(item.Value)
  152. }
  153. return deets
  154. }
  155. var mux = newMux()
  156. // Entry is the cloud function entry point
  157. func Entry(w http.ResponseWriter, r *http.Request) {
  158. mux.ServeHTTP(w, r)
  159. }
  160. func newMux() *http.ServeMux {
  161. mux := http.NewServeMux()
  162. mux.HandleFunc("/totals", Totals)
  163. mux.HandleFunc("/recent", Recent)
  164. mux.HandleFunc("/transaction", Transaction)
  165. mux.HandleFunc("/readrow", ReadRow)
  166. mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })
  167. return mux
  168. }