Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, model: support for dist-reorg on partitioned tables #41145

Merged
merged 18 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 76 additions & 64 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand All @@ -61,12 +62,14 @@ const (
typeAddIndexMergeTmpWorker backfillerType = 3

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
updateInstanceLease = 25 * time.Second
genTaskBatch = 4096
minGenTaskBatch = 1024
minDistTaskCnt = 32
retrySQLTimes = 10
InstanceLease = 1 * time.Minute
updateInstanceLease = 25 * time.Second
genTaskBatch = 4096
genPhysicalTableTaskBatch = 256
minGenTaskBatch = 1024
minGenPhysicalTableTaskBatch = 64
minDistTaskCnt = 64
retrySQLTimes = 10
)

// RetrySQLInterval is export for test.
Expand All @@ -89,15 +92,15 @@ func (bT backfillerType) String() string {

// BackfillJob is for a tidb_ddl_backfill table's record.
type BackfillJob struct {
ID int64
JobID int64
EleID int64
EleKey []byte
Tp backfillerType
State model.JobState
StoreID int64
InstanceID string
InstanceLease types.Time
ID int64
JobID int64
EleID int64
EleKey []byte
PhysicalTableID int64
Tp backfillerType
State model.JobState
InstanceID string
InstanceLease types.Time
// range info
CurrKey []byte
StartKey []byte
Expand Down Expand Up @@ -252,14 +255,13 @@ type reorgBackfillTask struct {
physicalTable table.PhysicalTable

// TODO: Remove the following fields after remove the function of run.
id int
physicalTableID int64
startKey kv.Key
endKey kv.Key
endInclude bool
jobID int64
sqlQuery string
priority int
id int
startKey kv.Key
endKey kv.Key
endInclude bool
jobID int64
sqlQuery string
priority int
}

func (r *reorgBackfillTask) getJobID() int64 {
Expand All @@ -278,7 +280,7 @@ func (r *reorgBackfillTask) excludedEndKey() kv.Key {
}

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
physicalID := strconv.FormatInt(r.physicalTable.GetPhysicalID(), 10)
startKey := hex.EncodeToString(r.startKey)
endKey := hex.EncodeToString(r.endKey)
rangeStr := "taskID_" + strconv.Itoa(r.id) + "_physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
Expand Down Expand Up @@ -366,6 +368,9 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
rc := d.getReorgCtx(jobID)

isDistReorg := task.bfJob != nil
if isDistReorg {
w.initPartitionIndexInfo(task)
}
for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillDataInTxn we will never cancel the job.
Expand Down Expand Up @@ -437,6 +442,15 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
return result
}

func (w *backfillWorker) initPartitionIndexInfo(task *reorgBackfillTask) {
if pt, ok := w.GetCtx().table.(table.PartitionedTable); ok {
if addIdxWorker, ok := w.backfiller.(*addIndexWorker); ok {
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID)
addIdxWorker.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
}
}
}

func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResult) {
logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w), zap.String("task", task.String()))
defer util.Recover(metrics.LabelDDL, "backfillWorker.runTask", func() {
Expand Down Expand Up @@ -668,7 +682,6 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount

func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, batch)
physicalTableID := reorgInfo.PhysicalTableID
var prefix kv.Key
if reorgInfo.mergingTmpIdx {
prefix = t.IndexPrefix()
Expand All @@ -679,15 +692,15 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
job := reorgInfo.Job
//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
jobCtx := reorgInfo.d.jobContext(reorgInfo.Job.ID)
jobCtx := reorgInfo.d.jobContext(job.ID)
for i, keyRange := range kvRanges {
startKey := keyRange.StartKey
endKey := keyRange.EndKey
endK, err := getRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, prefix, keyRange.StartKey, endKey)
if err != nil {
logutil.BgLogger().Info("[ddl] get backfill range task, get reverse key failed", zap.Error(err))
} else {
logutil.BgLogger().Info("[ddl] get backfill range task, change end key",
logutil.BgLogger().Info("[ddl] get backfill range task, change end key", zap.Int64("pTbl", phyTbl.GetPhysicalID()),
zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK)))
endKey = endK
}
Expand All @@ -699,13 +712,12 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
}

task := &reorgBackfillTask{
id: i,
jobID: reorgInfo.Job.ID,
physicalTableID: physicalTableID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
id: i,
jobID: job.ID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
// If the boundaries overlap, we should ignore the preceding endKey.
endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1}
batchTasks = append(batchTasks, task)
Expand Down Expand Up @@ -1105,8 +1117,8 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error {
return nil
}

