Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Collect duplicate log line metrics #13084

Merged
merged 21 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
774f2bd
First pass at getting duplicate log line metrics to be outputted via …
paul1r May 15, 2024
1e63ac1
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r May 30, 2024
40063ab
WIP: PR comments: remove err, update metric name, put metric stuff in…
paul1r May 30, 2024
00b67f0
Refactor so we can get tenant information and such in the unordered h…
paul1r May 30, 2024
3b40971
make fmt
paul1r May 30, 2024
471059f
Remove commented out code from first pass
paul1r May 30, 2024
2070530
Pass metrics and such into MemChunk and UnorderedHeadBlock
paul1r May 31, 2024
faaa10a
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 6, 2024
074f1f9
Move back to a variant of version 1, using a bool instead of an err
paul1r Jun 7, 2024
5c5e631
Add a comment about what the new return val is
paul1r Jun 7, 2024
788b274
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 7, 2024
c80a90f
Fix removal of return val
paul1r Jun 7, 2024
6f321d4
Lint
paul1r Jun 7, 2024
3a43546
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 17, 2024
19cc158
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 20, 2024
ae10548
Rework writefailures logic to be easier to read
paul1r Jun 20, 2024
07a82f0
Break out logging of duplicate metrics/log into a function
paul1r Jun 20, 2024
6b6d3d6
Don't make a nested if, break out early if no tenant configsZ
paul1r Jun 20, 2024
1f33c60
Add size to duplicate log line info
paul1r Jun 20, 2024
982ebf6
PR review comments, add comments about what each implementation may r…
paul1r Jun 26, 2024
00f9b82
Merge branch 'main' into paul1r/add_metrics_for_duplicate_log_line_bytes
paul1r Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3789,6 +3789,14 @@ These are values which allow you to control aspects of Loki's operation, most co
# CLI flag: -operation-config.log-push-request-streams
[log_push_request_streams: <boolean> | default = false]

# Log metrics for duplicate lines received.
# CLI flag: -operation-config.log-duplicate-metrics
[log_duplicate_metrics: <boolean> | default = false]

# Log stream info for duplicate lines received
# CLI flag: -operation-config.log-duplicate-stream-info
[log_duplicate_stream_info: <boolean> | default = false]
Comment on lines +3808 to +3814
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: I think it makes sense to unify both into a single config.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i would leave these separate, I think generally just the metric is useful?


# Log push errors with a rate limited logger, will show client push errors
# without overly spamming logs.
# CLI flag: -operation-config.limited-log-push-errors
Expand Down
8 changes: 4 additions & 4 deletions pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool {
return len(c.entries) < tmpNumEntries
}

func (c *dumbChunk) Append(entry *logproto.Entry) error {
func (c *dumbChunk) Append(entry *logproto.Entry) (bool, error) {
if len(c.entries) == tmpNumEntries {
return ErrChunkFull
return false, ErrChunkFull
}

if len(c.entries) > 0 && c.entries[len(c.entries)-1].Timestamp.After(entry.Timestamp) {
return ErrOutOfOrder
return false, ErrOutOfOrder
}

c.entries = append(c.entries, *entry)
return nil
return false, nil
}

func (c *dumbChunk) Size() int {
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func SupportedEncoding() string {
type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
// Append returns true if the entry appended was a duplicate
Append(*logproto.Entry) (bool, error)
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
// Returns the list of blocks in the chunks.
Expand Down
23 changes: 12 additions & 11 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ func (hb *headBlock) Reset() {

func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt }

func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error {
func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) (bool, error) {
if !hb.IsEmpty() && hb.maxt > ts {
return ErrOutOfOrder
return false, ErrOutOfOrder
}

hb.entries = append(hb.entries, entry{t: ts, s: line})
Expand All @@ -193,7 +193,7 @@ func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error {
hb.maxt = ts
hb.size += len(line)

return nil
return false, nil
}

func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) {
Expand Down Expand Up @@ -340,7 +340,7 @@ func (hb *headBlock) Convert(version HeadBlockFmt, symbolizer *symbolizer) (Head
out := version.NewBlock(symbolizer)

for _, e := range hb.entries {
if err := out.Append(e.t, e.s, e.structuredMetadata); err != nil {
if _, err := out.Append(e.t, e.s, e.structuredMetadata); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -834,27 +834,28 @@ func (c *MemChunk) Utilization() float64 {
}

// Append implements Chunk.
func (c *MemChunk) Append(entry *logproto.Entry) error {
func (c *MemChunk) Append(entry *logproto.Entry) (bool, error) {
entryTimestamp := entry.Timestamp.UnixNano()

// If the head block is empty but there are cut blocks, we have to make
// sure the new entry is not out of order compared to the previous block
if c.headFmt < UnorderedHeadBlockFmt && c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
return ErrOutOfOrder
return false, ErrOutOfOrder
}

if c.format < ChunkFormatV4 {
entry.StructuredMetadata = nil
}
if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)); err != nil {
return err
dup, err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata))
if err != nil {
return dup, err
}

if c.head.UncompressedSize() >= c.blockSize {
return c.cut()
return false, c.cut()
}

return nil
return dup, nil
}

// Close implements Chunk.
Expand Down Expand Up @@ -1122,7 +1123,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
if filter != nil && filter(entry.Timestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)...) {
continue
}
if err := newChunk.Append(&entry); err != nil {
if _, err := newChunk.Append(&entry); err != nil {
return nil, err
}
}
Expand Down
Loading
Loading