Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parser, executor: implement plan replayer capture statement #39181

Merged
merged 12 commits into from
Nov 17, 2022
22 changes: 22 additions & 0 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,28 @@ func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, ta
return true, nil
}

// CheckPlanReplayerTaskExists checks whether plan replayer capture task exists already
func CheckPlanReplayerTaskExists(ctx context.Context, sctx sessionctx.Context, sqlDigest, planDigest string) (bool, error) {
exec := sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_task where sql_digest = '%v' and plan_digest = '%v'",
sqlDigest, planDigest))
if err != nil {
return false, err
}
if rs == nil {
return false, nil
}
var rows []chunk.Row
defer terror.Call(rs.Close)
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return false, errors.Trace(err)
}
if len(rows) > 0 {
return true, nil
}
return false, nil
}

// PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status
type PlanReplayerStatusRecord struct {
SQLDigest string
Expand Down
3 changes: 3 additions & 0 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
zw := zip.NewWriter(zf)
records := generateRecords(task)
defer func() {
if err != nil {
logutil.BgLogger().Error("dump plan replayer failed", zap.Error(err))
}
err = zw.Close()
if err != nil {
logutil.BgLogger().Error("Closing zip writer failed", zap.Error(err), zap.String("filename", fileName))
Expand Down
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,17 @@ func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executo
}
return e
}
if v.Capture {
e := &PlanReplayerExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
CaptureInfo: &PlanReplayerCaptureInfo{
SQLDigest: v.SQLDigest,
PlanDigest: v.PlanDigest,
},
}
return e
}

e := &PlanReplayerExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
DumpInfo: &PlanReplayerDumpInfo{
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
}
if c.Ctx.GetSessionVars().EnablePlanReplayerCapture {
if c.Ctx.GetSessionVars().EnablePlanReplayerCapture && !c.Ctx.GetSessionVars().InRestrictedSQL {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode)
}

Expand Down
9 changes: 9 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ func TestPlanReplayer(t *testing.T) {
require.Len(t, rows, 1)
}

func TestPlanReplayerCapture(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("plan replayer capture '123' '123';")
tk.MustQuery("select sql_digest, plan_digest from mysql.plan_replayer_task;").Check(testkit.Rows("123 123"))
tk.MustGetErrMsg("plan replayer capture '123' '123';", "plan replayer capture task already exists")
}

func TestShow(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
36 changes: 34 additions & 2 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -43,8 +44,15 @@ var _ Executor = &PlanReplayerLoadExec{}
// PlanReplayerExec represents a plan replayer executor.
type PlanReplayerExec struct {
baseExecutor
DumpInfo *PlanReplayerDumpInfo
endFlag bool
CaptureInfo *PlanReplayerCaptureInfo
DumpInfo *PlanReplayerDumpInfo
endFlag bool
}

// PlanReplayerCaptureInfo indicates capture info
type PlanReplayerCaptureInfo struct {
SQLDigest string
PlanDigest string
}

// PlanReplayerDumpInfo indicates dump info
Expand All @@ -63,6 +71,9 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.endFlag {
return nil
}
if e.CaptureInfo != nil {
return e.registerCaptureTask(ctx)
}
err := e.createFile()
if err != nil {
return err
Expand All @@ -89,6 +100,27 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
exists, err := domain.CheckPlanReplayerTaskExists(ctx1, e.ctx, e.CaptureInfo.SQLDigest, e.CaptureInfo.PlanDigest)
if err != nil {
return err
}
if exists {
return errors.New("plan replayer capture task already exists")
}
exec := e.ctx.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx1, fmt.Sprintf("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('%s','%s')",
e.CaptureInfo.SQLDigest, e.CaptureInfo.PlanDigest))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
return err
}
e.endFlag = true
return nil
}

func (e *PlanReplayerExec) createFile() error {
var err error
e.DumpInfo.File, e.DumpInfo.FileName, err = domain.GeneratePlanReplayerFile()
Expand Down
13 changes: 13 additions & 0 deletions parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ type PlanReplayerStmt struct {
Stmt StmtNode
Analyze bool
Load bool

// Capture indicates 'plan replayer capture <sql_digest> <plan_digest>'
Capture bool
SQLDigest string
PlanDigest string

// File is used to store 2 cases:
// 1. plan replayer load 'file';
// 2. plan replayer dump explain <analyze> 'file'
Expand All @@ -284,6 +290,13 @@ func (n *PlanReplayerStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteString(n.File)
return nil
}
if n.Capture {
ctx.WriteKeyWord("PLAN REPLAYER CAPTURE ")
ctx.WriteString(n.SQLDigest)
ctx.WriteKeyWord(" ")
ctx.WriteString(n.PlanDigest)
return nil
}
ctx.WriteKeyWord("PLAN REPLAYER DUMP EXPLAIN ")
if n.Analyze {
ctx.WriteKeyWord("ANALYZE ")
Expand Down
Loading