Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: check the key existence on original index #40749

Merged
merged 37 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3b95add
ddl: check the key existence on original index
tangenta Jan 20, 2023
c5468ad
Merge branch 'master' into add-index-check-exist
hawkingrei Jan 20, 2023
df2fb49
add a test for insert on duplicate update
tangenta Jan 28, 2023
576c374
update comment
tangenta Jan 28, 2023
bd84f66
Merge remote-tracking branch 'upstream/master' into add-index-check-e…
tangenta Jan 29, 2023
6a9b48e
fix replace and insert on duplicate update issues
tangenta Jan 30, 2023
a224037
fix replace and insert on duplicate update issues
tangenta Jan 31, 2023
c0548a4
make context.Context as the first arg of a function
tangenta Jan 31, 2023
f220a48
Merge remote-tracking branch 'upstream/master' into add-index-check-e…
tangenta Jan 31, 2023
549e14b
fix admin check
tangenta Jan 31, 2023
5f1f47a
Merge branch 'master' into add-index-check-exist
tangenta Jan 31, 2023
fc12610
fix integration test
tangenta Feb 1, 2023
9303b0b
Merge branch 'master' into add-index-check-exist
tangenta Feb 1, 2023
1874a4e
add some comments
tangenta Feb 1, 2023
dda1225
address comment
tangenta Feb 2, 2023
0e36c80
fix merging case
tangenta Feb 2, 2023
ee5cbd7
fix linter
tangenta Feb 2, 2023
098b834
Merge remote-tracking branch 'upstream/master' into add-index-check-e…
tangenta Feb 7, 2023
b9c995b
record all the operations to the temp index value
tangenta Feb 7, 2023
c2040b1
fix linter
tangenta Feb 7, 2023
495466b
Merge branch 'master' into add-index-check-exist
tangenta Feb 7, 2023
c3984c4
update bazel
tangenta Feb 7, 2023
1c6e9d6
fix panic on clustered index table
tangenta Feb 8, 2023
bd7d17a
address comment
tangenta Feb 8, 2023
5323e89
Merge branch 'master' into add-index-check-exist
tangenta Feb 8, 2023
7f4a3c4
print value to log when inconsistency detected
tangenta Feb 8, 2023
9dfe8af
fix insert ignore may introduce invalid temp index value
tangenta Feb 8, 2023
2865dd8
Merge branch 'master' into add-index-check-exist
tangenta Feb 8, 2023
86d5a9c
move failpoint to a common place for distributed reorg
tangenta Feb 8, 2023
c365c3e
remove debug info and refine
tangenta Feb 8, 2023
bfe6db6
Merge remote-tracking branch 'upstream/master' into add-index-check-e…
tangenta Feb 8, 2023
f51ce7f
fix build
tangenta Feb 8, 2023
0e75f01
should not return during iterating temp index value elem
tangenta Feb 8, 2023
8d1ed64
add more test for TempIndexValue encoding
tangenta Feb 8, 2023
5d739db
Merge remote-tracking branch 'upstream/master' into add-index-check-e…
tangenta Feb 8, 2023
29aaa12
Merge branch 'master' into add-index-check-exist
tangenta Feb 9, 2023
5945d0e
Merge branch 'master' into add-index-check-exist
purelind Feb 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,12 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true)
return false, ver, errors.Trace(err)
case model.BackfillStateMerging:
failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && MockDMLExecutionMerging != nil {
MockDMLExecutionMerging()
}
})
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, true)
if !done {
return false, ver, err
Expand Down Expand Up @@ -1713,6 +1719,9 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
// MockDMLExecution is only used for test.
var MockDMLExecution func()

// MockDMLExecutionMerging is only used for test.
var MockDMLExecutionMerging func()

func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
Expand Down
119 changes: 119 additions & 0 deletions ddl/indexmergetest/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,122 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2"))
}

