Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): Blooms/v2 encoding multipart series #13093

Merged
merged 39 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
18d6cc5
SeriesWithOffsets encoding
owen-d May 13, 2024
c9fb232
removes unused fn
owen-d May 13, 2024
c65c466
[wip] adds v2, decodes multipart blooms
owen-d May 14, 2024
e7dc0d1
[wip] mergebuilder multipart bloom integration
owen-d May 15, 2024
3f1edcf
fusedquerier supports multipart blooms
owen-d May 15, 2024
0a248ff
sbf library max size support
owen-d May 16, 2024
9b488f0
[wip] working on multipart bloom tokenizer population
owen-d May 16, 2024
8bf00bc
tokenizer populate multipart blooms + removes tokenType dimension fro…
owen-d May 17, 2024
c8160a3
[wip] work on adjusting tests for v2 bloom fmt
owen-d May 17, 2024
c6ee9a3
[wip] BlockQuerierIter impl
owen-d May 28, 2024
1fab355
test alignment
owen-d May 28, 2024
c297404
encoding fixes
owen-d May 29, 2024
06c828f
small fixes + test updates
owen-d May 29, 2024
f63b2d1
bloomtokenizer retries lines across blooms when full
owen-d May 29, 2024
2ce12be
fused querier handling of multipage blooms
owen-d May 29, 2024
01e2fa1
clears tokenizer cache across multipart blooms + re-allows short-circ…
owen-d May 30, 2024
27895a4
bloom populate fix when there are no pre-existing blooms
owen-d May 30, 2024
2c0f050
fingerprint collision test
owen-d May 30, 2024
c38cb3e
exports some v1 bloom fields for reuse
owen-d May 31, 2024
4d2f648
specific type for bloom version
owen-d May 31, 2024
bbfc8e2
bloomcompactor integration to new bloom lib changes
owen-d May 31, 2024
fb93b7c
versioned builder init
owen-d May 31, 2024
4b25889
remove old (unused) bloom tester pkg
owen-d May 31, 2024
25be717
rearranging builders
owen-d May 31, 2024
33f8829
re-add v1 impl
owen-d May 31, 2024
cf5a66d
version specific type & testware for versioned builders
owen-d May 31, 2024
b00c419
Merge remote-tracking branch 'upstream/main' into blooms/v2-encoding-…
owen-d May 31, 2024
f29daf5
make format
owen-d May 31, 2024
4625589
method rename adjustments
owen-d May 31, 2024
be6f549
addtl linting
owen-d May 31, 2024
c2a1ae8
version specific offset encoding impls
owen-d Jun 3, 2024
5c6d4ae
metric integration
owen-d Jun 4, 2024
a202fe4
Merge remote-tracking branch 'upstream/main' into blooms/v2-encoding-…
owen-d Jun 4, 2024
b682a8c
integrates to new bloombuild pkg
owen-d Jun 4, 2024
fa51dc6
handles meta expiry when no sources present
owen-d Jun 4, 2024
8a12055
bloomshipper no longer returns empty metas on fetch
owen-d Jun 4, 2024
59bf6a4
better chunk inclusion calculations in bloomgw
owen-d Jun 5, 2024
80ed22b
chunks skipped instrumentation
owen-d Jun 6, 2024
fe03194
CI job bump
owen-d Jun 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions pkg/bloombuild/builder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ func newBatchedBlockLoader(
}

// compiler checks
var _ v1.Iterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.CloseableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.ResettableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.Iterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
var _ v1.CloseableIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
var _ v1.ResettableIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}

