http.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package ccq
  2. import (
  3. "crypto/ecdsa"
  4. "encoding/hex"
  5. "encoding/json"
  6. "fmt"
  7. "math"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "time"
  12. "github.com/certusone/wormhole/node/pkg/common"
  13. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  14. "github.com/certusone/wormhole/node/pkg/query"
  15. "github.com/gorilla/mux"
  16. pubsub "github.com/libp2p/go-libp2p-pubsub"
  17. "go.uber.org/zap"
  18. "google.golang.org/protobuf/proto"
  19. )
  20. const MAX_BODY_SIZE = 5 * 1024 * 1024
  21. type queryRequest struct {
  22. Bytes string `json:"bytes"`
  23. Signature string `json:"signature"`
  24. }
  25. type queryResponse struct {
  26. Bytes string `json:"bytes"`
  27. Signatures []string `json:"signatures"`
  28. }
  29. type httpServer struct {
  30. topic *pubsub.Topic
  31. logger *zap.Logger
  32. env common.Environment
  33. permissions *Permissions
  34. signerKey *ecdsa.PrivateKey
  35. pendingResponses *PendingResponses
  36. loggingMap *LoggingMap
  37. }
  38. func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
  39. // Set CORS headers for all requests.
  40. w.Header().Set("Access-Control-Allow-Origin", "*")
  41. // Set CORS headers for the preflight request
  42. if r.Method == http.MethodOptions {
  43. w.Header().Set("Access-Control-Allow-Methods", "PUT, POST")
  44. w.Header().Set("Access-Control-Allow-Headers", "Content-Type, X-Api-Key")
  45. w.Header().Set("Access-Control-Max-Age", "3600")
  46. w.WriteHeader(http.StatusNoContent)
  47. return
  48. }
  49. start := time.Now()
  50. allQueryRequestsReceived.Inc()
  51. // Decode the body first. This is because the library seems to hang if we receive a large body and return without decoding it.
  52. // This could be a slight waste of resources, but should not be a DoS risk because we cap the max body size.
  53. var q queryRequest
  54. err := json.NewDecoder(http.MaxBytesReader(w, r.Body, MAX_BODY_SIZE)).Decode(&q)
  55. if err != nil {
  56. s.logger.Error("failed to decode body", zap.Error(err))
  57. http.Error(w, err.Error(), http.StatusBadRequest)
  58. invalidQueryRequestReceived.WithLabelValues("failed_to_decode_body").Inc()
  59. return
  60. }
  61. // There should be one and only one API key in the header.
  62. apiKeys, exists := r.Header["X-Api-Key"]
  63. if !exists || len(apiKeys) != 1 {
  64. s.logger.Error("received a request with the wrong number of api keys", zap.Stringer("url", r.URL), zap.Int("numApiKeys", len(apiKeys)))
  65. http.Error(w, "api key is missing", http.StatusUnauthorized)
  66. invalidQueryRequestReceived.WithLabelValues("missing_api_key").Inc()
  67. return
  68. }
  69. apiKey := strings.ToLower(apiKeys[0])
  70. // Make sure the user is authorized before we go any farther.
  71. permEntry, exists := s.permissions.GetUserEntry(apiKey)
  72. if !exists {
  73. s.logger.Error("invalid api key", zap.String("apiKey", apiKey))
  74. http.Error(w, "invalid api key", http.StatusForbidden)
  75. invalidQueryRequestReceived.WithLabelValues("invalid_api_key").Inc()
  76. return
  77. }
  78. if permEntry.rateLimiter != nil && !permEntry.rateLimiter.Allow() {
  79. s.logger.Debug("denying request due to rate limit", zap.String("userId", permEntry.userName))
  80. http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
  81. rateLimitExceededByUser.WithLabelValues(permEntry.userName).Inc()
  82. return
  83. }
  84. totalRequestsByUser.WithLabelValues(permEntry.userName).Inc()
  85. queryRequestBytes, err := hex.DecodeString(q.Bytes)
  86. if err != nil {
  87. s.logger.Error("failed to decode request bytes", zap.String("userId", permEntry.userName), zap.Error(err))
  88. http.Error(w, err.Error(), http.StatusBadRequest)
  89. invalidQueryRequestReceived.WithLabelValues("failed_to_decode_request").Inc()
  90. invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc()
  91. return
  92. }
  93. signature, err := hex.DecodeString(q.Signature)
  94. if err != nil {
  95. s.logger.Error("failed to decode signature bytes", zap.String("userId", permEntry.userName), zap.Error(err))
  96. http.Error(w, err.Error(), http.StatusBadRequest)
  97. invalidQueryRequestReceived.WithLabelValues("failed_to_decode_signature").Inc()
  98. invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc()
  99. return
  100. }
  101. signedQueryRequest := &gossipv1.SignedQueryRequest{
  102. QueryRequest: queryRequestBytes,
  103. Signature: signature,
  104. }
  105. status, queryReq, err := validateRequest(s.logger, s.env, s.permissions, s.signerKey, apiKey, signedQueryRequest)
  106. if err != nil {
  107. s.logger.Error("failed to validate request", zap.String("userId", permEntry.userName), zap.String("requestId", hex.EncodeToString(signedQueryRequest.Signature)), zap.Int("status", status), zap.Error(err))
  108. http.Error(w, err.Error(), status)
  109. // Error specific metric has already been pegged.
  110. invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc()
  111. return
  112. }
  113. requestId := hex.EncodeToString(signedQueryRequest.Signature)
  114. s.logger.Info("received request from client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
  115. m := gossipv1.GossipMessage{
  116. Message: &gossipv1.GossipMessage_SignedQueryRequest{
  117. SignedQueryRequest: signedQueryRequest,
  118. },
  119. }
  120. b, err := proto.Marshal(&m)
  121. if err != nil {
  122. s.logger.Error("failed to marshal gossip message", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err))
  123. http.Error(w, err.Error(), http.StatusInternalServerError)
  124. invalidQueryRequestReceived.WithLabelValues("failed_to_marshal_gossip_msg").Inc()
  125. invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc()
  126. return
  127. }
  128. pendingResponse := NewPendingResponse(signedQueryRequest, permEntry.userName, queryReq)
  129. added := s.pendingResponses.Add(pendingResponse)
  130. if !added {
  131. s.logger.Info("duplicate request", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
  132. http.Error(w, "Duplicate request", http.StatusBadRequest)
  133. invalidQueryRequestReceived.WithLabelValues("duplicate_request").Inc()
  134. invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc()
  135. return
  136. }
  137. if permEntry.logResponses {
  138. s.loggingMap.AddRequest(requestId)
  139. }
  140. s.logger.Info("posting request to gossip", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
  141. err = s.topic.Publish(r.Context(), b)
  142. if err != nil {
  143. s.logger.Error("failed to publish gossip message", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err))
  144. http.Error(w, err.Error(), http.StatusInternalServerError)
  145. invalidQueryRequestReceived.WithLabelValues("failed_to_publish_gossip_msg").Inc()
  146. invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc()
  147. s.pendingResponses.Remove(pendingResponse)
  148. return
  149. }
  150. // Wait for the response or timeout
  151. outer:
  152. select {
  153. case <-time.After(query.RequestTimeout + 5*time.Second):
  154. maxMatchingResponses, outstandingResponses, quorum := pendingResponse.getStats()
  155. s.logger.Info("publishing time out to client",
  156. zap.String("userId", permEntry.userName),
  157. zap.String("requestId", requestId),
  158. zap.Int("maxMatchingResponses", maxMatchingResponses),
  159. zap.Int("outstandingResponses", outstandingResponses),
  160. zap.Int("quorum", quorum),
  161. )
  162. http.Error(w, "Timed out waiting for response", http.StatusGatewayTimeout)
  163. queryTimeoutsByUser.WithLabelValues(permEntry.userName).Inc()
  164. failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
  165. case res := <-pendingResponse.ch:
  166. s.logger.Info("publishing response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
  167. resBytes, err := res.Response.Marshal()
  168. if err != nil {
  169. s.logger.Error("failed to marshal response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err))
  170. http.Error(w, err.Error(), http.StatusInternalServerError)
  171. invalidQueryRequestReceived.WithLabelValues("failed_to_marshal_response").Inc()
  172. failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
  173. break
  174. }
  175. // Signature indices must be ascending for on-chain verification
  176. sort.Slice(res.Signatures, func(i, j int) bool {
  177. return res.Signatures[i].Index < res.Signatures[j].Index
  178. })
  179. signatures := make([]string, 0, len(res.Signatures))
  180. for _, sig := range res.Signatures {
  181. if sig.Index > math.MaxUint8 {
  182. s.logger.Error("Signature index out of bounds", zap.Int("sig.Index", sig.Index))
  183. http.Error(w, err.Error(), http.StatusInternalServerError)
  184. invalidQueryRequestReceived.WithLabelValues("failed_to_marshal_response").Inc()
  185. failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
  186. break outer
  187. }
  188. // ECDSA signature + a byte for the index of the guardian in the guardian set
  189. signature := fmt.Sprintf("%s%02x", sig.Signature, uint8(sig.Index)) // #nosec G115 -- This is validated above
  190. signatures = append(signatures, signature)
  191. }
  192. w.Header().Add("Content-Type", "application/json")
  193. err = json.NewEncoder(w).Encode(&queryResponse{
  194. Signatures: signatures,
  195. Bytes: hex.EncodeToString(resBytes),
  196. })
  197. if err != nil {
  198. s.logger.Error("failed to encode response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err))
  199. http.Error(w, err.Error(), http.StatusInternalServerError)
  200. invalidQueryRequestReceived.WithLabelValues("failed_to_encode_response").Inc()
  201. failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
  202. break
  203. }
  204. successfulQueriesByUser.WithLabelValues(permEntry.userName).Inc()
  205. case errEntry := <-pendingResponse.errCh:
  206. s.logger.Info("publishing error response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Int("status", errEntry.status), zap.Error(errEntry.err))
  207. http.Error(w, errEntry.err.Error(), errEntry.status)
  208. // Metrics have already been pegged.
  209. break
  210. }
  211. totalQueryTime.Observe(float64(time.Since(start).Milliseconds()))
  212. validQueryRequestsReceived.Inc()
  213. s.pendingResponses.Remove(pendingResponse)
  214. }
  215. func NewHTTPServer(addr string, t *pubsub.Topic, permissions *Permissions, signerKey *ecdsa.PrivateKey, p *PendingResponses, logger *zap.Logger, env common.Environment, loggingMap *LoggingMap) *http.Server {
  216. s := &httpServer{
  217. topic: t,
  218. permissions: permissions,
  219. signerKey: signerKey,
  220. pendingResponses: p,
  221. logger: logger,
  222. env: env,
  223. loggingMap: loggingMap,
  224. }
  225. r := mux.NewRouter()
  226. r.HandleFunc("/v1/query", s.handleQuery).Methods("PUT", "POST", "OPTIONS")
  227. return &http.Server{
  228. Addr: addr,
  229. Handler: r,
  230. ReadHeaderTimeout: 5 * time.Second,
  231. }
  232. }