From 8d982a09eebca3708b3418dab58015629ce5fd45 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 4 Dec 2018 17:50:11 +0800 Subject: [PATCH] *: Txn() function signature refactor and remove ActivePendingTxn() (#8327) * change Txn() function signature to Txn(active bool) * ActivePendingTxn() is not used any more because Txn() does the work * change executor builder getStartTS() uint64 to getStartTS() (uint64, error) --- ddl/column_change_test.go | 6 ++--- ddl/column_test.go | 12 +++++----- ddl/db_test.go | 16 ++++++------- ddl/ddl_test.go | 4 ++-- ddl/ddl_worker_test.go | 2 +- ddl/fail_test.go | 2 +- ddl/foreign_key_test.go | 6 ++--- ddl/index_change_test.go | 12 +++++----- ddl/reorg_test.go | 18 +++++++-------- executor/adapter.go | 4 ++-- executor/admin.go | 2 +- executor/batch_checker.go | 4 ++-- executor/builder.go | 30 +++++++++++++++---------- executor/ddl.go | 2 +- executor/executor.go | 12 +++++----- executor/executor_test.go | 10 ++++----- executor/insert.go | 2 +- executor/insert_common.go | 6 ++--- executor/point_get.go | 9 ++++++-- executor/prepared.go | 2 -- executor/simple.go | 10 ++++----- executor/write_test.go | 4 ++-- planner/core/physical_plan_test.go | 2 +- server/conn.go | 4 ++-- session/session.go | 18 ++------------- session/session_test.go | 36 +++++++++++++----------------- session/txn.go | 2 +- sessionctx/context.go | 10 ++++----- statistics/ddl.go | 12 +++++----- statistics/gc.go | 4 ++-- statistics/histogram.go | 4 ++-- statistics/update.go | 4 ++-- table/tables/tables.go | 34 ++++++++++++---------------- table/tables/tables_test.go | 10 ++++----- util/admin/admin_test.go | 6 ++--- util/kvencoder/kv_encoder.go | 2 +- util/kvencoder/kv_encoder_test.go | 2 +- util/mock/context.go | 17 +------------- 38 files changed, 155 insertions(+), 187 deletions(-) diff --git a/ddl/column_change_test.go b/ddl/column_change_test.go index fc0acf34b87d5..31b0478260c49 100644 --- a/ddl/column_change_test.go +++ b/ddl/column_change_test.go @@ -75,7 +75,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) { row := types.MakeDatums(1, 2) h, err := originTable.AddRecord(ctx, row, false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) var mu sync.Mutex @@ -127,7 +127,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) { } mu.Unlock() } - err = hookCtx.Txn().Commit(context.Background()) + err = hookCtx.Txn(true).Commit(context.Background()) if err != nil { checkErr = errors.Trace(err) } @@ -179,7 +179,7 @@ func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx sessionctx.Cont checkErr = errors.Trace(err) } } - err = hookCtx.Txn().Commit(context.TODO()) + err = hookCtx.Txn(true).Commit(context.TODO()) if err != nil { checkErr = errors.Trace(err) } diff --git a/ddl/column_test.go b/ddl/column_test.go index 91a2af5062cf9..fd9f38f51f2ac 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -271,9 +271,9 @@ func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Tab if err != nil { return errors.Trace(err) } - defer ctx.Txn().Commit(context.Background()) + defer ctx.Txn(true).Commit(context.Background()) key := t.RecordKey(handle) - data, err := ctx.Txn().Get(key) + data, err := ctx.Txn(true).Get(key) if !isExist { if terror.ErrorEqual(err, kv.ErrNotExist) { return nil @@ -760,7 +760,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) { handle, err := t.AddRecord(ctx, oldRow, false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) newColName := "c4" @@ -823,7 +823,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) d.Stop() @@ -847,7 +847,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) { _, err = t.AddRecord(ctx, append(row, types.NewDatum(defaultColValue)), false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) checkOK := false @@ -896,7 +896,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) d.Stop() diff --git a/ddl/db_test.go b/ddl/db_test.go index 28630600ee072..dc979efaef3bd 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -747,12 +747,12 @@ LOOP: // Make sure there is index with name c3_index. c.Assert(nidx, NotNil) c.Assert(nidx.Meta().ID, Greater, int64(0)) - ctx.Txn().Rollback() + ctx.Txn(true).Rollback() c.Assert(ctx.NewTxn(), IsNil) - defer ctx.Txn().Rollback() + defer ctx.Txn(true).Rollback() - it, err := nidx.SeekFirst(ctx.Txn()) + it, err := nidx.SeekFirst(ctx.Txn(true)) c.Assert(err, IsNil) defer it.Close() @@ -848,9 +848,9 @@ func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) { handles := make(map[int64]struct{}) c.Assert(ctx.NewTxn(), IsNil) - defer ctx.Txn().Rollback() + defer ctx.Txn(true).Rollback() - it, err := idx.SeekFirst(ctx.Txn()) + it, err := idx.SeekFirst(ctx.Txn(true)) c.Assert(err, IsNil) defer it.Close() @@ -1090,7 +1090,7 @@ LOOP: i := 0 j := 0 ctx.NewTxn() - defer ctx.Txn().Rollback() + defer ctx.Txn(true).Rollback() err = t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []types.Datum, cols []*table.Column) (bool, error) { i++ @@ -3502,7 +3502,7 @@ func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLC return } jobIDs := []int64{job.ID} - errs, err := admin.CancelJobs(hookCtx.Txn(), jobIDs) + errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs) if err != nil { checkErr = errors.Trace(err) return @@ -3512,7 +3512,7 @@ func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLC checkErr = errors.Trace(errs[0]) return } - err = hookCtx.Txn().Commit(context.Background()) + err = hookCtx.Txn(true).Commit(context.Background()) if err != nil { checkErr = errors.Trace(err) } diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 195a859c290f8..09f4d415e69db 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -128,7 +128,7 @@ func testNewDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, func getSchemaVer(c *C, ctx sessionctx.Context) int64 { err := ctx.NewTxn() c.Assert(err, IsNil) - m := meta.NewMeta(ctx.Txn()) + m := meta.NewMeta(ctx.Txn(true)) ver, err := m.GetSchemaVersion() c.Assert(err, IsNil) return ver @@ -156,7 +156,7 @@ func checkHistoryJob(c *C, job *model.Job) { } func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJobArgs) { - t := meta.NewMeta(ctx.Txn()) + t := meta.NewMeta(ctx.Txn(true)) historyJob, err := t.GetHistoryDDLJob(id) c.Assert(err, IsNil) c.Assert(historyJob.BinlogInfo.FinishedTS, Greater, uint64(0)) diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 4703b51733650..cc48c01bb6a34 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -399,7 +399,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) { row := types.MakeDatums(1, 2) _, err = originTable.AddRecord(ctx, row, false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} diff --git a/ddl/fail_test.go b/ddl/fail_test.go index f182dbe388c5f..9b8771672793b 100644 --- a/ddl/fail_test.go +++ b/ddl/fail_test.go @@ -36,7 +36,7 @@ func (s *testColumnChangeSuite) TestFailBeforeDecodeArgs(c *C) { row := types.MakeDatums(1, 2) _, err = originTable.AddRecord(ctx, row, false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} diff --git a/ddl/foreign_key_test.go b/ddl/foreign_key_test.go index 2bb7eaa2c7cf0..582e59e3621fe 100644 --- a/ddl/foreign_key_test.go +++ b/ddl/foreign_key_test.go @@ -128,7 +128,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { testCreateTable(c, ctx, d, s.dbInfo, tblInfo) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) // fix data race @@ -162,7 +162,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { job := s.testCreateForeignKey(c, tblInfo, "c1_fk", []string{"c1"}, "t2", []string{"c1"}, ast.ReferOptionCascade, ast.ReferOptionSetNull) testCheckJobDone(c, d, job, true) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) mu.Lock() hErr := hookErr @@ -220,6 +220,6 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) } diff --git a/ddl/index_change_test.go b/ddl/index_change_test.go index 0681c54248f11..78831fdc4a7e8 100644 --- a/ddl/index_change_test.go +++ b/ddl/index_change_test.go @@ -74,7 +74,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) { _, err = originTable.AddRecord(ctx, types.MakeDatums(3, 3), false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} @@ -125,7 +125,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) { d.SetHook(tc) testCreateIndex(c, ctx, d, s.dbInfo, originTable.Meta(), false, "c2", "c2") c.Check(errors.ErrorStack(checkErr), Equals, "") - c.Assert(ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil) d.Stop() prevState = model.StateNone var noneTable table.Table @@ -172,7 +172,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) { func checkIndexExists(ctx sessionctx.Context, tbl table.Table, indexValue interface{}, handle int64, exists bool) error { idx := tbl.Indices()[0] - doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, ctx.Txn(), types.MakeDatums(indexValue), handle) + doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, ctx.Txn(true), types.MakeDatums(indexValue), handle) if err != nil { return errors.Trace(err) } @@ -322,7 +322,7 @@ func (s *testIndexChangeSuite) checkAddPublic(d *ddl, ctx sessionctx.Context, wr return errors.Trace(err) } } - return ctx.Txn().Commit(context.Background()) + return ctx.Txn(true).Commit(context.Background()) } func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx sessionctx.Context, publicTbl, writeTbl table.Table) error { @@ -362,7 +362,7 @@ func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx sessionctx.Context if err != nil { return errors.Trace(err) } - return ctx.Txn().Commit(context.Background()) + return ctx.Txn(true).Commit(context.Background()) } func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx sessionctx.Context, writeTbl, delTbl table.Table) error { @@ -407,5 +407,5 @@ func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx sessionctx.Contex if err != nil { return errors.Trace(err) } - return ctx.Txn().Commit(context.Background()) + return ctx.Txn(true).Commit(context.Background()) } diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 1bcfe9d21c402..55517deab5f18 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -49,14 +49,14 @@ func (s *testDDLSuite) TestReorg(c *C) { err := ctx.NewTxn() c.Assert(err, IsNil) - ctx.Txn().Set([]byte("a"), []byte("b")) - err = ctx.Txn().Rollback() + ctx.Txn(true).Set([]byte("a"), []byte("b")) + err = ctx.Txn(true).Rollback() c.Assert(err, IsNil) err = ctx.NewTxn() c.Assert(err, IsNil) - ctx.Txn().Set([]byte("a"), []byte("b")) - err = ctx.Txn().Commit(context.Background()) + ctx.Txn(true).Set([]byte("a"), []byte("b")) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) rowCount := int64(10) @@ -73,7 +73,7 @@ func (s *testDDLSuite) TestReorg(c *C) { } err = ctx.NewTxn() c.Assert(err, IsNil) - m := meta.NewMeta(ctx.Txn()) + m := meta.NewMeta(ctx.Txn(true)) rInfo := &reorgInfo{ Job: job, } @@ -89,12 +89,12 @@ func (s *testDDLSuite) TestReorg(c *C) { c.Assert(d.generalWorker().reorgCtx.rowCount, Equals, int64(0)) // Test whether reorgInfo's Handle is update. - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) err = ctx.NewTxn() c.Assert(err, IsNil) - m = meta.NewMeta(ctx.Txn()) + m = meta.NewMeta(ctx.Txn(true)) info, err1 := getReorgInfo(d.ddlCtx, m, job, nil) c.Assert(err1, IsNil) c.Assert(info.StartHandle, Equals, handle) @@ -110,7 +110,7 @@ func (s *testDDLSuite) TestReorg(c *C) { return nil }) c.Assert(err, NotNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) d.start(context.Background(), nil) @@ -172,7 +172,7 @@ func (s *testDDLSuite) TestReorgOwner(c *C) { c.Assert(err, IsNil) } - err := ctx.Txn().Commit(context.Background()) + err := ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} diff --git a/executor/adapter.go b/executor/adapter.go index 8d2e4813635c5..fca075094b311 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -243,7 +243,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { var txnStartTS uint64 if sctx.Txn(false).Valid() { - txnStartTS = sctx.Txn().StartTS() + txnStartTS = sctx.Txn(true).StartTS() } return &recordSet{ executor: e, @@ -273,7 +273,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co txnTS := uint64(0) // Don't active pending txn here. if sctx.Txn(false).Valid() { - txnTS = sctx.Txn().StartTS() + txnTS = sctx.Txn(true).StartTS() } a.LogSlowQuery(txnTS, err == nil) }() diff --git a/executor/admin.go b/executor/admin.go index e62ec605996d1..eeb74a98bd2be 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -125,7 +125,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error { func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} - dagReq.StartTs = e.ctx.Txn().StartTS() + dagReq.StartTs = e.ctx.Txn(true).StartTS() dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location()) sc := e.ctx.GetSessionVars().StmtCtx dagReq.Flags = statementContextToFlags(sc) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index eeb4970ff0bbf..739ac009b4683 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -52,7 +52,7 @@ type batchChecker struct { // batchGetOldValues gets the values of storage in batch. func (b *batchChecker) batchGetOldValues(ctx sessionctx.Context, batchKeys []kv.Key) error { - values, err := kv.BatchGetValues(ctx.Txn(), batchKeys) + values, err := kv.BatchGetValues(ctx.Txn(true), batchKeys) if err != nil { return errors.Trace(err) } @@ -205,7 +205,7 @@ func (b *batchChecker) batchGetInsertKeys(ctx sessionctx.Context, t table.Table, batchKeys = append(batchKeys, k.newKV.key) } } - b.dupKVs, err = kv.BatchGetValues(ctx.Txn(), batchKeys) + b.dupKVs, err = kv.BatchGetValues(ctx.Txn(true), batchKeys) return errors.Trace(err) } diff --git a/executor/builder.go b/executor/builder.go index 2f5178d517b1f..52976a0065aa3 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -48,7 +48,6 @@ import ( "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-tipb" - log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -180,7 +179,7 @@ func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Execu baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), jobIDs: v.JobIDs, } - e.errs, b.err = admin.CancelJobs(e.ctx.Txn(), e.jobIDs) + e.errs, b.err = admin.CancelJobs(e.ctx.Txn(true), e.jobIDs) if b.err != nil { b.err = errors.Trace(b.err) return nil @@ -214,7 +213,7 @@ func (b *executorBuilder) buildShowDDL(v *plannercore.ShowDDL) Executor { return nil } - ddlInfo, err := admin.GetDDLInfo(e.ctx.Txn()) + ddlInfo, err := admin.GetDDLInfo(e.ctx.Txn(true)) if err != nil { b.err = errors.Trace(err) return nil @@ -403,7 +402,11 @@ func (b *executorBuilder) buildChecksumTable(v *plannercore.ChecksumTable) Execu tables: make(map[int64]*checksumContext), done: false, } - startTs := b.getStartTS() + startTs, err := b.getStartTS() + if err != nil { + b.err = err + return nil + } for _, t := range v.Tables { e.tables[t.TableInfo.ID] = newChecksumContext(t.DBInfo, t.TableInfo, startTs) } @@ -1158,22 +1161,22 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu return e } -func (b *executorBuilder) getStartTS() uint64 { +func (b *executorBuilder) getStartTS() (uint64, error) { if b.startTS != 0 { // Return the cached value. - return b.startTS + return b.startTS, nil } startTS := b.ctx.GetSessionVars().SnapshotTS - if startTS == 0 && b.ctx.Txn().Valid() { - startTS = b.ctx.Txn().StartTS() + if startTS == 0 && b.ctx.Txn(true).Valid() { + startTS = b.ctx.Txn(true).StartTS() } b.startTS = startTS if b.startTS == 0 { - // The the code should never run here if there is no bug. - log.Error(errors.ErrorStack(errors.Trace(ErrGetStartTS))) + // It may happen when getting start ts from PD fail, and Txn() is not valid. + return 0, errors.Trace(ErrGetStartTS) } - return startTS + return startTS, nil } func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executor { @@ -1507,7 +1510,10 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { dagReq = &tipb.DAGRequest{} - dagReq.StartTs = b.getStartTS() + dagReq.StartTs, err = b.getStartTS() + if err != nil { + return nil, false, errors.Trace(err) + } dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(b.ctx.GetSessionVars().Location()) sc := b.ctx.GetSessionVars().StmtCtx dagReq.Flags = statementContextToFlags(sc) diff --git a/executor/ddl.go b/executor/ddl.go index d70b73ad9a7e6..63be6cb7a14e8 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -54,7 +54,7 @@ func (e *DDLExec) toErr(err error) error { // Here we distinguish the ErrInfoSchemaChanged error from other errors. dom := domain.GetDomain(e.ctx) checker := domain.NewSchemaChecker(dom, e.is.SchemaMetaVersion(), nil) - schemaInfoErr := checker.Check(e.ctx.Txn().StartTS()) + schemaInfoErr := checker.Check(e.ctx.Txn(true).StartTS()) if schemaInfoErr != nil { return errors.Trace(schemaInfoErr) } diff --git a/executor/executor.go b/executor/executor.go index b4cd2a805b55d..17384f070c6bc 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -297,11 +297,11 @@ func (e *ShowDDLJobQueriesExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - jobs, err := admin.GetDDLJobs(e.ctx.Txn()) + jobs, err := admin.GetDDLJobs(e.ctx.Txn(true)) if err != nil { return errors.Trace(err) } - historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn(), admin.DefNumHistoryJobs) + historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn(true), admin.DefNumHistoryJobs) if err != nil { return errors.Trace(err) } @@ -338,14 +338,14 @@ func (e *ShowDDLJobsExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - jobs, err := admin.GetDDLJobs(e.ctx.Txn()) + jobs, err := admin.GetDDLJobs(e.ctx.Txn(true)) if err != nil { return errors.Trace(err) } if e.jobNumber == 0 { e.jobNumber = admin.DefNumHistoryJobs } - historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn(), int(e.jobNumber)) + historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn(true), int(e.jobNumber)) if err != nil { return errors.Trace(err) } @@ -465,7 +465,7 @@ func (e *CheckTableExec) doCheckPartitionedTable(tbl table.PartitionedTable) err func (e *CheckTableExec) doCheckTable(tbl table.Table) error { for _, idx := range tbl.Indices() { - txn := e.ctx.Txn() + txn := e.ctx.Txn(true) err := admin.CompareIndexData(e.ctx, txn, tbl, idx, e.genExprs) if err != nil { return errors.Trace(err) @@ -628,7 +628,7 @@ func (e *SelectLockExec) Next(ctx context.Context, chk *chunk.Chunk) error { if len(e.Schema().TblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate { return nil } - txn := e.ctx.Txn() + txn := e.ctx.Txn(true) keys := make([]kv.Key, 0, chk.NumRows()) iter := chunk.NewIterator4Chunk(chk) for id, cols := range e.Schema().TblID2Handle { diff --git a/executor/executor_test.go b/executor/executor_test.go index 43a5293921ef9..cbfd1ee6e1bb0 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -312,7 +312,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data))) } ctx.StmtCommit() - err1 = ctx.Txn().Commit(context.Background()) + err1 = ctx.Txn(true).Commit(context.Background()) c.Assert(err1, IsNil) r := tk.MustQuery(selectSQL) r.Check(testutil.RowsWithSep("|", tt.expected...)) @@ -2103,11 +2103,11 @@ func (s *testSuite) TestTiDBCurrentTS(c *C) { tk.MustExec("begin") rows := tk.MustQuery("select @@tidb_current_ts").Rows() tsStr := rows[0][0].(string) - c.Assert(tsStr, Equals, fmt.Sprintf("%d", tk.Se.Txn().StartTS())) + c.Assert(tsStr, Equals, fmt.Sprintf("%d", tk.Se.Txn(true).StartTS())) tk.MustExec("begin") rows = tk.MustQuery("select @@tidb_current_ts").Rows() newTsStr := rows[0][0].(string) - c.Assert(newTsStr, Equals, fmt.Sprintf("%d", tk.Se.Txn().StartTS())) + c.Assert(newTsStr, Equals, fmt.Sprintf("%d", tk.Se.Txn(true).StartTS())) c.Assert(newTsStr, Not(Equals), tsStr) tk.MustExec("commit") tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0")) @@ -2126,7 +2126,7 @@ func (s *testSuite) TestSelectForUpdate(c *C) { tk.MustExec("drop table if exists t, t1") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") tk.MustExec("insert t values (12, 2, 3)") @@ -2829,7 +2829,7 @@ func (s *testSuite) TestCheckIndex(c *C) { c.Assert(err, IsNil) _, err = tb.AddRecord(s.ctx, recordVal2, false) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) mockCtx := mock.NewContext() idx := tb.Indices()[0] diff --git a/executor/insert.go b/executor/insert.go index 4bf2fb5fa9bd1..ba78cfb939e30 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -40,7 +40,7 @@ func (e *InsertExec) exec(rows [][]types.Datum) error { ignoreErr := sessVars.StmtCtx.DupKeyAsWarning if !sessVars.LightningMode { - sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(e.ctx.Txn(), kv.TempTxnMemBufCap) + sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(e.ctx.Txn(true), kv.TempTxnMemBufCap) } // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. diff --git a/executor/insert_common.go b/executor/insert_common.go index 8122eec52136a..224df2b1c3bc9 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -317,7 +317,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(rows return ErrBatchInsertFail.GenWithStack("BatchInsert failed with error: %v", err) } if !sessVars.LightningMode { - sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(e.ctx.Txn(), kv.TempTxnMemBufCap) + sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(e.ctx.Txn(true), kv.TempTxnMemBufCap) } } } @@ -542,9 +542,9 @@ func (e *InsertValues) batchCheckAndInsert(rows [][]types.Datum, addRecord func( } func (e *InsertValues) addRecord(row []types.Datum) (int64, error) { - e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil) + e.ctx.Txn(true).SetOption(kv.PresumeKeyNotExists, nil) h, err := e.Table.AddRecord(e.ctx, row, false) - e.ctx.Txn().DelOption(kv.PresumeKeyNotExists) + e.ctx.Txn(true).DelOption(kv.PresumeKeyNotExists) if err != nil { return 0, errors.Trace(err) } diff --git a/executor/point_get.go b/executor/point_get.go index cd32718f19e6f..d2955f1922ea8 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -31,6 +31,11 @@ import ( ) func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { + startTS, err := b.getStartTS() + if err != nil { + b.err = errors.Trace(err) + return nil + } return &PointGetExecutor{ ctx: b.ctx, schema: p.Schema(), @@ -38,7 +43,7 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { idxInfo: p.IndexInfo, idxVals: p.IndexValues, handle: p.Handle, - startTS: b.getStartTS(), + startTS: startTS, } } @@ -127,7 +132,7 @@ func (e *PointGetExecutor) encodeIndexKey() ([]byte, error) { } func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) { - txn := e.ctx.Txn() + txn := e.ctx.Txn(true) if txn != nil && txn.Valid() && !txn.IsReadOnly() { return txn.Get(key) } diff --git a/executor/prepared.go b/executor/prepared.go index 3c73898053c26..dc63db3a0fe1e 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -202,8 +202,6 @@ func (e *ExecuteExec) Build() error { var err error if IsPointGetWithPKOrUniqueKeyByAutoCommit(e.ctx, e.plan) { err = e.ctx.InitTxnWithStartTS(math.MaxUint64) - } else { - err = e.ctx.ActivePendingTxn() } if err != nil { return errors.Trace(err) diff --git a/executor/simple.go b/executor/simple.go index 880a79d3b0df1..b0432d9551de8 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -113,8 +113,8 @@ func (e *SimpleExec) executeBegin(s *ast.BeginStmt) error { // the transaction with COMMIT or ROLLBACK. The autocommit mode then // reverts to its previous state. e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true) - // Call ctx.Txn() to active pending txn. - e.ctx.Txn() + // Call ctx.Txn(true) to active pending txn. + e.ctx.Txn(true) return nil } @@ -126,9 +126,9 @@ func (e *SimpleExec) executeRollback(s *ast.RollbackStmt) error { sessVars := e.ctx.GetSessionVars() log.Debugf("con:%d execute rollback statement", sessVars.ConnectionID) sessVars.SetStatusFlag(mysql.ServerStatusInTrans, false) - if e.ctx.Txn().Valid() { + if e.ctx.Txn(true).Valid() { e.ctx.GetSessionVars().TxnCtx.ClearDelta() - return e.ctx.Txn().Rollback() + return e.ctx.Txn(true).Rollback() } return nil } @@ -208,7 +208,7 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error { } if len(failedUsers) > 0 { // Commit the transaction even if we returns error - err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx)) + err := e.ctx.Txn(true).Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx)) if err != nil { return errors.Trace(err) } diff --git a/executor/write_test.go b/executor/write_test.go index 31324a5b92b1a..a7da7256b7418 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1846,7 +1846,7 @@ func (s *testBypassSuite) TestBypassLatch(c *C) { tk1.MustExec("truncate table t") fn() - txn := tk1.Se.Txn() + txn := tk1.Se.Txn(true) txn.SetOption(kv.BypassLatch, true) // Bypass latch, there will be no conflicts. tk1.MustExec("commit") @@ -2092,7 +2092,7 @@ func (s *testSuite) TestRebaseIfNeeded(c *C) { // which could simulate another TiDB adds a large auto ID. _, err = tbl.AddRecord(s.ctx, types.MakeDatums(30001, 2), false) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) tk.MustExec(`update t set b = 3 where a = 30001;`) tk.MustExec(`insert into t (b) values (4);`) diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index d503791f8481a..fc59e709f64bd 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -771,7 +771,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderUnionScan(c *C) { err = se.NewTxn() c.Assert(err, IsNil) // Make txn not read only. - se.Txn().Set(kv.Key("AAA"), []byte("BBB")) + se.Txn(true).Set(kv.Key("AAA"), []byte("BBB")) se.StmtCommit() p, err := core.Optimize(se, stmt, s.is) c.Assert(err, IsNil) diff --git a/server/conn.go b/server/conn.go index 8754b0d11b603..b7b0699a7083f 100644 --- a/server/conn.go +++ b/server/conn.go @@ -756,7 +756,7 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat // 1. latches may result in false positive transaction conflicts. // 2. load data is not retryable when it meets conflicts. // 3. load data will abort abnormally under condition 1 + 2. - loadDataInfo.Ctx.Txn().SetOption(kv.BypassLatch, true) + loadDataInfo.Ctx.Txn(true).SetOption(kv.BypassLatch, true) // Make sure that there are no retries when committing. if err = loadDataInfo.Ctx.RefreshTxnCtx(ctx); err != nil { return nil, errors.Trace(err) @@ -814,7 +814,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor } } - txn := loadDataInfo.Ctx.Txn() + txn := loadDataInfo.Ctx.Txn(true) loadDataInfo.Ctx.StmtCommit() if err != nil { if txn != nil && txn.Valid() { diff --git a/session/session.go b/session/session.go index 628f4632dace9..afa423136cb31 100644 --- a/session/session.go +++ b/session/session.go @@ -948,8 +948,8 @@ func (s *session) DropPreparedStmt(stmtID uint32) error { return nil } -func (s *session) Txn(opt ...bool) kv.Transaction { - if s.txn.pending() && len(opt) == 0 { +func (s *session) Txn(active bool) kv.Transaction { + if s.txn.pending() && active { // Transaction is lazy intialized. // PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn, // If Txn() is called later, wait for the future to get a valid txn. @@ -1386,20 +1386,6 @@ func (s *session) RefreshTxnCtx(ctx context.Context) error { return errors.Trace(s.NewTxn()) } -// ActivePendingTxn implements Context.ActivePendingTxn interface. -func (s *session) ActivePendingTxn() error { - if s.txn.Valid() { - return nil - } - txnCap := s.getMembufCap() - // The transaction status should be pending. - if err := s.txn.changePendingToValid(txnCap); err != nil { - return errors.Trace(err) - } - s.sessionVars.TxnCtx.StartTS = s.txn.StartTS() - return nil -} - // InitTxnWithStartTS create a transaction with startTS. func (s *session) InitTxnWithStartTS(startTS uint64) error { if s.txn.Valid() { diff --git a/session/session_test.go b/session/session_test.go index 4c54f5559ad48..993b1ee3bd730 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -137,10 +137,6 @@ func (s *testSessionSuite) TestForCoverage(c *C) { tk.MustExec("show processlist") _, err := tk.Se.FieldList("t") c.Check(err, IsNil) - - // Cover the error branch, althrough this never happen. - err = tk.Se.ActivePendingTxn() - c.Assert(err, NotNil) } func (s *testSessionSuite) TestErrorRollback(c *C) { @@ -243,7 +239,7 @@ func (s *testSessionSuite) TestRowLock(c *C) { tk2 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("drop table if exists t") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") tk.MustExec("insert t values (12, 2, 3)") @@ -483,7 +479,7 @@ func (s *testSessionSuite) TestRetryCleanTxn(c *C) { history.Add(0, stmt, tk.Se.GetSessionVars().StmtCtx) _, err = tk.Exec("commit") c.Assert(err, NotNil) - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) c.Assert(tk.Se.GetSessionVars().InTxn(), IsFalse) } @@ -558,39 +554,39 @@ func (s *testSessionSuite) TestInTrans(c *C) { tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") tk.MustExec("insert t values ()") tk.MustExec("begin") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("drop table if exists t;") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("commit") tk.MustExec("insert t values ()") tk.MustExec("set autocommit=0") tk.MustExec("begin") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("commit") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("commit") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("set autocommit=1") tk.MustExec("drop table if exists t") tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") tk.MustExec("begin") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("rollback") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) } func (s *testSessionSuite) TestRetryPreparedStmt(c *C) { @@ -599,7 +595,7 @@ func (s *testSessionSuite) TestRetryPreparedStmt(c *C) { tk2 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("drop table if exists t") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") diff --git a/session/txn.go b/session/txn.go index fd4f3cf6227c6..0d68b888374eb 100644 --- a/session/txn.go +++ b/session/txn.go @@ -33,7 +33,7 @@ import ( // TxnState wraps kv.Transaction to provide a new kv.Transaction. // 1. It holds all statement related modification in the buffer before flush to the txn, // so if execute statement meets error, the txn won't be made dirty. -// 2. It's a lazy transaction, that means it's a txnFuture befort StartTS() is really need. +// 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need. type TxnState struct { // States of a TxnState should be one of the followings: // Invalid: kv.Transaction == nil && txnFuture == nil diff --git a/sessionctx/context.go b/sessionctx/context.go index 1daf9383e6eb3..8ea5a36f15efb 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -34,8 +34,10 @@ type Context interface { NewTxn() error // Txn returns the current transaction which is created before executing a statement. - // The returned kv.Transaction is not nil, but maybe pending or invalid. - Txn(...bool) kv.Transaction + // The returned kv.Transaction is not nil, but it maybe pending or invalid. + // If the active parameter is true, call this function will wait for the pending txn + // to become valid. + Txn(active bool) kv.Transaction // GetClient gets a kv.Client. GetClient() kv.Client @@ -58,10 +60,6 @@ type Context interface { // now just for load data and batch insert. RefreshTxnCtx(context.Context) error - // ActivePendingTxn receives the pending transaction from the transaction channel. - // It should be called right before we builds an executor. - ActivePendingTxn() error - // InitTxnWithStartTS initializes a transaction with startTS. // It should be called right before we builds an executor. InitTxnWithStartTS(startTS uint64) error diff --git a/statistics/ddl.go b/statistics/ddl.go index ac0dda56dbd36..c907760a49f85 100644 --- a/statistics/ddl.go +++ b/statistics/ddl.go @@ -55,18 +55,18 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo) (err error) { defer func() { err = finishTransaction(context.Background(), exec, err) }() - _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", h.mu.ctx.Txn().StartTS(), info.ID)) + _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", h.mu.ctx.Txn(true).StartTS(), info.ID)) if err != nil { return } for _, col := range info.Columns { - _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", info.ID, col.ID, h.mu.ctx.Txn().StartTS())) + _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", info.ID, col.ID, h.mu.ctx.Txn(true).StartTS())) if err != nil { return } } for _, idx := range info.Indices { - _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", info.ID, idx.ID, h.mu.ctx.Txn().StartTS())) + _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", info.ID, idx.ID, h.mu.ctx.Txn(true).StartTS())) if err != nil { return } @@ -88,7 +88,7 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er err = finishTransaction(context.Background(), exec, err) }() // First of all, we update the version. - _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn().StartTS(), tableID)) + _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn(true).StartTS(), tableID)) if err != nil { return } @@ -117,13 +117,13 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er } if value.IsNull() { // If the adding column has default value null, all the existing rows have null value on the newly added column. - _, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", h.mu.ctx.Txn().StartTS(), tableID, colInfo.ID, count)) + _, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", h.mu.ctx.Txn(true).StartTS(), tableID, colInfo.ID, count)) if err != nil { return } } else { // If this stats exists, we insert histogram meta first, the distinct_count will always be one. - _, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", h.mu.ctx.Txn().StartTS(), tableID, colInfo.ID, int64(len(value.GetBytes()))*count)) + _, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", h.mu.ctx.Txn(true).StartTS(), tableID, colInfo.ID, int64(len(value.GetBytes()))*count)) if err != nil { return } diff --git a/statistics/gc.go b/statistics/gc.go index 54b38f171b586..57a47eb40917f 100644 --- a/statistics/gc.go +++ b/statistics/gc.go @@ -105,7 +105,7 @@ func (h *Handle) deleteHistStatsFromKV(tableID int64, histID int64, isIndex int) err = finishTransaction(context.Background(), exec, err) }() // First of all, we update the version. If this table doesn't exist, it won't have any problem. Because we cannot delete anything. - _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn().StartTS(), tableID)) + _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn(true).StartTS(), tableID)) if err != nil { return } @@ -132,7 +132,7 @@ func (h *Handle) DeleteTableStatsFromKV(id int64) (err error) { err = finishTransaction(context.Background(), exec, err) }() // We only update the version so that other tidb will know that this table is deleted. - sql := fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn().StartTS(), id) + sql := fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn(true).StartTS(), id) _, err = exec.Execute(context.Background(), sql) if err != nil { return diff --git a/statistics/histogram.go b/statistics/histogram.go index 67ef41640031d..4695cbdd45d90 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -210,7 +210,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg defer func() { err = finishTransaction(context.Background(), exec, err) }() - txn := h.mu.ctx.Txn() + txn := h.mu.ctx.Txn(true) version := txn.StartTS() var sql string // If the count is less than 0, then we do not want to update the modify count and count. @@ -281,7 +281,7 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error err = finishTransaction(ctx, exec, err) }() var sql string - version := h.mu.ctx.Txn().StartTS() + version := h.mu.ctx.Txn(true).StartTS() sql = fmt.Sprintf("replace into mysql.stats_meta (version, table_id, count, modify_count) values (%d, %d, %d, %d)", version, tableID, count, modifyCount) _, err = exec.Execute(ctx, sql) return diff --git a/statistics/update.go b/statistics/update.go index 47cc79a68d1b5..a2771815e92ed 100644 --- a/statistics/update.go +++ b/statistics/update.go @@ -327,9 +327,9 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up }() var sql string if delta.Delta < 0 { - sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count - %d, modify_count = modify_count + %d where table_id = %d and count >= %d", h.mu.ctx.Txn().StartTS(), -delta.Delta, delta.Count, id, -delta.Delta) + sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count - %d, modify_count = modify_count + %d where table_id = %d and count >= %d", h.mu.ctx.Txn(true).StartTS(), -delta.Delta, delta.Count, id, -delta.Delta) } else { - sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", h.mu.ctx.Txn().StartTS(), delta.Delta, delta.Count, id) + sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", h.mu.ctx.Txn(true).StartTS(), delta.Delta, delta.Count, id) } _, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(ctx, sql) if err != nil { diff --git a/table/tables/tables.go b/table/tables/tables.go index 5115a35bc0260..75189bc8a2189 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -271,7 +271,7 @@ func (t *tableCommon) FirstKey() kv.Key { // `touched` means which columns are really modified, used for secondary indices. // Length of `oldData` and `newData` equals to length of `t.WritableCols()`. func (t *tableCommon) UpdateRecord(ctx sessionctx.Context, h int64, oldData, newData []types.Datum, touched []bool) error { - txn := ctx.Txn() + txn := ctx.Txn(true) // TODO: reuse bs, like AddRecord does. bs := kv.NewBufferStore(txn, kv.DefaultTxnMembufCap) @@ -390,12 +390,12 @@ func adjustRowValuesBuf(writeBufs *variable.WriteStmtBufs, rowLen int) { // Just add the kv to transaction's membuf directly. func (t *tableCommon) getRollbackableMemStore(ctx sessionctx.Context) kv.RetrieverMutator { if ctx.GetSessionVars().LightningMode { - return ctx.Txn() + return ctx.Txn(true) } bs := ctx.GetSessionVars().GetWriteStmtBufs().BufStore if bs == nil { - bs = kv.NewBufferStore(ctx.Txn(), kv.DefaultTxnMembufCap) + bs = kv.NewBufferStore(ctx.Txn(true), kv.DefaultTxnMembufCap) } else { bs.Reset() } @@ -426,7 +426,7 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, skipHan } } - txn := ctx.Txn() + txn := ctx.Txn(true) sessVars := ctx.GetSessionVars() // when LightningMode or BatchCheck is true, // no needs to check the key constrains, so we names the variable skipCheck. @@ -524,7 +524,7 @@ func (t *tableCommon) genIndexKeyStr(colVals []types.Datum) (string, error) { // addIndices adds data into indices. If any key is duplicated, returns the original handle. func (t *tableCommon) addIndices(ctx sessionctx.Context, recordID int64, r []types.Datum, rm kv.RetrieverMutator, skipHandleCheck bool) (int64, error) { - txn := ctx.Txn() + txn := ctx.Txn(true) // Clean up lazy check error environment defer txn.DelOption(kv.PresumeKeyNotExistsError) skipCheck := ctx.GetSessionVars().LightningMode || ctx.GetSessionVars().StmtCtx.BatchCheck @@ -568,7 +568,7 @@ func (t *tableCommon) addIndices(ctx sessionctx.Context, recordID int64, r []typ func (t *tableCommon) RowWithCols(ctx sessionctx.Context, h int64, cols []*table.Column) ([]types.Datum, error) { // Get raw row data from kv. key := t.RecordKey(h) - value, err := ctx.Txn().Get(key) + value, err := ctx.Txn(true).Get(key) if err != nil { return nil, errors.Trace(err) } @@ -709,7 +709,7 @@ func (t *tableCommon) addDeleteBinlog(ctx sessionctx.Context, r []types.Datum, c func (t *tableCommon) removeRowData(ctx sessionctx.Context, h int64) error { // Remove row data. - err := ctx.Txn().Delete([]byte(t.RecordKey(h))) + err := ctx.Txn(true).Delete([]byte(t.RecordKey(h))) if err != nil { return errors.Trace(err) } @@ -721,14 +721,14 @@ func (t *tableCommon) removeRowIndices(ctx sessionctx.Context, h int64, rec []ty for _, v := range t.DeletableIndices() { vals, err := v.FetchValues(rec, nil) if err != nil { - log.Infof("remove row index %v failed %v, txn %d, handle %d, data %v", v.Meta(), err, ctx.Txn().StartTS(), h, rec) + log.Infof("remove row index %v failed %v, txn %d, handle %d, data %v", v.Meta(), err, ctx.Txn(true).StartTS(), h, rec) return errors.Trace(err) } - if err = v.Delete(ctx.GetSessionVars().StmtCtx, ctx.Txn(), vals, h); err != nil { + if err = v.Delete(ctx.GetSessionVars().StmtCtx, ctx.Txn(true), vals, h); err != nil { if v.Meta().State != model.StatePublic && kv.ErrNotExist.Equal(err) { // If the index is not in public state, we may have not created the index, // or already deleted the index, so skip ErrNotExist error. - log.Debugf("remove row index %v doesn't exist, txn %d, handle %d", v.Meta(), ctx.Txn().StartTS(), h) + log.Debugf("remove row index %v doesn't exist, txn %d, handle %d", v.Meta(), ctx.Txn(true).StartTS(), h) continue } return errors.Trace(err) @@ -767,7 +767,7 @@ func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMu func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column, fn table.RecordIterFunc) error { prefix := t.RecordPrefix() - it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext()) + it, err := ctx.Txn(true).Iter(startKey, prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -896,7 +896,7 @@ func (t *tableCommon) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetS // Seek implements table.Table Seek interface. func (t *tableCommon) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) { seekKey := tablecodec.EncodeRowKeyWithHandle(t.physicalTableID, h) - iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext()) + iter, err := ctx.Txn(true).Iter(seekKey, t.RecordPrefix().PrefixNext()) if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) { // No more records in the table, skip to the end. return 0, false, nil @@ -983,7 +983,7 @@ func CheckHandleExists(ctx sessionctx.Context, t table.Table, recordID int64, da } t = pt.GetPartition(pid) } - txn := ctx.Txn() + txn := ctx.Txn(true) // Check key exists. recordKey := t.RecordKey(recordID) e := kv.ErrKeyExists.FastGen("Duplicate entry '%d' for key 'PRIMARY'", recordID) @@ -1026,7 +1026,7 @@ func (ctx *ctxForPartitionExpr) NewTxn() error { } // Txn returns the current transaction which is created before executing a statement. -func (ctx *ctxForPartitionExpr) Txn(...bool) kv.Transaction { +func (ctx *ctxForPartitionExpr) Txn(bool) kv.Transaction { panic("not support") } @@ -1066,12 +1066,6 @@ func (ctx *ctxForPartitionExpr) RefreshTxnCtx(context.Context) error { panic("not support") } -// ActivePendingTxn receives the pending transaction from the transaction channel. -// It should be called right before we builds an executor. -func (ctx *ctxForPartitionExpr) ActivePendingTxn() error { - panic("not support") -} - // InitTxnWithStartTS initializes a transaction with startTS. // It should be called right before we builds an executor. func (ctx *ctxForPartitionExpr) InitTxnWithStartTS(startTS uint64) error { diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index 54e9919f6409a..41f686e3166b3 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -147,7 +147,7 @@ func (ts *testSuite) TestBasic(c *C) { func countEntriesWithPrefix(ctx sessionctx.Context, prefix []byte) (int, error) { cnt := 0 - err := util.ScanMetaWithPrefix(ctx.Txn(), prefix, func(k kv.Key, v []byte) bool { + err := util.ScanMetaWithPrefix(ctx.Txn(true), prefix, func(k kv.Key, v []byte) bool { cnt++ return true }) @@ -230,7 +230,7 @@ func (ts *testSuite) TestUniqueIndexMultipleNullEntries(c *C) { c.Assert(err, IsNil) _, err = tb.AddRecord(sctx, types.MakeDatums(2, nil), false) c.Assert(err, IsNil) - c.Assert(sctx.Txn().Rollback(), IsNil) + c.Assert(sctx.Txn(true).Rollback(), IsNil) _, err = ts.se.Execute(context.Background(), "drop table test.t") c.Assert(err, IsNil) } @@ -289,7 +289,7 @@ func (ts *testSuite) TestUnsignedPK(c *C) { c.Assert(err, IsNil) c.Assert(len(row), Equals, 2) c.Assert(row[0].Kind(), Equals, types.KindUint64) - c.Assert(ts.se.Txn().Commit(context.Background()), IsNil) + c.Assert(ts.se.Txn(true).Commit(context.Background()), IsNil) } func (ts *testSuite) TestIterRecords(c *C) { @@ -309,7 +309,7 @@ func (ts *testSuite) TestIterRecords(c *C) { }) c.Assert(err, IsNil) c.Assert(totalCount, Equals, 2) - c.Assert(ts.se.Txn().Commit(context.Background()), IsNil) + c.Assert(ts.se.Txn(true).Commit(context.Background()), IsNil) } func (ts *testSuite) TestTableFromMeta(c *C) { @@ -354,7 +354,7 @@ PARTITION BY RANGE ( id ) ( c.Assert(err, IsNil) // Check that add record writes to the partition, rather than the table. - txn := ts.se.Txn() + txn := ts.se.Txn(true) val, err := txn.Get(tables.PartitionRecordKey(p0.ID, rid)) c.Assert(err, IsNil) c.Assert(len(val), Greater, 0) diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index d678f7d4543fc..448eec584638d 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -294,7 +294,7 @@ func (s *testSuite) TestScan(c *C) { c.Assert(s.ctx.NewTxn(), IsNil) _, err = tb.AddRecord(s.ctx, types.MakeDatums(1, 10, 11), false) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) record1 := &RecordData{Handle: int64(1), Values: types.MakeDatums(int64(1), int64(10), int64(11))} record2 := &RecordData{Handle: int64(2), Values: types.MakeDatums(int64(2), int64(20), int64(21))} @@ -307,7 +307,7 @@ func (s *testSuite) TestScan(c *C) { c.Assert(s.ctx.NewTxn(), IsNil) _, err = tb.AddRecord(s.ctx, record2.Values, false) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) txn, err := s.store.Begin() c.Assert(err, IsNil) @@ -351,7 +351,7 @@ func (s *testSuite) TestScan(c *C) { c.Assert(err, IsNil) err = tb.RemoveRecord(s.ctx, 2, record2.Values) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) } func newDiffRetError(prefix string, ra, rb *RecordData) string { diff --git a/util/kvencoder/kv_encoder.go b/util/kvencoder/kv_encoder.go index c5ab7cc32f947..bc6f1f54b5667 100644 --- a/util/kvencoder/kv_encoder.go +++ b/util/kvencoder/kv_encoder.go @@ -142,7 +142,7 @@ func (e *kvEncoder) Encode(sql string, tableID int64) (kvPairs []KvPair, affecte } func (e *kvEncoder) getKvPairsInMemBuffer(tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) { - txnMemBuffer := e.se.Txn().GetMemBuffer() + txnMemBuffer := e.se.Txn(true).GetMemBuffer() kvPairs = make([]KvPair, 0, txnMemBuffer.Len()) err = kv.WalkMemBuffer(txnMemBuffer, func(k kv.Key, v []byte) error { if bytes.HasPrefix(k, tablecodec.TablePrefix()) { diff --git a/util/kvencoder/kv_encoder_test.go b/util/kvencoder/kv_encoder_test.go index 27c9c40ea408e..74a0f2d5338b1 100644 --- a/util/kvencoder/kv_encoder_test.go +++ b/util/kvencoder/kv_encoder_test.go @@ -87,7 +87,7 @@ func getExpectKvPairs(tkExpect *testkit.TestKit, sql string) []KvPair { tkExpect.MustExec("begin") tkExpect.MustExec(sql) kvPairsExpect := make([]KvPair, 0) - kv.WalkMemBuffer(tkExpect.Se.Txn().GetMemBuffer(), func(k kv.Key, v []byte) error { + kv.WalkMemBuffer(tkExpect.Se.Txn(true).GetMemBuffer(), func(k kv.Key, v []byte) error { kvPairsExpect = append(kvPairsExpect, KvPair{Key: k, Val: v}) return nil }) diff --git a/util/mock/context.go b/util/mock/context.go index af73669720bab..692a4a787a550 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -88,7 +88,7 @@ func (c *Context) GetSessionVars() *variable.SessionVars { } // Txn implements sessionctx.Context Txn interface. -func (c *Context) Txn(...bool) kv.Transaction { +func (c *Context) Txn(bool) kv.Transaction { return &c.txn } @@ -149,21 +149,6 @@ func (c *Context) RefreshTxnCtx(ctx context.Context) error { return errors.Trace(c.NewTxn()) } -// ActivePendingTxn implements the sessionctx.Context interface. -func (c *Context) ActivePendingTxn() error { - if c.txn.Valid() { - return nil - } - if c.Store != nil { - txn, err := c.Store.Begin() - if err != nil { - return errors.Trace(err) - } - c.txn.Transaction = txn - } - return nil -} - // InitTxnWithStartTS implements the sessionctx.Context interface with startTS. func (c *Context) InitTxnWithStartTS(startTS uint64) error { if c.txn.Valid() {