// TODO(chaudum): testware
func newBlockLoadingIter(ctx context.Context, blocks []bloomshipper.BlockRef, fetcher FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier], batchSize int) *blockLoadingIter {
Expand All @@ -196,13 +196,13 @@ type blockLoadingIter struct {
// internals
initialized bool
err error
iter v1.Iterator[*v1.SeriesWithBloom]
iter v1.Iterator[*v1.SeriesWithBlooms]
loader *batchedLoader[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier, *bloomshipper.CloseableBlockQuerier]
loaded map[io.Closer]struct{}
}

// At implements v1.Iterator.
func (i *blockLoadingIter) At() *v1.SeriesWithBloom {
func (i *blockLoadingIter) At() *v1.SeriesWithBlooms {
if !i.initialized {
panic("iterator not initialized")
}
Expand All @@ -229,7 +229,7 @@ func (i *blockLoadingIter) init() {
i.overlapping = overlappingBlocksIter(i.inputs)

// set initial iter
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()

// set "match all" filter function if not present
if i.filter == nil {
Expand All @@ -249,22 +249,22 @@ func (i *blockLoadingIter) loadNext() bool {
loader := newBatchedBlockLoader(i.ctx, i.fetcher, blockRefs, i.batchSize)
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)

iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs))
iters := make([]v1.PeekingIterator[*v1.SeriesWithBlooms], 0, len(blockRefs))
for filtered.Next() {
bq := filtered.At()
i.loaded[bq] = struct{}{}
iter, err := bq.SeriesIter()
if err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
return false
}
iters = append(iters, iter)
}

if err := filtered.Err(); err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
return false
}

Expand All @@ -278,12 +278,12 @@ func (i *blockLoadingIter) loadNext() bool {
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
i.iter = v1.NewDedupingIter[*v1.SeriesWithBloom, *v1.SeriesWithBloom](
func(a, b *v1.SeriesWithBloom) bool {
i.iter = v1.NewDedupingIter[*v1.SeriesWithBlooms, *v1.SeriesWithBlooms](
func(a, b *v1.SeriesWithBlooms) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
v1.Identity[*v1.SeriesWithBloom],
func(a, b *v1.SeriesWithBloom) *v1.SeriesWithBloom {
v1.Identity[*v1.SeriesWithBlooms],
func(a, b *v1.SeriesWithBlooms) *v1.SeriesWithBlooms {
if len(a.Series.Chunks) > len(b.Series.Chunks) {
return a
}
Expand All @@ -294,7 +294,7 @@ func (i *blockLoadingIter) loadNext() bool {
return i.iter.Next()
}

i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
i.err = i.overlapping.Err()
return false
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/bloombuild/builder/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -208,3 +209,12 @@ func TestOverlappingBlocksIter(t *testing.T) {
})
}
}

func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
bounds := v1.NewBounds(min, max)
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
Bounds: bounds,
},
}
}
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (b *Builder) loadWorkForGap(
tenant string,
id tsdb.Identifier,
gap protos.GapWithBlocks,
) (v1.Iterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBloom], error) {
) (v1.Iterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBlooms], error) {
// load a series iterator for the gap
seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds)
if err != nil {
Expand Down
57 changes: 21 additions & 36 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -45,7 +44,7 @@ type SimpleBloomGenerator struct {
userID string
store v1.Iterator[*v1.Series]
chunkLoader ChunkLoader
blocksIter v1.ResettableIterator[*v1.SeriesWithBloom]
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms]

// options to build blocks with
opts v1.BlockOptions
Expand All @@ -68,7 +67,7 @@ func NewSimpleBloomGenerator(
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocksIter v1.ResettableIterator[*v1.SeriesWithBloom],
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
reporter func(model.Fingerprint),
metrics *Metrics,
Expand Down Expand Up @@ -98,44 +97,30 @@ func NewSimpleBloomGenerator(
}
}

func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
return func(series *v1.Series, bloom *v1.Bloom) (int, bool, error) {
start := time.Now()
func (s *SimpleBloomGenerator) populator(ctx context.Context) v1.BloomPopulatorFunc {
return func(
series *v1.Series,
srcBlooms v1.SizedIterator[*v1.Bloom],
toAdd v1.ChunkRefs,
ch chan *v1.BloomCreation,
) {
level.Debug(s.logger).Log(
"msg", "populating bloom filter",
"stage", "before",
"fp", series.Fingerprint,
"chunks", len(series.Chunks),
)
chunkItersWithFP, err := s.chunkLoader.Load(ctx, s.userID, series)
if err != nil {
return 0, false, errors.Wrapf(err, "failed to load chunks for series: %+v", series)
}

bytesAdded, skip, err := s.tokenizer.Populate(
&v1.SeriesWithBloom{
Series: series,
Bloom: bloom,
},
chunkItersWithFP.itr,
)
chunkItersWithFP := s.chunkLoader.Load(ctx, s.userID, &v1.Series{
Fingerprint: series.Fingerprint,
Chunks: toAdd,
})

level.Debug(s.logger).Log(
"msg", "populating bloom filter",
"stage", "after",
"fp", series.Fingerprint,
"chunks", len(series.Chunks),
"series_bytes", bytesAdded,
"duration", time.Since(start),
"err", err,
)
s.tokenizer.Populate(srcBlooms, chunkItersWithFP.itr, ch)

if s.reporter != nil {
s.reporter(series.Fingerprint)
}
return bytesAdded, skip, err
}

}

func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIterator {
Expand Down Expand Up @@ -179,10 +164,10 @@ type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate func(*v1.Series, *v1.Bloom) (int, bool, error)
populate v1.BloomPopulatorFunc
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
blocks v1.ResettableIterator[*v1.SeriesWithBloom]
blocks v1.ResettableIterator[*v1.SeriesWithBlooms]

bytesAdded int
curr *v1.Block
Expand All @@ -193,10 +178,10 @@ func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *Metrics,
populate func(*v1.Series, *v1.Bloom) (int, bool, error),
populate v1.BloomPopulatorFunc,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
blocks v1.ResettableIterator[*v1.SeriesWithBloom],
blocks v1.ResettableIterator[*v1.SeriesWithBlooms],
) *LazyBlockBuilderIterator {
return &LazyBlockBuilderIterator{
ctx: ctx,
Expand Down Expand Up @@ -270,7 +255,7 @@ type ChunkItersByFingerprint struct {

// ChunkLoader loads chunks from a store
type ChunkLoader interface {
Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error)
Load(ctx context.Context, userID string, series *v1.Series) *ChunkItersByFingerprint
}

// StoreChunkLoader loads chunks from a store
Expand All @@ -286,7 +271,7 @@ func NewStoreChunkLoader(fetcherProvider stores.ChunkFetcherProvider, metrics *M
}
}

func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error) {
func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.Series) *ChunkItersByFingerprint {
// NB(owen-d): This is probably unnecessary as we should only have one fetcher
// because we'll only be working on a single index period at a time, but this should protect
// us in the case of refactoring/changing this and likely isn't a perf bottleneck.
Expand Down Expand Up @@ -317,5 +302,5 @@ func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.S
return &ChunkItersByFingerprint{
fp: series.Fingerprint,
itr: newBatchedChunkLoader(ctx, fetchers, inputs, s.metrics, batchedLoaderDefaultBatchSize),
}, nil
}
}
30 changes: 11 additions & 19 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBloom, refs []bloomshipper.BlockRef) {
func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
return blocksFromSchemaWithRange(t, n, options, 0, 0xffff)
}

