diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 8fc6f552e60d3..78fcebcc739e9 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -609,7 +609,12 @@ type backfillScheduler struct { copReqSenderPool *copReqSenderPool // for add index in ingest way. } -const backfillTaskChanSize = 1024 +var backfillTaskChanSize = 1024 + +// SetBackfillTaskChanSizeForTest is only used for test. +func SetBackfillTaskChanSizeForTest(n int) { + backfillTaskChanSize = n +} func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool, tp backfillWorkerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column, @@ -830,8 +835,11 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic if err != nil { return errors.Trace(err) } - scheduler.setMaxWorkerSize(len(kvRanges)) + if len(kvRanges) == 0 { + break + } + scheduler.setMaxWorkerSize(len(kvRanges)) err = scheduler.adjustWorkerSize() if err != nil { return errors.Trace(err) @@ -854,14 +862,17 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic if err != nil { return errors.Trace(err) } - - if len(remains) == 0 { - if ingestBeCtx != nil { - ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID) - } + if len(remains) > 0 { + startKey = remains[0].StartKey + } else { + startKey = kvRanges[len(kvRanges)-1].EndKey + } + if startKey.Cmp(endKey) >= 0 { break } - startKey = remains[0].StartKey + } + if ingestBeCtx != nil { + ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID) } return nil } diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 9b79704d3e7b4..df9c9baa05931 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -334,3 +334,26 @@ func TestAddIndexIngestUniqueKey(t *testing.T) { tk.MustExec("split table t by ('m');") tk.MustGetErrMsg("alter table t add unique index idx(b, c);", "[kv:1062]Duplicate entry '1-c1' for key 't.idx'") } + +func TestAddIndexSplitTableRanges(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + + tk.MustExec("create table t (a int primary key, b int);") + for i := 0; i < 8; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) + } + tk.MustQuery("split table t between (0) and (80000) regions 7;").Check(testkit.Rows("6 1")) + + ddl.SetBackfillTaskChanSizeForTest(4) + tk.MustExec("alter table t add index idx(b);") + tk.MustExec("admin check table t;") + ddl.SetBackfillTaskChanSizeForTest(7) + tk.MustExec("alter table t add index idx_2(b);") + tk.MustExec("admin check table t;") + ddl.SetBackfillTaskChanSizeForTest(1024) +}