Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support select for update no wait #12775

Merged
merged 29 commits into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f242af2
support select for update no wait
cfzjywxk Oct 16, 2019
9e8d6cf
refactor, handle select for update error will update "updateTS"
cfzjywxk Oct 17, 2019
70dc1eb
move const1
cfzjywxk Oct 17, 2019
d2bc394
add LockType in LockInfo
cfzjywxk Oct 17, 2019
c103a77
lockType use kvrpc.Op
cfzjywxk Oct 18, 2019
b69efa6
gomod, refactor some logic
cfzjywxk Oct 18, 2019
3f4205b
typo
cfzjywxk Oct 18, 2019
d5ef7bf
Merge branch 'master' into nowait
cfzjywxk Oct 18, 2019
38c5caf
format
cfzjywxk Oct 18, 2019
16616e0
Merge branch 'master' into nowait
cfzjywxk Oct 24, 2019
99935c3
go sum
cfzjywxk Oct 24, 2019
34dccb9
go sum
cfzjywxk Oct 24, 2019
a70b11f
Merge branch 'master' into nowait
cfzjywxk Oct 24, 2019
5c52dbd
format
cfzjywxk Oct 24, 2019
1a4d0b3
tidy
cfzjywxk Oct 24, 2019
4afc621
tidy sum
cfzjywxk Oct 24, 2019
de469ca
tidy
cfzjywxk Oct 24, 2019
31f2c67
Merge branch 'master' into nowait
cfzjywxk Oct 25, 2019
fddc2e8
refactoration, add case
cfzjywxk Oct 25, 2019
119a307
add missing
cfzjywxk Oct 25, 2019
09bdcb2
import
cfzjywxk Oct 25, 2019
4565b05
Merge branch 'master' into nowait
cfzjywxk Oct 25, 2019
7f73591
Merge branch 'master' into nowait
crazycs520 Oct 25, 2019
99bada6
Merge branch 'master' into nowait
cfzjywxk Nov 1, 2019
afa7bea
format
cfzjywxk Nov 1, 2019
4b3dfce
change nowait signal to int64
cfzjywxk Nov 1, 2019
edcaf66
Revert "change nowait signal to int64"
cfzjywxk Nov 4, 2019
16b26c4
change nowait signal to int64
cfzjywxk Nov 1, 2019
cd7151a
Merge branch 'master' into nowait
cfzjywxk Nov 4, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(context.Background(), nil, 0, idxRecord.key)
err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
42 changes: 41 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -535,7 +536,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, keys...)
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, kv.LockAlwaysWait, keys...)
if err == nil {
return nil
}
Expand All @@ -546,6 +547,32 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
}

// GetTimestampWithRetry tries to get timestamp using retry and backoff mechanism
func (a *ExecStmt) GetTimestampWithRetry(ctx context.Context) (uint64, error) {
tsoMaxBackoff := 15000
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("ExecStmt.GetTimestampWithRetry", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
bo := tikv.NewBackoffer(ctx, tsoMaxBackoff)
for {
ts, err := a.Ctx.GetStore().GetOracle().GetTimestamp(ctx)
// mock get ts fail
failpoint.Inject("ExecStmtGetTsError", func() (uint64, error) {
return 0, errors.New("ExecStmtGetTsError")
})

if err == nil {
return ts, nil
}
err = bo.Backoff(tikv.BoPDRPC, errors.Errorf("ExecStmt get timestamp failed: %v", err))
if err != nil {
return 0, errors.Trace(err)
}
}
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
txnCtx := a.Ctx.GetSessionVars().TxnCtx
Expand Down Expand Up @@ -573,6 +600,19 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
if conflictCommitTS > forUpdateTS {
newForUpdateTS = conflictCommitTS
}
} else if terror.ErrorEqual(err, tikv.ErrLockAcquireFailAndNoWaitSet) {
// for nowait, when ErrLock happened, ErrLockAcquireFailAndNoWaitSet will be returned, and in the same txn
// the select for updateTs must be updated, otherwise there maybe rollback problem.
// begin; select for update key1(here ErrLocked or other errors(or max_execution_time like util),
// key1 lock not get and async rollback key1 is raised)
// select for update key1 again(this time lock succ(maybe lock released by others))
// the async rollback operation rollbacked the lock just acquired
newForUpdateTS, tsErr := a.GetTimestampWithRetry(ctx)
if tsErr != nil {
return nil, tsErr
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if fail happened here, UpdateTS will not be updated, and the previous async rollback is possiblely ongoing,
still risk that the later select for update rollbacked ? @coocood

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error rarely happens, so even if this happened, the async rollback causes the transaction to fail, it is acceptable.

Copy link
Contributor Author

@cfzjywxk cfzjywxk Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tiancaiamao @coocood @jackysp
I'll add and use getWithRetry, but I think here still risk with nowait. User get this statement result error, next statement pessimistic lock getForUpdateTs not updated(select for update, update, delete operations), ths "async rollback" risk still exists, how should we process the error here?
Or maybe we make rollback sync for "nowait" locked error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync rollback may still send duplicated requests later.

}
txnCtx.SetForUpdateTS(newForUpdateTS)
return nil, err
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
} else {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(ctx, nil, 0, recordKey)
err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey)
if err != nil {
return result, err
}
Expand Down
16 changes: 12 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
// If there's no handle or it's not a `SELECT FOR UPDATE` statement.
if len(e.tblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate {
if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) {
return nil
}
if req.NumRows() != 0 {
Expand All @@ -815,18 +815,26 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
return doLockKeys(ctx, e.ctx, e.keys...)
var lockWaitTime = kv.LockAlwaysWait
if e.Lock == ast.SelectLockForUpdateNoWait {
lockWaitTime = kv.LockNoWait
}
return doLockKeys(ctx, e.ctx, lockWaitTime, e.keys...)
}

func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) error {
// doLockKeys is the main entry for pessimistic lock keys
// waitTime means the lock operation will wait in milliseconds if target key is already
// locked by others. used for (select for update nowait) situation
// except 0 means alwaysWait 1 means nowait
func doLockKeys(ctx context.Context, se sessionctx.Context, waitTime int64, keys ...kv.Key) error {
se.GetSessionVars().TxnCtx.ForUpdate = true
// Lock keys only once when finished fetching all results.
txn, err := se.Txn(true)
if err != nil {
return err
}
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, keys...)
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, waitTime, keys...)
}