// splits 100 series across `n` non-overlapping blocks.
// uses options to build blocks with.
func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBloom, refs []bloomshipper.BlockRef) {
func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
if 100%n != 0 {
panic("100 series must be evenly divisible by n")
}

numSeries := 100
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, 0, fromFP, throughFp, 0, 10000)
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, fromFP, throughFp, 0, 10000)

seriesPerBlock := numSeries / n

Expand All @@ -46,7 +46,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro

minIdx, maxIdx := i*seriesPerBlock, (i+1)*seriesPerBlock

itr := v1.NewSliceIter[v1.SeriesWithBloom](data[minIdx:maxIdx])
itr := v1.NewSliceIter[v1.SeriesWithBlooms](data[minIdx:maxIdx])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

Expand All @@ -62,11 +62,11 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
// doesn't actually load any chunks
type dummyChunkLoader struct{}

func (dummyChunkLoader) Load(_ context.Context, _ string, series *v1.Series) (*ChunkItersByFingerprint, error) {
func (dummyChunkLoader) Load(_ context.Context, _ string, series *v1.Series) *ChunkItersByFingerprint {
return &ChunkItersByFingerprint{
fp: series.Fingerprint,
itr: v1.NewEmptyIter[v1.ChunkRefWithIter](),
}, nil
}
}

func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block, refs []bloomshipper.BlockRef) *SimpleBloomGenerator {
Expand Down Expand Up @@ -132,9 +132,9 @@ func TestSimpleBloomGenerator(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
sourceBlocks, data, refs := blocksFromSchemaWithRange(t, 2, tc.fromSchema, 0x00000, 0x6ffff)
storeItr := v1.NewMapIter[v1.SeriesWithBloom, *v1.Series](
v1.NewSliceIter[v1.SeriesWithBloom](data),
func(swb v1.SeriesWithBloom) *v1.Series {
storeItr := v1.NewMapIter[v1.SeriesWithBlooms, *v1.Series](
v1.NewSliceIter[v1.SeriesWithBlooms](data),
func(swb v1.SeriesWithBlooms) *v1.Series {
return swb.Series
},
)
Expand All @@ -150,9 +150,9 @@ func TestSimpleBloomGenerator(t *testing.T) {

// Check all the input series are present in the output blocks.
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize)
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand All @@ -164,13 +164,5 @@ func TestSimpleBloomGenerator(t *testing.T) {
})
}
}
}

func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
bounds := v1.NewBounds(min, max)
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
Bounds: bounds,
},
}
}
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (p *Planner) loadWork(
if err != nil {
return nil, fmt.Errorf("error loading tenants: %w", err)
}
level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Len())
level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Remaining())

for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
p.metrics.tenantsDiscovered.Inc()
Expand Down
Loading
Loading