From ad8bcde2dfeaa558eefd9a4be2f5d57eef765acd Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 14 Jan 2020 08:48:24 -0500 Subject: [PATCH 1/2] pkg/chunkenc: ignore duplicate lines pushed to a stream This commit changes the behavior of appending to a stream to ignore an incoming line if its timestamp and contents match the previous line received. This should reduce the chances of ingesters storing duplicate log lines when clients retry push requests whenever a 500 is returned. Fixes #1517 --- pkg/ingester/stream.go | 27 +++++++++++++++++++++++---- pkg/ingester/stream_test.go | 21 +++++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 060eac76d81f..ec7a6743b3b1 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -51,14 +51,20 @@ func init() { prometheus.MustRegister(blocksPerChunk) } +type line struct { + ts time.Time + content string +} + type stream struct { cfg *Config // Newest chunk at chunks[n-1]. // Not thread-safe; assume accesses to this are locked by caller. - chunks []chunkDesc - fp model.Fingerprint // possibly remapped fingerprint, used in the streams map - labels labels.Labels - factory func() chunkenc.Chunk + chunks []chunkDesc + fp model.Fingerprint // possibly remapped fingerprint, used in the streams map + labels labels.Labels + factory func() chunkenc.Chunk + lastLine line tailers map[uint32]*tailer tailerMtx sync.RWMutex @@ -119,6 +125,18 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe // Don't fail on the first append error - if samples are sent out of order, // we still want to append the later ones. for i := range entries { + // If this entry matches our last appended line's timestamp and contents, + // ignore it. + // + // This check is done at the stream level so it persists across cut and + // flushed chunks. + // + // N.B.: it's still possible for duplicates to be appended if a stream is + // deleted from inactivity. + if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content { + continue + } + chunk := &s.chunks[len(s.chunks)-1] if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk.chunk, synchronizePeriod, minUtilization) { // If the chunk has no more space call Close to make sure anything in the head block is cut and compressed @@ -146,6 +164,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe // send only stored entries to tailers storedEntries = append(storedEntries, entries[i]) lastChunkTimestamp = entries[i].Timestamp + s.lastLine = line{ts: lastChunkTimestamp, content: entries[i].Line} } chunk.lastUpdated = time.Now() } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 5f1775dd3ada..e3f8f8b34185 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -68,6 +68,27 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { } } +func TestPushDeduplication(t *testing.T) { + s := newStream( + &Config{}, + model.Fingerprint(0), + labels.Labels{ + {Name: "foo", Value: "bar"}, + }, + defaultFactory, + ) + + err := s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "test"}, + {Timestamp: time.Unix(1, 0), Line: "test"}, + {Timestamp: time.Unix(1, 0), Line: "newer, better test"}, + }, 0, 0) + require.NoError(t, err) + require.Len(t, s.chunks, 1) + require.Equal(t, s.chunks[0].chunk.Size(), 2, + "expected exact duplicate to be dropped and newer content with same timestamp to be appended") +} + func TestStreamIterator(t *testing.T) { const chunks = 3 const entries = 100 From 183f6c1591a4b488022aeba9d347653808d77801 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 14 Jan 2020 14:07:03 -0500 Subject: [PATCH 2/2] s/N.B./NOTE --- pkg/ingester/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index ec7a6743b3b1..51ed70814676 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -131,7 +131,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe // This check is done at the stream level so it persists across cut and // flushed chunks. // - // N.B.: it's still possible for duplicates to be appended if a stream is + // NOTE: it's still possible for duplicates to be appended if a stream is // deleted from inactivity. if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content { continue