diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 8dc359fa37163..9a7231726400f 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -50,6 +50,14 @@ var ( _ Executor = &IndexMergeReaderExecutor{} ) +const ( + partialIndexWorkerType = "IndexMergePartialIndexWorker" + partialTableWorkerType = "IndexMergePartialTableWorker" + processWorkerType = "IndexMergeProcessWorker" + partTblIntersectionWorkerType = "IndexMergePartTblIntersectionWorker" + tableScanWorkerType = "IndexMergeTableScanWorker" +) + // IndexMergeReaderExecutor accesses a table with multiple index/table scan. // There are three types of workers: // 1. partialTableWorker/partialIndexWorker, which are used to fetch the handles @@ -86,10 +94,10 @@ type IndexMergeReaderExecutor struct { // All fields above are immutable. - tblWorkerWg sync.WaitGroup - idxWorkerWg sync.WaitGroup - processWokerWg sync.WaitGroup - finished chan struct{} + tblWorkerWg sync.WaitGroup + idxWorkerWg sync.WaitGroup + processWorkerWg sync.WaitGroup + finished chan struct{} workerStarted bool keyRanges [][]kv.KeyRange @@ -262,7 +270,7 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont indexMerge: e, stats: e.stats, } - e.processWokerWg.Add(1) + e.processWorkerWg.Add(1) go func() { defer trace.StartRegion(ctx, "IndexMergeProcessWorker").End() util.WithRecovery( @@ -273,13 +281,19 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont idxMergeProcessWorker.fetchLoopUnion(ctx, fetch, workCh, e.resultCh, e.finished) } }, - idxMergeProcessWorker.handleLoopFetcherPanic(ctx, e.resultCh, "IndexMergeProcessWorker", nil), + handleWorkerPanic(ctx, e.finished, e.resultCh, nil, processWorkerType), ) - e.processWokerWg.Done() + e.processWorkerWg.Done() }() } func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, workID int) error { + failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { + // Wait for processWorker to close resultCh. + time.Sleep(2) + // Should use fetchCh instead of resultCh to send error. + syncErr(ctx, e.finished, fetchCh, errors.New("testIndexMergeResultChCloseEarly")) + }) if e.runtimeStats != nil { collExec := true e.dagPBs[workID].CollectExecutionSummaries = &collExec @@ -303,6 +317,17 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, defer e.idxWorkerWg.Done() util.WithRecovery( func() { + failpoint.Inject("testIndexMergePanicPartialIndexWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-e.finished: + failpoint.Return() + case <-exitCh: + failpoint.Return() + } + }) worker := &partialIndexWorker{ stats: e.stats, idxID: e.getPartitalPlanID(workID), @@ -310,13 +335,14 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, batchSize: e.maxChunkSize, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, + memTracker: e.memTracker, } if e.isCorColInPartialFilters[workID] { // We got correlated column, so need to refresh Selection operator. var err error if e.dagPBs[workID].Executors, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { - syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } } @@ -350,12 +376,12 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, }) kvReq, err := builder.SetKeyRanges(keyRange).Build() if err != nil { - syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID)) if err != nil { - syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } worker.batchSize = e.maxChunkSize @@ -368,7 +394,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // fetch all data from this partition ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols, parTblIdx) + _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.finished, e.handleCols, parTblIdx) if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again e.feedbacks[workID].Invalidate() } @@ -382,7 +408,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, } } }, - e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialIndexWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, nil, "partialIndexWorker"), ) }() @@ -406,6 +432,17 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, defer e.idxWorkerWg.Done() util.WithRecovery( func() { + failpoint.Inject("testIndexMergePanicPartialTableWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-e.finished: + failpoint.Return() + case <-exitCh: + failpoint.Return() + } + }) var err error partialTableReader := &TableReaderExecutor{ baseExecutor: newBaseExecutor(e.ctx, ts.Schema(), e.getPartitalPlanID(workID)), @@ -427,11 +464,12 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, tableReader: partialTableReader, + memTracker: e.memTracker, } if e.isCorColInPartialFilters[workID] { if e.dagPBs[workID].Executors, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { - syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } partialTableReader.dagPB = e.dagPBs[workID] @@ -449,7 +487,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, partialTableReader.table = tbl if err = partialTableReader.Open(ctx); err != nil { logutil.Logger(ctx).Error("open Select result failed:", zap.Error(err)) - syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) break } worker.batchSize = e.maxChunkSize @@ -462,7 +500,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // fetch all handles from this table ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols, parTblIdx) + _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.finished, e.handleCols, parTblIdx) if fetchErr != nil { // this error is synced in fetchHandles, so don't sync it again e.feedbacks[workID].Invalidate() } @@ -478,7 +516,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, } } }, - e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialTableWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, nil, "partialTableWorker"), ) }() return nil @@ -514,9 +552,10 @@ type partialTableWorker struct { maxChunkSize int tableReader Executor partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table + memTracker *memory.Tracker } -func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, +func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols, parTblIdx int) (count int64, err error) { chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize, w.maxChunkSize, w.tableReader.base().AllocPool) var basic *execdetails.BasicRuntimeStats @@ -527,7 +566,7 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols) if err != nil { - syncErr(resultCh, err) + syncErr(ctx, finished, fetchCh, err) return count, err } if len(handles) == 0 { @@ -556,6 +595,8 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) + var memUsage int64 + defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(w.tableReader.Next(ctx, chk)) @@ -563,8 +604,14 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, nil, err } if chk.NumRows() == 0 { + failpoint.Inject("testIndexMergeErrorPartialTableWorker", func(v failpoint.Value) { + failpoint.Return(handles, nil, errors.New(v.(string))) + }) return handles, retChk, nil } + memDelta := chk.MemoryUsage() + memUsage += memDelta + w.memTracker.Consume(memDelta) for i := 0; i < chk.NumRows(); i++ { handle, err := handleCols.BuildHandle(chk.GetRow(i)) if err != nil { @@ -612,8 +659,18 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End() var task *indexMergeTableTask util.WithRecovery( +<<<<<<< HEAD func() { task = worker.pickAndExecTask(ctx1) }, worker.handlePickAndExecTaskPanic(ctx1, task), +======= + // Note we use the address of `task` as the argument of both `pickAndExecTask` and `handleTableScanWorkerPanic` + // because `task` is expected to be assigned in `pickAndExecTask`, and this assignment should also be visible + // in `handleTableScanWorkerPanic` since it will get `doneCh` from `task`. Golang always pass argument by value, + // so if we don't use the address of `task` as the argument, the assignment to `task` in `pickAndExecTask` is + // not visible in `handleTableScanWorkerPanic` + func() { worker.pickAndExecTask(ctx1, &task) }, + worker.handleTableScanWorkerPanic(ctx1, e.finished, &task, tableScanWorkerType), +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) ) cancel() e.tblWorkerWg.Done() @@ -694,21 +751,38 @@ func (e *IndexMergeReaderExecutor) getResultTask() (*indexMergeTableTask, error) return e.resultCurr, nil } -func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context, resultCh chan<- *indexMergeTableTask, worker string) func(r interface{}) { +func handleWorkerPanic(ctx context.Context, finished <-chan struct{}, ch chan<- *indexMergeTableTask, extraNotifyCh chan bool, worker string) func(r interface{}) { return func(r interface{}) { + if worker == processWorkerType { + // There is only one processWorker, so it's safe to close here. + // No need to worry about "close on closed channel" error. + defer close(ch) + } if r == nil { return } - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor %s: %v", worker, r) + if extraNotifyCh != nil { + extraNotifyCh <- true + } + + err4Panic := errors.Errorf("%s: %v", worker, r) logutil.Logger(ctx).Error(err4Panic.Error()) doneCh := make(chan error, 1) doneCh <- err4Panic - resultCh <- &indexMergeTableTask{ + task := &indexMergeTableTask{ lookupTableTask: lookupTableTask{ doneCh: doneCh, }, } + select { + case <-ctx.Done(): + return + case <-finished: + return + case ch <- task: + return + } } } @@ -721,9 +795,9 @@ func (e *IndexMergeReaderExecutor) Close() error { return nil } close(e.finished) - e.processWokerWg.Wait() e.tblWorkerWg.Wait() e.idxWorkerWg.Wait() + e.processWorkerWg.Wait() e.finished = nil e.workerStarted = false // TODO: how to store e.feedbacks @@ -737,17 +811,32 @@ type indexMergeProcessWorker struct { func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-chan *indexMergeTableTask, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { - defer func() { - close(workCh) - close(resultCh) - }() + failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { + failpoint.Return() + }) + memTracker := memory.NewTracker(w.indexMerge.id, -1) + memTracker.AttachTo(w.indexMerge.memTracker) + defer memTracker.Detach() + defer close(workCh) + failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil) distinctHandles := make(map[int64]*kv.HandleMap) for task := range fetchCh { + select { + case err := <-task.doneCh: + // If got error from partialIndexWorker/partialTableWorker, stop processing. + if err != nil { + syncErr(ctx, finished, resultCh, err) + return + } + default: + } start := time.Now() handles := task.handles fhs := make([]kv.Handle, 0, 8) + memTracker.Consume(int64(cap(task.handles) * 8)) + var tblID int64 if w.indexMerge.partitionTableMode { tblID = getPhysicalTableID(task.partitionTable) @@ -812,6 +901,7 @@ func (w *intersectionProcessWorker) consumeMemDelta() { } func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Context, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { + failpoint.Inject("testIndexMergePanicPartitionTableIntersectionWorker", nil) defer w.memTracker.Detach() for task := range w.workerCh { @@ -900,10 +990,7 @@ func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Conte // To avoid too many goroutines, each intersectionProcessWorker can handle multiple partitions. func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fetchCh <-chan *indexMergeTableTask, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { - defer func() { - close(workCh) - close(resultCh) - }() + defer close(workCh) if w.stats != nil { start := time.Now() @@ -912,6 +999,8 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet }() } + failpoint.Inject("testIndexMergePanicProcessWorkerIntersection", nil) + // One goroutine may handle one or multiple partitions. // Max number of partition number is 8192, we use ExecutorConcurrency to avoid too many goroutines. maxWorkerCnt := w.indexMerge.ctx.GetSessionVars().IndexMergeIntersectionConcurrency() @@ -947,14 +1036,25 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet wg.RunWithRecover(func() { defer trace.StartRegion(ctx, "IndexMergeIntersectionProcessWorker").End() worker.doIntersectionPerPartition(ctx, workCh, resultCh, finished) - }, w.handleLoopFetcherPanic(ctx, resultCh, "IndexMergeIntersectionProcessWorker", errCh)) + }, handleWorkerPanic(ctx, finished, resultCh, errCh, partTblIntersectionWorkerType)) workers = append(workers, worker) } loop: for task := range fetchCh { + select { + case err := <-task.doneCh: + // If got error from partialIndexWorker/partialTableWorker, stop processing. + if err != nil { + syncErr(ctx, finished, resultCh, err) + break loop + } + default: + } + select { case workers[task.parTblIdx%workerCnt].workerCh <- task: case <-errCh: + // If got error from intersectionProcessWorker, stop processing. break loop } } @@ -964,27 +1064,6 @@ loop: wg.Wait() } -func (w *indexMergeProcessWorker) handleLoopFetcherPanic(ctx context.Context, resultCh chan<- *indexMergeTableTask, worker string, extraCh chan bool) func(r interface{}) { - return func(r interface{}) { - if r == nil { - return - } - if extraCh != nil { - extraCh <- true - } - - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor %s: %v", worker, r) - logutil.Logger(ctx).Error(err4Panic.Error()) - doneCh := make(chan error, 1) - doneCh <- err4Panic - resultCh <- &indexMergeTableTask{ - lookupTableTask: lookupTableTask{ - doneCh: doneCh, - }, - } - } -} - type partialIndexWorker struct { stats *IndexMergeRuntimeStat sc sessionctx.Context @@ -993,16 +1072,28 @@ type partialIndexWorker struct { maxBatchSize int maxChunkSize int partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table + memTracker *memory.Tracker } -func syncErr(resultCh chan<- *indexMergeTableTask, err error) { +func syncErr(ctx context.Context, finished <-chan struct{}, errCh chan<- *indexMergeTableTask, err error) { + logutil.BgLogger().Error("IndexMergeReaderExecutor.syncErr", zap.Error(err)) doneCh := make(chan error, 1) doneCh <- err - resultCh <- &indexMergeTableTask{ + task := &indexMergeTableTask{ lookupTableTask: lookupTableTask{ doneCh: doneCh, }, } + + // ctx.Done and finished is to avoid write channel is stuck. + select { + case <-ctx.Done(): + return + case <-finished: + return + case errCh <- task: + return + } } func (w *partialIndexWorker) fetchHandles( @@ -1010,7 +1101,6 @@ func (w *partialIndexWorker) fetchHandles( result distsql.SelectResult, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, - resultCh chan<- *indexMergeTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols, parTblIdx int) (count int64, err error) { @@ -1025,7 +1115,7 @@ func (w *partialIndexWorker) fetchHandles( start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, result, handleCols) if err != nil { - syncErr(resultCh, err) + syncErr(ctx, finished, fetchCh, err) return count, err } if len(handles) == 0 { @@ -1057,6 +1147,8 @@ func (w *partialIndexWorker) fetchHandles( func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) + var memUsage int64 + defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(idxResult.Next(ctx, chk)) @@ -1064,8 +1156,14 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, nil, err } if chk.NumRows() == 0 { + failpoint.Inject("testIndexMergeErrorPartialIndexWorker", func(v failpoint.Value) { + failpoint.Return(handles, nil, errors.New(v.(string))) + }) return handles, retChk, nil } + memDelta := chk.MemoryUsage() + memUsage += memDelta + w.memTracker.Consume(memDelta) for i := 0; i < chk.NumRows(); i++ { handle, err := handleCols.BuildHandleFromIndexRow(chk.GetRow(i)) if err != nil { @@ -1119,6 +1217,17 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task * case <-w.finished: return } + // Make sure panic failpoint is after fetch task from workCh. + // Otherwise cannot send error to task.doneCh. + failpoint.Inject("testIndexMergePanicTableScanWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-w.finished: + failpoint.Return() + } + }) execStart := time.Now() err := w.executeTask(ctx, task) if w.stats != nil { @@ -1130,15 +1239,32 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task * } } +<<<<<<< HEAD func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task *indexMergeTableTask) func(r interface{}) { +======= +func (w *indexMergeTableScanWorker) handleTableScanWorkerPanic(ctx context.Context, finished <-chan struct{}, task **indexMergeTableTask, worker string) func(r interface{}) { +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) return func(r interface{}) { if r == nil { return } - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r) + err4Panic := errors.Errorf("%s: %v", worker, r) logutil.Logger(ctx).Error(err4Panic.Error()) +<<<<<<< HEAD task.doneCh <- err4Panic +======= + if *task != nil { + select { + case <-ctx.Done(): + return + case <-finished: + return + case (*task).doneCh <- err4Panic: + return + } + } +>>>>>>> bc2c1b229df (executor: fix IndexMerge handle panic logic (#41036)) } } diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 79d2d8b895a81..0582433d4f019 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -747,9 +747,9 @@ func TestIntersectionWorkerPanic(t *testing.T) { require.Contains(t, res[1][0], "IndexMerge") // Test panic in intersection. - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic", "panic")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic", `panic("testIndexMergeIntersectionWorkerPanic")`)) err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") - require.Contains(t, err.Error(), "IndexMergeReaderExecutor") + require.Contains(t, err.Error(), "testIndexMergeIntersectionWorkerPanic") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic")) } @@ -778,3 +778,87 @@ func TestIntersectionMemQuota(t *testing.T) { err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024") require.Contains(t, err.Error(), "Out Of Memory Quota!") } + +func TestIndexMergePanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") + tk.MustExec("insert into t1 values(1, 1, 1), (100, 100, 100)") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly", "return(true)")) + tk.MustExec("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c1 < 100 or c2 < 100") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly")) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + insertStr := "insert into t1 values(0, 0, 0)" + for i := 1; i < 1000; i++ { + insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) + } + tk.MustExec(insertStr) + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + + minV := 200 + maxV := 1000 + runSQL := func(fp string) { + var sql string + v1 := rand.Intn(maxV-minV) + minV + v2 := rand.Intn(maxV-minV) + minV + if !strings.Contains(fp, "Intersection") { + sql = fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < %d or c2 < %d;", v1, v2) + } else { + sql = fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c3 < %d and c2 < %d", v1, v2) + } + res := tk.MustQuery("explain " + sql).Rows() + require.Contains(t, res[1][0], "IndexMerge") + err := tk.QueryToErr(sql) + require.Contains(t, err.Error(), fp) + } + + packagePath := "github.com/pingcap/tidb/executor/" + panicFPPaths := []string{ + packagePath + "testIndexMergePanicPartialIndexWorker", + packagePath + "testIndexMergePanicPartialTableWorker", + + packagePath + "testIndexMergePanicProcessWorkerUnion", + packagePath + "testIndexMergePanicProcessWorkerIntersection", + packagePath + "testIndexMergePanicPartitionTableIntersectionWorker", + + packagePath + "testIndexMergePanicTableScanWorker", + } + for _, fp := range panicFPPaths { + fmt.Println("handling failpoint: ", fp) + if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") { + // When mockSleepBeforeStartTableReader is enabled, will not read real data. This is to avoid leaking goroutines in coprocessor. + // But should disable mockSleepBeforeStartTableReader for testIndexMergePanicTableScanWorker. + // Because finalTableScanWorker need task.doneCh to pass error, so need partialIndexWorker/partialTableWorker runs normally. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader", "return(1000)")) + } + for i := 0; i < 1000; i++ { + require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp))) + runSQL(fp) + require.NoError(t, failpoint.Disable(fp)) + } + if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader")) + } + } + + errFPPaths := []string{ + packagePath + "testIndexMergeErrorPartialIndexWorker", + packagePath + "testIndexMergeErrorPartialTableWorker", + } + for _, fp := range errFPPaths { + fmt.Println("handling failpoint: ", fp) + require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`return("%s")`, fp))) + for i := 0; i < 100; i++ { + runSQL(fp) + } + require.NoError(t, failpoint.Disable(fp)) + } +}