diff --git a/ddl/index.go b/ddl/index.go index ebcffe21ea916..b391dab5fc6e2 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -869,7 +869,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx // Lock the row key to notify us that someone delete or update the row, // then we should not backfill the index of it, otherwise the adding index is redundant. - err := txn.LockKeys(context.Background(), nil, 0, idxRecord.key) + err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key) if err != nil { return errors.Trace(err) } diff --git a/executor/adapter.go b/executor/adapter.go index a3e3929be4927..0568c1cd8bf81 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -24,6 +24,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" @@ -535,7 +536,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { return nil } forUpdateTS := txnCtx.GetForUpdateTS() - err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, keys...) + err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, kv.LockAlwaysWait, keys...) if err == nil { return nil } @@ -546,6 +547,32 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { } } +// GetTimestampWithRetry tries to get timestamp using retry and backoff mechanism +func (a *ExecStmt) GetTimestampWithRetry(ctx context.Context) (uint64, error) { + tsoMaxBackoff := 15000 + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("ExecStmt.GetTimestampWithRetry", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + bo := tikv.NewBackoffer(ctx, tsoMaxBackoff) + for { + ts, err := a.Ctx.GetStore().GetOracle().GetTimestamp(ctx) + // mock get ts fail + failpoint.Inject("ExecStmtGetTsError", func() (uint64, error) { + return 0, errors.New("ExecStmtGetTsError") + }) + + if err == nil { + return ts, nil + } + err = bo.Backoff(tikv.BoPDRPC, errors.Errorf("ExecStmt get timestamp failed: %v", err)) + if err != nil { + return 0, errors.Trace(err) + } + } +} + // handlePessimisticLockError updates TS and rebuild executor if the err is write conflict. func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) { txnCtx := a.Ctx.GetSessionVars().TxnCtx @@ -573,6 +600,19 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E if conflictCommitTS > forUpdateTS { newForUpdateTS = conflictCommitTS } + } else if terror.ErrorEqual(err, tikv.ErrLockAcquireFailAndNoWaitSet) { + // for nowait, when ErrLock happened, ErrLockAcquireFailAndNoWaitSet will be returned, and in the same txn + // the select for updateTs must be updated, otherwise there maybe rollback problem. + // begin; select for update key1(here ErrLocked or other errors(or max_execution_time like util), + // key1 lock not get and async rollback key1 is raised) + // select for update key1 again(this time lock succ(maybe lock released by others)) + // the async rollback operation rollbacked the lock just acquired + newForUpdateTS, tsErr := a.GetTimestampWithRetry(ctx) + if tsErr != nil { + return nil, tsErr + } + txnCtx.SetForUpdateTS(newForUpdateTS) + return nil, err } else { return nil, err } diff --git a/executor/admin.go b/executor/admin.go index dde2395a23518..8c1e88b8d62c7 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -432,7 +432,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa } recordKey := e.table.RecordKey(row.handle) - err := txn.LockKeys(ctx, nil, 0, recordKey) + err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey) if err != nil { return result, err } diff --git a/executor/executor.go b/executor/executor.go index 051da26a805e9..ed9f9ed9a4282 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -801,7 +801,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { return err } // If there's no handle or it's not a `SELECT FOR UPDATE` statement. - if len(e.tblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate { + if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) { return nil } if req.NumRows() != 0 { @@ -815,10 +815,18 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } return nil } - return doLockKeys(ctx, e.ctx, e.keys...) + var lockWaitTime = kv.LockAlwaysWait + if e.Lock == ast.SelectLockForUpdateNoWait { + lockWaitTime = kv.LockNoWait + } + return doLockKeys(ctx, e.ctx, lockWaitTime, e.keys...) } -func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) error { +// doLockKeys is the main entry for pessimistic lock keys +// waitTime means the lock operation will wait in milliseconds if target key is already +// locked by others. used for (select for update nowait) situation +// except 0 means alwaysWait 1 means nowait +func doLockKeys(ctx context.Context, se sessionctx.Context, waitTime int64, keys ...kv.Key) error { se.GetSessionVars().TxnCtx.ForUpdate = true // Lock keys only once when finished fetching all results. txn, err := se.Txn(true) @@ -826,7 +834,7 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) erro return err } forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS() - return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, keys...) + return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, waitTime, keys...) } // LimitExec represents limit executor diff --git a/executor/point_get.go b/executor/point_get.go index 71efac8e0ecfb..f055d2937c474 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -50,14 +50,15 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { type PointGetExecutor struct { baseExecutor - tblInfo *model.TableInfo - handle int64 - idxInfo *model.IndexInfo - idxVals []types.Datum - startTS uint64 - snapshot kv.Snapshot - done bool - lock bool + tblInfo *model.TableInfo + handle int64 + idxInfo *model.IndexInfo + idxVals []types.Datum + startTS uint64 + snapshot kv.Snapshot + done bool + lock bool + lockWaitTime int64 } // Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field @@ -69,6 +70,7 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) { e.startTS = startTs e.done = false e.lock = p.Lock + e.lockWaitTime = p.LockWaitTime } // Open implements the Executor interface. @@ -154,7 +156,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error { if e.lock { - return doLockKeys(ctx, e.ctx, key) + return doLockKeys(ctx, e.ctx, e.lockWaitTime, key) } return nil } diff --git a/go.mod b/go.mod index c2d577a44c153..e9d399724ee9d 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 + github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567 github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 diff --git a/go.sum b/go.sum index b3cfcf6079010..f50772ec7f607 100644 --- a/go.sum +++ b/go.sum @@ -165,8 +165,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89 github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 h1:qqGSXCFr9Uc5VIDBEt4zlmMcI8e4GlkWfDVzQ+dexRk= -github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk= +github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/kv/kv.go b/kv/kv.go index 0b79161532277..60e9639c85767 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -169,7 +169,7 @@ type Transaction interface { // String implements fmt.Stringer interface. String() string // LockKeys tries to lock the entries with the keys in KV store. - LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keys ...Key) error + LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. SetOption(opt Option, val interface{}) @@ -374,3 +374,11 @@ type SplitableStore interface { WaitScatterRegionFinish(regionID uint64, backOff int) error CheckRegionInScattering(regionID uint64) (bool, error) } + +// Used for pessimistic lock wait time +// these two constants are special for lock protocol with tikv +// 0 means always wait, -1 means nowait, others meaning lock wait in milliseconds +var ( + LockAlwaysWait = int64(0) + LockNoWait = int64(-1) +) diff --git a/kv/mock.go b/kv/mock.go index bd5f6bcc00168..2e0332b6a4fa3 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -39,7 +39,7 @@ func (t *mockTxn) String() string { return "" } -func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ ...Key) error { +func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error { return nil } diff --git a/kv/mock_test.go b/kv/mock_test.go index 18ce2db821d82..7446a47ab7af2 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -38,7 +38,7 @@ func (s testMockSuite) TestInterface(c *C) { transaction, err := storage.Begin() c.Check(err, IsNil) - err = transaction.LockKeys(context.Background(), nil, 0, Key("lock")) + err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock")) c.Check(err, IsNil) transaction.SetOption(Option(23), struct{}{}) if mock, ok := transaction.(*mockTxn); ok { diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index fd61be7c9fc2b..a7270e97d6904 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/parser/opcode" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" @@ -54,6 +55,7 @@ type PointGetPlan struct { Lock bool IsForUpdate bool outputNames []*types.FieldName + LockWaitTime int64 } type nameValuePair struct { @@ -251,7 +253,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan { tableDual.SetSchema(fp.Schema()) return tableDual.Init(ctx, &property.StatsInfo{}, 0) } - if x.LockTp == ast.SelectLockForUpdate { + if x.LockTp == ast.SelectLockForUpdate || x.LockTp == ast.SelectLockForUpdateNoWait { // Locking of rows for update using SELECT FOR UPDATE only applies when autocommit // is disabled (either by beginning transaction with START TRANSACTION or by setting // autocommit to 0. If autocommit is enabled, the rows matching the specification are not locked. @@ -260,6 +262,10 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan { if !sessVars.IsAutocommit() || sessVars.InTxn() { fp.Lock = true fp.IsForUpdate = true + fp.LockWaitTime = kv.LockAlwaysWait + if x.LockTp == ast.SelectLockForUpdateNoWait { + fp.LockWaitTime = kv.LockNoWait + } } } return fp @@ -610,11 +616,12 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP func newPointGetPlan(ctx sessionctx.Context, dbName string, schema *expression.Schema, tbl *model.TableInfo, names []*types.FieldName) *PointGetPlan { p := &PointGetPlan{ - basePlan: newBasePlan(ctx, plancodec.TypePointGet, 0), - dbName: dbName, - schema: schema, - TblInfo: tbl, - outputNames: names, + basePlan: newBasePlan(ctx, plancodec.TypePointGet, 0), + dbName: dbName, + schema: schema, + TblInfo: tbl, + outputNames: names, + LockWaitTime: kv.LockAlwaysWait, } ctx.GetSessionVars().StmtCtx.Tables = []stmtctx.TableEntry{{DB: ctx.GetSessionVars().CurrentDB, Table: tbl.Name.L}} return p diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 08aa8f453a628..d99322c9a6600 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -353,7 +353,7 @@ func (la *LogicalApply) PruneColumns(parentUsedCols []*expression.Column) error // PruneColumns implements LogicalPlan interface. func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error { - if p.Lock != ast.SelectLockForUpdate { + if p.Lock != ast.SelectLockForUpdate && p.Lock != ast.SelectLockForUpdateNoWait { return p.baseLogicalPlan.PruneColumns(parentUsedCols) } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index ee5c5fb5acd01..d54a9dd659e22 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -21,6 +21,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/domain" @@ -395,6 +396,133 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) { tk.MustQuery("select * from conflict").Check(testkit.Rows("1 3")) } +func (s *testPessimisticSuite) TestSelectForUpdateNoWait(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk3 := testkit.NewTestKitWithInit(c, s.store) + + tk.MustExec("drop table if exists tk") + tk.MustExec("create table tk (c1 int primary key, c2 int)") + tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)") + + tk.MustExec("set @@autocommit = 0") + tk2.MustExec("set @@autocommit = 0") + tk3.MustExec("set @@autocommit = 0") + + // point get with no autocommit + tk.MustExec("begin pessimistic") + tk.MustExec("select * from tk where c1 = 2 for update") // lock succ + + tk2.MustExec("begin pessimistic") + _, err := tk2.Exec("select * from tk where c1 = 2 for update nowait") + c.Check(err, NotNil) + tk.MustExec("commit") + tk2.MustExec("select * from tk where c1 = 2 for update nowait") // lock succ + + tk3.MustExec("begin pessimistic") + _, err = tk3.Exec("select * from tk where c1 = 2 for update nowait") + c.Check(err, NotNil) + + tk2.MustExec("commit") + tk3.MustExec("select * from tk where c1 = 2 for update") + tk3.MustExec("commit") + tk.MustExec("commit") + + tk3.MustExec("begin pessimistic") + tk3.MustExec("update tk set c2 = c2 + 1 where c1 = 3") + tk2.MustExec("begin pessimistic") + _, err = tk2.Exec("select * from tk where c1 = 3 for update nowait") + c.Check(err, NotNil) + tk3.MustExec("commit") + tk2.MustExec("select * from tk where c1 = 3 for update nowait") + tk2.MustExec("commit") + + tk.MustExec("commit") + tk2.MustExec("commit") + tk3.MustExec("commit") + + // scan with no autocommit + tk.MustExec("begin pessimistic") + tk.MustExec("select * from tk where c1 >= 2 for update") + tk2.MustExec("begin pessimistic") + _, err = tk2.Exec("select * from tk where c1 = 2 for update nowait") + c.Check(err, NotNil) + _, err = tk2.Exec("select * from tk where c1 > 3 for update nowait") + c.Check(err, NotNil) + tk2.MustExec("select * from tk where c1 = 1 for update nowait") + tk2.MustExec("commit") + tk.MustQuery("select * from tk where c1 >= 2 for update").Check(testkit.Rows("2 2", "3 4", "4 4", "5 5")) + tk.MustExec("commit") + tk.MustExec("begin pessimistic") + tk.MustExec("update tk set c2 = c2 + 10 where c1 > 3") + tk3.MustExec("begin pessimistic") + _, err = tk3.Exec("select * from tk where c1 = 5 for update nowait") + c.Check(err, NotNil) + tk3.MustExec("select * from tk where c1 = 1 for update nowait") + tk.MustExec("commit") + tk3.MustQuery("select * from tk where c1 > 3 for update nowait").Check(testkit.Rows("4 14", "5 15")) + tk3.MustExec("commit") + + //delete + tk3.MustExec("begin pessimistic") + tk3.MustExec("delete from tk where c1 <= 2") + tk.MustExec("begin pessimistic") + _, err = tk.Exec("select * from tk where c1 = 1 for update nowait") + c.Check(err, NotNil) + tk3.MustExec("commit") + tk.MustQuery("select * from tk where c1 > 1 for update nowait").Check(testkit.Rows("3 4", "4 14", "5 15")) + tk.MustExec("update tk set c2 = c2 + 1 where c1 = 5") + tk2.MustExec("begin pessimistic") + _, err = tk2.Exec("select * from tk where c1 = 5 for update nowait") + c.Check(err, NotNil) + tk.MustExec("commit") + tk2.MustQuery("select * from tk where c1 = 5 for update nowait").Check(testkit.Rows("5 16")) + tk2.MustExec("update tk set c2 = c2 + 1 where c1 = 5") + tk2.MustQuery("select * from tk where c1 = 5 for update nowait").Check(testkit.Rows("5 17")) + tk2.MustExec("commit") +} + +func (s *testPessimisticSuite) TestAsyncRollBackNoWait(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk3 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists tk") + tk.MustExec("create table tk (c1 int primary key, c2 int)") + tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,17)") + + tk.MustExec("set @@autocommit = 0") + tk2.MustExec("set @@autocommit = 0") + tk3.MustExec("set @@autocommit = 0") + + // test get ts failed for handlePessimisticLockError when using nowait + // even though async rollback for pessimistic lock may rollback later locked key if get ts failed from pd + // the txn correctness should be ensured + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/ExecStmtGetTsError", "return"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep", "return"), IsNil) + tk.MustExec("begin pessimistic") + tk.MustExec("select * from tk where c1 > 0 for update nowait") + tk2.MustExec("begin pessimistic") + _, err := tk2.Exec("select * from tk where c1 > 0 for update nowait") + c.Check(err, NotNil) + tk.MustExec("commit") + tk2.MustQuery("select * from tk where c1 > 0 for update nowait") + tk2.MustQuery("select * from tk where c1 = 5 for update nowait").Check(testkit.Rows("5 17")) + tk3.MustExec("begin pessimistic") + tk3.MustExec("update tk set c2 = 1 where c1 = 5") // here lock succ happen in getTs failed from pd + tk2.MustExec("update tk set c2 = c2 + 100 where c1 > 0") // this will not get effect + time.Sleep(3 * time.Second) + _, err = tk2.Exec("commit") + c.Check(err, NotNil) // txn abort because pessimistic lock not found + tk3.MustExec("commit") + tk3.MustExec("begin pessimistic") + tk3.MustQuery("select * from tk where c1 = 5 for update nowait").Check(testkit.Rows("5 1")) + tk3.MustQuery("select * from tk where c1 = 4 for update nowait").Check(testkit.Rows("4 4")) + tk3.MustQuery("select * from tk where c1 = 3 for update nowait").Check(testkit.Rows("3 3")) + tk3.MustExec("commit") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/ExecStmtGetTsError"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep"), IsNil) +} + func (s *testPessimisticSuite) TestWaitLockKill(c *C) { // Test kill command works on waiting pessimistic lock. tk := testkit.NewTestKitWithInit(c, s.store) diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index 99bdca22a3121..ea26645d04a84 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -22,16 +22,18 @@ import ( // ErrLocked is returned when trying to Read/Write on a locked key. Client should // backoff or cleanup the lock then retry. type ErrLocked struct { - Key MvccKey - Primary []byte - StartTS uint64 - TTL uint64 - TxnSize uint64 + Key MvccKey + Primary []byte + StartTS uint64 + TTL uint64 + TxnSize uint64 + LockType kvrpcpb.Op } // Error formats the lock to a string. func (e *ErrLocked) Error() string { - return fmt.Sprintf("key is locked, key: %q, primary: %q, txnStartTS: %v", e.Key, e.Primary, e.StartTS) + return fmt.Sprintf("key is locked, key: %q, primary: %q, txnStartTS: %v, LockType: %v", + e.Key, e.Primary, e.StartTS, e.LockType) } // ErrKeyAlreadyExist is returned when key exists but this key has a constraint that diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index be85563479903..ffed3c0b22e14 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -198,11 +198,12 @@ func newEntry(key MvccKey) *mvccEntry { // Note that parameter key is raw key, while key in ErrLocked is mvcc key. func (l *mvccLock) lockErr(key []byte) error { return &ErrLocked{ - Key: mvccEncode(key, lockVer), - Primary: l.primary, - StartTS: l.startTS, - TTL: l.ttl, - TxnSize: l.txnSize, + Key: mvccEncode(key, lockVer), + Primary: l.primary, + StartTS: l.startTS, + TTL: l.ttl, + TxnSize: l.txnSize, + LockType: l.op, } } @@ -254,7 +255,8 @@ type MVCCStore interface { Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair - PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error + PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, + forUpdateTS uint64, ttl uint64, lockWaitTime int64) []error PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error Prewrite(req *kvrpcpb.PrewriteRequest) []error Commit(keys [][]byte, startTS, commitTS uint64) error diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index b83622347e891..74c67228fb287 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/goleveldb/leveldb/util" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/deadlock" @@ -464,7 +465,8 @@ func reverse(values []mvccValue) { } // PessimisticLock writes the pessimistic lock. -func (mvcc *MVCCLevelDB) PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error { +func (mvcc *MVCCLevelDB) PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, + forUpdateTS uint64, ttl uint64, lockWaitTime int64) []error { mvcc.mu.Lock() defer mvcc.mu.Unlock() @@ -477,6 +479,11 @@ func (mvcc *MVCCLevelDB) PessimisticLock(mutations []*kvrpcpb.Mutation, primary if err != nil { anyError = true } + if lockWaitTime == kv.LockNoWait { + if _, ok := err.(*ErrLocked); ok { + break + } + } } if anyError { return errs diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 4d860b9db722e..ad2f485d1ef85 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -59,6 +59,7 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { LockVersion: locked.StartTS, LockTtl: locked.TTL, TxnSize: locked.TxnSize, + LockType: locked.LockType, }, } } @@ -314,10 +315,12 @@ func (h *rpcHandler) handleKvPessimisticLock(req *kvrpcpb.PessimisticLockRequest startTS := req.StartVersion regionID := req.Context.RegionId h.cluster.handleDelay(startTS, regionID) - errs := h.mvccStore.PessimisticLock(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetForUpdateTs(), req.GetLockTtl()) - - // TODO: remove this when implement sever side wait. - h.simulateServerSideWaitLock(errs) + errs := h.mvccStore.PessimisticLock(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetForUpdateTs(), + req.GetLockTtl(), req.WaitTimeout) + if req.WaitTimeout == kv.LockAlwaysWait { + // TODO: remove this when implement sever side wait. + h.simulateServerSideWaitLock(errs) + } return &kvrpcpb.PessimisticLockResponse{ Errors: convertToKeyErrors(errs), } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 4a9a8d0948f4a..d5d94a5c7cdaf 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -46,7 +46,10 @@ type twoPhaseCommitAction interface { type actionPrewrite struct{} type actionCommit struct{} type actionCleanup struct{} -type actionPessimisticLock struct{ killed *uint32 } +type actionPessimisticLock struct { + killed *uint32 + lockWaitTime int64 +} type actionPessimisticRollback struct{} var ( @@ -671,6 +674,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * ForUpdateTs: c.forUpdateTS, LockTtl: elapsed + PessimisticLockTTL, IsFirstLock: c.isFirstLock, + WaitTimeout: action.lockWaitTime, }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) for { resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) @@ -686,7 +690,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if err != nil { return errors.Trace(err) } - err = c.pessimisticLockKeys(bo, action.killed, batch.keys) + err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, batch.keys) return errors.Trace(err) } if resp.Resp == nil { @@ -717,6 +721,17 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if err1 != nil { return errors.Trace(err1) } + // Check lock conflict error for nowait, if nowait set and key locked by others, + // report error immediately and do no more resolve locks. + // if the lock left behind whose related txn is already committed or rollbacked, + // (eg secondary locks not committed or rollbacked yet) + // we cant return "nowait conflict" directly + if action.lockWaitTime == kv.LockNoWait && lock.LockType == pb.Op_PessimisticLock { + // the pessimistic lock found could be lock left behind(timeout but not recycled yet) + if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { + return ErrLockAcquireFailAndNoWaitSet + } + } locks = append(locks, lock) } // Because we already waited on tikv, no need to Backoff here. @@ -957,8 +972,9 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { return c.doActionOnKeys(bo, actionCleanup{}, keys) } -func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticLock{killed}, keys) +func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, lockWaitTime int64, + keys [][]byte) error { + return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime}, keys) } func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index cb47a8bf057c8..04b375af886a8 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -556,7 +556,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { c.Assert(txn.Set(key, key), IsNil) txn.DelOption(kv.PresumeKeyNotExistsError) txn.DelOption(kv.PresumeKeyNotExists) - err := txn.LockKeys(context.Background(), nil, txn.startTS, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key) c.Assert(err, NotNil) c.Assert(txn.Delete(key), IsNil) key2 := kv.Key("key2") @@ -568,9 +568,9 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) - err := txn.LockKeys(context.Background(), nil, 100, kv.Key("abc"), kv.Key("def")) + err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), nil, 100, kv.Key("abc"), kv.Key("def")) + err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) c.Assert(txn.lockKeys, HasLen, 2) } @@ -580,11 +580,11 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) time.Sleep(time.Millisecond * 100) - err := txn.LockKeys(context.Background(), nil, txn.startTS, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key) c.Assert(err, IsNil) time.Sleep(time.Millisecond * 100) key2 := kv.Key("key2") - err = txn.LockKeys(context.Background(), nil, txn.startTS, key2) + err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key2) c.Assert(err, IsNil) lockInfo := s.getLockInfo(c, key) msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl) diff --git a/store/tikv/error.go b/store/tikv/error.go index 574e460454912..30879aa85b9e7 100644 --- a/store/tikv/error.go +++ b/store/tikv/error.go @@ -32,13 +32,14 @@ const mismatchClusterID = "mismatch cluster id" // MySQL error instances. var ( - ErrTiKVServerTimeout = terror.ClassTiKV.New(mysql.ErrTiKVServerTimeout, mysql.MySQLErrName[mysql.ErrTiKVServerTimeout]) - ErrResolveLockTimeout = terror.ClassTiKV.New(mysql.ErrResolveLockTimeout, mysql.MySQLErrName[mysql.ErrResolveLockTimeout]) - ErrPDServerTimeout = terror.ClassTiKV.New(mysql.ErrPDServerTimeout, mysql.MySQLErrName[mysql.ErrPDServerTimeout]) - ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable]) - ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]) - ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) - ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted]) + ErrTiKVServerTimeout = terror.ClassTiKV.New(mysql.ErrTiKVServerTimeout, mysql.MySQLErrName[mysql.ErrTiKVServerTimeout]) + ErrResolveLockTimeout = terror.ClassTiKV.New(mysql.ErrResolveLockTimeout, mysql.MySQLErrName[mysql.ErrResolveLockTimeout]) + ErrPDServerTimeout = terror.ClassTiKV.New(mysql.ErrPDServerTimeout, mysql.MySQLErrName[mysql.ErrPDServerTimeout]) + ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable]) + ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]) + ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) + ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted]) + ErrLockAcquireFailAndNoWaitSet = terror.ClassTiKV.New(mysql.ErrLockAcquireFailAndNoWaitSet, mysql.MySQLErrName[mysql.ErrLockAcquireFailAndNoWaitSet]) ) // ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface. @@ -54,14 +55,15 @@ func (d *ErrDeadlock) Error() string { func init() { tikvMySQLErrCodes := map[terror.ErrCode]uint16{ - mysql.ErrTiKVServerTimeout: mysql.ErrTiKVServerTimeout, - mysql.ErrResolveLockTimeout: mysql.ErrResolveLockTimeout, - mysql.ErrPDServerTimeout: mysql.ErrPDServerTimeout, - mysql.ErrRegionUnavailable: mysql.ErrRegionUnavailable, - mysql.ErrTiKVServerBusy: mysql.ErrTiKVServerBusy, - mysql.ErrGCTooEarly: mysql.ErrGCTooEarly, - mysql.ErrTruncatedWrongValue: mysql.ErrTruncatedWrongValue, - mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted, + mysql.ErrTiKVServerTimeout: mysql.ErrTiKVServerTimeout, + mysql.ErrResolveLockTimeout: mysql.ErrResolveLockTimeout, + mysql.ErrPDServerTimeout: mysql.ErrPDServerTimeout, + mysql.ErrRegionUnavailable: mysql.ErrRegionUnavailable, + mysql.ErrTiKVServerBusy: mysql.ErrTiKVServerBusy, + mysql.ErrGCTooEarly: mysql.ErrGCTooEarly, + mysql.ErrTruncatedWrongValue: mysql.ErrTruncatedWrongValue, + mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted, + mysql.ErrLockAcquireFailAndNoWaitSet: mysql.ErrLockAcquireFailAndNoWaitSet, } terror.ErrClassToMySQLCodes[terror.ClassTiKV] = tikvMySQLErrCodes } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 42a17c095477e..7262095a2ce83 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -129,11 +129,12 @@ var ttlFactor = 6000 // Lock represents a lock from tikv server. type Lock struct { - Key []byte - Primary []byte - TxnID uint64 - TTL uint64 - TxnSize uint64 + Key []byte + Primary []byte + TxnID uint64 + TTL uint64 + TxnSize uint64 + LockType kvrpcpb.Op } func (l *Lock) String() string { @@ -143,11 +144,12 @@ func (l *Lock) String() string { // NewLock creates a new *Lock. func NewLock(l *kvrpcpb.LockInfo) *Lock { return &Lock{ - Key: l.GetKey(), - Primary: l.GetPrimaryLock(), - TxnID: l.GetLockVersion(), - TTL: l.GetLockTtl(), - TxnSize: l.GetTxnSize(), + Key: l.GetKey(), + Primary: l.GetPrimaryLock(), + TxnID: l.GetLockVersion(), + TTL: l.GetLockTtl(), + TxnSize: l.GetTxnSize(), + LockType: l.LockType, } } diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index 49759035ec990..86cccc95d6e70 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -119,7 +119,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) { txn := s.beginTxn(c) err := txn.Set(encodeKey(s.prefix, "key"), []byte("value")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), nil, 0, encodeKey(s.prefix, "key")) + err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, encodeKey(s.prefix, "key")) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 83f5da9f4276f..0d26ac5dfbfdb 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -381,7 +381,8 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error { return txn.committer.pessimisticRollbackKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), txn.lockKeys) } -func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keysInput ...kv.Key) error { +// lockWaitTime in ms, except that 0 means always wait lock, 1 means nowait lock +func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keysInput ...kv.Key) error { // Exclude keys that are already locked. keys := make([][]byte, 0, len(keysInput)) txn.mu.Lock() @@ -420,7 +421,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui // If the number of keys greater than 1, it can be on different region, // concurrently execute on multiple regions may lead to deadlock. txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1 - err := txn.committer.pessimisticLockKeys(bo, killed, keys) + err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, keys) if killed != nil { // If the kill signal is received during waiting for pessimisticLock, // pessimisticLockKeys would handle the error but it doesn't reset the flag. @@ -475,6 +476,9 @@ func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) wg := new(sync.WaitGroup) wg.Add(1) go func() { + failpoint.Inject("AsyncRollBackSleep", func() { + time.Sleep(2 * time.Second) + }) err := committer.pessimisticRollbackKeys(NewBackoffer(ctx, pessimisticRollbackMaxBackoff), keys) if err != nil { logutil.Logger(ctx).Warn("[kv] pessimisticRollback failed.", zap.Error(err))