Skip to content

Commit

Permalink
pkg/chunkenc: ignore duplicate lines pushed to a stream
Browse files Browse the repository at this point in the history
This commit changes the behavior of appending to a stream to ignore
an incoming line if its timestamp and contents match the previous line
received. This should reduce the chances of ingesters storing duplicate
log lines when clients retry push requests whenever a 500 is returned.

Fixes #1517
  • Loading branch information
rfratto committed Jan 14, 2020
1 parent b43039e commit ad8bcde
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
27 changes: 23 additions & 4 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,20 @@ func init() {
prometheus.MustRegister(blocksPerChunk)
}

type line struct {
ts time.Time
content string
}

type stream struct {
cfg *Config
// Newest chunk at chunks[n-1].
// Not thread-safe; assume accesses to this are locked by caller.
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
factory func() chunkenc.Chunk
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
factory func() chunkenc.Chunk
lastLine line

tailers map[uint32]*tailer
tailerMtx sync.RWMutex
Expand Down Expand Up @@ -119,6 +125,18 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe
// Don't fail on the first append error - if samples are sent out of order,
// we still want to append the later ones.
for i := range entries {
// If this entry matches our last appended line's timestamp and contents,
// ignore it.
//
// This check is done at the stream level so it persists across cut and
// flushed chunks.
//
// N.B.: it's still possible for duplicates to be appended if a stream is
// deleted from inactivity.
if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content {
continue
}

chunk := &s.chunks[len(s.chunks)-1]
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk.chunk, synchronizePeriod, minUtilization) {
// If the chunk has no more space call Close to make sure anything in the head block is cut and compressed
Expand Down Expand Up @@ -146,6 +164,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe
// send only stored entries to tailers
storedEntries = append(storedEntries, entries[i])
lastChunkTimestamp = entries[i].Timestamp
s.lastLine = line{ts: lastChunkTimestamp, content: entries[i].Line}
}
chunk.lastUpdated = time.Now()
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
}
}

func TestPushDeduplication(t *testing.T) {
s := newStream(
&Config{},
model.Fingerprint(0),
labels.Labels{
{Name: "foo", Value: "bar"},
},
defaultFactory,
)

err := s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "newer, better test"},
}, 0, 0)
require.NoError(t, err)
require.Len(t, s.chunks, 1)
require.Equal(t, s.chunks[0].chunk.Size(), 2,
"expected exact duplicate to be dropped and newer content with same timestamp to be appended")
}

func TestStreamIterator(t *testing.T) {
const chunks = 3
const entries = 100
Expand Down

0 comments on commit ad8bcde

Please sign in to comment.