From 62d5122a11cf0d898e811dae4d3dd23a3bc9b573 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 30 Aug 2019 18:08:13 +0200 Subject: [PATCH] Promtail exports metrics on sent and dropped log entries --- docs/operations.md | 4 ++ pkg/promtail/client/client.go | 33 ++++++++++-- pkg/promtail/client/client_test.go | 83 +++++++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 6 deletions(-) diff --git a/docs/operations.md b/docs/operations.md index 1103fedc6567..d6807745f0c3 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -39,7 +39,11 @@ Promtail metrics: - `promtail_read_bytes_total` Number of bytes read. - `promtail_read_lines_total` Number of lines read. - `promtail_request_duration_seconds_count` Number of send requests. +- `promtail_encoded_bytes_total` Number of bytes encoded and ready to send. - `promtail_sent_bytes_total` Number of bytes sent. +- `promtail_dropped_bytes_total` Number of bytes dropped because failed to be sent to the ingester after all retries. +- `promtail_sent_entries_total` Number of log entries sent to the ingester. +- `promtail_dropped_entries_total` Number of log entries dropped because failed to be sent to the ingester after all retries. Most of these metrics are counters and should continuously increase during normal operations: diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index 0cf478aadf0b..0684fe1f30b2 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -40,6 +40,21 @@ var ( Name: "sent_bytes_total", Help: "Number of bytes sent.", }, []string{"host"}) + droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "dropped_bytes_total", + Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", + }, []string{"host"}) + sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "sent_entries_total", + Help: "Number of log entries sent to the ingester.", + }, []string{"host"}) + droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "dropped_entries_total", + Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", + }, []string{"host"}) requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "promtail", Name: "request_duration_seconds", @@ -50,6 +65,9 @@ var ( func init() { prometheus.MustRegister(encodedBytes) prometheus.MustRegister(sentBytes) + prometheus.MustRegister(droppedBytes) + prometheus.MustRegister(sentEntries) + prometheus.MustRegister(droppedEntries) prometheus.MustRegister(requestDuration) } @@ -153,7 +171,7 @@ func (c *client) run() { } func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) { - buf, err := encodeBatch(batch) + buf, entriesCount, err := encodeBatch(batch) if err != nil { level.Error(c.logger).Log("msg", "error encoding batch", "error", err) return @@ -171,6 +189,7 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) { if err == nil { sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) return } @@ -185,22 +204,28 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) { if err != nil { level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err) + droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) } } -func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) { +func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, int, error) { req := logproto.PushRequest{ Streams: make([]*logproto.Stream, 0, len(batch)), } + + entriesCount := 0 for _, stream := range batch { req.Streams = append(req.Streams, stream) + entriesCount += len(stream.Entries) } + buf, err := proto.Marshal(&req) if err != nil { - return nil, err + return nil, 0, err } buf = snappy.Encode(nil, buf) - return buf, nil + return buf, entriesCount, nil } func (c *client) send(ctx context.Context, buf []byte) (int, error) { diff --git a/pkg/promtail/client/client_test.go b/pkg/promtail/client/client_test.go index 0fe6edfa67c6..f03e06fd7c13 100644 --- a/pkg/promtail/client/client_test.go +++ b/pkg/promtail/client/client_test.go @@ -3,6 +3,7 @@ package client import ( "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -14,17 +15,21 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/grafana/loki/pkg/logproto" lokiflag "github.com/grafana/loki/pkg/util/flagext" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/config" "github.com/prometheus/common/model" ) -func TestClient_Handle(t *testing.T) { - logEntries := []entry{ +var ( + logEntries = []entry{ {labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}}, {labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}}, {labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}}, } +) +func TestClient_Handle(t *testing.T) { tests := map[string]struct { clientBatchSize int clientBatchWait time.Duration @@ -33,6 +38,7 @@ func TestClient_Handle(t *testing.T) { inputEntries []entry inputDelay time.Duration expectedBatches [][]*logproto.Stream + expectedMetrics string }{ "batch log entries together until the batch size is reached": { clientBatchSize: 10, @@ -48,6 +54,11 @@ func TestClient_Handle(t *testing.T) { {Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}, }, }, + expectedMetrics: ` + # HELP promtail_sent_entries_total Number of log entries sent to the ingester. + # TYPE promtail_sent_entries_total counter + promtail_sent_entries_total{host="__HOST__"} 3.0 + `, }, "batch log entries together until the batch wait time is reached": { clientBatchSize: 10, @@ -64,6 +75,11 @@ func TestClient_Handle(t *testing.T) { {Labels: "{}", Entries: []logproto.Entry{logEntries[1].Entry}}, }, }, + expectedMetrics: ` + # HELP promtail_sent_entries_total Number of log entries sent to the ingester. + # TYPE promtail_sent_entries_total counter + promtail_sent_entries_total{host="__HOST__"} 2.0 + `, }, "retry send a batch up to backoff's max retries in case the server responds with a 5xx": { clientBatchSize: 10, @@ -82,6 +98,11 @@ func TestClient_Handle(t *testing.T) { {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}, }, }, + expectedMetrics: ` + # HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE promtail_dropped_entries_total counter + promtail_dropped_entries_total{host="__HOST__"} 1.0 + `, }, "do not retry send a batch in case the server responds with a 4xx": { clientBatchSize: 10, @@ -94,11 +115,20 @@ func TestClient_Handle(t *testing.T) { {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}, }, }, + expectedMetrics: ` + # HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE promtail_dropped_entries_total counter + promtail_dropped_entries_total{host="__HOST__"} 1.0 + `, }, } for testName, testData := range tests { t.Run(testName, func(t *testing.T) { + // Reset metrics + sentEntries.Reset() + droppedEntries.Reset() + // Create a buffer channel where we do enqueue received requests receivedReqsChan := make(chan logproto.PushRequest, 10) @@ -156,6 +186,55 @@ func TestClient_Handle(t *testing.T) { for i, batch := range receivedReqs { assert.Equal(t, testData.expectedBatches[i], batch.Streams) } + + expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1) + err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total") + assert.NoError(t, err) + }) + } +} + +func TestClient_encodeBatch(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + inputBatch map[model.Fingerprint]*logproto.Stream + expectedEntriesCount int + }{ + "empty batch": { + inputBatch: map[model.Fingerprint]*logproto.Stream{}, + expectedEntriesCount: 0, + }, + "single stream with single log entry": { + inputBatch: map[model.Fingerprint]*logproto.Stream{ + model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}, + }, + expectedEntriesCount: 1, + }, + "single stream with multiple log entries": { + inputBatch: map[model.Fingerprint]*logproto.Stream{ + model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}, + }, + expectedEntriesCount: 2, + }, + "multiple streams with multiple log entries": { + inputBatch: map[model.Fingerprint]*logproto.Stream{ + model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}, + model.Fingerprint(2): {Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}, + }, + expectedEntriesCount: 3, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + _, entriesCount, err := encodeBatch(testData.inputBatch) + require.NoError(t, err) + assert.Equal(t, testData.expectedEntriesCount, entriesCount) }) } }