grpc.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package common
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
  7. grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
  8. grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
  9. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  10. "go.uber.org/zap"
  11. "go.uber.org/zap/zapcore"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/metadata"
  14. "google.golang.org/protobuf/encoding/protojson"
  15. "google.golang.org/protobuf/reflect/protoreflect"
  16. )
  17. type GrpcLogDetail string
  18. // initMutex is used during initialization to prevent concurrent writes to the prometheus registry.
  19. // This is only relevant during testing of node/node.go where multiple guardians are created in the same process.
  20. var initMutex sync.Mutex
  21. const (
  22. GrpcLogDetailNone GrpcLogDetail = "none"
  23. GrpcLogDetailMinimal GrpcLogDetail = "minimal"
  24. GrpcLogDetailFull GrpcLogDetail = "full"
  25. )
  26. func truncateStr(str string, maxLen int) string {
  27. if len(str) > maxLen {
  28. return str[:maxLen] + "..."
  29. }
  30. return str
  31. }
  32. func addDetail(ctx context.Context, logDetail GrpcLogDetail) {
  33. if logDetail == GrpcLogDetailNone {
  34. return
  35. }
  36. if md, ok := metadata.FromIncomingContext(ctx); ok {
  37. tags := grpc_ctxtags.Extract(ctx)
  38. tags.Set("x-forwarded-for", md.Get("x-forwarded-for"))
  39. if logDetail == GrpcLogDetailFull {
  40. if len(md.Get("grpcgateway-user-agent")) > 0 {
  41. tags.Set("user-agent", truncateStr(md.Get("grpcgateway-user-agent")[0], 200))
  42. }
  43. }
  44. }
  45. }
  46. // newMetadataStreamServerInterceptor returns stream interceptor that
  47. // adds the `x-forwarded-for` and, if logDetail == "full", the `user-agent` metadata as a tag to cause it to be logged by grpc_zap.StreamServerInterceptor().
  48. // Note that `x-forwarded-for` can only be trusted if the latest hop in the proxy chain is trusted.
  49. // For JSON-Web requests, the latest hop is the guardian itself (`grpc-gateway`), which is listening on TCP and forwarding to the gRPC publicrpc UNIX socket.
  50. // This can be identified by `"peer.address": "@"` in the logs and `grpc-gateway` correctly sets the `x-forwarded-for` metadata.
  51. func newMetadataStreamServerInterceptor(logDetail GrpcLogDetail) func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  52. return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  53. ctx := stream.Context()
  54. addDetail(ctx, logDetail)
  55. return handler(srv, stream)
  56. }
  57. }
  58. // newMetadataServerInterceptor returns a unary interceptor that
  59. // adds the `x-forwarded-for` and, if logDetail == "full", the `user-agent` metadata as a tag to cause it to be logged by grpc_zap.StreamServerInterceptor().
  60. // Note that `x-forwarded-for` can only be trusted if the latest hop in the proxy chain is trusted.
  61. // For JSON-Web requests, the latest hop is the guardian itself (`grpc-gateway`), which is listening on TCP and forwarding to the gRPC publicrpc UNIX socket.
  62. // This can be identified by `"peer.address": "@"` in the logs and `grpc-gateway` correctly sets the `x-forwarded-for` metadata.
  63. func newMetadataServerInterceptor(logDetail GrpcLogDetail) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  64. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  65. addDetail(ctx, logDetail)
  66. return handler(ctx, req)
  67. }
  68. }
  69. func requestPayloadServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  70. if p, ok := req.(protoreflect.ProtoMessage); ok {
  71. if b, err := protojson.Marshal(p); err == nil {
  72. if len(b) <= 200 {
  73. grpc_ctxtags.Extract(ctx).Set("grpc.requestbody", &protojsonObjectMarshaler{pb: p})
  74. } else {
  75. grpc_ctxtags.Extract(ctx).Set("grpc.requestbody", "too long")
  76. }
  77. }
  78. }
  79. return handler(ctx, req)
  80. }
  81. func NewInstrumentedGRPCServer(logger *zap.Logger, rpcLogDetail GrpcLogDetail) *grpc.Server {
  82. initMutex.Lock()
  83. defer initMutex.Unlock()
  84. streamInterceptors := []grpc.StreamServerInterceptor{
  85. grpc_ctxtags.StreamServerInterceptor(),
  86. grpc_prometheus.StreamServerInterceptor,
  87. }
  88. unaryInterceptors := []grpc.UnaryServerInterceptor{
  89. grpc_ctxtags.UnaryServerInterceptor(),
  90. grpc_prometheus.UnaryServerInterceptor,
  91. }
  92. if rpcLogDetail != GrpcLogDetailNone {
  93. logger = logger.With(zap.Bool("_privateLogEntry", true))
  94. streamInterceptors = append(streamInterceptors,
  95. newMetadataStreamServerInterceptor(rpcLogDetail),
  96. grpc_zap.StreamServerInterceptor(logger),
  97. )
  98. // if logging detail is "full", also log the request payload (only applicable to unary)
  99. if rpcLogDetail == GrpcLogDetailFull {
  100. unaryInterceptors = append(unaryInterceptors, requestPayloadServerInterceptor)
  101. }
  102. unaryInterceptors = append(unaryInterceptors,
  103. newMetadataServerInterceptor(rpcLogDetail),
  104. grpc_zap.UnaryServerInterceptor(logger),
  105. )
  106. }
  107. server := grpc.NewServer(
  108. grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
  109. grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
  110. )
  111. grpc_prometheus.EnableHandlingTimeHistogram()
  112. grpc_prometheus.Register(server)
  113. return server
  114. }
  115. // this helper type and associated functions are such that the ZAP jsonEncoder will properly encode the gRPC request payload.
  116. // We could instead just encode the payload to a string here, but then that string will be encoded again by the ZAP jsonEncoder, making downstream processing more difficult
  117. type protojsonObjectMarshaler struct {
  118. pb protoreflect.ProtoMessage
  119. }
  120. func (j *protojsonObjectMarshaler) MarshalLogObject(e zapcore.ObjectEncoder) error {
  121. // ZAP jsonEncoder deals with AddReflect by using json.MarshalObject. The same thing applies for consoleEncoder.
  122. return e.AddReflected("msg", j)
  123. }
  124. func (j *protojsonObjectMarshaler) MarshalJSON() ([]byte, error) {
  125. b, err := protojson.Marshal(j.pb)
  126. if err != nil {
  127. return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
  128. }
  129. return b, nil
  130. }