Skip to content

Commit

Permalink
planner: recalculate as-of ts of staleread when plan is cached (#43204)…
Browse files Browse the repository at this point in the history
… (#43550)

close #43044
  • Loading branch information
ti-chi-bot committed May 10, 2023
1 parent 3fa4fb9 commit 52b57dd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
26 changes: 26 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,3 +1370,29 @@ func TestIssue35686(t *testing.T) {
// This query should not panic
tk.MustQuery("select * from information_schema.ddl_jobs as of timestamp now()")
}

func TestStalePrepare(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

stmtID, _, _, err := tk.Session().PrepareStmt("select * from t as of timestamp now(3) - interval 1000 microsecond order by id asc")
require.Nil(t, err)
tk.MustExec("prepare stmt from \"select * from t as of timestamp now(3) - interval 1000 microsecond order by id asc\"")

var expected [][]interface{}
for i := 0; i < 20; i++ {
tk.MustExec("insert into t values(?)", i)
time.Sleep(2 * time.Millisecond) // sleep 2ms to ensure staleread_ts > commit_ts.

expected = append(expected, testkit.Rows(fmt.Sprintf("%d", i))...)
rs, err := tk.Session().ExecutePreparedStmt(context.Background(), stmtID, nil)
require.Nil(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(expected)
rs.Close()
tk.MustQuery("execute stmt").Check(expected)
}
}
21 changes: 17 additions & 4 deletions sessiontxn/staleread/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error {
}

// If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...'
stmtAsOfTS, err := parseAndValidateAsOf(p.ctx, p.sctx, tn.AsOf)
evaluateTS := func(sctx sessionctx.Context) (uint64, error) {
return parseAndValidateAsOf(context.Background(), p.sctx, tn.AsOf)
}
stmtAsOfTS, err := evaluateTS(p.sctx)
if err != nil {
return err
}
Expand All @@ -179,7 +182,7 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error {
}
return nil
}
return p.evaluateFromStmtTSOrSysVariable(stmtAsOfTS)
return p.evaluateFromStmtTSOrSysVariable(stmtAsOfTS, evaluateTS)
}

func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator StalenessTSEvaluator) (err error) {
Expand All @@ -201,7 +204,10 @@ func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator Staleness
return err
}
}
return p.evaluateFromStmtTSOrSysVariable(stmtTS)
// When executing a prepared stmt, the stmtTS is calculated once and reused to avoid eval overhead,
// note it only takes PlanCacheStmt.SnapshotTSEvaluator without overwriting it.
// the evaluator will be re-calculated in next execution.
return p.evaluateFromStmtTSOrSysVariable(stmtTS, nil)
}

func (p *staleReadProcessor) evaluateFromTxn() error {
Expand All @@ -223,7 +229,7 @@ func (p *staleReadProcessor) evaluateFromTxn() error {
return p.setAsNonStaleRead()
}

func (p *staleReadProcessor) evaluateFromStmtTSOrSysVariable(stmtTS uint64) error {
func (p *staleReadProcessor) evaluateFromStmtTSOrSysVariable(stmtTS uint64, evaluator StalenessTSEvaluator) error {
// If `txnReadTS` is not 0, it means we meet following situation:
// set transaction read only as of timestamp ...
// select from table or execute prepared statement
Expand All @@ -235,6 +241,13 @@ func (p *staleReadProcessor) evaluateFromStmtTSOrSysVariable(stmtTS uint64) erro

if stmtTS > 0 {
p.stmtTS = stmtTS
if evaluator != nil {
is, err := GetSessionSnapshotInfoSchema(p.sctx, stmtTS)
if err != nil {
return err
}
return p.setEvaluatedValues(stmtTS, is, evaluator)
}
return p.setEvaluatedTS(stmtTS)
}

Expand Down

0 comments on commit 52b57dd

Please sign in to comment.