fetch_row.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. // Package p contains an HTTP Cloud Function.
  2. package p
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "html"
  8. "io"
  9. "log"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "sync"
  14. "cloud.google.com/go/bigtable"
  15. )
  16. // client is a global Bigtable client, to avoid initializing a new client for
  17. // every request.
  18. var client *bigtable.Client
  19. var clientOnce sync.Once
  20. var columnFamilies = []string{"MessagePublication", "Signatures", "VAAState", "QuorumState"}
  21. type (
  22. MessagePub struct {
  23. InitiatingTxID string
  24. Payload []byte
  25. }
  26. Summary struct {
  27. Message MessagePub
  28. GuardianAddresses []string
  29. SignedVAA []byte
  30. QuorumTime string
  31. }
  32. )
  33. func makeSummary(row bigtable.Row) *Summary {
  34. summary := &Summary{}
  35. if _, ok := row[columnFamilies[0]]; ok {
  36. message := &MessagePub{}
  37. for _, item := range row[columnFamilies[0]] {
  38. switch item.Column {
  39. case "MessagePublication:InitiatingTxID":
  40. message.InitiatingTxID = string(item.Value)
  41. case "MessagePublication:Payload":
  42. message.Payload = item.Value
  43. }
  44. }
  45. summary.Message = *message
  46. }
  47. if _, ok := row[columnFamilies[1]]; ok {
  48. for _, item := range row[columnFamilies[1]] {
  49. column := strings.Split(item.Column, ":")
  50. summary.GuardianAddresses = append(summary.GuardianAddresses, column[1])
  51. }
  52. }
  53. if _, ok := row[columnFamilies[3]]; ok {
  54. for _, item := range row[columnFamilies[3]] {
  55. if item.Column == "QuorumState:SignedVAA" {
  56. summary.SignedVAA = item.Value
  57. summary.QuorumTime = item.Timestamp.Time().String()
  58. }
  59. }
  60. }
  61. return summary
  62. }
  63. func ReadRow(w http.ResponseWriter, r *http.Request) {
  64. // Set CORS headers for the preflight request
  65. if r.Method == http.MethodOptions {
  66. w.Header().Set("Access-Control-Allow-Origin", "*")
  67. w.Header().Set("Access-Control-Allow-Methods", "POST")
  68. w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  69. w.Header().Set("Access-Control-Max-Age", "3600")
  70. w.WriteHeader(http.StatusNoContent)
  71. return
  72. }
  73. // Set CORS headers for the main request.
  74. w.Header().Set("Access-Control-Allow-Origin", "*")
  75. var rowKey string
  76. // allow GET requests with querystring params, or POST requests with json body.
  77. switch r.Method {
  78. case http.MethodGet:
  79. queryParams := r.URL.Query()
  80. emitterChain := queryParams.Get("emitterChain")
  81. emitterAddress := queryParams.Get("emitterAddress")
  82. sequence := queryParams.Get("sequence")
  83. readyCheck := queryParams.Get("readyCheck")
  84. if readyCheck != "" {
  85. // for running in devnet
  86. w.WriteHeader(http.StatusOK)
  87. fmt.Fprint(w, html.EscapeString("ready"))
  88. return
  89. }
  90. // check for empty values
  91. if emitterChain == "" || emitterAddress == "" || sequence == "" {
  92. fmt.Fprint(w, "body values cannot be empty")
  93. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  94. return
  95. }
  96. rowKey = emitterChain + ":" + emitterAddress + ":" + sequence
  97. case http.MethodPost:
  98. // declare request body properties
  99. var d struct {
  100. EmitterChain string `json:"emitterChain"`
  101. EmitterAddress string `json:"emitterAddress"`
  102. Sequence string `json:"sequence"`
  103. }
  104. // deserialize request body
  105. if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
  106. switch err {
  107. case io.EOF:
  108. fmt.Fprint(w, "request body required")
  109. return
  110. default:
  111. log.Printf("json.NewDecoder: %v", err)
  112. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  113. return
  114. }
  115. }
  116. // check for empty values
  117. if d.EmitterChain == "" || d.EmitterAddress == "" || d.Sequence == "" {
  118. fmt.Fprint(w, "body values cannot be empty")
  119. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  120. return
  121. }
  122. rowKey = d.EmitterChain + ":" + d.EmitterAddress + ":" + d.Sequence
  123. default:
  124. http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
  125. log.Println("Method Not Allowed")
  126. return
  127. }
  128. clientOnce.Do(func() {
  129. // Declare a separate err variable to avoid shadowing client.
  130. var err error
  131. project := os.Getenv("GCP_PROJECT")
  132. client, err = bigtable.NewClient(context.Background(), project, "wormhole")
  133. if err != nil {
  134. http.Error(w, "Error initializing client", http.StatusInternalServerError)
  135. log.Printf("bigtable.NewClient: %v", err)
  136. return
  137. }
  138. })
  139. tbl := client.Open("v2Events")
  140. row, err := tbl.ReadRow(r.Context(), rowKey)
  141. if err != nil {
  142. http.Error(w, "Error reading rows", http.StatusInternalServerError)
  143. log.Printf("tbl.ReadRows(): %v", err)
  144. return
  145. }
  146. if row == nil {
  147. http.NotFound(w, r)
  148. log.Printf("did not find row for key %v", rowKey)
  149. return
  150. }
  151. summary := makeSummary(row)
  152. jsonBytes, err := json.Marshal(summary)
  153. if err != nil {
  154. w.WriteHeader(http.StatusInternalServerError)
  155. w.Write([]byte(err.Error()))
  156. log.Println(err.Error())
  157. return
  158. }
  159. w.WriteHeader(http.StatusOK)
  160. w.Write(jsonBytes)
  161. }