diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 44378b2a262a2..db62df97dc1c1 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -18,6 +18,7 @@ import ( "fmt" "math" "sort" + "sync/atomic" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -551,13 +552,25 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange { // IndexRangesToKVRanges converts index ranges to "KeyRange". func IndexRangesToKVRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) { - return IndexRangesToKVRangesForTables(sc, []int64{tid}, idxID, ranges, fb) + return IndexRangesToKVRangesWithInterruptSignal(sc, tid, idxID, ranges, fb, nil, nil) +} + +// IndexRangesToKVRangesWithInterruptSignal converts index ranges to "KeyRange". +// The process can be interrupted by set `interruptSignal` to true. +func IndexRangesToKVRangesWithInterruptSignal(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) { + return indexRangesToKVRangesForTablesWithInterruptSignal(sc, []int64{tid}, idxID, ranges, fb, memTracker, interruptSignal) } // IndexRangesToKVRangesForTables converts indexes ranges to "KeyRange". func IndexRangesToKVRangesForTables(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) { + return indexRangesToKVRangesForTablesWithInterruptSignal(sc, tids, idxID, ranges, fb, nil, nil) +} + +// IndexRangesToKVRangesForTablesWithInterruptSignal converts indexes ranges to "KeyRange". +// The process can be interrupted by set `interruptSignal` to true. +func indexRangesToKVRangesForTablesWithInterruptSignal(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) { if fb == nil || fb.Hist == nil { - return indexRangesToKVWithoutSplit(sc, tids, idxID, ranges) + return indexRangesToKVWithoutSplit(sc, tids, idxID, ranges, memTracker, interruptSignal) } feedbackRanges := make([]*ranger.Range, 0, len(ranges)) for _, ran := range ranges { @@ -642,18 +655,37 @@ func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSc return true } -func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range) ([]kv.KeyRange, error) { +func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) { krs := make([]kv.KeyRange, 0, len(ranges)) - for _, ran := range ranges { + const CheckSignalStep = 8 + var estimatedMemUsage int64 + // encodeIndexKey and EncodeIndexSeekKey is time-consuming, thus we need to + // check the interrupt signal periodically. + for i, ran := range ranges { low, high, err := encodeIndexKey(sc, ran) if err != nil { return nil, err } + if i == 0 { + estimatedMemUsage += int64(cap(low) + cap(high)) + } for _, tid := range tids { startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low) endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high) + if i == 0 { + estimatedMemUsage += int64(cap(startKey)) + int64(cap(endKey)) + } krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) } + if i%CheckSignalStep == 0 { + if i == 0 && memTracker != nil { + estimatedMemUsage *= int64(len(ranges)) + memTracker.Consume(estimatedMemUsage) + } + if interruptSignal != nil && interruptSignal.Load().(bool) { + return nil, nil + } + } } return krs, nil } diff --git a/executor/builder.go b/executor/builder.go index e10a2b6b11970..db5c9dfb9302d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "unsafe" @@ -57,6 +58,7 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/timeutil" @@ -2765,6 +2767,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) indexRanges: v.Ranges, keyOff2IdxOff: v.KeyOff2IdxOff, lastColHelper: v.CompareFilters, + finished: &atomic.Value{}, } childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) e.joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, leftTypes, rightTypes, childrenUsedSchema) @@ -3594,21 +3597,21 @@ type mockPhysicalIndexReader struct { } func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent, - IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) { - return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles) + IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { + return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) } func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan plannercore.Plan, lookUpContents []*indexJoinLookUpContent, - IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) { + IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { switch v := plan.(type) { case *plannercore.PhysicalTableReader: - return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles) + return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) case *plannercore.PhysicalIndexReader: - return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc) + return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) case *plannercore.PhysicalIndexLookUpReader: - return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc) + return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) case *plannercore.PhysicalUnionScan: - return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles) + return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) // The inner child of IndexJoin might be Projection when a combination of the following conditions is true: // 1. The inner child fetch data using indexLookupReader // 2. PK is not handle @@ -3616,11 +3619,11 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context. // In this case, an extra column tidb_rowid will be appended in the output result of IndexLookupReader(see copTask.doubleReadNeedProj). // Then we need a Projection upon IndexLookupReader to prune the redundant column. case *plannercore.PhysicalProjection: - return builder.buildProjectionForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc) + return builder.buildProjectionForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) // Need to support physical selection because after PR 16389, TiDB will push down all the expr supported by TiKV or TiFlash // in predicate push down stage, so if there is an expr which only supported by TiFlash, a physical selection will be added after index read case *plannercore.PhysicalSelection: - childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles) + childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3638,9 +3641,9 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context. func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan, values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, - cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) { + cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder} - reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc, canReorderHandles) + reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3654,7 +3657,7 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, - cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) { + cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { e, err := buildNoRangeTableReader(builder.executorBuilder, v) if err != nil { return nil, err @@ -3662,7 +3665,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte tbInfo := e.table.Meta() if v.IsCommonHandle { if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { - kvRanges, err := buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + kvRanges, err := buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3691,7 +3694,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte return nil, err } pid := p.GetPhysicalID() - tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, []*indexJoinLookUpContent{content}, indexRanges, keyOff2IdxOff, cwc) + tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, []*indexJoinLookUpContent{content}, indexRanges, keyOff2IdxOff, cwc, nil, interruptSignal) if err != nil { return nil, err } @@ -3706,7 +3709,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte kvRanges = make([]kv.KeyRange, 0, len(partitions)*len(lookUpContents)) for _, p := range partitions { pid := p.GetPhysicalID() - tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3872,14 +3875,14 @@ func (builder *dataReaderBuilder) buildTableReaderFromKvRanges(ctx context.Conte } func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexReader, - lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) { + lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memoryTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { e, err := buildNoRangeIndexReader(builder.executorBuilder, v) if err != nil { return nil, err } tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { - kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal) if err != nil { return nil, err } @@ -3918,7 +3921,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader, - lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) { + lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v) if err != nil { return nil, err @@ -3926,7 +3929,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { - e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3966,18 +3969,18 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context } func (builder *dataReaderBuilder) buildProjectionForIndexJoin(ctx context.Context, v *plannercore.PhysicalProjection, - lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) { + lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { var ( childExec Executor err error ) switch op := v.Children()[0].(type) { case *plannercore.PhysicalIndexLookUpReader: - if childExec, err = builder.buildIndexLookUpReaderForIndexJoin(ctx, op, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil { + if childExec, err = builder.buildIndexLookUpReaderForIndexJoin(ctx, op, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal); err != nil { return nil, err } case *plannercore.PhysicalTableReader: - if childExec, err = builder.buildTableReaderForIndexJoin(ctx, op, lookUpContents, indexRanges, keyOff2IdxOff, cwc, true); err != nil { + if childExec, err = builder.buildTableReaderForIndexJoin(ctx, op, lookUpContents, indexRanges, keyOff2IdxOff, cwc, true, memTracker, interruptSignal); err != nil { return nil, err } default: @@ -4046,7 +4049,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*indexJoin // buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan. func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent, - ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (_ []kv.KeyRange, err error) { + ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (_ []kv.KeyRange, err error) { kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents)) lastPos := len(ranges[0].LowVal) - 1 sc := ctx.GetSessionVars().StmtCtx @@ -4065,7 +4068,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l if indexID == -1 { tmpKvRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{tableID}, ranges) } else { - tmpKvRanges, err = distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil) + tmpKvRanges, err = distsql.IndexRangesToKVRangesWithInterruptSignal(sc, tableID, indexID, ranges, nil, memTracker, interruptSignal) } if err != nil { return nil, err @@ -4087,7 +4090,12 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l } } } - + if len(kvRanges) != 0 && memTracker != nil { + memTracker.Consume(int64(2 * cap(kvRanges[0].StartKey) * len(kvRanges))) + } + if len(tmpDatumRanges) != 0 && memTracker != nil { + memTracker.Consume(2 * int64(len(tmpDatumRanges)) * types.EstimatedMemUsage(tmpDatumRanges[0].LowVal, len(tmpDatumRanges))) + } if cwc == nil { sort.Slice(kvRanges, func(i, j int) bool { return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0 @@ -4103,7 +4111,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l if indexID == -1 { return distsql.CommonHandleRangesToKVRanges(ctx.GetSessionVars().StmtCtx, []int64{tableID}, tmpDatumRanges) } - return distsql.IndexRangesToKVRanges(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil) + return distsql.IndexRangesToKVRangesWithInterruptSignal(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil, memTracker, interruptSignal) } func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) Executor { diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 3fca4be179169..dc3e68acaa4f5 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -195,7 +195,7 @@ func SubTestBuildKvRangesForIndexJoinWithoutCwc(t *testing.T) { keyOff2IdxOff := []int{1, 3} ctx := mock.NewContext() - kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil) + kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, nil, nil) require.NoError(t, err) // Check the kvRanges is in order. for i, kvRange := range kvRanges { diff --git a/executor/executor_test.go b/executor/executor_test.go index 6ed310b3691e3..8a8889cb95d9f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "math" + "math/rand" "net" "os" "path/filepath" @@ -9325,3 +9326,61 @@ func (s *testSuiteP1) TestIssue29412(c *C) { tk.MustExec("insert into t29142_1 value(20);") tk.MustQuery("select sum(distinct a) as x from t29142_1 having x > some ( select a from t29142_2 where x in (a));").Check(nil) } + +func (s *testSerialSuite) TestIssue28650(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int, index(a));") + tk.MustExec("create table t2(a int, c int, b char(50), index(a,c,b));") + tk.MustExec("set tidb_enable_rate_limit_action=off;") + + wg := &sync.WaitGroup{} + sql := `explain analyze + select /*+ stream_agg(@sel_1) stream_agg(@sel_3) %s(@sel_2 t2)*/ count(1) from + ( + SELECT t2.a AS t2_external_user_ext_id, t2.b AS t2_t1_ext_id FROM t2 INNER JOIN (SELECT t1.a AS d_t1_ext_id FROM t1 GROUP BY t1.a) AS anon_1 ON anon_1.d_t1_ext_id = t2.a WHERE t2.c = 123 AND t2.b + IN ("%s") ) tmp` + + wg.Add(1) + sqls := make([]string, 2) + go func() { + defer wg.Done() + inElems := make([]string, 1000) + for i := 0; i < len(inElems); i++ { + inElems[i] = fmt.Sprintf("wm_%dbDgAAwCD-v1QB%dxky-g_dxxQCw", rand.Intn(100), rand.Intn(100)) + } + sqls[0] = fmt.Sprintf(sql, "inl_join", strings.Join(inElems, "\",\"")) + sqls[1] = fmt.Sprintf(sql, "inl_hash_join", strings.Join(inElems, "\",\"")) + }() + + tk.MustExec("insert into t1 select rand()*400;") + for i := 0; i < 10; i++ { + tk.MustExec("insert into t1 select rand()*400 from t1;") + } + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + defer func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionLog + }) + }() + wg.Wait() + for _, sql := range sqls { + tk.MustExec("set @@tidb_mem_quota_query = 1073741824") // 1GB + c.Assert(tk.QueryToErr(sql), IsNil) + tk.MustExec("set @@tidb_mem_quota_query = 33554432") // 32MB, out of memory during executing + c.Assert(strings.Contains(tk.QueryToErr(sql).Error(), "Out Of Memory Quota!"), IsTrue) + tk.MustExec("set @@tidb_mem_quota_query = 65536") // 64KB, out of memory during building the plan + func() { + defer func() { + r := recover() + c.Assert(r, NotNil) + err := errors.Errorf("%v", r) + c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue) + }() + tk.MustExec(sql) + }() + } +} diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index b77e446c62104..4fb2abf4509aa 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" @@ -152,6 +153,7 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { e.stats = &indexLookUpJoinRuntimeStats{} e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } + e.finished.Store(false) e.startWorkers(ctx) return nil } @@ -201,6 +203,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { if r != nil { + e.IndexLookUpJoin.finished.Store(true) err := errors.New(fmt.Sprintf("%v", r)) if !e.keepOuterOrder { e.resultCh <- &indexHashJoinResult{err: err} @@ -209,6 +212,7 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { e.taskCh <- task } if e.cancelFunc != nil { + e.IndexLookUpJoin.ctxCancelReason.Store(err) e.cancelFunc() } } @@ -245,6 +249,9 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er return result.err } case <-ctx.Done(): + if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() } req.SwapColumns(result.chk) @@ -274,6 +281,9 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu return result.err } case <-ctx.Done(): + if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() } req.SwapColumns(result.chk) @@ -319,6 +329,7 @@ func (e *IndexNestedLoopHashJoin) Close() error { close(e.joinChkResourceCh[i]) } e.joinChkResourceCh = nil + e.finished.Store(false) return e.baseExecutor.Close() } @@ -326,6 +337,7 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { defer trace.StartRegion(ctx, "IndexHashJoinOuterWorker").End() defer close(ow.innerCh) for { + failpoint.Inject("TestIssue30211", nil) task, err := ow.buildTask(ctx) failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() { err = errors.New("mockIndexHashJoinOuterWorkerErr") @@ -432,6 +444,8 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, stats: innerStats, + lookup: &e.IndexLookUpJoin, + memTracker: memory.NewTracker(memory.LabelForIndexJoinInnerWorker, -1), }, taskCh: taskCh, joiner: e.joiners[workerID], @@ -441,6 +455,14 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, joinKeyBuf: make([]byte, 1), outerRowStatus: make([]outerRowStatusFlag, 0, e.maxChunkSize), } + iw.memTracker.AttachTo(e.memTracker) + if len(copiedRanges) != 0 { + // We should not consume this memory usage in `iw.memTracker`. The + // memory usage of inner worker will be reset the end of iw.handleTask. + // While the life cycle of this memory consumption exists throughout the + // whole active period of inner worker. + e.ctx.GetSessionVars().StmtCtx.MemTracker.Consume(2 * types.EstimatedMemUsage(copiedRanges[0].LowVal, len(copiedRanges))) + } if e.lastColHelper != nil { // nextCwf.TmpConstant needs to be reset for every individual // inner worker to avoid data race when the inner workers is running @@ -584,6 +606,9 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{} } func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { + defer func() { + iw.memTracker.Consume(-iw.memTracker.BytesConsumed()) + }() var joinStartTime time.Time if iw.stats != nil { start := time.Now() @@ -631,6 +656,9 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i select { case resultCh <- joinResult: case <-ctx.Done(): + if err := iw.lookup.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) @@ -779,6 +807,9 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind select { case resultCh <- joinResult: case <-ctx.Done(): + if err := iw.lookup.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 5f4945f3fd55c..aa155b0d4c610 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -27,6 +27,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" @@ -82,7 +83,9 @@ type IndexLookUpJoin struct { memTracker *memory.Tracker // track memory usage. - stats *indexLookUpJoinRuntimeStats + stats *indexLookUpJoinRuntimeStats + ctxCancelReason atomic.Value + finished *atomic.Value } type outerCtx struct { @@ -145,11 +148,13 @@ type innerWorker struct { outerCtx outerCtx ctx sessionctx.Context executorChk *chunk.Chunk + lookup *IndexLookUpJoin indexRanges []*ranger.Range nextColCompareFilters *plannercore.ColWithCmpFuncManager keyOff2IdxOff []int stats *innerWorkerRuntimeStats + memTracker *memory.Tracker } // Open implements the Executor interface. @@ -161,6 +166,7 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) + e.finished.Store(false) if e.runtimeStats != nil { e.stats = &indexLookUpJoinRuntimeStats{} e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) @@ -222,6 +228,16 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, stats: innerStats, + lookup: e, + memTracker: memory.NewTracker(memory.LabelForIndexJoinInnerWorker, -1), + } + iw.memTracker.AttachTo(e.memTracker) + if len(copiedRanges) != 0 { + // We should not consume this memory usage in `iw.memTracker`. The + // memory usage of inner worker will be reset the end of iw.handleTask. + // While the life cycle of this memory consumption exists throughout the + // whole active period of inner worker. + e.ctx.GetSessionVars().StmtCtx.MemTracker.Consume(2 * types.EstimatedMemUsage(copiedRanges[0].LowVal, len(copiedRanges))) } if e.lastColHelper != nil { // nextCwf.TmpConstant needs to be reset for every individual @@ -298,6 +314,9 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, select { case task = <-e.resultCh: case <-ctx.Done(): + if err := e.ctxCancelReason.Load(); err != nil { + return nil, err.(error) + } return nil, ctx.Err() } if task == nil { @@ -310,6 +329,9 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return nil, err } case <-ctx.Done(): + if err := e.ctxCancelReason.Load(); err != nil { + return nil, err.(error) + } return nil, ctx.Err() } @@ -333,19 +355,24 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { defer trace.StartRegion(ctx, "IndexLookupJoinOuterWorker").End() defer func() { if r := recover(); r != nil { + ow.lookup.finished.Store(true) buf := make([]byte, 4096) stackSize := runtime.Stack(buf, false) buf = buf[:stackSize] logutil.Logger(ctx).Error("outerWorker panicked", zap.String("stack", string(buf))) task := &lookUpJoinTask{doneCh: make(chan error, 1)} - task.doneCh <- errors.Errorf("%v", r) + err := errors.Errorf("%v", r) + task.doneCh <- err ow.pushToChan(ctx, task, ow.resultCh) + ow.lookup.ctxCancelReason.Store(err) + ow.lookup.cancelFunc() } close(ow.resultCh) close(ow.innerCh) wg.Done() }() for { + failpoint.Inject("TestIssue30211", nil) task, err := ow.buildTask(ctx) if err != nil { task.doneCh <- err @@ -450,12 +477,16 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { var task *lookUpJoinTask defer func() { if r := recover(); r != nil { + iw.lookup.finished.Store(true) buf := make([]byte, 4096) stackSize := runtime.Stack(buf, false) buf = buf[:stackSize] logutil.Logger(ctx).Error("innerWorker panicked", zap.String("stack", string(buf))) + err := errors.Errorf("%v", r) // "task != nil" is guaranteed when panic happened. - task.doneCh <- errors.Errorf("%v", r) + task.doneCh <- err + iw.lookup.ctxCancelReason.Store(err) + iw.lookup.cancelFunc() } wg.Done() }() @@ -489,6 +520,9 @@ func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) err atomic.AddInt64(&iw.stats.totalTime, int64(time.Since(start))) }() } + defer func() { + iw.memTracker.Consume(-iw.memTracker.BytesConsumed()) + }() lookUpContents, err := iw.constructLookupContent(task) if err != nil { return err @@ -527,6 +561,9 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi } return nil, err } + if rowIdx == 0 { + iw.lookup.memTracker.Consume(types.EstimatedMemUsage(dLookUpKey, numRows)) + } if dHashKey == nil { // Append null to make looUpKeys the same length as outer Result. task.encodedLookUpKeys[chkIdx].AppendNull(0) @@ -651,7 +688,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa atomic.AddInt64(&iw.stats.fetch, int64(time.Since(start))) }() } - innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters, true) + innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters, true, iw.memTracker, iw.lookup.finished) if innerExec != nil { defer terror.Call(innerExec.Close) } @@ -665,6 +702,9 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa for { select { case <-ctx.Done(): + if err := iw.lookup.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() default: } @@ -733,6 +773,7 @@ func (e *IndexLookUpJoin) Close() error { e.workerWg.Wait() e.memTracker = nil e.task = nil + e.finished.Store(false) return e.baseExecutor.Close() } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 9bbe55537421b..746fc6a5733fc 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -502,7 +502,7 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo dLookUpKeys[i], dLookUpKeys[lenKeys-i-1] = dLookUpKeys[lenKeys-i-1], dLookUpKeys[i] } } - imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters, false) + imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters, false, nil, nil) if imw.innerExec != nil { defer terror.Call(imw.innerExec.Close) } diff --git a/executor/join_test.go b/executor/join_test.go index 35d5a914d6470..e55708bfebd8a 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2610,3 +2610,39 @@ func (s *testSuiteJoinSerial) TestIssue25902(c *C) { tk.MustQuery("select * from tt1 where ts in (select ts from tt2);").Check(testkit.Rows()) tk.MustExec("set @@session.time_zone = @tmp;") } + +func (s *testSuiteJoinSerial) TestIssue30211(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int, index(a));") + tk.MustExec("create table t2(a int, index(a));") + func() { + fpName := "github.com/pingcap/tidb/executor/TestIssue30211" + c.Assert(failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic") + + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic") + }() + tk.MustExec("insert into t1 values(1),(2);") + tk.MustExec("insert into t2 values(1),(1),(2),(2);") + tk.MustExec("set @@tidb_mem_quota_query=8000;") + tk.MustExec("set tidb_index_join_batch_size = 1;") + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + defer func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionLog + }) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue) + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue) +} diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index b6b83e5ce598d..c507f89ef6b66 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1565,6 +1565,9 @@ func (ijHelper *indexJoinBuildHelper) buildTemplateRange(matchedKeyCnt int, eqAn if len(oneColumnRan) == 0 { return nil, true, nil } + if sc.MemTracker != nil { + sc.MemTracker.Consume(2 * types.EstimatedMemUsage(oneColumnRan[0].LowVal, len(oneColumnRan))) + } for _, ran := range ranges { ran.LowVal[i] = oneColumnRan[0].LowVal[0] ran.HighVal[i] = oneColumnRan[0].HighVal[0] @@ -1578,6 +1581,9 @@ func (ijHelper *indexJoinBuildHelper) buildTemplateRange(matchedKeyCnt int, eqAn newRange.HighVal[i] = oneColumnRan[ranIdx].HighVal[0] newRanges = append(newRanges, newRange) } + if sc.MemTracker != nil && len(newRanges) != 0 { + sc.MemTracker.Consume(2 * types.EstimatedMemUsage(newRanges[0].LowVal, len(newRanges))) + } ranges = append(ranges, newRanges...) } j++ diff --git a/util/memory/tracker.go b/util/memory/tracker.go index cda7a67ad278b..470029e309402 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -552,4 +552,8 @@ const ( LabelForSimpleTask int = -18 // LabelForCTEStorage represents the label of CTE storage LabelForCTEStorage int = -19 + // LabelForIndexJoinInnerWorker represents the label of IndexJoin InnerWorker + LabelForIndexJoinInnerWorker int = -20 + // LabelForIndexJoinOuterWorker represents the label of IndexJoin OuterWorker + LabelForIndexJoinOuterWorker int = -21 )