From 52b57ddfdbf29e93b5b20e33190a6e09b9d7a886 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 10 May 2023 16:28:09 +0800 Subject: [PATCH] planner: recalculate as-of ts of staleread when plan is cached (#43204) (#43550) close pingcap/tidb#43044 --- executor/stale_txn_test.go | 26 ++++++++++++++++++++++++++ sessiontxn/staleread/processor.go | 21 +++++++++++++++++---- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index a6e814b68c172..dc1ab4ff962a8 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -1370,3 +1370,29 @@ func TestIssue35686(t *testing.T) { // This query should not panic tk.MustQuery("select * from information_schema.ddl_jobs as of timestamp now()") } + +func TestStalePrepare(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + defer tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int)") + + stmtID, _, _, err := tk.Session().PrepareStmt("select * from t as of timestamp now(3) - interval 1000 microsecond order by id asc") + require.Nil(t, err) + tk.MustExec("prepare stmt from \"select * from t as of timestamp now(3) - interval 1000 microsecond order by id asc\"") + + var expected [][]interface{} + for i := 0; i < 20; i++ { + tk.MustExec("insert into t values(?)", i) + time.Sleep(2 * time.Millisecond) // sleep 2ms to ensure staleread_ts > commit_ts. + + expected = append(expected, testkit.Rows(fmt.Sprintf("%d", i))...) + rs, err := tk.Session().ExecutePreparedStmt(context.Background(), stmtID, nil) + require.Nil(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(expected) + rs.Close() + tk.MustQuery("execute stmt").Check(expected) + } +} diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index 2fc7f3806632b..17df59c2873e3 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -167,7 +167,10 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { } // If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...' - stmtAsOfTS, err := parseAndValidateAsOf(p.ctx, p.sctx, tn.AsOf) + evaluateTS := func(sctx sessionctx.Context) (uint64, error) { + return parseAndValidateAsOf(context.Background(), p.sctx, tn.AsOf) + } + stmtAsOfTS, err := evaluateTS(p.sctx) if err != nil { return err } @@ -179,7 +182,7 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { } return nil } - return p.evaluateFromStmtTSOrSysVariable(stmtAsOfTS) + return p.evaluateFromStmtTSOrSysVariable(stmtAsOfTS, evaluateTS) } func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator StalenessTSEvaluator) (err error) { @@ -201,7 +204,10 @@ func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator Staleness return err } } - return p.evaluateFromStmtTSOrSysVariable(stmtTS) + // When executing a prepared stmt, the stmtTS is calculated once and reused to avoid eval overhead, + // note it only takes PlanCacheStmt.SnapshotTSEvaluator without overwriting it. + // the evaluator will be re-calculated in next execution. + return p.evaluateFromStmtTSOrSysVariable(stmtTS, nil) } func (p *staleReadProcessor) evaluateFromTxn() error { @@ -223,7 +229,7 @@ func (p *staleReadProcessor) evaluateFromTxn() error { return p.setAsNonStaleRead() } -func (p *staleReadProcessor) evaluateFromStmtTSOrSysVariable(stmtTS uint64) error { +func (p *staleReadProcessor) evaluateFromStmtTSOrSysVariable(stmtTS uint64, evaluator StalenessTSEvaluator) error { // If `txnReadTS` is not 0, it means we meet following situation: // set transaction read only as of timestamp ... // select from table or execute prepared statement @@ -235,6 +241,13 @@ func (p *staleReadProcessor) evaluateFromStmtTSOrSysVariable(stmtTS uint64) erro if stmtTS > 0 { p.stmtTS = stmtTS + if evaluator != nil { + is, err := GetSessionSnapshotInfoSchema(p.sctx, stmtTS) + if err != nil { + return err + } + return p.setEvaluatedValues(stmtTS, is, evaluator) + } return p.setEvaluatedTS(stmtTS) }