diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index f59f6501c94e..d30dce2f7775 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -333,6 +333,23 @@ bloom_build: [enabled: | default = false] planner: + # Interval at which to re-run the bloom creation planning. + # CLI flag: -bloom-build.planner.interval + [planning_interval: | default = 8h] + + # Newest day-table offset (from today, inclusive) to build blooms for. + # Increase to lower cost by not re-writing data to object storage too + # frequently since recent data changes more often at the cost of not having + # blooms available as quickly. + # CLI flag: -bloom-build.planner.min-table-offset + [min_table_offset: | default = 1] + + # Oldest day-table offset (from today, inclusive) to compact. This can be + # used to lower cost by not trying to compact older data which doesn't + # change. This can be optimized by aligning it with the maximum + # `reject_old_samples_max_age` setting of any tenant. + # CLI flag: -bloom-build.planner.max-table-offset + [max_table_offset: | default = 2] builder: @@ -3382,6 +3399,16 @@ shard_streams: # CLI flag: -bloom-compactor.max-bloom-size [bloom_compactor_max_bloom_size: | default = 128MB] +# Experimental. Whether to create blooms for the tenant. +# CLI flag: -bloom-build.enable +[bloom_creation_enabled: | default = false] + +# Experimental. Number of splits to create for the series keyspace when building +# blooms. The series keyspace is split into this many parts to parallelize bloom +# creation. +# CLI flag: -bloom-build.split-keyspace-by +[bloom_split_series_keyspace_by: | default = 256] + # Experimental. Length of the n-grams created when computing blooms from log # lines. # CLI flag: -bloom-compactor.ngram-length diff --git a/pkg/bloombuild/planner/config.go b/pkg/bloombuild/planner/config.go index dd8cb315d934..47b01c0b286e 100644 --- a/pkg/bloombuild/planner/config.go +++ b/pkg/bloombuild/planner/config.go @@ -1,21 +1,40 @@ package planner -import "flag" +import ( + "flag" + "fmt" + "time" +) // Config configures the bloom-planner component. type Config struct { - // TODO: Add config + PlanningInterval time.Duration `yaml:"planning_interval"` + MinTableOffset int `yaml:"min_table_offset"` + MaxTableOffset int `yaml:"max_table_offset"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. -func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) { - // TODO: Register flags with flagsPrefix +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.PlanningInterval, prefix+".interval", 8*time.Hour, "Interval at which to re-run the bloom creation planning.") + f.IntVar(&cfg.MinTableOffset, prefix+".min-table-offset", 1, "Newest day-table offset (from today, inclusive) to build blooms for. Increase to lower cost by not re-writing data to object storage too frequently since recent data changes more often at the cost of not having blooms available as quickly.") + // TODO(owen-d): ideally we'd set this per tenant based on their `reject_old_samples_max_age` setting, + // but due to how we need to discover tenants, we can't do that yet. Tenant+Period discovery is done by + // iterating the table periods in object storage and looking for tenants within that period. + // In order to have this done dynamically, we'd need to account for tenant specific overrides, which are also + // dynamically reloaded. + // I'm doing it the simple way for now. + f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.") } func (cfg *Config) Validate() error { + if cfg.MinTableOffset > cfg.MaxTableOffset { + return fmt.Errorf("min-table-offset (%d) must be less than or equal to max-table-offset (%d)", cfg.MinTableOffset, cfg.MaxTableOffset) + } + return nil } type Limits interface { - // TODO: Add limits + BloomCreationEnabled(tenantID string) bool + BloomSplitSeriesKeyspaceBy(tenantID string) int } diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go index e9a9035e14df..c0028237d9b1 100644 --- a/pkg/bloombuild/planner/metrics.go +++ b/pkg/bloombuild/planner/metrics.go @@ -8,10 +8,19 @@ import ( const ( metricsNamespace = "loki" metricsSubsystem = "bloomplanner" + + statusSuccess = "success" + statusFailure = "failure" ) type Metrics struct { running prometheus.Gauge + + buildStarted prometheus.Counter + buildCompleted *prometheus.CounterVec + buildTime *prometheus.HistogramVec + + tenantsDiscovered prometheus.Counter } func NewMetrics(r prometheus.Registerer) *Metrics { @@ -22,5 +31,32 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Name: "running", Help: "Value will be 1 if bloom planner is currently running on this instance", }), + + buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "build_started_total", + Help: "Total number of builds started", + }), + buildCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "build_completed_total", + Help: "Total number of builds completed", + }, []string{"status"}), + buildTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "build_time_seconds", + Help: "Time spent during a builds cycle.", + Buckets: prometheus.DefBuckets, + }, []string{"status"}), + + tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "tenants_discovered_total", + Help: "Number of tenants discovered during the current build iteration", + }), } } diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 7732d180b0bb..0be853a2f604 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -2,33 +2,63 @@ package planner import ( "context" + "fmt" + "sort" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/grafana/loki/v3/pkg/storage" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" utillog "github.com/grafana/loki/v3/pkg/util/log" ) type Planner struct { services.Service - cfg Config + cfg Config + limits Limits + schemaCfg config.SchemaConfig + + tsdbStore TSDBStore + bloomStore bloomshipper.Store + metrics *Metrics logger log.Logger } func New( cfg Config, + limits Limits, + schemaCfg config.SchemaConfig, + storeCfg storage.Config, + storageMetrics storage.ClientMetrics, + bloomStore bloomshipper.Store, logger log.Logger, r prometheus.Registerer, ) (*Planner, error) { utillog.WarnExperimentalUse("Bloom Planner", logger) + tsdbStore, err := NewTSDBStores(schemaCfg, storeCfg, storageMetrics, logger) + if err != nil { + return nil, fmt.Errorf("error creating TSDB store: %w", err) + } + p := &Planner{ - cfg: cfg, - metrics: NewMetrics(r), - logger: logger, + cfg: cfg, + limits: limits, + schemaCfg: schemaCfg, + tsdbStore: tsdbStore, + bloomStore: bloomStore, + metrics: NewMetrics(r), + logger: logger, } p.Service = services.NewBasicService(p.starting, p.running, p.stopping) @@ -45,6 +75,373 @@ func (p *Planner) stopping(_ error) error { return nil } -func (p *Planner) running(_ context.Context) error { +func (p *Planner) running(ctx context.Context) error { + // run once at beginning + if err := p.runOne(ctx); err != nil { + level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err) + } + + ticker := time.NewTicker(p.cfg.PlanningInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + err := ctx.Err() + level.Debug(p.logger).Log("msg", "planner context done", "err", err) + return err + + case <-ticker.C: + if err := p.runOne(ctx); err != nil { + level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err) + } + } + } +} + +func (p *Planner) runOne(ctx context.Context) error { + var ( + start = time.Now() + status = statusFailure + ) + defer func() { + p.metrics.buildCompleted.WithLabelValues(status).Inc() + p.metrics.buildTime.WithLabelValues(status).Observe(time.Since(start).Seconds()) + }() + + p.metrics.buildStarted.Inc() + level.Info(p.logger).Log("msg", "running bloom build planning") + + tables := p.tables(time.Now()) + level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays()) + + work, err := p.loadWork(ctx, tables) + if err != nil { + level.Error(p.logger).Log("msg", "error loading work", "err", err) + return fmt.Errorf("error loading work: %w", err) + } + + // TODO: Enqueue instead of buffering here + // This is just a placeholder for now + var tasks []Task + + for _, w := range work { + gaps, err := p.findGapsForBounds(ctx, w.tenant, w.table, w.ownershipRange) + if err != nil { + level.Error(p.logger).Log("msg", "error finding gaps", "err", err, "tenant", w.tenant, "table", w.table, "ownership", w.ownershipRange.String()) + return fmt.Errorf("error finding gaps for tenant (%s) in table (%s) for bounds (%s): %w", w.tenant, w.table, w.ownershipRange, err) + } + + for _, gap := range gaps { + tasks = append(tasks, Task{ + table: w.table.Addr(), + tenant: w.tenant, + OwnershipBounds: w.ownershipRange, + tsdb: gap.tsdb, + gaps: gap.gaps, + }) + } + } + + status = statusSuccess + level.Info(p.logger).Log( + "msg", "bloom build iteration completed", + "duration", time.Since(start).Seconds(), + "tasks", len(tasks), + ) return nil } + +func (p *Planner) tables(ts time.Time) *dayRangeIterator { + // adjust the minimum by one to make it inclusive, which is more intuitive + // for a configuration variable + adjustedMin := p.cfg.MinTableOffset - 1 + minCompactionDelta := time.Duration(adjustedMin) * config.ObjectStorageIndexRequiredPeriod + maxCompactionDelta := time.Duration(p.cfg.MaxTableOffset) * config.ObjectStorageIndexRequiredPeriod + + from := ts.Add(-maxCompactionDelta).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod) + through := ts.Add(-minCompactionDelta).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod) + + fromDay := config.NewDayTime(model.TimeFromUnixNano(from)) + throughDay := config.NewDayTime(model.TimeFromUnixNano(through)) + level.Debug(p.logger).Log("msg", "loaded tables for compaction", "from", fromDay, "through", throughDay) + return newDayRangeIterator(fromDay, throughDay, p.schemaCfg) +} + +type tenantTableRange struct { + tenant string + table config.DayTable + ownershipRange v1.FingerprintBounds + + // TODO: Add tracking + //finished bool + //queueTime, startTime, endTime time.Time +} + +func (p *Planner) loadWork( + ctx context.Context, + tables *dayRangeIterator, +) ([]tenantTableRange, error) { + var work []tenantTableRange + + for tables.Next() && tables.Err() == nil && ctx.Err() == nil { + table := tables.At() + level.Debug(p.logger).Log("msg", "loading work for table", "table", table) + + tenants, err := p.tenants(ctx, table) + if err != nil { + return nil, fmt.Errorf("error loading tenants: %w", err) + } + level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Len()) + + for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil { + p.metrics.tenantsDiscovered.Inc() + tenant := tenants.At() + + if !p.limits.BloomCreationEnabled(tenant) { + continue + } + + splitFactor := p.limits.BloomSplitSeriesKeyspaceBy(tenant) + bounds := SplitFingerprintKeyspaceByFactor(splitFactor) + + for _, bounds := range bounds { + work = append(work, tenantTableRange{ + tenant: tenant, + table: table, + ownershipRange: bounds, + }) + } + + level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor) + } + if err := tenants.Err(); err != nil { + level.Error(p.logger).Log("msg", "error iterating tenants", "err", err) + return nil, fmt.Errorf("error iterating tenants: %w", err) + } + + } + if err := tables.Err(); err != nil { + level.Error(p.logger).Log("msg", "error iterating tables", "err", err) + return nil, fmt.Errorf("error iterating tables: %w", err) + } + + return work, ctx.Err() +} + +func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*v1.SliceIter[string], error) { + tenants, err := p.tsdbStore.UsersForPeriod(ctx, table) + if err != nil { + return nil, fmt.Errorf("error loading tenants for table (%s): %w", table, err) + } + + return v1.NewSliceIter(tenants), nil +} + +/* +Planning works as follows, split across many functions for clarity: + 1. Fetch all meta.jsons for the given tenant and table which overlap the ownership range of this compactor. + 2. Load current TSDBs for this tenant/table. + 3. For each live TSDB (there should be only 1, but this works with multiple), find any gaps + (fingerprint ranges) which are not up-to-date, determined by checking other meta.json files and comparing + the TSDBs they were generated from as well as their ownership ranges. +*/ +func (p *Planner) findGapsForBounds( + ctx context.Context, + tenant string, + table config.DayTable, + ownershipRange v1.FingerprintBounds, +) ([]blockPlan, error) { + logger := log.With(p.logger, "org_id", tenant, "table", table.Addr(), "ownership", ownershipRange.String()) + + // Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms + metas, err := p.bloomStore.FetchMetas( + ctx, + bloomshipper.MetaSearchParams{ + TenantID: tenant, + Interval: bloomshipper.NewInterval(table.Bounds()), + Keyspace: ownershipRange, + }, + ) + if err != nil { + level.Error(logger).Log("msg", "failed to get metas", "err", err) + return nil, fmt.Errorf("failed to get metas: %w", err) + } + + level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metas)) + + // Find gaps in the TSDBs for this tenant/table + gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger) + if err != nil { + return nil, fmt.Errorf("failed to find outdated gaps: %w", err) + } + + return gaps, nil +} + +// blockPlan is a plan for all the work needed to build a meta.json +// It includes: +// - the tsdb (source of truth) which contains all the series+chunks +// we need to ensure are indexed in bloom blocks +// - a list of gaps that are out of date and need to be checked+built +// - within each gap, a list of block refs which overlap the gap are included +// so we can use them to accelerate bloom generation. They likely contain many +// of the same chunks we need to ensure are indexed, just from previous tsdb iterations. +// This is a performance optimization to avoid expensive re-reindexing +type blockPlan struct { + tsdb tsdb.SingleTenantTSDBIdentifier + gaps []GapWithBlocks +} + +func (p *Planner) findOutdatedGaps( + ctx context.Context, + tenant string, + table config.DayTable, + ownershipRange v1.FingerprintBounds, + metas []bloomshipper.Meta, + logger log.Logger, +) ([]blockPlan, error) { + // Resolve TSDBs + tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant) + if err != nil { + level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err) + return nil, fmt.Errorf("failed to resolve tsdbs: %w", err) + } + + if len(tsdbs) == 0 { + return nil, nil + } + + // Determine which TSDBs have gaps in the ownership range and need to + // be processed. + tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas) + if err != nil { + level.Error(logger).Log("msg", "failed to find gaps", "err", err) + return nil, fmt.Errorf("failed to find gaps: %w", err) + } + + if len(tsdbsWithGaps) == 0 { + level.Debug(logger).Log("msg", "blooms exist for all tsdbs") + return nil, nil + } + + work, err := blockPlansForGaps(tsdbsWithGaps, metas) + if err != nil { + level.Error(logger).Log("msg", "failed to create plan", "err", err) + return nil, fmt.Errorf("failed to create plan: %w", err) + } + + return work, nil +} + +// Used to signal the gaps that need to be populated for a tsdb +type tsdbGaps struct { + tsdb tsdb.SingleTenantTSDBIdentifier + gaps []v1.FingerprintBounds +} + +// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting +// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB. +func gapsBetweenTSDBsAndMetas( + ownershipRange v1.FingerprintBounds, + tsdbs []tsdb.SingleTenantTSDBIdentifier, + metas []bloomshipper.Meta, +) (res []tsdbGaps, err error) { + for _, db := range tsdbs { + id := db.Name() + + relevantMetas := make([]v1.FingerprintBounds, 0, len(metas)) + for _, meta := range metas { + for _, s := range meta.Sources { + if s.Name() == id { + relevantMetas = append(relevantMetas, meta.Bounds) + } + } + } + + gaps, err := FindGapsInFingerprintBounds(ownershipRange, relevantMetas) + if err != nil { + return nil, err + } + + if len(gaps) > 0 { + res = append(res, tsdbGaps{ + tsdb: db, + gaps: gaps, + }) + } + } + + return res, err +} + +// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks. +// This allows us to expedite bloom generation by using existing blocks to fill in the gaps +// since many will contain the same chunks. +func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) { + plans := make([]blockPlan, 0, len(tsdbs)) + + for _, idx := range tsdbs { + plan := blockPlan{ + tsdb: idx.tsdb, + gaps: make([]GapWithBlocks, 0, len(idx.gaps)), + } + + for _, gap := range idx.gaps { + planGap := GapWithBlocks{ + bounds: gap, + } + + for _, meta := range metas { + + if meta.Bounds.Intersection(gap) == nil { + // this meta doesn't overlap the gap, skip + continue + } + + for _, block := range meta.Blocks { + if block.Bounds.Intersection(gap) == nil { + // this block doesn't overlap the gap, skip + continue + } + // this block overlaps the gap, add it to the plan + // for this gap + planGap.blocks = append(planGap.blocks, block) + } + } + + // ensure we sort blocks so deduping iterator works as expected + sort.Slice(planGap.blocks, func(i, j int) bool { + return planGap.blocks[i].Bounds.Less(planGap.blocks[j].Bounds) + }) + + peekingBlocks := v1.NewPeekingIter[bloomshipper.BlockRef]( + v1.NewSliceIter[bloomshipper.BlockRef]( + planGap.blocks, + ), + ) + // dedupe blocks which could be in multiple metas + itr := v1.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef]( + func(a, b bloomshipper.BlockRef) bool { + return a == b + }, + v1.Identity[bloomshipper.BlockRef], + func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef { + return a + }, + peekingBlocks, + ) + + deduped, err := v1.Collect[bloomshipper.BlockRef](itr) + if err != nil { + return nil, fmt.Errorf("failed to dedupe blocks: %w", err) + } + planGap.blocks = deduped + + plan.gaps = append(plan.gaps, planGap) + } + + plans = append(plans, plan) + } + + return plans, nil +} diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go new file mode 100644 index 000000000000..346bd145ab8d --- /dev/null +++ b/pkg/bloombuild/planner/planner_test.go @@ -0,0 +1,321 @@ +package planner + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" +) + +func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier { + return tsdb.SingleTenantTSDBIdentifier{ + TS: time.Unix(int64(n), 0), + } +} + +func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta { + m := bloomshipper.Meta{ + MetaRef: bloomshipper.MetaRef{ + Ref: bloomshipper.Ref{ + Bounds: v1.NewBounds(min, max), + }, + }, + Blocks: blocks, + } + for _, source := range sources { + m.Sources = append(m.Sources, tsdbID(source)) + } + return m +} + +func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { + + for _, tc := range []struct { + desc string + err bool + exp []tsdbGaps + ownershipRange v1.FingerprintBounds + tsdbs []tsdb.SingleTenantTSDBIdentifier + metas []bloomshipper.Meta + }{ + { + desc: "non-overlapping tsdbs and metas", + err: true, + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(11, 20, []int{0}, nil), + }, + }, + { + desc: "single tsdb", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(4, 8, []int{0}, nil), + }, + exp: []tsdbGaps{ + { + tsdb: tsdbID(0), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(0, 3), + v1.NewBounds(9, 10), + }, + }, + }, + }, + { + desc: "multiple tsdbs with separate blocks", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, + metas: []bloomshipper.Meta{ + genMeta(0, 5, []int{0}, nil), + genMeta(6, 10, []int{1}, nil), + }, + exp: []tsdbGaps{ + { + tsdb: tsdbID(0), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(6, 10), + }, + }, + { + tsdb: tsdbID(1), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(0, 5), + }, + }, + }, + }, + { + desc: "multiple tsdbs with the same blocks", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, + metas: []bloomshipper.Meta{ + genMeta(0, 5, []int{0, 1}, nil), + genMeta(6, 8, []int{1}, nil), + }, + exp: []tsdbGaps{ + { + tsdb: tsdbID(0), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(6, 10), + }, + }, + { + tsdb: tsdbID(1), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(9, 10), + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas) + if tc.err { + require.Error(t, err) + return + } + require.Equal(t, tc.exp, gaps) + }) + } +} + +func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { + bounds := v1.NewBounds(min, max) + return bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + Bounds: bounds, + }, + } +} + +func Test_blockPlansForGaps(t *testing.T) { + for _, tc := range []struct { + desc string + ownershipRange v1.FingerprintBounds + tsdbs []tsdb.SingleTenantTSDBIdentifier + metas []bloomshipper.Meta + err bool + exp []blockPlan + }{ + { + desc: "single overlapping meta+no overlapping block", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}), + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []GapWithBlocks{ + { + bounds: v1.NewBounds(0, 10), + }, + }, + }, + }, + }, + { + desc: "single overlapping meta+one overlapping block", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []GapWithBlocks{ + { + bounds: v1.NewBounds(0, 10), + blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)}, + }, + }, + }, + }, + }, + { + // the range which needs to be generated doesn't overlap with existing blocks + // from other tsdb versions since theres an up to date tsdb version block, + // but we can trim the range needing generation + desc: "trims up to date area", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb + genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []GapWithBlocks{ + { + bounds: v1.NewBounds(0, 8), + }, + }, + }, + }, + }, + { + desc: "uses old block for overlapping range", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []GapWithBlocks{ + { + bounds: v1.NewBounds(0, 8), + blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)}, + }, + }, + }, + }, + }, + { + desc: "multi case", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs + metas: []bloomshipper.Meta{ + genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{ + genBlockRef(0, 1), + genBlockRef(1, 2), + }), // tsdb_0 + genMeta(6, 8, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 8)}), // tsdb_0 + + genMeta(3, 5, []int{1}, []bloomshipper.BlockRef{genBlockRef(3, 5)}), // tsdb_1 + genMeta(8, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), // tsdb_1 + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []GapWithBlocks{ + // tsdb (id=0) can source chunks from the blocks built from tsdb (id=1) + { + bounds: v1.NewBounds(3, 5), + blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)}, + }, + { + bounds: v1.NewBounds(9, 10), + blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)}, + }, + }, + }, + // tsdb (id=1) can source chunks from the blocks built from tsdb (id=0) + { + tsdb: tsdbID(1), + gaps: []GapWithBlocks{ + { + bounds: v1.NewBounds(0, 2), + blocks: []bloomshipper.BlockRef{ + genBlockRef(0, 1), + genBlockRef(1, 2), + }, + }, + { + bounds: v1.NewBounds(6, 7), + blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)}, + }, + }, + }, + }, + }, + { + desc: "dedupes block refs", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{ + genBlockRef(1, 4), + genBlockRef(9, 20), + }), // blocks for first diff tsdb + genMeta(5, 20, []int{2}, []bloomshipper.BlockRef{ + genBlockRef(5, 10), + genBlockRef(9, 20), // same block references in prior meta (will be deduped) + }), // block for second diff tsdb + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []GapWithBlocks{ + { + bounds: v1.NewBounds(0, 10), + blocks: []bloomshipper.BlockRef{ + genBlockRef(1, 4), + genBlockRef(5, 10), + genBlockRef(9, 20), + }, + }, + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested + // separately and it's used to generate input in our regular code path (easier to write tests this way). + gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas) + require.NoError(t, err) + + plans, err := blockPlansForGaps(gaps, tc.metas) + if tc.err { + require.Error(t, err) + return + } + require.Equal(t, tc.exp, plans) + + }) + } +} diff --git a/pkg/bloombuild/planner/tableIterator.go b/pkg/bloombuild/planner/tableIterator.go new file mode 100644 index 000000000000..c17458a04806 --- /dev/null +++ b/pkg/bloombuild/planner/tableIterator.go @@ -0,0 +1,50 @@ +package planner + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/storage/config" +) + +type dayRangeIterator struct { + min, max, cur config.DayTime + curPeriod config.PeriodConfig + schemaCfg config.SchemaConfig + err error +} + +func newDayRangeIterator(min, max config.DayTime, schemaCfg config.SchemaConfig) *dayRangeIterator { + return &dayRangeIterator{min: min, max: max, cur: min.Dec(), schemaCfg: schemaCfg} +} + +func (r *dayRangeIterator) TotalDays() int { + offset := r.cur + if r.cur.Before(r.min) { + offset = r.min + } + return int(r.max.Sub(offset.Time) / config.ObjectStorageIndexRequiredPeriod) +} + +func (r *dayRangeIterator) Next() bool { + r.cur = r.cur.Inc() + if !r.cur.Before(r.max) { + return false + } + + period, err := r.schemaCfg.SchemaForTime(r.cur.ModelTime()) + if err != nil { + r.err = fmt.Errorf("getting schema for time (%s): %w", r.cur, err) + return false + } + r.curPeriod = period + + return true +} + +func (r *dayRangeIterator) At() config.DayTable { + return config.NewDayTable(r.cur, r.curPeriod.IndexTables.Prefix) +} + +func (r *dayRangeIterator) Err() error { + return nil +} diff --git a/pkg/bloombuild/planner/task.go b/pkg/bloombuild/planner/task.go new file mode 100644 index 000000000000..80f730c4fb6d --- /dev/null +++ b/pkg/bloombuild/planner/task.go @@ -0,0 +1,22 @@ +package planner + +import ( + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" +) + +// TODO: Extract this definiton to a proto file at pkg/bloombuild/protos/protos.proto + +type GapWithBlocks struct { + bounds v1.FingerprintBounds + blocks []bloomshipper.BlockRef +} + +type Task struct { + table string + tenant string + OwnershipBounds v1.FingerprintBounds + tsdb tsdb.SingleTenantTSDBIdentifier + gaps []GapWithBlocks +} diff --git a/pkg/bloombuild/planner/tsdb.go b/pkg/bloombuild/planner/tsdb.go new file mode 100644 index 000000000000..7c15c43306db --- /dev/null +++ b/pkg/bloombuild/planner/tsdb.go @@ -0,0 +1,261 @@ +package planner + +import ( + "context" + "fmt" + "io" + "math" + "path" + "strings" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/chunkenc" + baseStore "github.com/grafana/loki/v3/pkg/storage" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" + "github.com/grafana/loki/v3/pkg/storage/types" +) + +const ( + gzipExtension = ".gz" +) + +type TSDBStore interface { + UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) + ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) + LoadTSDB( + ctx context.Context, + table config.DayTable, + tenant string, + id tsdb.Identifier, + bounds v1.FingerprintBounds, + ) (v1.Iterator[*v1.Series], error) +} + +// BloomTSDBStore is a wrapper around the storage.Client interface which +// implements the TSDBStore interface for this pkg. +type BloomTSDBStore struct { + storage storage.Client + logger log.Logger +} + +func NewBloomTSDBStore(storage storage.Client, logger log.Logger) *BloomTSDBStore { + return &BloomTSDBStore{ + storage: storage, + logger: logger, + } +} + +func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) { + _, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing + return users, err +} + +func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { + indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing + if err != nil { + return nil, errors.Wrap(err, "failed to list user files") + } + + ids := make([]tsdb.SingleTenantTSDBIdentifier, 0, len(indices)) + for _, index := range indices { + key := index.Name + if decompress := storage.IsCompressedFile(index.Name); decompress { + key = strings.TrimSuffix(key, gzipExtension) + } + + id, ok := tsdb.ParseSingleTenantTSDBPath(path.Base(key)) + if !ok { + return nil, errors.Errorf("failed to parse single tenant tsdb path: %s", key) + } + + ids = append(ids, id) + + } + return ids, nil +} + +func (b *BloomTSDBStore) LoadTSDB( + ctx context.Context, + table config.DayTable, + tenant string, + id tsdb.Identifier, + bounds v1.FingerprintBounds, +) (v1.Iterator[*v1.Series], error) { + withCompression := id.Name() + gzipExtension + + data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression) + if err != nil { + return nil, errors.Wrap(err, "failed to get file") + } + defer data.Close() + + decompressorPool := chunkenc.GetReaderPool(chunkenc.EncGZIP) + decompressor, err := decompressorPool.GetReader(data) + if err != nil { + return nil, errors.Wrap(err, "failed to get decompressor") + } + defer decompressorPool.PutReader(decompressor) + + buf, err := io.ReadAll(decompressor) + if err != nil { + return nil, errors.Wrap(err, "failed to read file") + } + + reader, err := index.NewReader(index.RealByteSlice(buf)) + if err != nil { + return nil, errors.Wrap(err, "failed to create index reader") + } + + idx := tsdb.NewTSDBIndex(reader) + defer func() { + if err := idx.Close(); err != nil { + level.Error(b.logger).Log("msg", "failed to close index", "err", err) + } + }() + + return NewTSDBSeriesIter(ctx, tenant, idx, bounds) +} + +func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) { + // TODO(salvacorts): Create a pool + series := make([]*v1.Series, 0, 100) + + if err := f.ForSeries( + ctx, + user, + bounds, + 0, math.MaxInt64, + func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { + select { + case <-ctx.Done(): + return true + default: + res := &v1.Series{ + Fingerprint: fp, + Chunks: make(v1.ChunkRefs, 0, len(chks)), + } + for _, chk := range chks { + res.Chunks = append(res.Chunks, v1.ChunkRef{ + From: model.Time(chk.MinTime), + Through: model.Time(chk.MaxTime), + Checksum: chk.Checksum, + }) + } + + series = append(series, res) + return false + } + }, + labels.MustNewMatcher(labels.MatchEqual, "", ""), + ); err != nil { + return nil, err + } + + select { + case <-ctx.Done(): + return v1.NewEmptyIter[*v1.Series](), ctx.Err() + default: + return v1.NewCancelableIter[*v1.Series](ctx, v1.NewSliceIter[*v1.Series](series)), nil + } +} + +type TSDBStores struct { + schemaCfg config.SchemaConfig + stores []TSDBStore +} + +func NewTSDBStores( + schemaCfg config.SchemaConfig, + storeCfg baseStore.Config, + clientMetrics baseStore.ClientMetrics, + logger log.Logger, +) (*TSDBStores, error) { + res := &TSDBStores{ + schemaCfg: schemaCfg, + stores: make([]TSDBStore, len(schemaCfg.Configs)), + } + + for i, cfg := range schemaCfg.Configs { + if cfg.IndexType == types.TSDBType { + + c, err := baseStore.NewObjectClient(cfg.ObjectType, storeCfg, clientMetrics) + if err != nil { + return nil, errors.Wrap(err, "failed to create object client") + } + res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix), logger) + } + } + + return res, nil +} + +func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) { + for i := len(s.schemaCfg.Configs) - 1; i >= 0; i-- { + period := s.schemaCfg.Configs[i] + + if !table.Before(period.From) { + // we have the desired period config + + if s.stores[i] != nil { + // valid: it's of tsdb type + return s.stores[i], nil + } + + // invalid + return nil, errors.Errorf( + "store for period is not of TSDB type (%s) while looking up store for (%v)", + period.IndexType, + table, + ) + } + + } + + return nil, fmt.Errorf( + "there is no store matching no matching period found for table (%v) -- too early", + table, + ) +} + +func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) { + store, err := s.storeForPeriod(table.DayTime) + if err != nil { + return nil, err + } + + return store.UsersForPeriod(ctx, table) +} + +func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { + store, err := s.storeForPeriod(table.DayTime) + if err != nil { + return nil, err + } + + return store.ResolveTSDBs(ctx, table, tenant) +} + +func (s *TSDBStores) LoadTSDB( + ctx context.Context, + table config.DayTable, + tenant string, + id tsdb.Identifier, + bounds v1.FingerprintBounds, +) (v1.Iterator[*v1.Series], error) { + store, err := s.storeForPeriod(table.DayTime) + if err != nil { + return nil, err + } + + return store.LoadTSDB(ctx, table, tenant, id, bounds) +} diff --git a/pkg/bloombuild/planner/tsdb_test.go b/pkg/bloombuild/planner/tsdb_test.go new file mode 100644 index 000000000000..f47c193c2cd1 --- /dev/null +++ b/pkg/bloombuild/planner/tsdb_test.go @@ -0,0 +1,105 @@ +package planner + +import ( + "context" + "math" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" +) + +type forSeriesTestImpl []*v1.Series + +func (f forSeriesTestImpl) ForSeries( + _ context.Context, + _ string, + _ index.FingerprintFilter, + _ model.Time, + _ model.Time, + fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) bool, + _ ...*labels.Matcher, +) error { + for i := range f { + unmapped := make([]index.ChunkMeta, 0, len(f[i].Chunks)) + for _, c := range f[i].Chunks { + unmapped = append(unmapped, index.ChunkMeta{ + MinTime: int64(c.From), + MaxTime: int64(c.Through), + Checksum: c.Checksum, + }) + } + + fn(nil, f[i].Fingerprint, unmapped) + } + return nil +} + +func (f forSeriesTestImpl) Close() error { + return nil +} + +func TestTSDBSeriesIter(t *testing.T) { + input := []*v1.Series{ + { + Fingerprint: 1, + Chunks: []v1.ChunkRef{ + { + From: 0, + Through: 1, + Checksum: 2, + }, + { + From: 3, + Through: 4, + Checksum: 5, + }, + }, + }, + } + srcItr := v1.NewSliceIter(input) + itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64)) + require.NoError(t, err) + + v1.EqualIterators[*v1.Series]( + t, + func(a, b *v1.Series) { + require.Equal(t, a, b) + }, + itr, + srcItr, + ) +} + +func TestTSDBSeriesIter_Expiry(t *testing.T) { + t.Run("expires on creation", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{ + {}, // a single entry + }, v1.NewBounds(0, math.MaxUint64)) + require.Error(t, err) + require.False(t, itr.Next()) + }) + + t.Run("expires during consumption", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{ + {}, + {}, + }, v1.NewBounds(0, math.MaxUint64)) + require.NoError(t, err) + + require.True(t, itr.Next()) + require.NoError(t, itr.Err()) + + cancel() + require.False(t, itr.Next()) + require.Error(t, itr.Err()) + }) + +} diff --git a/pkg/bloombuild/planner/util.go b/pkg/bloombuild/planner/util.go new file mode 100644 index 000000000000..f9a97587f802 --- /dev/null +++ b/pkg/bloombuild/planner/util.go @@ -0,0 +1,125 @@ +package planner + +import ( + "fmt" + "math" + + "github.com/prometheus/common/model" + + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" +) + +// SplitFingerprintKeyspaceByFactor splits the keyspace covered by model.Fingerprint into contiguous non-overlapping ranges. +func SplitFingerprintKeyspaceByFactor(factor int) []v1.FingerprintBounds { + if factor <= 0 { + return nil + } + + bounds := make([]v1.FingerprintBounds, 0, factor) + + // The keyspace of a Fingerprint is from 0 to max uint64. + keyspaceSize := uint64(math.MaxUint64) + + // Calculate the size of each range. + rangeSize := keyspaceSize / uint64(factor) + + for i := 0; i < factor; i++ { + // Calculate the start and end of the range. + start := uint64(i) * rangeSize + end := start + rangeSize - 1 + + // For the last range, make sure it ends at the end of the keyspace. + if i == factor-1 { + end = keyspaceSize + } + + // Create a FingerprintBounds for the range and add it to the slice. + bounds = append(bounds, v1.FingerprintBounds{ + Min: model.Fingerprint(start), + Max: model.Fingerprint(end), + }) + } + + return bounds +} + +func FindGapsInFingerprintBounds(ownershipRange v1.FingerprintBounds, metas []v1.FingerprintBounds) (gaps []v1.FingerprintBounds, err error) { + if len(metas) == 0 { + return []v1.FingerprintBounds{ownershipRange}, nil + } + + // turn the available metas into a list of non-overlapping metas + // for easier processing + var nonOverlapping []v1.FingerprintBounds + // First, we reduce the metas into a smaller set by combining overlaps. They must be sorted. + var cur *v1.FingerprintBounds + for i := 0; i < len(metas); i++ { + j := i + 1 + + // first iteration (i == 0), set the current meta + if cur == nil { + cur = &metas[i] + } + + if j >= len(metas) { + // We've reached the end of the list. Add the last meta to the non-overlapping set. + nonOverlapping = append(nonOverlapping, *cur) + break + } + + combined := cur.Union(metas[j]) + if len(combined) == 1 { + // There was an overlap between the two tested ranges. Combine them and keep going. + cur = &combined[0] + continue + } + + // There was no overlap between the two tested ranges. Add the first to the non-overlapping set. + // and keep the second for the next iteration. + nonOverlapping = append(nonOverlapping, combined[0]) + cur = &combined[1] + } + + // Now, detect gaps between the non-overlapping metas and the ownership range. + // The left bound of the ownership range will be adjusted as we go. + leftBound := ownershipRange.Min + for _, meta := range nonOverlapping { + + clippedMeta := meta.Intersection(ownershipRange) + // should never happen as long as we are only combining metas + // that intersect with the ownership range + if clippedMeta == nil { + return nil, fmt.Errorf("meta is not within ownership range: %v", meta) + } + + searchRange := ownershipRange.Slice(leftBound, clippedMeta.Max) + // update the left bound for the next iteration + // We do the max to prevent the max bound to overflow from MaxUInt64 to 0 + leftBound = min( + max(clippedMeta.Max+1, clippedMeta.Max), + max(ownershipRange.Max+1, ownershipRange.Max), + ) + + // since we've already ensured that the meta is within the ownership range, + // we know the xor will be of length zero (when the meta is equal to the ownership range) + // or 1 (when the meta is a subset of the ownership range) + xors := searchRange.Unless(*clippedMeta) + if len(xors) == 0 { + // meta is equal to the ownership range. This means the meta + // covers this entire section of the ownership range. + continue + } + + gaps = append(gaps, xors[0]) + } + + // If the leftBound is less than the ownership range max, and it's smaller than MaxUInt64, + // There is a gap between the last meta and the end of the ownership range. + // Note: we check `leftBound < math.MaxUint64` since in the loop above we clamp the + // leftBound to MaxUint64 to prevent an overflow to 0: `max(clippedMeta.Max+1, clippedMeta.Max)` + if leftBound < math.MaxUint64 && leftBound <= ownershipRange.Max { + gaps = append(gaps, v1.NewBounds(leftBound, ownershipRange.Max)) + } + + return gaps, nil +} diff --git a/pkg/bloombuild/planner/util_test.go b/pkg/bloombuild/planner/util_test.go new file mode 100644 index 000000000000..6755478ef729 --- /dev/null +++ b/pkg/bloombuild/planner/util_test.go @@ -0,0 +1,172 @@ +package planner + +import ( + "math" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" +) + +func TestSplitFingerprintKeyspaceByFactor(t *testing.T) { + for _, tt := range []struct { + name string + factor int + }{ + { + name: "Factor is 0", + factor: 0, + }, + { + name: "Factor is 1", + factor: 1, + }, + { + name: "Factor is 256", + factor: 256, + }, + } { + t.Run(tt.name, func(t *testing.T) { + got := SplitFingerprintKeyspaceByFactor(tt.factor) + + if tt.factor == 0 { + require.Empty(t, got) + return + } + + // Check overall min and max values of the ranges. + require.Equal(t, model.Fingerprint(math.MaxUint64), got[len(got)-1].Max) + require.Equal(t, model.Fingerprint(0), got[0].Min) + + // For each range, check that the max value of the previous range is one less than the min value of the current range. + for i := 1; i < len(got); i++ { + require.Equal(t, got[i-1].Max+1, got[i].Min) + } + }) + } +} + +func Test_FindGapsInFingerprintBounds(t *testing.T) { + for _, tc := range []struct { + desc string + err bool + exp []v1.FingerprintBounds + ownershipRange v1.FingerprintBounds + metas []v1.FingerprintBounds + }{ + { + desc: "error nonoverlapping metas", + err: true, + exp: nil, + ownershipRange: v1.NewBounds(0, 10), + metas: []v1.FingerprintBounds{v1.NewBounds(11, 20)}, + }, + { + desc: "one meta with entire ownership range", + err: false, + exp: nil, + ownershipRange: v1.NewBounds(0, 10), + metas: []v1.FingerprintBounds{v1.NewBounds(0, 10)}, + }, + { + desc: "two non-overlapping metas with entire ownership range", + err: false, + exp: nil, + ownershipRange: v1.NewBounds(0, 10), + metas: []v1.FingerprintBounds{ + v1.NewBounds(0, 5), + v1.NewBounds(6, 10), + }, + }, + { + desc: "two overlapping metas with entire ownership range", + err: false, + exp: nil, + ownershipRange: v1.NewBounds(0, 10), + metas: []v1.FingerprintBounds{ + v1.NewBounds(0, 6), + v1.NewBounds(4, 10), + }, + }, + { + desc: "one meta with partial ownership range", + err: false, + exp: []v1.FingerprintBounds{ + v1.NewBounds(6, 10), + }, + ownershipRange: v1.NewBounds(0, 10), + metas: []v1.FingerprintBounds{ + v1.NewBounds(0, 5), + }, + }, + { + desc: "smaller subsequent meta with partial ownership range", + err: false, + exp: []v1.FingerprintBounds{ + v1.NewBounds(8, 10), + }, + ownershipRange: v1.NewBounds(0, 10), + metas: []v1.FingerprintBounds{ + v1.NewBounds(0, 7), + v1.NewBounds(3, 4), + }, + }, + { + desc: "hole in the middle", + err: false, + exp: []v1.FingerprintBounds{ + v1.NewBounds(4, 5), + }, + ownershipRange: v1.NewBounds(0, 10), + metas: []v1.FingerprintBounds{ + v1.NewBounds(0, 3), + v1.NewBounds(6, 10), + }, + }, + { + desc: "holes on either end", + err: false, + exp: []v1.FingerprintBounds{ + v1.NewBounds(0, 2), + v1.NewBounds(8, 10), + }, + ownershipRange: v1.NewBounds(0, 10), + metas: []v1.FingerprintBounds{ + v1.NewBounds(3, 5), + v1.NewBounds(6, 7), + }, + }, + { + desc: "full ownership range with single meta", + err: false, + exp: nil, + ownershipRange: v1.NewBounds(0, math.MaxUint64), + metas: []v1.FingerprintBounds{ + v1.NewBounds(0, math.MaxUint64), + }, + }, + { + desc: "full ownership range with multiple metas", + err: false, + exp: nil, + ownershipRange: v1.NewBounds(0, math.MaxUint64), + // Three metas covering the whole 0 - MaxUint64 + metas: []v1.FingerprintBounds{ + v1.NewBounds(0, math.MaxUint64/3), + v1.NewBounds(math.MaxUint64/3+1, math.MaxUint64/2), + v1.NewBounds(math.MaxUint64/2+1, math.MaxUint64), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gaps, err := FindGapsInFingerprintBounds(tc.ownershipRange, tc.metas) + if tc.err { + require.Error(t, err) + return + } + require.Equal(t, tc.exp, gaps) + }) + } +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index a563e80f789f..e73369aca2d7 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1566,6 +1566,11 @@ func (t *Loki) initBloomPlanner() (services.Service, error) { return planner.New( t.Cfg.BloomBuild.Planner, + t.Overrides, + t.Cfg.SchemaConfig, + t.Cfg.StorageConfig, + t.ClientMetrics, + t.BloomStore, logger, prometheus.DefaultRegisterer, ) diff --git a/pkg/util/limiter/combined_limits.go b/pkg/util/limiter/combined_limits.go index 39684c7b43e8..92caf2c19d68 100644 --- a/pkg/util/limiter/combined_limits.go +++ b/pkg/util/limiter/combined_limits.go @@ -1,6 +1,8 @@ package limiter import ( + bloombuilder "github.com/grafana/loki/v3/pkg/bloombuild/builder" + bloomplanner "github.com/grafana/loki/v3/pkg/bloombuild/planner" "github.com/grafana/loki/v3/pkg/bloomcompactor" "github.com/grafana/loki/v3/pkg/bloomgateway" "github.com/grafana/loki/v3/pkg/compactor" @@ -26,4 +28,6 @@ type CombinedLimits interface { indexgateway.Limits bloomgateway.Limits bloomcompactor.Limits + bloomplanner.Limits + bloombuilder.Limits } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index ca33d1f4bf42..b0660686f5c1 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -205,6 +205,9 @@ type Limits struct { BloomCompactorMaxBlockSize flagext.ByteSize `yaml:"bloom_compactor_max_block_size" json:"bloom_compactor_max_block_size" category:"experimental"` BloomCompactorMaxBloomSize flagext.ByteSize `yaml:"bloom_compactor_max_bloom_size" json:"bloom_compactor_max_bloom_size" category:"experimental"` + BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"` + BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"` + BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"` BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"` BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate" category:"experimental"` @@ -380,6 +383,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { ), ) + f.BoolVar(&l.BloomCreationEnabled, "bloom-build.enable", false, "Experimental. Whether to create blooms for the tenant.") + f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.") + _ = l.BloomCompactorMaxBloomSize.Set(defaultBloomCompactorMaxBloomSize) f.Var(&l.BloomCompactorMaxBloomSize, "bloom-compactor.max-bloom-size", fmt.Sprintf( @@ -973,6 +979,14 @@ func (o *Overrides) BloomCompactorEnabled(userID string) bool { return o.getOverridesForUser(userID).BloomCompactorEnabled } +func (o *Overrides) BloomCreationEnabled(userID string) bool { + return o.getOverridesForUser(userID).BloomCreationEnabled +} + +func (o *Overrides) BloomSplitSeriesKeyspaceBy(userID string) int { + return o.getOverridesForUser(userID).BloomSplitSeriesKeyspaceBy +} + func (o *Overrides) BloomNGramLength(userID string) int { return o.getOverridesForUser(userID).BloomNGramLength }