Skip to content

Commit

Permalink
feat(blooms): Add counter metric for blocks that are not available at…
Browse files Browse the repository at this point in the history
… query time (#12968)

When filtering chunks on the bloom gateway, bloom block may not be available and they will be downloaded asynchronously in the background.

This new metric `loki_bloom_gateway_blocks_not_available_total` counts the blocks that are not available at query time.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed May 28, 2024
1 parent 1432a3e commit d6374bc
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 16 deletions.
19 changes: 10 additions & 9 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk

g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())

var preFilterSeries, preFilterChunks int

preFilterSeries = len(req.Refs)
for _, series := range req.Refs {
preFilterChunks += len(series.Refs)
}

// Ideally we could use an unbuffered channel here, but since we return the
// request on the first error, there can be cases where the request context
// is not done yet and the consumeTask() function wants to send to the
Expand All @@ -316,13 +323,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk

remaining := len(tasks)

preFilterSeries := len(req.Refs)
var preFilterChunks, postFilterChunks int

for _, series := range req.Refs {
preFilterChunks += len(series.Refs)
}

combinedRecorder := v1.NewBloomRecorder(ctx, "combined")
for remaining > 0 {
select {
Expand Down Expand Up @@ -353,11 +353,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
responsesPool.Put(resp)
}

postFilterSeries := len(filtered)

var postFilterSeries, postFilterChunks int
postFilterSeries = len(filtered)
for _, group := range filtered {
postFilterChunks += len(group.Refs)
}

g.metrics.requestedSeries.Observe(float64(preFilterSeries))
g.metrics.filteredSeries.Observe(float64(preFilterSeries - postFilterSeries))
g.metrics.requestedChunks.Observe(float64(preFilterChunks))
Expand Down
19 changes: 13 additions & 6 deletions pkg/bloomgateway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,13 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str
}

type workerMetrics struct {
dequeueDuration *prometheus.HistogramVec
queueDuration *prometheus.HistogramVec
processDuration *prometheus.HistogramVec
tasksDequeued *prometheus.CounterVec
tasksProcessed *prometheus.CounterVec
blockQueryLatency *prometheus.HistogramVec
dequeueDuration *prometheus.HistogramVec
queueDuration *prometheus.HistogramVec
processDuration *prometheus.HistogramVec
tasksDequeued *prometheus.CounterVec
tasksProcessed *prometheus.CounterVec
blocksNotAvailable *prometheus.CounterVec
blockQueryLatency *prometheus.HistogramVec
}

func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics {
Expand Down Expand Up @@ -158,6 +159,12 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Name: "tasks_processed_total",
Help: "Total amount of tasks that the worker processed",
}, append(labels, "status")),
blocksNotAvailable: r.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "blocks_not_available_total",
Help: "Total amount of blocks that have been skipped because they were not found or not downloaded yet",
}, labels),
blockQueryLatency: r.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.Close
return concurrency.ForEachJob(ctx, len(bqs), p.concurrency, func(ctx context.Context, i int) error {
bq := bqs[i]
if bq == nil {
// TODO(chaudum): Add metric for skipped blocks
p.metrics.blocksNotAvailable.WithLabelValues(p.id).Inc()
return nil
}

Expand Down

0 comments on commit d6374bc

Please sign in to comment.