Ver código fonte

Node/CCQServer: log responses (#3752)

bruce-riley 1 ano atrás
pai
commit
e98bb64436

+ 7 - 1
node/cmd/ccq/http.go

@@ -38,6 +38,7 @@ type httpServer struct {
 	permissions      *Permissions
 	signerKey        *ecdsa.PrivateKey
 	pendingResponses *PendingResponses
+	loggingMap       *LoggingMap
 }
 
 func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
@@ -147,6 +148,10 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
+	if permEntry.logResponses {
+		s.loggingMap.AddRequest(requestId)
+	}
+
 	s.logger.Info("posting request to gossip", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
 	err = s.topic.Publish(r.Context(), b)
 	if err != nil {
@@ -202,7 +207,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
 	s.pendingResponses.Remove(pendingResponse)
 }
 
-func NewHTTPServer(addr string, t *pubsub.Topic, permissions *Permissions, signerKey *ecdsa.PrivateKey, p *PendingResponses, logger *zap.Logger, env common.Environment) *http.Server {
+func NewHTTPServer(addr string, t *pubsub.Topic, permissions *Permissions, signerKey *ecdsa.PrivateKey, p *PendingResponses, logger *zap.Logger, env common.Environment, loggingMap *LoggingMap) *http.Server {
 	s := &httpServer{
 		topic:            t,
 		permissions:      permissions,
@@ -210,6 +215,7 @@ func NewHTTPServer(addr string, t *pubsub.Topic, permissions *Permissions, signe
 		pendingResponses: p,
 		logger:           logger,
 		env:              env,
+		loggingMap:       loggingMap,
 	}
 	r := mux.NewRouter()
 	r.HandleFunc("/v1/query", s.handleQuery).Methods("PUT", "POST", "OPTIONS")

+ 69 - 0
node/cmd/ccq/loggingMap.go

@@ -0,0 +1,69 @@
+package ccq
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	"go.uber.org/zap"
+)
+
+// LoggingMap is used to track the requests for which we should log response. It contains a map keyed by the request signature
+// where the payload is time the request was received. Requests will be removed from the map after two minutes.
+type LoggingMap struct {
+	loggingLock sync.Mutex
+	loggingMap  map[string]time.Time
+}
+
+// NewLoggingMap creates the map used to track requests for which we should log responses.
+func NewLoggingMap() *LoggingMap {
+	return &LoggingMap{
+		loggingMap: make(map[string]time.Time),
+	}
+}
+
+// Start starts a go routine to clean up requests that have been in the map for two minutes.
+func (lm *LoggingMap) Start(ctx context.Context, logger *zap.Logger, errC chan error) {
+	common.RunWithScissors(ctx, errC, "logging_cleanup", func(ctx context.Context) error {
+		ticker := time.NewTicker(1 * time.Minute)
+		defer ticker.Stop()
+
+		for {
+			select {
+			case <-ctx.Done():
+				return nil
+			case <-ticker.C:
+				lm.CleanUp(logger)
+			}
+		}
+	})
+}
+
+// CleanUp iterates over the map and removes all entries that are more than two minutes old.
+func (lm *LoggingMap) CleanUp(logger *zap.Logger) {
+	lm.loggingLock.Lock()
+	defer lm.loggingLock.Unlock()
+	for requestId, cleanUpTime := range lm.loggingMap {
+		if time.Now().After(cleanUpTime) {
+			delete(lm.loggingMap, requestId)
+		}
+	}
+}
+
+// AddRequest adds a request to the map, giving it an expiration time two minutes into the future.
+func (lm *LoggingMap) AddRequest(requestSignature string) {
+	lm.loggingLock.Lock()
+	defer lm.loggingLock.Unlock()
+	lm.loggingMap[requestSignature] = time.Now().Add(2 * time.Minute)
+}
+
+// ShouldLogResponse returns true if the request is in the map.
+func (lm *LoggingMap) ShouldLogResponse(requestSignature string) bool {
+	lm.loggingLock.Lock()
+	defer lm.loggingLock.Unlock()
+	if _, exists := lm.loggingMap[requestSignature]; exists {
+		return true
+	}
+	return false
+}

+ 4 - 1
node/cmd/ccq/p2p.go

@@ -37,7 +37,7 @@ type P2PSub struct {
 	host       host.Host
 }
 
-func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, bootstrapPeers, ethRpcUrl, ethCoreAddr string, pendingResponses *PendingResponses, logger *zap.Logger, monitorPeers bool) (*P2PSub, error) {
+func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, bootstrapPeers, ethRpcUrl, ethCoreAddr string, pendingResponses *PendingResponses, logger *zap.Logger, monitorPeers bool, loggingMap *LoggingMap) (*P2PSub, error) {
 	// p2p setup
 	components := p2p.DefaultComponents()
 	components.Port = port
@@ -168,6 +168,9 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
 				}
 				requestSignature := hex.EncodeToString(queryResponse.Request.Signature)
 				logger.Info("query response received from gossip", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
+				if loggingMap.ShouldLogResponse(requestSignature) {
+					logger.Info("logging response", zap.Any("requestId", requestSignature), zap.Any("response", queryResponse))
+				}
 				// Check that we're handling the request for this response
 				pendingResponse := pendingResponses.Get(requestSignature)
 				if pendingResponse == nil {

+ 3 - 0
node/cmd/ccq/permissions.go

@@ -28,6 +28,7 @@ type (
 		UserName      string        `json:"userName"`
 		ApiKey        string        `json:"apiKey"`
 		AllowUnsigned bool          `json:"allowUnsigned"`
+		LogResponses  bool          `json:"logResponses"`
 		AllowedCalls  []AllowedCall `json:"allowedCalls"`
 	}
 
@@ -67,6 +68,7 @@ type (
 		userName      string
 		apiKey        string
 		allowUnsigned bool
+		logResponses  bool
 		allowedCalls  allowedCallsForUser // Key is something like "ethCall:2:000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d6:06fdde03"
 	}
 
@@ -267,6 +269,7 @@ func parseConfig(byteValue []byte) (PermissionsMap, error) {
 			userName:      user.UserName,
 			apiKey:        apiKey,
 			allowUnsigned: user.AllowUnsigned,
+			logResponses:  user.LogResponses,
 			allowedCalls:  allowedCalls,
 		}
 

+ 7 - 2
node/cmd/ccq/query_server.go

@@ -148,6 +148,8 @@ func runQueryServer(cmd *cobra.Command, args []string) {
 		logger.Fatal("Failed to load permissions file", zap.String("permFile", *permFile), zap.Error(err))
 	}
 
+	loggingMap := NewLoggingMap()
+
 	// Load p2p private key
 	var priv crypto.PrivKey
 	priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath)
@@ -170,14 +172,14 @@ func runQueryServer(cmd *cobra.Command, args []string) {
 
 	// Run p2p
 	pendingResponses := NewPendingResponses()
-	p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers)
+	p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap)
 	if err != nil {
 		logger.Fatal("Failed to start p2p", zap.Error(err))
 	}
 
 	// Start the HTTP server
 	go func() {
-		s := NewHTTPServer(*listenAddr, p2p.topic_req, permissions, signerKey, pendingResponses, logger, env)
+		s := NewHTTPServer(*listenAddr, p2p.topic_req, permissions, signerKey, pendingResponses, logger, env, loggingMap)
 		logger.Sugar().Infof("Server listening on %s", *listenAddr)
 		err := s.ListenAndServe()
 		if err != nil && err != http.ErrServerClosed {
@@ -249,6 +251,9 @@ func runQueryServer(cmd *cobra.Command, args []string) {
 	errC := make(chan error)
 	permissions.StartWatcher(ctx, logger, errC)
 
+	// Star logging cleanup process.
+	loggingMap.Start(ctx, logger, errC)
+
 	// Wait for either a shutdown or a fatal error from the permissions watcher.
 	select {
 	case <-ctx.Done():