Skip to content

Commit

Permalink
parser, executor: implement plan replayer capture statement (#39181)
Browse files Browse the repository at this point in the history
ref #38779
  • Loading branch information
Yisaer committed Nov 17, 2022
1 parent 0722251 commit 9c48480
Show file tree
Hide file tree
Showing 14 changed files with 3,773 additions and 3,643 deletions.
22 changes: 22 additions & 0 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,28 @@ func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, ta
return true, nil
}

// CheckPlanReplayerTaskExists checks whether plan replayer capture task exists already
func CheckPlanReplayerTaskExists(ctx context.Context, sctx sessionctx.Context, sqlDigest, planDigest string) (bool, error) {
exec := sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_task where sql_digest = '%v' and plan_digest = '%v'",
sqlDigest, planDigest))
if err != nil {
return false, err
}
if rs == nil {
return false, nil
}
var rows []chunk.Row
defer terror.Call(rs.Close)
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return false, errors.Trace(err)
}
if len(rows) > 0 {
return true, nil
}
return false, nil
}

// PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status
type PlanReplayerStatusRecord struct {
SQLDigest string
Expand Down
3 changes: 3 additions & 0 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
zw := zip.NewWriter(zf)
records := generateRecords(task)
defer func() {
if err != nil {
logutil.BgLogger().Error("dump plan replayer failed", zap.Error(err))
}
err = zw.Close()
if err != nil {
logutil.BgLogger().Error("Closing zip writer failed", zap.Error(err), zap.String("filename", fileName))
Expand Down
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,17 @@ func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executo
}
return e
}
if v.Capture {
e := &PlanReplayerExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
CaptureInfo: &PlanReplayerCaptureInfo{
SQLDigest: v.SQLDigest,
PlanDigest: v.PlanDigest,
},
}
return e
}

e := &PlanReplayerExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
DumpInfo: &PlanReplayerDumpInfo{
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
}
if c.Ctx.GetSessionVars().EnablePlanReplayerCapture {
if c.Ctx.GetSessionVars().EnablePlanReplayerCapture && !c.Ctx.GetSessionVars().InRestrictedSQL {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode)
}

Expand Down
9 changes: 9 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ func TestPlanReplayer(t *testing.T) {
require.Len(t, rows, 1)
}

func TestPlanReplayerCapture(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("plan replayer capture '123' '123';")
tk.MustQuery("select sql_digest, plan_digest from mysql.plan_replayer_task;").Check(testkit.Rows("123 123"))
tk.MustGetErrMsg("plan replayer capture '123' '123';", "plan replayer capture task already exists")
}

func TestShow(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
36 changes: 34 additions & 2 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -43,8 +44,15 @@ var _ Executor = &PlanReplayerLoadExec{}
// PlanReplayerExec represents a plan replayer executor.
type PlanReplayerExec struct {
baseExecutor
DumpInfo *PlanReplayerDumpInfo
endFlag bool
CaptureInfo *PlanReplayerCaptureInfo
DumpInfo *PlanReplayerDumpInfo
endFlag bool
}

// PlanReplayerCaptureInfo indicates capture info
type PlanReplayerCaptureInfo struct {
SQLDigest string
PlanDigest string
}

// PlanReplayerDumpInfo indicates dump info
Expand All @@ -63,6 +71,9 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.endFlag {
return nil
}
if e.CaptureInfo != nil {
return e.registerCaptureTask(ctx)
}
err := e.createFile()
if err != nil {
return err
Expand All @@ -89,6 +100,27 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
exists, err := domain.CheckPlanReplayerTaskExists(ctx1, e.ctx, e.CaptureInfo.SQLDigest, e.CaptureInfo.PlanDigest)
if err != nil {
return err
}
if exists {
return errors.New("plan replayer capture task already exists")
}
exec := e.ctx.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx1, fmt.Sprintf("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('%s','%s')",
e.CaptureInfo.SQLDigest, e.CaptureInfo.PlanDigest))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
return err
}
e.endFlag = true
return nil
}

func (e *PlanReplayerExec) createFile() error {
var err error
e.DumpInfo.File, e.DumpInfo.FileName, err = domain.GeneratePlanReplayerFile()
Expand Down
17 changes: 8 additions & 9 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1553,15 +1553,14 @@ func TestVariablesInfo(t *testing.T) {
// See session/bootstrap.go:doDMLWorks() for where the exceptions are defined.
stmt := tk.MustQuery(`SELECT variable_name, default_value, current_value FROM information_schema.variables_info WHERE current_value != default_value and default_value != '' ORDER BY variable_name`)
stmt.Check(testkit.Rows(
"last_sql_use_alloc OFF ON", // for test stability
"tidb_enable_auto_analyze ON OFF", // always changed for tests
"tidb_enable_collect_execution_info ON OFF", // for test stability
"tidb_enable_mutation_checker OFF ON", // for new installs
"tidb_enable_plan_replayer_capture OFF false", // for enable plan replayer capture
"tidb_mem_oom_action CANCEL LOG", // always changed for tests
"tidb_row_format_version 1 2", // for new installs
"tidb_txn_assertion_level OFF FAST", // for new installs
"timestamp 0 123456789", // always dynamic
"last_sql_use_alloc OFF ON", // for test stability
"tidb_enable_auto_analyze ON OFF", // always changed for tests
"tidb_enable_collect_execution_info ON OFF", // for test stability
"tidb_enable_mutation_checker OFF ON", // for new installs
"tidb_mem_oom_action CANCEL LOG", // always changed for tests
"tidb_row_format_version 1 2", // for new installs
"tidb_txn_assertion_level OFF FAST", // for new installs
"timestamp 0 123456789", // always dynamic
))
}

Expand Down
13 changes: 13 additions & 0 deletions parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ type PlanReplayerStmt struct {
Stmt StmtNode
Analyze bool
Load bool

// Capture indicates 'plan replayer capture <sql_digest> <plan_digest>'
Capture bool
SQLDigest string
PlanDigest string

// File is used to store 2 cases:
// 1. plan replayer load 'file';
// 2. plan replayer dump explain <analyze> 'file'
Expand All @@ -284,6 +290,13 @@ func (n *PlanReplayerStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteString(n.File)
return nil
}
if n.Capture {
ctx.WriteKeyWord("PLAN REPLAYER CAPTURE ")
ctx.WriteString(n.SQLDigest)
ctx.WriteKeyWord(" ")
ctx.WriteString(n.PlanDigest)
return nil
}
ctx.WriteKeyWord("PLAN REPLAYER DUMP EXPLAIN ")
if n.Analyze {
ctx.WriteKeyWord("ANALYZE ")
Expand Down
Loading

0 comments on commit 9c48480

Please sign in to comment.