shared.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package p
  2. import (
  3. "context"
  4. "log"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "sync"
  9. "cloud.google.com/go/bigtable"
  10. "cloud.google.com/go/pubsub"
  11. "github.com/certusone/wormhole/node/pkg/vaa"
  12. )
  13. // shared code for the various functions, primarily response formatting.
  14. // client is a global Bigtable client, to avoid initializing a new client for
  15. // every request.
  16. var client *bigtable.Client
  17. var clientOnce sync.Once
  18. var tbl *bigtable.Table
  19. var pubsubClient *pubsub.Client
  20. var pubSubTokenTransferDetailsTopic *pubsub.Topic
  21. // init runs during cloud function initialization. So, this will only run during an
  22. // an instance's cold start.
  23. // https://cloud.google.com/functions/docs/bestpractices/networking#accessing_google_apis
  24. func init() {
  25. clientOnce.Do(func() {
  26. // Declare a separate err variable to avoid shadowing client.
  27. var err error
  28. project := os.Getenv("GCP_PROJECT")
  29. instance := os.Getenv("BIGTABLE_INSTANCE")
  30. client, err = bigtable.NewClient(context.Background(), project, instance)
  31. if err != nil {
  32. // http.Error(w, "Error initializing client", http.StatusInternalServerError)
  33. log.Printf("bigtable.NewClient error: %v", err)
  34. return
  35. }
  36. var pubsubErr error
  37. pubsubClient, pubsubErr = pubsub.NewClient(context.Background(), project)
  38. if pubsubErr != nil {
  39. log.Printf("pubsub.NewClient error: %v", pubsubErr)
  40. return
  41. }
  42. })
  43. tbl = client.Open("v2Events")
  44. // create the topic that will be published to after decoding token transfer payloads
  45. tokenTransferDetailsTopic := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC")
  46. pubSubTokenTransferDetailsTopic = pubsubClient.Topic(tokenTransferDetailsTopic)
  47. }
  48. var columnFamilies = []string{
  49. "MessagePublication",
  50. "QuorumState",
  51. "TokenTransferPayload",
  52. "AssetMetaPayload",
  53. "NFTTransferPayload",
  54. "TokenTransferDetails",
  55. "ChainDetails",
  56. }
  57. type (
  58. Summary struct {
  59. EmitterChain string
  60. EmitterAddress string
  61. Sequence string
  62. InitiatingTxID string
  63. Payload []byte
  64. SignedVAABytes []byte
  65. QuorumTime string
  66. }
  67. // Details is a Summary, with the VAA decoded as SignedVAA
  68. Details struct {
  69. SignedVAA *vaa.VAA
  70. EmitterChain string
  71. EmitterAddress string
  72. Sequence string
  73. InitiatingTxID string
  74. Payload []byte
  75. SignedVAABytes []byte
  76. QuorumTime string
  77. }
  78. )
  79. func chainIdStringToType(chainId string) vaa.ChainID {
  80. switch chainId {
  81. case "1":
  82. return vaa.ChainIDSolana
  83. case "2":
  84. return vaa.ChainIDEthereum
  85. case "3":
  86. return vaa.ChainIDTerra
  87. case "4":
  88. return vaa.ChainIDBSC
  89. case "5":
  90. return vaa.ChainIDPolygon
  91. }
  92. return vaa.ChainIDUnset
  93. }
  94. func makeSummary(row bigtable.Row) *Summary {
  95. summary := &Summary{}
  96. if _, ok := row[columnFamilies[0]]; ok {
  97. for _, item := range row[columnFamilies[0]] {
  98. switch item.Column {
  99. case "MessagePublication:InitiatingTxID":
  100. summary.InitiatingTxID = string(item.Value)
  101. case "MessagePublication:Payload":
  102. summary.Payload = item.Value
  103. case "MessagePublication:EmitterChain":
  104. summary.EmitterChain = string(item.Value)
  105. case "MessagePublication:EmitterAddress":
  106. summary.EmitterAddress = string(item.Value)
  107. case "MessagePublication:Sequence":
  108. summary.Sequence = string(item.Value)
  109. }
  110. }
  111. } else {
  112. // Some rows have a QuorumState, but no MessagePublication,
  113. // so populate Summary values from the rowKey.
  114. keyParts := strings.Split(row.Key(), ":")
  115. chainId := chainIdStringToType(keyParts[0])
  116. summary.EmitterChain = chainId.String()
  117. summary.EmitterAddress = keyParts[1]
  118. seq := strings.TrimLeft(keyParts[2], "0")
  119. if seq == "" {
  120. seq = "0"
  121. }
  122. summary.Sequence = seq
  123. }
  124. if _, ok := row[columnFamilies[1]]; ok {
  125. item := row[columnFamilies[1]][0]
  126. summary.SignedVAABytes = item.Value
  127. summary.QuorumTime = item.Timestamp.Time().String()
  128. }
  129. return summary
  130. }
  131. func makeDetails(row bigtable.Row) *Details {
  132. sum := makeSummary(row)
  133. deets := &Details{
  134. EmitterChain: sum.EmitterChain,
  135. EmitterAddress: sum.EmitterAddress,
  136. Sequence: sum.Sequence,
  137. InitiatingTxID: sum.InitiatingTxID,
  138. Payload: sum.Payload,
  139. SignedVAABytes: sum.SignedVAABytes,
  140. QuorumTime: sum.QuorumTime,
  141. }
  142. if _, ok := row[columnFamilies[1]]; ok {
  143. item := row[columnFamilies[1]][0]
  144. deets.SignedVAA, _ = vaa.Unmarshal(item.Value)
  145. }
  146. return deets
  147. }
  148. var mux = newMux()
  149. // Entry is the cloud function entry point
  150. func Entry(w http.ResponseWriter, r *http.Request) {
  151. mux.ServeHTTP(w, r)
  152. }
  153. func newMux() *http.ServeMux {
  154. mux := http.NewServeMux()
  155. mux.HandleFunc("/totals", Totals)
  156. mux.HandleFunc("/recent", Recent)
  157. mux.HandleFunc("/transaction", Transaction)
  158. mux.HandleFunc("/readrow", ReadRow)
  159. mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })
  160. return mux
  161. }