diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index a3966db2f9af..145ab85144a0 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3805,6 +3805,14 @@ These are values which allow you to control aspects of Loki's operation, most co # CLI flag: -operation-config.log-push-request-streams [log_push_request_streams: | default = false] +# Log metrics for duplicate lines received. +# CLI flag: -operation-config.log-duplicate-metrics +[log_duplicate_metrics: | default = false] + +# Log stream info for duplicate lines received +# CLI flag: -operation-config.log-duplicate-stream-info +[log_duplicate_stream_info: | default = false] + # Log push errors with a rate limited logger, will show client push errors # without overly spamming logs. # CLI flag: -operation-config.limited-log-push-errors diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index e2d520df6e02..ef8548b1438d 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -36,17 +36,18 @@ func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool { return len(c.entries) < tmpNumEntries } -func (c *dumbChunk) Append(entry *logproto.Entry) error { +// The dumbChunk does not check for duplicates, and will always return false +func (c *dumbChunk) Append(entry *logproto.Entry) (bool, error) { if len(c.entries) == tmpNumEntries { - return ErrChunkFull + return false, ErrChunkFull } if len(c.entries) > 0 && c.entries[len(c.entries)-1].Timestamp.After(entry.Timestamp) { - return ErrOutOfOrder + return false, ErrOutOfOrder } c.entries = append(c.entries, *entry) - return nil + return false, nil } func (c *dumbChunk) Size() int { diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 3825a6520af5..f0b17c7750f3 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -132,7 +132,8 @@ func SupportedEncoding() string { type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool - Append(*logproto.Entry) error + // Append returns true if the entry appended was a duplicate + Append(*logproto.Entry) (bool, error) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator // Returns the list of blocks in the chunks. diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 107e3c71a97d..f4e27255633d 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -181,9 +181,10 @@ func (hb *headBlock) Reset() { func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt } -func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error { +// The headBlock does not check for duplicates, and will always return false +func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) (bool, error) { if !hb.IsEmpty() && hb.maxt > ts { - return ErrOutOfOrder + return false, ErrOutOfOrder } hb.entries = append(hb.entries, entry{t: ts, s: line}) @@ -193,7 +194,7 @@ func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error { hb.maxt = ts hb.size += len(line) - return nil + return false, nil } func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) { @@ -340,7 +341,7 @@ func (hb *headBlock) Convert(version HeadBlockFmt, symbolizer *symbolizer) (Head out := version.NewBlock(symbolizer) for _, e := range hb.entries { - if err := out.Append(e.t, e.s, e.structuredMetadata); err != nil { + if _, err := out.Append(e.t, e.s, e.structuredMetadata); err != nil { return nil, err } } @@ -834,27 +835,29 @@ func (c *MemChunk) Utilization() float64 { } // Append implements Chunk. -func (c *MemChunk) Append(entry *logproto.Entry) error { +// The MemChunk may return true or false, depending on what the head block returns. +func (c *MemChunk) Append(entry *logproto.Entry) (bool, error) { entryTimestamp := entry.Timestamp.UnixNano() // If the head block is empty but there are cut blocks, we have to make // sure the new entry is not out of order compared to the previous block if c.headFmt < UnorderedHeadBlockFmt && c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp { - return ErrOutOfOrder + return false, ErrOutOfOrder } if c.format < ChunkFormatV4 { entry.StructuredMetadata = nil } - if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)); err != nil { - return err + dup, err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)) + if err != nil { + return dup, err } if c.head.UncompressedSize() >= c.blockSize { - return c.cut() + return false, c.cut() } - return nil + return dup, nil } // Close implements Chunk. @@ -1122,7 +1125,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err if filter != nil && filter(entry.Timestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)...) { continue } - if err := newChunk.Append(&entry); err != nil { + if _, err := newChunk.Append(&entry); err != nil { return nil, err } } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index f7ed66b7c890..1d9ef3eea21f 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -88,7 +88,8 @@ func TestBlocksInclusive(t *testing.T) { for _, format := range allPossibleFormats { chunkfmt, headfmt := format.chunkFormat, format.headBlockFmt chk := NewMemChunk(chunkfmt, enc, headfmt, testBlockSize, testTargetSize) - err := chk.Append(logprotoEntry(1, "1")) + dup, err := chk.Append(logprotoEntry(1, "1")) + require.False(t, dup) require.Nil(t, err) err = chk.cut() require.Nil(t, err) @@ -178,7 +179,9 @@ func TestBlock(t *testing.T) { } for _, c := range cases { - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(c.ts, c.str, c.lbs))) + dup, err := chk.Append(logprotoEntryWithStructuredMetadata(c.ts, c.str, c.lbs)) + require.False(t, dup) + require.NoError(t, err) if c.cut { require.NoError(t, chk.cut()) } @@ -442,7 +445,9 @@ func TestSerialization(t *testing.T) { if appendWithStructuredMetadata { entry.StructuredMetadata = []logproto.LabelAdapter{{Name: "foo", Value: strconv.Itoa(i)}} } - require.NoError(t, chk.Append(entry)) + dup, err := chk.Append(entry) + require.False(t, dup) + require.NoError(t, err) } require.NoError(t, chk.Close()) @@ -527,7 +532,9 @@ func TestChunkFilling(t *testing.T) { i := int64(0) for ; chk.SpaceFor(entry) && i < 30; i++ { entry.Timestamp = time.Unix(0, i) - require.NoError(t, chk.Append(entry)) + dup, err := chk.Append(entry) + require.False(t, dup) + require.NoError(t, err) } require.Equal(t, int64(lines), i) @@ -576,7 +583,9 @@ func TestGZIPChunkTargetSize(t *testing.T) { Line: string(logLine), } entry.Timestamp = time.Unix(0, i) - require.NoError(t, chk.Append(entry)) + dup, err := chk.Append(entry) + require.False(t, dup) + require.NoError(t, err) } // 5000 is a limit ot make sure the test doesn't run away, we shouldn't need this many log lines to make 1MB chunk @@ -606,37 +615,61 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { tests := map[string]tester{ "append out of order in the same block": func(t *testing.T, chk *MemChunk) { - assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) - assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) + dup, err := chk.Append(logprotoEntry(5, "test")) + assert.False(t, dup) + assert.NoError(t, err) + dup, err = chk.Append(logprotoEntry(6, "test")) + assert.False(t, dup) + assert.NoError(t, err) if chk.headFmt == OrderedHeadBlockFmt { - assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.EqualError(t, err, ErrOutOfOrder.Error()) + assert.False(t, dup) } else { - assert.NoError(t, chk.Append(logprotoEntry(1, "test"))) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) + assert.NoError(t, err) } }, "append out of order in a new block right after cutting the previous one": func(t *testing.T, chk *MemChunk) { - assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) - assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) + dup, err := chk.Append(logprotoEntry(5, "test")) + assert.False(t, dup) + assert.NoError(t, err) + dup, err = chk.Append(logprotoEntry(6, "test")) + assert.False(t, dup) + assert.NoError(t, err) assert.NoError(t, chk.cut()) if chk.headFmt == OrderedHeadBlockFmt { - assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) + assert.EqualError(t, err, ErrOutOfOrder.Error()) } else { - assert.NoError(t, chk.Append(logprotoEntry(1, "test"))) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) + assert.NoError(t, err) } }, "append out of order in a new block after multiple cuts": func(t *testing.T, chk *MemChunk) { - assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) + dup, err := chk.Append(logprotoEntry(5, "test")) + assert.False(t, dup) + assert.NoError(t, err) assert.NoError(t, chk.cut()) - assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) + dup, err = chk.Append(logprotoEntry(6, "test")) + assert.False(t, dup) + assert.NoError(t, err) assert.NoError(t, chk.cut()) if chk.headFmt == OrderedHeadBlockFmt { - assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) + assert.EqualError(t, err, ErrOutOfOrder.Error()) } else { - assert.NoError(t, chk.Append(logprotoEntry(1, "test"))) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) + assert.NoError(t, err) } }, } @@ -705,7 +738,7 @@ func TestChunkStats(t *testing.T) { if !c.SpaceFor(entry) { break } - if err := c.Append(entry); err != nil { + if _, err := c.Append(entry); err != nil { t.Fatal(err) } inserted++ @@ -826,7 +859,7 @@ func BenchmarkWrite(b *testing.B) { c := NewMemChunk(ChunkFormatV3, enc, f, testBlockSize, testTargetSize) // adds until full so we trigger cut which serialize using gzip for c.SpaceFor(entry) { - _ = c.Append(entry) + _, _ = c.Append(entry) entry.Timestamp = time.Unix(0, i) entry.Line = testdata.LogString(i) if withStructuredMetadata { @@ -977,7 +1010,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) { } for i := 0; i < j; i++ { - if err := h.Append(int64(i), "this is the append string", structuredMetadata); err != nil { + if _, err := h.Append(int64(i), "this is the append string", structuredMetadata); err != nil { b.Fatal(err) } } @@ -1009,7 +1042,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) { } for i := 0; i < j; i++ { - if err := h.Append(int64(i), "this is the append string", structuredMetadata); err != nil { + if _, err := h.Append(int64(i), "this is the append string", structuredMetadata); err != nil { b.Fatal(err) } } @@ -1034,13 +1067,13 @@ func TestMemChunk_IteratorBounds(t *testing.T) { t.Helper() c := NewMemChunk(ChunkFormatV3, EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6) - if err := c.Append(&logproto.Entry{ + if _, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(0, 1), Line: "1", }); err != nil { t.Fatal(err) } - if err := c.Append(&logproto.Entry{ + if _, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(0, 2), Line: "2", }); err != nil { @@ -1099,7 +1132,9 @@ func TestMemchunkLongLine(t *testing.T) { c := NewMemChunk(ChunkFormatV3, enc, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) for i := 1; i <= 10; i++ { - require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)})) + dup, err := c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)}) + require.False(t, dup) + require.NoError(t, err) } noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) @@ -1143,7 +1178,9 @@ func TestCheckpointEncoding(t *testing.T) { }}, } require.Equal(t, true, c.SpaceFor(entry)) - require.Nil(t, c.Append(entry)) + dup, err := c.Append(entry) + require.False(t, dup) + require.Nil(t, err) } // cut it @@ -1178,7 +1215,9 @@ func TestCheckpointEncoding(t *testing.T) { Line: fmt.Sprintf("hi there - %d", i), } require.Equal(t, true, c.SpaceFor(entry)) - require.Nil(t, c.Append(entry)) + dup, err := c.Append(entry) + require.False(t, dup) + require.Nil(t, err) } // ensure new blocks are not cut @@ -1321,7 +1360,9 @@ func Test_HeadIteratorReverse(t *testing.T) { } var i int64 for e := genEntry(i); c.SpaceFor(e); e, i = genEntry(i+1), i+1 { - require.NoError(t, c.Append(e)) + dup, err := c.Append(e) + require.False(t, dup) + require.NoError(t, err) } assertOrder := func(t *testing.T, total int64) { @@ -1427,7 +1468,7 @@ func TestMemChunk_Rebound(t *testing.T) { func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk { chk := NewMemChunk(ChunkFormatV3, EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) for ; from.Before(through); from = from.Add(time.Second) { - err := chk.Append(&logproto.Entry{ + _, err := chk.Append(&logproto.Entry{ Line: from.String(), Timestamp: from, }) @@ -1558,7 +1599,7 @@ func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matching if matchingFrom != nil && matchingTo != nil && (from.Equal(*matchingFrom) || (from.After(*matchingFrom) && (from.Before(*matchingTo)))) { t.Logf("%v matching line", from.String()) - err := chk.Append(&logproto.Entry{ + _, err := chk.Append(&logproto.Entry{ Line: fmt.Sprintf("matching %v", from.String()), Timestamp: from, StructuredMetadata: structuredMetadata, @@ -1570,7 +1611,7 @@ func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matching if withStructuredMetadata { structuredMetadata = push.LabelsAdapter{{Name: "ding", Value: "dong"}} } - err := chk.Append(&logproto.Entry{ + _, err := chk.Append(&logproto.Entry{ Line: from.String(), Timestamp: from, StructuredMetadata: structuredMetadata, @@ -1700,7 +1741,9 @@ func TestMemChunk_SpaceFor(t *testing.T) { chk.blocks = make([]block, tc.nBlocks) chk.cutBlockSize = tc.cutBlockSize for i := 0; i < tc.headSize; i++ { - require.NoError(t, chk.head.Append(int64(i), "a", nil)) + dup, err := chk.head.Append(int64(i), "a", nil) + require.False(t, dup) + require.NoError(t, err) } expect := tc.expect @@ -1724,23 +1767,31 @@ func TestMemChunk_IteratorWithStructuredMetadata(t *testing.T) { {Name: "job", Value: "fake"}, } chk := newMemChunkWithFormat(ChunkFormatV4, enc, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(1, "lineA", []logproto.LabelAdapter{ + dup, err := chk.Append(logprotoEntryWithStructuredMetadata(1, "lineA", []logproto.LabelAdapter{ {Name: "traceID", Value: "123"}, {Name: "user", Value: "a"}, - }))) - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(2, "lineB", []logproto.LabelAdapter{ + })) + require.False(t, dup) + require.NoError(t, err) + dup, err = chk.Append(logprotoEntryWithStructuredMetadata(2, "lineB", []logproto.LabelAdapter{ {Name: "traceID", Value: "456"}, {Name: "user", Value: "b"}, - }))) + })) + require.False(t, dup) + require.NoError(t, err) require.NoError(t, chk.cut()) - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(3, "lineC", []logproto.LabelAdapter{ + dup, err = chk.Append(logprotoEntryWithStructuredMetadata(3, "lineC", []logproto.LabelAdapter{ {Name: "traceID", Value: "789"}, {Name: "user", Value: "c"}, - }))) - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(4, "lineD", []logproto.LabelAdapter{ + })) + require.False(t, dup) + require.NoError(t, err) + dup, err = chk.Append(logprotoEntryWithStructuredMetadata(4, "lineD", []logproto.LabelAdapter{ {Name: "traceID", Value: "123"}, {Name: "user", Value: "d"}, - }))) + })) + require.False(t, dup) + require.NoError(t, err) // The expected bytes is the sum of bytes decompressed and bytes read from the head chunk. // First we add the bytes read from the store (aka decompressed). That's diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 788f9c0a7c45..807f80b2c0f8 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "fmt" + "io" "math" "time" @@ -34,7 +35,7 @@ type HeadBlock interface { Entries() int UncompressedSize() int Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error) - Append(int64, string, labels.Labels) error + Append(int64, string, labels.Labels) (bool, error) Iterator( ctx context.Context, direction logproto.Direction, @@ -110,7 +111,8 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 { return e.ts } -func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) error { +// unorderedHeadBlock will return true if the entry is a duplicate, false otherwise +func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) (bool, error) { if hb.format < UnorderedWithStructuredMetadataHeadBlockFmt { // structuredMetadata must be ignored for the previous head block formats structuredMetadata = nil @@ -135,7 +137,7 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l for _, et := range displaced[0].(*nsEntries).entries { if et.line == line { e.entries = displaced[0].(*nsEntries).entries - return nil + return true, nil } } e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, hb.symbolizer.Add(structuredMetadata)}) @@ -156,7 +158,7 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l hb.size += len(structuredMetadata) * 2 * 4 // 4 bytes per label and value pair as structuredMetadataSymbols hb.lines++ - return nil + return false, nil } func metaLabelsLen(metaLabels labels.Labels) int { @@ -443,7 +445,8 @@ func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt, symbolizer *symboliz 0, math.MaxInt64, func(_ *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error { - return out.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)) + _, err := out.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)) + return err }, ) return out, err @@ -583,7 +586,7 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { } } - if err := hb.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)); err != nil { + if _, err := hb.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)); err != nil { return err } } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index f4930952660f..43c07d0f835f 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -35,7 +35,9 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { func Test_forEntriesEarlyReturn(t *testing.T) { hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt, newSymbolizer()) for i := 0; i < 10; i++ { - require.Nil(t, hb.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "i", Value: fmt.Sprint(i)}})) + dup, err := hb.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "i", Value: fmt.Sprint(i)}}) + require.False(t, dup) + require.Nil(t, err) } // forward @@ -86,6 +88,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { desc string input, exp []entry dir logproto.Direction + hasDup bool }{ { desc: "simple forward", @@ -152,7 +155,8 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { exp: []entry{ {0, "a", nil}, {0, "b", nil}, {1, "c", nil}, }, - dir: logproto.FORWARD, + dir: logproto.FORWARD, + hasDup: true, }, { desc: "ts remove exact dupe backward", @@ -162,7 +166,8 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { exp: []entry{ {1, "c", nil}, {0, "b", nil}, {0, "a", nil}, }, - dir: logproto.BACKWARD, + dir: logproto.BACKWARD, + hasDup: true, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -172,9 +177,17 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { } { t.Run(format.String(), func(t *testing.T) { hb := newUnorderedHeadBlock(format, newSymbolizer()) + dup := false for _, e := range tc.input { - require.Nil(t, hb.Append(e.t, e.s, e.structuredMetadata)) + tmpdup, err := hb.Append(e.t, e.s, e.structuredMetadata) + if !dup { // only set dup if it's not already true + if tmpdup { // can't examine duplicates until we start getting all the data + dup = true + } + } + require.Nil(t, err) } + require.Equal(t, tc.hasDup, dup) itr := hb.Iterator( context.Background(), @@ -250,7 +263,9 @@ func Test_UnorderedBoundedIter(t *testing.T) { t.Run(format.String(), func(t *testing.T) { hb := newUnorderedHeadBlock(format, newSymbolizer()) for _, e := range tc.input { - require.Nil(t, hb.Append(e.t, e.s, e.structuredMetadata)) + dup, err := hb.Append(e.t, e.s, e.structuredMetadata) + require.False(t, dup) + require.Nil(t, err) } itr := hb.Iterator( @@ -281,9 +296,15 @@ func TestHeadBlockInterop(t *testing.T) { unorderedWithStructuredMetadata := newUnorderedHeadBlock(UnorderedWithStructuredMetadataHeadBlockFmt, newSymbolizer()) for i := 0; i < 100; i++ { metaLabels := labels.Labels{{Name: "foo", Value: fmt.Sprint(99 - i)}} - require.Nil(t, unordered.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) - require.Nil(t, unorderedWithStructuredMetadata.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) - require.Nil(t, ordered.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "foo", Value: fmt.Sprint(i)}})) + dup, err := unordered.Append(int64(99-i), fmt.Sprint(99-i), metaLabels) + require.False(t, dup) + require.Nil(t, err) + dup, err = unorderedWithStructuredMetadata.Append(int64(99-i), fmt.Sprint(99-i), metaLabels) + require.False(t, dup) + require.Nil(t, err) + dup, err = ordered.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "foo", Value: fmt.Sprint(i)}}) + require.False(t, dup) + require.Nil(t, err) } // turn to bytes @@ -359,14 +380,14 @@ func BenchmarkHeadBlockWrites(b *testing.B) { headBlockFn := func() func(int64, string, labels.Labels) { hb := &headBlock{} return func(ts int64, line string, metaLabels labels.Labels) { - _ = hb.Append(ts, line, metaLabels) + _, _ = hb.Append(ts, line, metaLabels) } } unorderedHeadBlockFn := func() func(int64, string, labels.Labels) { hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt, nil) return func(ts int64, line string, metaLabels labels.Labels) { - _ = hb.Append(ts, line, metaLabels) + _, _ = hb.Append(ts, line, metaLabels) } } @@ -432,10 +453,12 @@ func TestUnorderedChunkIterators(t *testing.T) { c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for i := 0; i < 100; i++ { // push in reverse order - require.Nil(t, c.Append(&logproto.Entry{ + dup, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(int64(99-i), 0), Line: fmt.Sprint(99 - i), - })) + }) + require.False(t, dup) + require.Nil(t, err) // ensure we have a mix of cut blocks + head block. if i%30 == 0 { @@ -574,7 +597,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) { func chunkFrom(xs []logproto.Entry) ([]byte, error) { c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range xs { - if err := c.Append(&x); err != nil { + if _, err := c.Append(&x); err != nil { return nil, err } } @@ -634,7 +657,9 @@ func TestReorder(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range tc.input { - require.Nil(t, c.Append(&x)) + dup, err := c.Append(&x) + require.False(t, dup) + require.Nil(t, err) } require.Nil(t, c.Close()) b, err := c.Bytes() @@ -657,10 +682,12 @@ func TestReorderAcrossBlocks(t *testing.T) { {3, 7}, } { for _, x := range batch { - require.Nil(t, c.Append(&logproto.Entry{ + dup, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(int64(x), 0), Line: fmt.Sprint(x), - })) + }) + require.False(t, dup) + require.Nil(t, err) } require.Nil(t, c.cut()) } @@ -705,7 +732,9 @@ func Test_HeadIteratorHash(t *testing.T) { "ordered": &headBlock{}, } { t.Run(name, func(t *testing.T) { - require.NoError(t, b.Append(1, "foo", labels.Labels{{Name: "foo", Value: "bar"}})) + dup, err := b.Append(1, "foo", labels.Labels{{Name: "foo", Value: "bar"}}) + require.False(t, dup) + require.NoError(t, err) eit := b.Iterator(context.Background(), logproto.BACKWARD, 0, 2, log.NewNoopPipeline().ForStream(lbs)) for eit.Next() { diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index a1860f9ae297..de74f7946e2a 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -33,7 +33,7 @@ func generateData(enc Encoding, chunksCount, blockSize, targetSize int) ([]Chunk c := NewMemChunk(ChunkFormatV4, enc, UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize) for c.SpaceFor(entry) { size += uint64(len(entry.Line)) - _ = c.Append(entry) + _, _ = c.Append(entry) i++ entry = logprotoEntry(i, testdata.LogString(i)) } @@ -55,7 +55,7 @@ func fillChunkClose(c Chunk, close bool) int64 { Line: testdata.LogString(i), } for c.SpaceFor(entry) { - err := c.Append(entry) + _, err := c.Append(entry) if err != nil { panic(err) } @@ -81,7 +81,7 @@ func fillChunkRandomOrder(c Chunk, close bool) { } for c.SpaceFor(entry) { - err := c.Append(entry) + _, err := c.Append(entry) if err != nil { panic(err) } diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go index 6c261d34799e..a3f157dc7774 100644 --- a/pkg/compactor/retention/retention_test.go +++ b/pkg/compactor/retention/retention_test.go @@ -223,11 +223,13 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { - require.NoError(t, chunkEnc.Append(&logproto.Entry{ + dup, err := chunkEnc.Append(&logproto.Entry{ Timestamp: ts.Time(), Line: ts.String(), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", ts.String())), - })) + }) + require.False(t, dup) + require.NoError(t, err) } require.NoError(t, chunkEnc.Close()) diff --git a/pkg/distributor/writefailures/manager.go b/pkg/distributor/writefailures/manager.go index f02ab2e57d76..5a02a7f2c222 100644 --- a/pkg/distributor/writefailures/manager.go +++ b/pkg/distributor/writefailures/manager.go @@ -39,7 +39,8 @@ func (m *Manager) Log(tenantID string, err error) { return } - if !m.tenantCfgs.LimitedLogPushErrors(tenantID) { + if !(m.tenantCfgs.LimitedLogPushErrors(tenantID) || + m.tenantCfgs.LogDuplicateStreamInfo(tenantID)) { return } diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index 4523bc8cc1d8..9ceb3c740926 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -55,10 +55,11 @@ func TestIterator(t *testing.T) { t.Run(chk.name, func(t *testing.T) { chunk := chk.new() for i := int64(0); i < entries; i++ { - err := chunk.Append(&logproto.Entry{ + dup, err := chunk.Append(&logproto.Entry{ Timestamp: time.Unix(i, 0), Line: fmt.Sprintf("line %d", i), }) + require.False(t, dup) require.NoError(t, err) } diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index 4bb1aab0b8da..458da1132c96 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -22,7 +22,9 @@ func fillChunk(t testing.TB, c chunkenc.Chunk) { } for c.SpaceFor(entry) { - require.NoError(t, c.Append(entry)) + dup, err := c.Append(entry) + require.False(t, dup) + require.NoError(t, err) i++ entry.Timestamp = time.Unix(0, i) entry.Line = fmt.Sprintf("entry for line %d", i) @@ -120,10 +122,12 @@ func Test_EncodingChunks(t *testing.T) { func Test_EncodingCheckpoint(t *testing.T) { conf := dummyConf() c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) - require.Nil(t, c.Append(&logproto.Entry{ + dup, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(1, 0), Line: "hi there", - })) + }) + require.False(t, dup) + require.Nil(t, err) data, err := c.Bytes() require.Nil(t, err) from, to := c.Bounds() diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 1d30e7e23ece..b8d4ed68e9a7 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -311,7 +311,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre return nil, fmt.Errorf("failed to create stream: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). if record != nil { @@ -376,7 +376,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) i.onStreamCreated(s) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index f5e959165481..80074f6391e9 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -316,9 +316,10 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { require.NoError(t, err) chunkfmt, headfmt, err := instance.chunkFormatAt(minTs(&testStream)) require.NoError(t, err) - chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil).NewChunk() + chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk() for _, entry := range testStream.Entries { - err = chunk.Append(&entry) + dup, err := chunk.Append(&entry) + require.False(t, dup) require.NoError(t, err) } stream.chunks = append(stream.chunks, chunkDesc{chunk: chunk}) @@ -575,7 +576,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil)) + inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil)) } }) } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 756eba0ebea7..fd2a3e52bbb9 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -66,6 +66,8 @@ type ingesterMetrics struct { shutdownMarker prometheus.Gauge flushQueueLength prometheus.Gauge + + duplicateLogBytesTotal *prometheus.CounterVec } // setRecoveryBytesInUse bounds the bytes reports to >= 0. @@ -293,5 +295,12 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Name: "flush_queue_length", Help: "The total number of series pending in the flush queue.", }), + + duplicateLogBytesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "ingester", + Name: "duplicate_log_bytes_total", + Help: "The total number of bytes that were discarded for duplicate log lines.", + }, []string{"tenant"}), } } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 0aa3c41ea619..7d37859b1541 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/grafana/loki/v3/pkg/runtime" + "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/opentracing/opentracing-go" @@ -78,6 +80,8 @@ type stream struct { chunkFormat byte chunkHeadBlockFormat chunkenc.HeadBlockFmt + + configs *runtime.TenantConfigs } type chunkDesc struct { @@ -107,6 +111,7 @@ func newStream( streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics, writeFailures *writefailures.Manager, + configs *runtime.TenantConfigs, ) *stream { hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName) return &stream{ @@ -126,6 +131,8 @@ func newStream( writeFailures: writeFailures, chunkFormat: chunkFormat, chunkHeadBlockFormat: headBlockFmt, + + configs: configs, } } @@ -333,7 +340,8 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa } chunk.lastUpdated = time.Now() - if err := chunk.chunk.Append(&entries[i]); err != nil { + dup, err := chunk.chunk.Append(&entries[i]) + if err != nil { invalid = append(invalid, entryWithError{&entries[i], err}) if chunkenc.IsOutOfOrderErr(err) { s.writeFailures.Log(s.tenant, err) @@ -342,6 +350,9 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa } continue } + if dup { + s.handleLoggingOfDuplicateEntry(entries[i]) + } s.entryCt++ s.lastLine.ts = entries[i].Timestamp @@ -357,6 +368,21 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa return bytesAdded, storedEntries, invalid } +func (s *stream) handleLoggingOfDuplicateEntry(entry logproto.Entry) { + if s.configs == nil { + return + } + if s.configs.LogDuplicateMetrics(s.tenant) { + s.metrics.duplicateLogBytesTotal.WithLabelValues(s.tenant).Add(float64(len(entry.Line))) + } + if s.configs.LogDuplicateStreamInfo(s.tenant) { + errMsg := fmt.Sprintf("duplicate log entry with size=%d at timestamp %s for stream %s", len(entry.Line), entry.Timestamp.Format(time.RFC3339), s.labelsString) + dupErr := errors.New(errMsg) + s.writeFailures.Log(s.tenant, dupErr) + } + +} + func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, isReplay, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]logproto.Entry, []entryWithError) { var ( diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index e4dd4a37ab35..68974ae016b3 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -9,12 +9,20 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus/testutil" + + gokitlog "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/v3/pkg/runtime" + "github.com/grafana/dskit/httpgrpc" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -69,6 +77,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) _, err := s.Push(context.Background(), []logproto.Entry{ @@ -122,6 +131,7 @@ func TestPushDeduplication(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) written, err := s.Push(context.Background(), []logproto.Entry{ @@ -136,6 +146,76 @@ func TestPushDeduplication(t *testing.T) { require.Equal(t, len("test"+"newer, better test"), written) } +func TestPushDeduplicationExtraMetrics(t *testing.T) { + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + + chunkfmt, headfmt := defaultChunkFormat(t) + + buf := bytes.NewBuffer(nil) + logger := gokitlog.NewLogfmtLogger(buf) + + provider := &providerMock{ + tenantConfig: func(tenantID string) *runtime.Config { + if tenantID == "fake" { + return &runtime.Config{ + LogDuplicateMetrics: true, + LogDuplicateStreamInfo: true, + } + } + + return &runtime.Config{} + }, + } + + runtimeCfg, err := runtime.NewTenantConfigs(provider) + + registry := prometheus.NewRegistry() + manager := writefailures.NewManager(logger, registry, writefailures.Cfg{LogRate: flagext.ByteSize(1000), AddInsightsLabel: true}, runtimeCfg, "ingester") + + require.NoError(t, err) + metrics := newIngesterMetrics(registry, "loki") + + s := newStream( + chunkfmt, + headfmt, + defaultConfig(), + limiter, + "fake", + model.Fingerprint(0), + labels.Labels{ + {Name: "foo", Value: "bar"}, + }, + true, + NewStreamRateCalculator(), + metrics, + manager, + runtimeCfg, + ) + + _, err = s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "test"}, + }, recordPool.GetRecord(), 0, true, false, nil) + require.NoError(t, err) + _, err = s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "not a test"}, + }, recordPool.GetRecord(), 0, true, false, nil) + require.NoError(t, err) + _, err = s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "test"}, + }, recordPool.GetRecord(), 0, true, false, nil) + require.NoError(t, err) + require.Len(t, s.chunks, 1) + require.Equal(t, 2, s.chunks[0].chunk.Size(), "expected exact duplicate to be dropped and newer content with same timestamp to be appended") + require.Equal(t, float64(4), testutil.ToFloat64(metrics.duplicateLogBytesTotal.WithLabelValues("fake"))) + + content := buf.String() + require.NotEmpty(t, content) + require.Contains(t, content, "insight") + require.Contains(t, content, "duplicate") +} + func TestPushRejectOldCounter(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) @@ -157,6 +237,7 @@ func TestPushRejectOldCounter(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) // counter should be 2 now since the first line will be deduped @@ -204,10 +285,11 @@ func TestStreamIterator(t *testing.T) { chunk := chk.new() for j := int64(0); j < entries; j++ { k := i*entries + j - err := chunk.Append(&logproto.Entry{ + dup, err := chunk.Append(&logproto.Entry{ Timestamp: time.Unix(k, 0), Line: fmt.Sprintf("line %d", k), }) + require.False(t, dup) require.NoError(t, err) } s.chunks = append(s.chunks, chunkDesc{chunk: chunk}) @@ -263,6 +345,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) s.highestTs = time.Now() @@ -301,6 +384,7 @@ func TestUnorderedPush(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) for _, x := range []struct { @@ -403,6 +487,7 @@ func TestPushRateLimit(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) entries := []logproto.Entry{ @@ -443,6 +528,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) entries := []logproto.Entry{ @@ -482,6 +568,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) base := time.Now() @@ -532,7 +619,7 @@ func Benchmark_PushStream(b *testing.B) { limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) chunkfmt, headfmt := defaultChunkFormat(b) - s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil) + s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil, nil) expr, err := syntax.ParseLogSelector(`{namespace="loki-dev"}`, true) require.NoError(b, err) t, err := newTailer("foo", expr, &fakeTailServer{}, 10) @@ -566,3 +653,11 @@ func defaultChunkFormat(t testing.TB) (byte, chunkenc.HeadBlockFmt) { return chunkfmt, headfmt } + +type providerMock struct { + tenantConfig func(string) *runtime.Config +} + +func (m *providerMock) TenantConfig(userID string) *runtime.Config { + return m.tenantConfig(userID) +} diff --git a/pkg/ingester/streams_map_test.go b/pkg/ingester/streams_map_test.go index d98369ff152a..b14b3e07e497 100644 --- a/pkg/ingester/streams_map_test.go +++ b/pkg/ingester/streams_map_test.go @@ -31,6 +31,7 @@ func TestStreamsMap(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ), newStream( chunkfmt, @@ -46,6 +47,7 @@ func TestStreamsMap(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ), } var s *stream diff --git a/pkg/loki/runtime_config_test.go b/pkg/loki/runtime_config_test.go index 36126841dc47..81081a856ca2 100644 --- a/pkg/loki/runtime_config_test.go +++ b/pkg/loki/runtime_config_test.go @@ -91,8 +91,12 @@ configs: "1": log_push_request: false limited_log_push_errors: false + log_duplicate_metrics: false + log_duplicate_stream_info: false "2": log_push_request: true + log_duplicate_metrics: true + log_duplicate_stream_info: true `) tenantConfigs, err := runtime.NewTenantConfigs(runtimeGetter) @@ -104,6 +108,12 @@ configs: require.Equal(t, true, tenantConfigs.LogPushRequest("2")) require.Equal(t, true, tenantConfigs.LimitedLogPushErrors("3")) require.Equal(t, false, tenantConfigs.LogPushRequest("3")) + require.Equal(t, false, tenantConfigs.LogDuplicateMetrics("1")) + require.Equal(t, true, tenantConfigs.LogDuplicateMetrics("2")) + require.Equal(t, false, tenantConfigs.LogDuplicateMetrics("3")) + require.Equal(t, false, tenantConfigs.LogDuplicateStreamInfo("1")) + require.Equal(t, true, tenantConfigs.LogDuplicateStreamInfo("2")) + require.Equal(t, false, tenantConfigs.LogDuplicateStreamInfo("3")) } func newTestRuntimeconfig(t *testing.T, yaml string) runtime.TenantConfigProvider { diff --git a/pkg/runtime/config.go b/pkg/runtime/config.go index 85f8dc3d81b4..1655789dae71 100644 --- a/pkg/runtime/config.go +++ b/pkg/runtime/config.go @@ -5,9 +5,11 @@ import ( ) type Config struct { - LogStreamCreation bool `yaml:"log_stream_creation"` - LogPushRequest bool `yaml:"log_push_request"` - LogPushRequestStreams bool `yaml:"log_push_request_streams"` + LogStreamCreation bool `yaml:"log_stream_creation"` + LogPushRequest bool `yaml:"log_push_request"` + LogPushRequestStreams bool `yaml:"log_push_request_streams"` + LogDuplicateMetrics bool `yaml:"log_duplicate_metrics"` + LogDuplicateStreamInfo bool `yaml:"log_duplicate_stream_info"` // LimitedLogPushErrors is to be implemented and will allow logging push failures at a controlled pace. LimitedLogPushErrors bool `yaml:"limited_log_push_errors"` @@ -18,6 +20,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.LogStreamCreation, "operation-config.log-stream-creation", false, "Log every new stream created by a push request (very verbose, recommend to enable via runtime config only).") f.BoolVar(&cfg.LogPushRequest, "operation-config.log-push-request", false, "Log every push request (very verbose, recommend to enable via runtime config only).") f.BoolVar(&cfg.LogPushRequestStreams, "operation-config.log-push-request-streams", false, "Log every stream in a push request (very verbose, recommend to enable via runtime config only).") + f.BoolVar(&cfg.LogDuplicateMetrics, "operation-config.log-duplicate-metrics", false, "Log metrics for duplicate lines received.") + f.BoolVar(&cfg.LogDuplicateStreamInfo, "operation-config.log-duplicate-stream-info", false, "Log stream info for duplicate lines received") f.BoolVar(&cfg.LimitedLogPushErrors, "operation-config.limited-log-push-errors", true, "Log push errors with a rate limited logger, will show client push errors without overly spamming logs.") } @@ -94,6 +98,14 @@ func (o *TenantConfigs) LogPushRequestStreams(userID string) bool { return o.getOverridesForUser(userID).LogPushRequestStreams } +func (o *TenantConfigs) LogDuplicateMetrics(userID string) bool { + return o.getOverridesForUser(userID).LogDuplicateMetrics +} + +func (o *TenantConfigs) LogDuplicateStreamInfo(userID string) bool { + return o.getOverridesForUser(userID).LogDuplicateStreamInfo +} + func (o *TenantConfigs) LimitedLogPushErrors(userID string) bool { return o.getOverridesForUser(userID).LimitedLogPushErrors } diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index 7685faaa9242..9bef1ab2ca20 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -101,7 +101,7 @@ func TestTokenizerPopulate(t *testing.T) { sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) - _ = memChunk.Append(&push.Entry{ + _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, }) @@ -140,7 +140,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) - _ = memChunk.Append(&push.Entry{ + _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, }) @@ -174,7 +174,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { func chunkRefItrFromLines(lines ...string) (iter.EntryIterator, error) { memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) for i, line := range lines { - if err := memChunk.Append(&push.Entry{ + if _, err := memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, int64(i)), Line: line, }); err != nil { @@ -261,7 +261,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) - _ = memChunk.Append(&push.Entry{ + _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, }) diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 23550dd34965..c6ab61666b88 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -36,7 +36,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) - err := cs.Append(&logproto.Entry{ + _, err := cs.Append(&logproto.Entry{ Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts), }) diff --git a/pkg/storage/chunk/client/testutils/testutils.go b/pkg/storage/chunk/client/testutils/testutils.go index 2b35b612badc..b34e75a6a166 100644 --- a/pkg/storage/chunk/client/testutils/testutils.go +++ b/pkg/storage/chunk/client/testutils/testutils.go @@ -89,7 +89,7 @@ func DummyChunkFor(from, through model.Time, metric labels.Labels) chunk.Chunk { cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) for ts := from; ts <= through; ts = ts.Add(15 * time.Second) { - err := cs.Append(&logproto.Entry{Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts)}) + _, err := cs.Append(&logproto.Entry{Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts)}) if err != nil { panic(err) } diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index 902b0dae1d74..03efc9afdc80 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -314,7 +314,7 @@ func makeChunks(now time.Time, tpls ...c) []chunk.Chunk { memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) // To make sure the fetcher doesn't swap keys and buffers each chunk is built with different, but deterministic data for i := 0; i < from; i++ { - _ = memChk.Append(&logproto.Entry{ + _, _ = memChk.Append(&logproto.Entry{ Timestamp: time.Unix(int64(i), 0), Line: fmt.Sprintf("line ts=%d", i), }) diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index f85e44a41ac5..74257a8ba6ad 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -110,7 +110,7 @@ func fillStore(cm storage.ClientMetrics) error { Line: randString(250), } if chunkEnc.SpaceFor(entry) { - _ = chunkEnc.Append(entry) + _, _ = chunkEnc.Append(entry) } else { from, to := chunkEnc.Bounds() c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc, 0, 0), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano())) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 51f04538cc18..13bcaa9688a9 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2049,7 +2049,9 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) { }, } } - require.NoError(t, chunkEnc.Append(&entry)) + dup, err := chunkEnc.Append(&entry) + require.False(t, dup) + require.NoError(t, err) } require.NoError(t, chunkEnc.Close()) diff --git a/pkg/storage/stores/series/series_store_test.go b/pkg/storage/stores/series/series_store_test.go index 15ecb1623eeb..553ea945f94f 100644 --- a/pkg/storage/stores/series/series_store_test.go +++ b/pkg/storage/stores/series/series_store_test.go @@ -755,7 +755,8 @@ func dummyChunkWithFormat(t testing.TB, now model.Time, metric labels.Labels, fo chk := chunkenc.NewMemChunk(format, chunkenc.EncGZIP, headfmt, 256*1024, 0) for i := 0; i < samples; i++ { ts := time.Duration(i) * 15 * time.Second - err := chk.Append(&logproto.Entry{Timestamp: chunkStart.Time().Add(ts), Line: fmt.Sprintf("line %d", i)}) + dup, err := chk.Append(&logproto.Entry{Timestamp: chunkStart.Time().Add(ts), Line: fmt.Sprintf("line %d", i)}) + require.False(t, dup) require.NoError(t, err) } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go index 25ccb52e9b18..6f1b0326a5cc 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go @@ -34,10 +34,12 @@ func createChunk(t testing.TB, chunkFormat byte, headBlockFmt chunkenc.HeadBlock chunkEnc := chunkenc.NewMemChunk(chunkFormat, chunkenc.EncSnappy, headBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { - require.NoError(t, chunkEnc.Append(&logproto.Entry{ + dup, err := chunkEnc.Append(&logproto.Entry{ Timestamp: ts.Time(), Line: ts.String(), - })) + }) + require.False(t, dup) + require.NoError(t, err) } require.NoError(t, chunkEnc.Close()) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 7c325cc4da6b..5ef02e74b1ca 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -110,7 +110,7 @@ func newChunk(chunkFormat byte, headBlockFmt chunkenc.HeadBlockFmt, stream logpr from, through := loki_util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp) chk := chunkenc.NewMemChunk(chunkFormat, chunkenc.EncGZIP, headBlockFmt, 256*1024, 0) for _, e := range stream.Entries { - _ = chk.Append(&e) + _, _ = chk.Append(&e) } chk.Close() c := chunk.NewChunk("fake", client.Fingerprint(lbs), lbs, chunkenc.NewFacade(chk, 0, 0), from, through)