Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: check max_execution_time when get tso #56087

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2065,7 +2065,11 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex

// Uncorrelated subqueries will execute once when building plan, so we reset process info before building plan.
s.currentPlan = nil // reset current plan
s.SetProcessInfo(stmtNode.Text(), time.Now(), cmdByte, 0)
var maxExecutionTime uint64
if intest.InTest {
maxExecutionTime = sessVars.MaxExecutionTime
}
s.SetProcessInfo(stmtNode.Text(), time.Now(), cmdByte, maxExecutionTime)
s.txn.onStmtStart(digest.String())
defer sessiontxn.GetTxnManager(s).OnStmtEnd()
defer s.txn.onStmtEnd()
Expand Down
30 changes: 30 additions & 0 deletions pkg/session/test/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -509,6 +511,34 @@ func TestInTrans(t *testing.T) {
require.False(t, txn.Valid())
}

func TestGetTsoSlow(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)
sv.SetDomain(dom)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)

go dom.ExpensiveQueryHandle().SetSessionManager(sv).Run()

tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id int)")
tk.MustExec("insert t values (1), (2)")

tk.MustExec("set @@max_execution_time = 500")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/session/mock_get_tso_slow", "return(3)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/session/mock_get_tso_slow"))
}()

err := tk.ExecToErr("select * from t")
require.NotNil(t, err)
require.Contains(t, err.Error(), "Query execution was interrupted, maximum statement execution time exceeded")
}

func TestMemBufferSnapshotRead(t *testing.T) {
store := testkit.CreateMockStore(t)

Expand Down
44 changes: 40 additions & 4 deletions pkg/session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,20 +608,56 @@ func (txn *LazyTxn) Wait(ctx context.Context, sctx sessionctx.Context) (kv.Trans
defer func(begin time.Time) {
sctx.GetSessionVars().DurationWaitTS = time.Since(begin)
}(time.Now())
if err := txn.waitWithSQLKiller(ctx, sctx); err != nil {
return txn, err
}
txn.lazyUniquenessCheckEnabled = !sctx.GetSessionVars().ConstraintCheckInPlacePessimistic
}
return txn, nil
}

func (txn *LazyTxn) waitWithSQLKiller(ctx context.Context, sctx sessionctx.Context) error {
finishCh := make(chan struct{})
defer close(finishCh)

tsoCh := make(chan error)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be some regression with the extra goroutine I think.

Test with sysbench random points:

10 threads
nightly: 8538 qps
this pr: 8308 qps

200 threads
nightly: 73670 qps
this pr: 71189 qps

// Transaction is lazy initialized.
// PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn,
// If Txn() is called later, wait for the future to get a valid txn.
if err := txn.changePendingToValid(ctx, sctx); err != nil {
var err error
failpoint.Inject("mock_get_tso_slow", func(v failpoint.Value) {
t := v.(int)
time.Sleep(time.Duration(t) * time.Second)
})
if err = txn.changePendingToValid(ctx, sctx); err != nil {
logutil.BgLogger().Error("active transaction fail",
zap.Error(err))
txn.cleanup()
sctx.GetSessionVars().TxnCtx.StartTS = 0
return txn, err
}
txn.lazyUniquenessCheckEnabled = !sctx.GetSessionVars().ConstraintCheckInPlacePessimistic
select {
case tsoCh <- err:
case <-finishCh:
}
close(tsoCh)
}()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := sctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
return err
}
case err := <-tsoCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}
return txn, nil
}

// KeyNeedToLock returns true if the key need to lock.
Expand Down