func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool,
batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error {
func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, phyTblID int64, notDistTask bool,
batchTasks []*reorgBackfillTask, bJobs []*BackfillJob) error {
bJobs = bJobs[:0]
instanceID := ""
if notDistTask {
Expand All @@ -1116,12 +1128,11 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
// TODO: Adjust the number of ranges(region) for each task.
for _, task := range batchTasks {
bm := &model.BackfillMeta{
PhysicalTableID: reorgInfo.PhysicalTableID,
IsUnique: isUnique,
EndInclude: task.endInclude,
ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp,
SQLMode: reorgInfo.ReorgMeta.SQLMode,
Location: reorgInfo.ReorgMeta.Location,
IsUnique: sJobCtx.isUnique,
EndInclude: task.endInclude,
ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp,
SQLMode: reorgInfo.ReorgMeta.SQLMode,
Location: reorgInfo.ReorgMeta.Location,
JobMeta: &model.JobMeta{
SchemaID: reorgInfo.Job.SchemaID,
TableID: reorgInfo.Job.TableID,
Expand All @@ -1130,19 +1141,19 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
},
}
bj := &BackfillJob{
ID: *id,
JobID: reorgInfo.Job.ID,
EleID: reorgInfo.currElement.ID,
EleKey: reorgInfo.currElement.TypeKey,
Tp: bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
ID: sJobCtx.currBackfillJobID.Add(1),
JobID: reorgInfo.Job.ID,
EleID: reorgInfo.currElement.ID,
EleKey: reorgInfo.currElement.TypeKey,
PhysicalTableID: phyTblID,
Tp: sJobCtx.bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
}
*id++
bJobs = append(bJobs, bj)
}
if err := AddBackfillJobs(sess, bJobs); err != nil {
Expand All @@ -1151,29 +1162,30 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
return nil
}

func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool,
bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error {
endKey := reorgInfo.EndKey
isFirstOps := true
bJobs := make([]*BackfillJob, 0, genTaskBatch)
func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, pTblMeta *BackfillJobRangeMeta) error {
isFirstOps := !sJobCtx.isMultiPhyTbl
batchSize := sJobCtx.batchSize
startKey, endKey := pTblMeta.StartKey, pTblMeta.EndKey
bJobs := make([]*BackfillJob, 0, batchSize)
for {
kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey, genTaskBatch)
kvRanges, err := splitTableRanges(pTblMeta.PhyTbl, reorgInfo.d.store, startKey, endKey, batchSize)
if err != nil {
return errors.Trace(err)
}
batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch)
batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, batchSize)
if len(batchTasks) == 0 {
break
}
notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt)
if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil {
if err = addBatchBackfillJobs(sess, reorgInfo, sJobCtx, pTblMeta.PhyTblID, notNeedDistProcess, batchTasks, bJobs); err != nil {
return errors.Trace(err)
}
isFirstOps = false

remains := kvRanges[len(batchTasks):]
dc.asyncNotifyWorker(dc.backfillJobCh, addingBackfillJob, reorgInfo.Job.ID, "backfill_job")
logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table",
zap.Int64("physicalID", pTblMeta.PhyTblID),
zap.Int("batchTasksCnt", len(batchTasks)),
zap.Int("totalRegionCnt", len(kvRanges)),
zap.Int("remainRegionCnt", len(remains)),
Expand All @@ -1185,11 +1197,11 @@ func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo,
}

for {
bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, pTblMeta.PhyTblID)
if err != nil {
return errors.Trace(err)
}
if bJobCnt < minGenTaskBatch {
if bJobCnt < sJobCtx.minBatchSize {
break
}
time.Sleep(RetrySQLInterval)
Expand Down
4 changes: 2 additions & 2 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
ddl_physical_tid bigint,
zimulala marked this conversation as resolved.
Show resolved Hide resolved
type int,
exec_id blob default null,
exec_lease timestamp,
Expand All @@ -74,7 +74,7 @@ const (
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
ddl_physical_tid bigint,
type int,
exec_id blob default null,
exec_lease timestamp,
Expand Down
3 changes: 3 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ var JobNeedGCForTest = jobNeedGC
// NewSession is only used for test.
var NewSession = newSession

// GetJobWithoutPartition is only used for test.
const GetJobWithoutPartition = getJobWithoutPartition

// GetDDLCtx returns ddlCtx for test.
func GetDDLCtx(d DDL) *ddlCtx {
return d.(*ddl).ddlCtx
Expand Down
Loading