Преглед на файлове

Node: Buffer log messages (#4294)

* Node: Buffer log messages

* Should shutdown the new go routine

* Go back to using a delay to get shutdown messages

* Redesign shutdown

* Tweak shutdown logic

* Remove wormhole_loki_write_failed metric

* Code review rework
bruce-riley преди 8 месеца
родител
ревизия
60e2b6b658
променени са 3 файла, в които са добавени 169 реда и са изтрити 16 реда
  1. 165 11
      node/pkg/telemetry/loki.go
  2. 3 3
      node/pkg/telemetry/telemetry.go
  3. 1 2
      node/pkg/telemetry/telemetry_test.go

+ 165 - 11
node/pkg/telemetry/loki.go

@@ -7,6 +7,7 @@ package telemetry
 import (
 	"context"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"time"
 
@@ -28,6 +29,14 @@ import (
 	"github.com/prometheus/common/model"
 )
 
+const (
+	// bufferSize specifies how many log messages can be queued up locally before we start dropping them.
+	bufferSize = 1000
+
+	// clientTimeout is how long we are willing to wait for Loki on shutdown. Note that this is an UPPER LIMIT. In the sunny day scenario, we won't need to wait.
+	clientTimeout = 250 * time.Millisecond
+)
+
 // ExternalLoggerLoki implements ExternalLogger for the Grafana Loki cloud logging.
 type ExternalLoggerLoki struct {
 	// c is the promtail client.
@@ -38,6 +47,17 @@ type ExternalLoggerLoki struct {
 
 	// localLogger is the zap localLogger used to log errors generated by the loki adapter. It does not use telemetry.
 	localLogger *zap.Logger
+
+	// bufferedChan is used to buffer log messages so that the app does not block on Loki. The Loki internal channel is unbuffered, so we
+	// write things to this channel. If this write would block, we peg a metric and drop the log message (although it still gets logged locally).
+	// There is then a worker routine picking messages off of this local channel and writing them to the Loki channel in a blocking manner.
+	bufferedChan chan api.Entry
+
+	// cancelWorker is used to cancel the worker routine on shutdown.
+	cancelWorker context.CancelFunc
+
+	// workerExitedC is used by the worker to signal that it has exited.
+	workerExitedC chan struct{}
 }
 
 var (
@@ -54,7 +74,7 @@ var (
 		})
 )
 
-func (logger *ExternalLoggerLoki) log(time time.Time, message json.RawMessage, level zapcore.Level) {
+func (logger *ExternalLoggerLoki) log(ts time.Time, message json.RawMessage, level zapcore.Level) {
 	lokiLabels := logger.labels[level]
 
 	bytes, err := message.MarshalJSON()
@@ -64,7 +84,7 @@ func (logger *ExternalLoggerLoki) log(time time.Time, message json.RawMessage, l
 	}
 	entry := api.Entry{
 		Entry: logproto.Entry{
-			Timestamp: time,
+			Timestamp: ts,
 			Line:      string(bytes),
 		},
 
@@ -72,21 +92,21 @@ func (logger *ExternalLoggerLoki) log(time time.Time, message json.RawMessage, l
 	}
 
 	select {
-	case logger.c.Chan() <- entry:
+	case logger.bufferedChan <- entry:
 		lokiMessagesSent.Inc()
 	default:
 		lokiMessagesDropped.Inc()
 	}
 
-	// A fatal error exits, which can cause us to lose messages. Flush everything.
+	// A fatal error exits, which can cause us to lose messages. Shut down the worker so it will flush the logs.
 	if level == zapcore.FatalLevel {
-		logger.c.StopNow()
+		logger.stopWorkerWithTimeout()
 	}
 }
 
-func (logger *ExternalLoggerLoki) close() error {
-	logger.c.Stop()
-	return nil
+func (logger *ExternalLoggerLoki) close() {
+	// Shut down the worker and wait for it to exit. It has a timeout so we won't wait forever.
+	logger.stopWorkerWithTimeout()
 }
 
 // NewLokiCloudLogger creates a new Telemetry logger using Grafana Loki Cloud Logging.
@@ -142,15 +162,149 @@ func NewLokiCloudLogger(ctx context.Context, logger *zap.Logger, url string, pro
 		lokiLabels[level] = levLabels
 	}
 
+	// Create a buffered channel so the application does not block in the logger.
+	bufferedChan := make(chan api.Entry, bufferSize)
+
+	// Create a local context with a cancel function so we can signal our worker to shutdown when the time comes.
+	// Cancelling the worker also closes the Loki client.
+	workerContext, cancelWorker := context.WithCancel(ctx)
+
+	// Create a channel used by the worker to signal that it has exited.
+	workerExitedC := make(chan struct{}, 1)
+
+	// Kick off the worker to read from the local buffered channel and write to the Loki unbuffered channel.
+	go logWriter(workerContext, localLogger, bufferedChan, workerExitedC, c)
+
 	return &Telemetry{
 		encoder: &guardianTelemetryEncoder{
 			Encoder: zapcore.NewJSONEncoder(zapdriver.NewProductionEncoderConfig()),
 			logger: &ExternalLoggerLoki{
-				c:           c,
-				labels:      lokiLabels,
-				localLogger: localLogger,
+				c:             c,
+				labels:        lokiLabels,
+				localLogger:   localLogger,
+				bufferedChan:  bufferedChan,
+				cancelWorker:  cancelWorker,
+				workerExitedC: workerExitedC,
 			},
 			skipPrivateLogs: skipPrivateLogs,
 		},
 	}, nil
 }
+
+// logWriter is the go routine that takes log messages off the buffered channel and posts them to the Loki client. It can block
+// on the Loki client until our context is canceled, meaning we are shutting down. On shutdown, it tries to flush buffered messages
+// and shutdown the Loki client using a timeout for both actions.
+func logWriter(ctx context.Context, logger *zap.Logger, localC chan api.Entry, workerExitedC chan struct{}, c client.Client) {
+	// pendingEntry is used to save the last log message if the write to Loki is interrupted by the context being canceled. We will attempt to flush it.
+	var pendingEntry *api.Entry
+
+	for {
+		select {
+		case entry, ok := <-localC:
+			if !ok {
+				logger.Error("Loki log writer is exiting because the buffered channel has been closed")
+				cleanUpWorker(logger, workerExitedC, c)
+				return
+			}
+
+			// Write to Loki in a blocking manner unless we are signaled to shutdown.
+			select {
+			case c.Chan() <- entry:
+				pendingEntry = nil
+			case <-ctx.Done():
+				// Time to shutdown. We probably failed to write this message, save it so we can try to flush it.
+				pendingEntry = &entry
+			}
+		case <-ctx.Done():
+			logger.Info("Loki log writer shutting down")
+
+			// Flush as much as we can in our allowed time.
+			if numRemaining, err := flushLogsWithTimeout(localC, c, pendingEntry); err != nil {
+				logger.Error("worker failed to flush logs", zap.Error(err), zap.Int("numEventsRemaining", numRemaining))
+			}
+
+			cleanUpWorker(logger, workerExitedC, c)
+			return
+		}
+	}
+}
+
+// flushLogsWithTimeout is used to flush any buffered log messages on shutdown.
+// It uses a timeout so that we only delay guardian shutdown for so long.
+func flushLogsWithTimeout(localC chan api.Entry, c client.Client, pendingEntry *api.Entry) (int, error) {
+	// Create a timeout context. Base it on the background one since ours has been canceled.
+	// We are using a timeout context rather than `time.After` here because that is the maximum
+	// we want to wait, rather than a per-event timeout.
+	timeout, cancel := context.WithTimeout(context.Background(), clientTimeout)
+	defer cancel()
+
+	if pendingEntry != nil {
+		select {
+		case c.Chan() <- *pendingEntry:
+		case <-timeout.Done():
+			// If we timeout, we didn't write the pending one, so count that as remaining.
+			return (1 + len(localC)), errors.New("timeout writing pending entry")
+		}
+	}
+
+	for len(localC) > 0 {
+		select {
+		case entry := <-localC:
+			c.Chan() <- entry
+		case <-timeout.Done():
+			// If we timeout, we didn't write the current one, so count that as remaining.
+			return (1 + len(localC)), errors.New("timeout flushing buffered entry")
+		}
+	}
+
+	return 0, nil
+}
+
+// cleanUpWorker is called when the worker is shutting down. It closes the Loki client connection and signals that the worker has exited.
+func cleanUpWorker(logger *zap.Logger, workerExitedC chan struct{}, c client.Client) {
+	// Stop the client without blocking indefinitely.
+	if err := stopClientWithTimeout(c); err != nil {
+		logger.Error("worker failed to stop Loki client", zap.Error(err))
+	}
+
+	// Signal that we are done.
+	select {
+	case workerExitedC <- struct{}{}:
+		logger.Info("Loki log writer exiting")
+	default:
+		logger.Error("Loki log writer failed to write the exited flag, exiting anyway")
+	}
+}
+
+// stopClientWithTimeout calls the Loki client shutdown function using a timeout so that we only delay guardian shutdown for so long.
+func stopClientWithTimeout(c client.Client) error {
+	// Call the stop function in a go routine so we can use a timeout.
+	stopExitedC := make(chan struct{}, 1)
+	go func(c client.Client) {
+		c.StopNow()
+		stopExitedC <- struct{}{}
+	}(c)
+
+	// Wait for the go routine to exit or the timer to expire. Using `time.After` since this is a one shot and we don't have the context.
+	select {
+	case <-stopExitedC:
+		return nil
+	case <-time.After(clientTimeout):
+		return errors.New("timeout")
+	}
+}
+
+// stopWorkerWithTimeout stops the log writer and waits for it to exit. It only waits a finite length of time.
+func (logger *ExternalLoggerLoki) stopWorkerWithTimeout() {
+	// Shut down the worker.
+	logger.cancelWorker()
+
+	// Wait for the worker to signal that it has exited. Use a timeout so we don't wait forever.
+	// It could take up to twice the client timeout for the worker to exit. Wait a little longer than that.
+	// Using `time.After` since this is a one shot and we don't have the context.
+	select {
+	case <-logger.workerExitedC:
+	case <-time.After(3 * clientTimeout):
+		logger.localLogger.Error("log writer failed to exit, giving up")
+	}
+}

+ 3 - 3
node/pkg/telemetry/telemetry.go

@@ -20,7 +20,7 @@ type Telemetry struct {
 
 type ExternalLogger interface {
 	log(time time.Time, message json.RawMessage, level zapcore.Level)
-	close() error
+	close()
 }
 
 // guardianTelemetryEncoder is a wrapper around zapcore.jsonEncoder that logs to cloud based logging
@@ -86,6 +86,6 @@ func (s *Telemetry) WrapLogger(logger *zap.Logger) *zap.Logger {
 	}))
 }
 
-func (s *Telemetry) Close() error {
-	return s.encoder.logger.close()
+func (s *Telemetry) Close() {
+	s.encoder.logger.close()
 }

+ 1 - 2
node/pkg/telemetry/telemetry_test.go

@@ -35,8 +35,7 @@ func (logger *externalLoggerMock) log(time time.Time, message json.RawMessage, l
 	}
 
 }
-func (logger *externalLoggerMock) close() error {
-	return nil
+func (logger *externalLoggerMock) close() {
 }
 
 func TestTelemetryWithPrivate(t *testing.T) {