Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Reset error on LazyBloomIter.Seek #12806

Merged
merged 2 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (it *LazyBloomIter) ensureInit() {
func (it *LazyBloomIter) Seek(offset BloomOffset) {
it.ensureInit()

// reset error from any previous seek/next that yield pages too large
if errors.Is(it.err, ErrPageTooLarge) {
it.err = nil
}

// if we need a different page or the current page hasn't been loaded,
// load the desired page
if it.curPageIndex != offset.Page || it.curPage == nil {
Expand Down
101 changes: 101 additions & 0 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package v1
import (
"bytes"
"context"
"fmt"
"sync"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
)

// TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't
Expand Down Expand Up @@ -44,6 +47,18 @@ func TestFusedQuerier(t *testing.T) {
numSeries := 1000
data, keys := MkBasicSeriesWithBlooms(numSeries, 0, 0x0000, 0xffff, 0, 10000)

// Make the first and third series blooms too big to fit into a single page so we skip them while reading
for i := 0; i < 10000; i++ {
tokenizer := NewNGramTokenizer(4, 0)
line := fmt.Sprintf("%04x:%04x", i, i+1)
it := tokenizer.Tokens(line)
for it.Next() {
key := it.At()
data[0].Bloom.Add(key)
data[2].Bloom.Add(key)
}
}

builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
Expand Down Expand Up @@ -130,6 +145,92 @@ func TestFusedQuerier(t *testing.T) {
}
}

func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
// references for linking in memory reader+writer
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)

numSeries := 4
data := make([]SeriesWithBloom, 0, numSeries)
tokenizer := NewNGramTokenizer(4, 0)
for i := 0; i < numSeries; i++ {
var series Series
series.Fingerprint = model.Fingerprint(i)
series.Chunks = []ChunkRef{
{
From: 0,
Through: 100,
Checksum: uint32(i),
},
}

var bloom Bloom
bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8)

nLines := 10
if i == 0 || i == 2 {
// Add enough lines to make the bloom page too large for series 1
nLines = 10000
}

for j := 0; j < nLines; j++ {
line := fmt.Sprintf("%04x:%04x", i, j)
it := tokenizer.Tokens(line)
for it.Next() {
key := it.At()
bloom.Add(key)
}
}

data = append(data, SeriesWithBloom{
Series: &series,
Bloom: &bloom,
})
}

builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
BloomPageSize: 10, // So we force one series per page
},
writer,
)
require.Nil(t, err)
itr := NewSliceIter[SeriesWithBloom](data)
_, err = builder.BuildFrom(itr)
require.NoError(t, err)
require.False(t, itr.Next())
block := NewBlock(reader, NewMetrics(nil))

querier := NewBlockQuerier(block, true, 1000)

for fp := model.Fingerprint(0); fp < model.Fingerprint(numSeries); fp++ {
err := querier.Seek(fp)
require.NoError(t, err)

require.True(t, querier.series.Next())
series := querier.series.At()
require.Equal(t, fp, series.Fingerprint)

querier.blooms.Seek(series.Offset)

if fp == 0 || fp == 2 {
require.False(t, querier.blooms.Next())
require.Error(t, querier.blooms.Err())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check the error type here?

continue
}

require.True(t, querier.blooms.Next())
require.NoError(t, querier.blooms.Err())
}
}

func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Output) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
Expand Down
Loading