Skip to content

Commit

Permalink
ddl: check the key existence on original index (#40749)
Browse files Browse the repository at this point in the history
close #40730
  • Loading branch information
tangenta committed Feb 10, 2023
1 parent 893cdb4 commit c6e6d62
Show file tree
Hide file tree
Showing 11 changed files with 850 additions and 276 deletions.
15 changes: 15 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,15 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m
func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}

failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && indexInfo.BackfillState == model.BackfillStateMerging &&
MockDMLExecutionStateMerging != nil {
MockDMLExecutionStateMerging()
}
})

sctx, err1 := w.sessPool.get()
if err1 != nil {
err = err1
Expand Down Expand Up @@ -1789,6 +1798,12 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
// MockDMLExecution is only used for test.
var MockDMLExecution func()

// MockDMLExecutionMerging is only used for test.
var MockDMLExecutionMerging func()

// MockDMLExecutionStateMerging is only used for test.
var MockDMLExecutionStateMerging func()

func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
Expand Down
72 changes: 44 additions & 28 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -198,6 +199,12 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC
return nil
})

failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && MockDMLExecutionMerging != nil {
MockDMLExecutionMerging()
}
})
logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000)
return
}
Expand Down Expand Up @@ -252,40 +259,49 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
return false, nil
}

originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle)
if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
return true, nil
tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle)
if err != nil {
return false, err
}
tempIdxVal = tempIdxVal.FilterOverwritten()

// Extract the operations on the original index and replay them later.
for _, elem := range tempIdxVal {
if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
continue
}

if handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns))
if err != nil {
return false, err
if elem.Handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, len(w.index.Meta().Columns))
if err != nil {
return false, err
}
}
}

originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)
originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)

idxRecord := &temporaryIndexRecord{
handle: handle,
delete: isDelete,
unique: unique,
skip: false,
}
if !isDelete {
idxRecord.vals = originVal
idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal)
idxRecord := &temporaryIndexRecord{
handle: elem.Handle,
delete: elem.Delete,
unique: elem.Distinct,
skip: false,
}
if !elem.Delete {
idxRecord.vals = elem.Value
idxRecord.distinct = tablecodec.IndexKVIsUnique(elem.Value)
}
w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord)
w.originIdxKeys = append(w.originIdxKeys, originIdxKey)
w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey)
}
w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord)
w.originIdxKeys = append(w.originIdxKeys, originIdxKey)
w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey)

lastKey = indexKey
return true, nil
})
Expand Down
1 change: 1 addition & 0 deletions ddl/indexmergetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_test(
"//ddl/internal/callback",
"//ddl/testutil",
"//domain",
"//errno",
"//kv",
"//meta/autoid",
"//parser/model",
Expand Down
Loading

0 comments on commit c6e6d62

Please sign in to comment.