Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/chunkenc: ignore duplicate lines pushed to a stream #1519

Merged
merged 2 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,20 @@ func init() {
prometheus.MustRegister(blocksPerChunk)
}

type line struct {
ts time.Time
content string
}

type stream struct {
cfg *Config
// Newest chunk at chunks[n-1].
// Not thread-safe; assume accesses to this are locked by caller.
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
factory func() chunkenc.Chunk
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
factory func() chunkenc.Chunk
lastLine line

tailers map[uint32]*tailer
tailerMtx sync.RWMutex
Expand Down Expand Up @@ -119,6 +125,18 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe
// Don't fail on the first append error - if samples are sent out of order,
// we still want to append the later ones.
for i := range entries {
// If this entry matches our last appended line's timestamp and contents,
// ignore it.
//
// This check is done at the stream level so it persists across cut and
// flushed chunks.
//
// NOTE: it's still possible for duplicates to be appended if a stream is
// deleted from inactivity.
if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content {
continue
}

chunk := &s.chunks[len(s.chunks)-1]
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk.chunk, synchronizePeriod, minUtilization) {
// If the chunk has no more space call Close to make sure anything in the head block is cut and compressed
Expand Down Expand Up @@ -146,6 +164,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe
// send only stored entries to tailers
storedEntries = append(storedEntries, entries[i])
lastChunkTimestamp = entries[i].Timestamp
s.lastLine = line{ts: lastChunkTimestamp, content: entries[i].Line}
}
chunk.lastUpdated = time.Now()
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
}
}

func TestPushDeduplication(t *testing.T) {
s := newStream(
&Config{},
model.Fingerprint(0),
labels.Labels{
{Name: "foo", Value: "bar"},
},
defaultFactory,
)

err := s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "newer, better test"},
}, 0, 0)
require.NoError(t, err)
require.Len(t, s.chunks, 1)
require.Equal(t, s.chunks[0].chunk.Size(), 2,
"expected exact duplicate to be dropped and newer content with same timestamp to be appended")
}

func TestStreamIterator(t *testing.T) {
const chunks = 3
const entries = 100
Expand Down