// LimitExec represents limit executor
Expand Down
20 changes: 11 additions & 9 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
type PointGetExecutor struct {
baseExecutor

tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
lockWaitTime int64
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand All @@ -69,6 +70,7 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
e.startTS = startTs
e.done = false
e.lock = p.Lock
e.lockWaitTime = p.LockWaitTime
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -154,7 +156,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if e.lock {
return doLockKeys(ctx, e.ctx, key)
return doLockKeys(ctx, e.ctx, e.lockWaitTime, key)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1 h1:qqGSXCFr9Uc5VIDBEt4zlmMcI8e4GlkWfDVzQ+dexRk=
github.com/pingcap/kvproto v0.0.0-20191025022903-62abb760d9b1/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466 h1:C5nV9osqA+R/R2fxYxVfqAUlCi3Oo5yJ/JSKDeHSAOk=
github.com/pingcap/kvproto v0.0.0-20191101062931-76b56d6eb466/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
10 changes: 9 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ type Transaction interface {
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keys ...Key) error
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
Expand Down Expand Up @@ -374,3 +374,11 @@ type SplitableStore interface {
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}

// Used for pessimistic lock wait time
// these two constants are special for lock protocol with tikv
// 0 means always wait, -1 means nowait, others meaning lock wait in milliseconds
var (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A gentle remind, better code style here is defining a type like this:

type LockWaitType int
const(
    LockAlwaysWait LockWaitType = iota
    LockNoWait 
)

Copy link
Contributor Author

@cfzjywxk cfzjywxk Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger, the uint64 type is used as input param type in doLockKeys, here to define 2 special case using this param, so they are not defined like above

LockAlwaysWait = int64(0)
LockNoWait = int64(-1)
)
2 changes: 1 addition & 1 deletion kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (t *mockTxn) String() string {
return ""
}

func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ ...Key) error {
func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s testMockSuite) TestInterface(c *C) {

transaction, err := storage.Begin()
c.Check(err, IsNil)
err = transaction.LockKeys(context.Background(), nil, 0, Key("lock"))
err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock"))
c.Check(err, IsNil)
transaction.SetOption(Option(23), struct{}{})
if mock, ok := transaction.(*mockTxn); ok {
Expand Down
19 changes: 13 additions & 6 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/opcode"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -54,6 +55,7 @@ type PointGetPlan struct {
Lock bool
IsForUpdate bool
outputNames []*types.FieldName
LockWaitTime int64
}

type nameValuePair struct {
Expand Down Expand Up @@ -251,7 +253,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
tableDual.SetSchema(fp.Schema())
return tableDual.Init(ctx, &property.StatsInfo{}, 0)
}
if x.LockTp == ast.SelectLockForUpdate {
if x.LockTp == ast.SelectLockForUpdate || x.LockTp == ast.SelectLockForUpdateNoWait {
// Locking of rows for update using SELECT FOR UPDATE only applies when autocommit
// is disabled (either by beginning transaction with START TRANSACTION or by setting
// autocommit to 0. If autocommit is enabled, the rows matching the specification are not locked.
Expand All @@ -260,6 +262,10 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
if !sessVars.IsAutocommit() || sessVars.InTxn() {
fp.Lock = true
fp.IsForUpdate = true
fp.LockWaitTime = kv.LockAlwaysWait
if x.LockTp == ast.SelectLockForUpdateNoWait {
fp.LockWaitTime = kv.LockNoWait
}
}
}
return fp
Expand Down Expand Up @@ -610,11 +616,12 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP

func newPointGetPlan(ctx sessionctx.Context, dbName string, schema *expression.Schema, tbl *model.TableInfo, names []*types.FieldName) *PointGetPlan {
p := &PointGetPlan{
basePlan: newBasePlan(ctx, plancodec.TypePointGet, 0),
dbName: dbName,
schema: schema,
TblInfo: tbl,
outputNames: names,
basePlan: newBasePlan(ctx, plancodec.TypePointGet, 0),
dbName: dbName,
schema: schema,
TblInfo: tbl,
outputNames: names,
LockWaitTime: kv.LockAlwaysWait,
}
ctx.GetSessionVars().StmtCtx.Tables = []stmtctx.TableEntry{{DB: ctx.GetSessionVars().CurrentDB, Table: tbl.Name.L}}
return p
Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (la *LogicalApply) PruneColumns(parentUsedCols []*expression.Column) error

// PruneColumns implements LogicalPlan interface.
func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error {
if p.Lock != ast.SelectLockForUpdate {
if p.Lock != ast.SelectLockForUpdate && p.Lock != ast.SelectLockForUpdateNoWait {
return p.baseLogicalPlan.PruneColumns(parentUsedCols)
}

Expand Down
Loading