func TestAddIndexMergeInsertOnMerging(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int default 0, b int default 0)")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &callback.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
if t.Failed() {
return
}
var err error
switch job.SchemaState {
case model.StateDeleteOnly:
_, err = tk1.Exec("insert into t values (5, 5)")
assert.NoError(t, err)
case model.StateWriteOnly:
_, err = tk1.Exec("insert into t values (5, 7)")
assert.NoError(t, err)
_, err = tk1.Exec("delete from t where b = 7")
assert.NoError(t, err)
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)

ddl.MockDMLExecutionMerging = func() {
_, err := tk1.Exec("insert into t values (5, 8);")
assert.Error(t, err) // [kv:1062]Duplicate entry '5' for key 't.idx'
_, err = tk1.Exec("insert into t values (5, 8) on duplicate key update a = 6;")
assert.NoError(t, err) // The row should be normally updated to (6, 5).
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)"))
tk.MustExec("alter table t add unique index idx(a);")
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("6 5"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging"))
}

func TestAddIndexMergeReplaceOnMerging(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int default 0, b int default 0);")
tk.MustExec("insert into t values (5, 5);")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

ddl.MockDMLExecution = func() {
_, err := tk1.Exec("delete from t where b = 5;")
assert.NoError(t, err)
}

ddl.MockDMLExecutionMerging = func() {
_, err := tk1.Exec("replace into t values (5, 8);")
assert.NoError(t, err)
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)"))
tk.MustExec("alter table t add unique index idx(a);")
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("5 8"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging"))
}

func TestAddIndexMergeInsertToDeletedTempIndex(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (5, 5);")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &callback.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
if t.Failed() {
return
}
var err error
switch job.SchemaState {
case model.StateWriteOnly:
_, err = tk1.Exec("delete from t where b = 5")
assert.NoError(t, err)
_, err := tk1.Exec("set @@tidb_constraint_check_in_place = true;")
assert.NoError(t, err)
_, err = tk1.Exec("insert into t values (5, 8);")
assert.NoError(t, err)
_, err = tk1.Exec("insert into t values (5, 8);")
assert.Error(t, err)
_, err = tk1.Exec("set @@tidb_constraint_check_in_place = false;")
assert.NoError(t, err)
_, err = tk1.Exec("insert into t values (5, 8);")
assert.Error(t, err)
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)

tk.MustExec("alter table t add unique index idx(a);")
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("5 8"))
}
27 changes: 6 additions & 21 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -158,11 +159,8 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t
for _, r := range rows {
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKey)]; found {
if tablecodec.IsTempIndexKey(uk.newKey) {
if tablecodec.CheckTempIndexValueIsDelete(val) {
continue
}
val = tablecodec.DecodeTempIndexOriginValue(val)
if isTemp, _ := tablecodec.IsTempIndexKey(uk.newKey); isTemp {
continue
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
if err != nil {
Expand Down Expand Up @@ -261,26 +259,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}

for _, uk := range r.uniqueKeys {
val, err := txn.Get(ctx, uk.newKey)
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return err
}
// Since the temp index stores deleted key with marked 'deleteu' for unique key at the end
// of value, So if return a key we check and skip deleted key.
if tablecodec.IsTempIndexKey(uk.newKey) {
if tablecodec.CheckTempIndexValueIsDelete(val) {
continue
}
val = tablecodec.DecodeTempIndexOriginValue(val)
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
if err != nil {
return err
if handle == nil {
continue
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
Expand Down
17 changes: 4 additions & 13 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -176,22 +177,12 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
val, err := txn.Get(ctx, uk.newKey)
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return false, false, err
}
if tablecodec.IsTempIndexKey(uk.newKey) {
if tablecodec.CheckTempIndexValueIsDelete(val) {
continue
}
val = tablecodec.DecodeTempIndexOriginValue(val)
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
if err != nil {
return false, true, err
if handle == nil {
continue
}
rowUnchanged, err := e.removeRow(ctx, txn, handle, r)
if err != nil {
Expand Down
Loading