diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index 01c0216c36f0..c539f7c27193 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -45,6 +45,11 @@ func (it *LazyBloomIter) ensureInit() { func (it *LazyBloomIter) Seek(offset BloomOffset) { it.ensureInit() + // reset error from any previous seek/next that yield pages too large + if errors.Is(it.err, ErrPageTooLarge) { + it.err = nil + } + // if we need a different page or the current page hasn't been loaded, // load the desired page if it.curPageIndex != offset.Page || it.curPage == nil { diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 610a4ed7e6b8..81e646d96968 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -3,14 +3,17 @@ package v1 import ( "bytes" "context" + "fmt" "sync" "testing" "github.com/go-kit/log" "github.com/grafana/dskit/concurrency" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" ) // TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't @@ -44,6 +47,18 @@ func TestFusedQuerier(t *testing.T) { numSeries := 1000 data, keys := MkBasicSeriesWithBlooms(numSeries, 0, 0x0000, 0xffff, 0, 10000) + // Make the first and third series blooms too big to fit into a single page so we skip them while reading + for i := 0; i < 10000; i++ { + tokenizer := NewNGramTokenizer(4, 0) + line := fmt.Sprintf("%04x:%04x", i, i+1) + it := tokenizer.Tokens(line) + for it.Next() { + key := it.At() + data[0].Bloom.Add(key) + data[2].Bloom.Add(key) + } + } + builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ @@ -130,6 +145,92 @@ func TestFusedQuerier(t *testing.T) { } } +func TestLazyBloomIter_Seek_ResetError(t *testing.T) { + // references for linking in memory reader+writer + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + + numSeries := 4 + data := make([]SeriesWithBloom, 0, numSeries) + tokenizer := NewNGramTokenizer(4, 0) + for i := 0; i < numSeries; i++ { + var series Series + series.Fingerprint = model.Fingerprint(i) + series.Chunks = []ChunkRef{ + { + From: 0, + Through: 100, + Checksum: uint32(i), + }, + } + + var bloom Bloom + bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8) + + nLines := 10 + if i == 0 || i == 2 { + // Add enough lines to make the bloom page too large for series 1 + nLines = 10000 + } + + for j := 0; j < nLines; j++ { + line := fmt.Sprintf("%04x:%04x", i, j) + it := tokenizer.Tokens(line) + for it.Next() { + key := it.At() + bloom.Add(key) + } + } + + data = append(data, SeriesWithBloom{ + Series: &series, + Bloom: &bloom, + }) + } + + builder, err := NewBlockBuilder( + BlockOptions{ + Schema: Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncSnappy, + }, + SeriesPageSize: 100, + BloomPageSize: 10, // So we force one series per page + }, + writer, + ) + require.Nil(t, err) + itr := NewSliceIter[SeriesWithBloom](data) + _, err = builder.BuildFrom(itr) + require.NoError(t, err) + require.False(t, itr.Next()) + block := NewBlock(reader, NewMetrics(nil)) + + querier := NewBlockQuerier(block, true, 1000) + + for fp := model.Fingerprint(0); fp < model.Fingerprint(numSeries); fp++ { + err := querier.Seek(fp) + require.NoError(t, err) + + require.True(t, querier.series.Next()) + series := querier.series.At() + require.Equal(t, fp, series.Fingerprint) + + querier.blooms.Seek(series.Offset) + + if fp == 0 || fp == 2 { + require.False(t, querier.blooms.Next()) + require.Error(t, querier.blooms.Err()) + continue + } + + require.True(t, querier.blooms.Next()) + require.NoError(t, querier.blooms.Err()) + } +} + func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Output) { indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil)