Skip to content

Commit

Permalink
executor: support select for update no wait (#12775)
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored and sre-bot committed Nov 4, 2019
1 parent 3a1fb29 commit f12403e
Show file tree
Hide file tree
Showing 23 changed files with 314 additions and 83 deletions.
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(context.Background(), nil, 0, idxRecord.key)
err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
42 changes: 41 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -535,7 +536,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, keys...)
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, kv.LockAlwaysWait, keys...)
if err == nil {
return nil
}
Expand All @@ -546,6 +547,32 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
}

// GetTimestampWithRetry tries to get timestamp using retry and backoff mechanism
func (a *ExecStmt) GetTimestampWithRetry(ctx context.Context) (uint64, error) {
tsoMaxBackoff := 15000
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("ExecStmt.GetTimestampWithRetry", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
bo := tikv.NewBackoffer(ctx, tsoMaxBackoff)
for {
ts, err := a.Ctx.GetStore().GetOracle().GetTimestamp(ctx)
// mock get ts fail
failpoint.Inject("ExecStmtGetTsError", func() (uint64, error) {
return 0, errors.New("ExecStmtGetTsError")
})

if err == nil {
return ts, nil
}
err = bo.Backoff(tikv.BoPDRPC, errors.Errorf("ExecStmt get timestamp failed: %v", err))
if err != nil {
return 0, errors.Trace(err)
}
}
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
txnCtx := a.Ctx.GetSessionVars().TxnCtx
Expand Down Expand Up @@ -573,6 +600,19 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
if conflictCommitTS > forUpdateTS {
newForUpdateTS = conflictCommitTS
}
} else if terror.ErrorEqual(err, tikv.ErrLockAcquireFailAndNoWaitSet) {
// for nowait, when ErrLock happened, ErrLockAcquireFailAndNoWaitSet will be returned, and in the same txn
// the select for updateTs must be updated, otherwise there maybe rollback problem.
// begin; select for update key1(here ErrLocked or other errors(or max_execution_time like util),
// key1 lock not get and async rollback key1 is raised)
// select for update key1 again(this time lock succ(maybe lock released by others))
// the async rollback operation rollbacked the lock just acquired
newForUpdateTS, tsErr := a.GetTimestampWithRetry(ctx)
if tsErr != nil {
return nil, tsErr
}
txnCtx.SetForUpdateTS(newForUpdateTS)
return nil, err
} else {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(ctx, nil, 0, recordKey)
err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey)
if err != nil {
return result, err
}
Expand Down
16 changes: 12 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
// If there's no handle or it's not a `SELECT FOR UPDATE` statement.
if len(e.tblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate {
if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) {
return nil
}
if req.NumRows() != 0 {
Expand All @@ -815,18 +815,26 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
return doLockKeys(ctx, e.ctx, e.keys...)
var lockWaitTime = kv.LockAlwaysWait
if e.Lock == ast.SelectLockForUpdateNoWait {
lockWaitTime = kv.LockNoWait
}
return doLockKeys(ctx, e.ctx, lockWaitTime, e.keys...)
}

func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) error {
// doLockKeys is the main entry for pessimistic lock keys
// waitTime means the lock operation will wait in milliseconds if target key is already
// locked by others. used for (select for update nowait) situation
// except 0 means alwaysWait 1 means nowait
func doLockKeys(ctx context.Context, se sessionctx.Context, waitTime int64, keys ...kv.Key) error {
se.GetSessionVars().TxnCtx.ForUpdate = true
// Lock keys only once when finished fetching all results.
txn, err := se.Txn(true)
if err != nil {
return err
}
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, keys...)
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, waitTime, keys...)
}

// LimitExec represents limit executor
Expand Down
20 changes: 11 additions & 9 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
type PointGetExecutor struct {
baseExecutor

tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
lockWaitTime int64
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand All @@ -69,6 +70,7 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
e.startTS = startTs
e.done = false
e.lock = p.Lock
e.lockWaitTime = p.LockWaitTime
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -154,7 +156,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if e.lock {
return doLockKeys(ctx, e.ctx, key)
return doLockKeys(ctx, e.ctx, e.lockWaitTime, key)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 h1:qqGSXCFr9Uc5VIDBEt4zlmMcI8e4GlkWfDVzQ+dexRk=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
10 changes: 9 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ type Transaction interface {
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keys ...Key) error
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
Expand Down Expand Up @@ -374,3 +374,11 @@ type SplitableStore interface {
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}

// Used for pessimistic lock wait time
// these two constants are special for lock protocol with tikv
// 0 means always wait, -1 means nowait, others meaning lock wait in milliseconds
var (
LockAlwaysWait = int64(0)
LockNoWait = int64(-1)
)
2 changes: 1 addition & 1 deletion kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (t *mockTxn) String() string {
return ""
}

func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ ...Key) error {
func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s testMockSuite) TestInterface(c *C) {

transaction, err := storage.Begin()
c.Check(err, IsNil)
err = transaction.LockKeys(context.Background(), nil, 0, Key("lock"))
err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock"))
c.Check(err, IsNil)
transaction.SetOption(Option(23), struct{}{})
if mock, ok := transaction.(*mockTxn); ok {
Expand Down
19 changes: 13 additions & 6 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/opcode"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -54,6 +55,7 @@ type PointGetPlan struct {
Lock bool
IsForUpdate bool
outputNames []*types.FieldName
LockWaitTime int64
}

type nameValuePair struct {
Expand Down Expand Up @@ -251,7 +253,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
tableDual.SetSchema(fp.Schema())
return tableDual.Init(ctx, &property.StatsInfo{}, 0)
}
if x.LockTp == ast.SelectLockForUpdate {
if x.LockTp == ast.SelectLockForUpdate || x.LockTp == ast.SelectLockForUpdateNoWait {
// Locking of rows for update using SELECT FOR UPDATE only applies when autocommit
// is disabled (either by beginning transaction with START TRANSACTION or by setting
// autocommit to 0. If autocommit is enabled, the rows matching the specification are not locked.
Expand All @@ -260,6 +262,10 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
if !sessVars.IsAutocommit() || sessVars.InTxn() {
fp.Lock = true
fp.IsForUpdate = true
fp.LockWaitTime = kv.LockAlwaysWait
if x.LockTp == ast.SelectLockForUpdateNoWait {
fp.LockWaitTime = kv.LockNoWait
}
}
}
return fp
Expand Down Expand Up @@ -610,11 +616,12 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP

func newPointGetPlan(ctx sessionctx.Context, dbName string, schema *expression.Schema, tbl *model.TableInfo, names []*types.FieldName) *PointGetPlan {
p := &PointGetPlan{
basePlan: newBasePlan(ctx, plancodec.TypePointGet, 0),
dbName: dbName,
schema: schema,
TblInfo: tbl,
outputNames: names,
basePlan: newBasePlan(ctx, plancodec.TypePointGet, 0),
dbName: dbName,
schema: schema,
TblInfo: tbl,
outputNames: names,
LockWaitTime: kv.LockAlwaysWait,
}
ctx.GetSessionVars().StmtCtx.Tables = []stmtctx.TableEntry{{DB: ctx.GetSessionVars().CurrentDB, Table: tbl.Name.L}}
return p
Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (la *LogicalApply) PruneColumns(parentUsedCols []*expression.Column) error

// PruneColumns implements LogicalPlan interface.
func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error {
if p.Lock != ast.SelectLockForUpdate {
if p.Lock != ast.SelectLockForUpdate && p.Lock != ast.SelectLockForUpdateNoWait {
return p.baseLogicalPlan.PruneColumns(parentUsedCols)
}

Expand Down
Loading

0 comments on commit f12403e

Please sign in to comment.