Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: let plan replayer record table tiflash replica #37336

Merged
merged 12 commits into from
Aug 24, 2022
7 changes: 7 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -140,12 +141,18 @@ func TestLoadStats(t *testing.T) {
}

func TestPlanReplayer(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount"))
}()
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx_a(a))")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("plan replayer dump explain select * from t where a=10")
tk.MustExec("plan replayer dump explain select /*+ read_from_storage(tiflash[t]) */ * from t")

tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
Expand Down
118 changes: 107 additions & 11 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"

Expand All @@ -48,6 +49,17 @@ import (
var _ Executor = &PlanReplayerSingleExec{}
var _ Executor = &PlanReplayerLoadExec{}

const (
configFile = "config.toml"
metaFile = "meta.txt"
variablesFile = "variables.toml"
sqlFile = "sqls.sql"
tiFlashReplicasFile = "table_tiflash_replica.txt"
sessionBindingFile = "session_bindings.sql"
globalBindingFile = "global_bindings.sql"
explainFile = "explain.txt"
)

// PlanReplayerSingleExec represents a plan replayer executor.
type PlanReplayerSingleExec struct {
baseExecutor
Expand Down Expand Up @@ -165,6 +177,7 @@ func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) err
| |-stats2.json
| |-....
|-config.toml
|-table_tiflash_replica.txt
|-variables.toml
|-bindings.sql
|-sqls.sql
Expand Down Expand Up @@ -232,6 +245,11 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
return "", err
}

// Dump tables tiflash replicas
if err = dumpTiFlashReplica(e.ctx, zw, pairs); err != nil {
return "", err
}

// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return "", err
Expand All @@ -243,7 +261,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
}

// Dump sql
sql, err := zw.Create("sqls.sql")
sql, err := zw.Create(sqlFile)
if err != nil {
return "", nil
}
Expand Down Expand Up @@ -271,7 +289,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
}

func dumpConfig(zw *zip.Writer) error {
cf, err := zw.Create("config.toml")
cf, err := zw.Create(configFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -282,7 +300,7 @@ func dumpConfig(zw *zip.Writer) error {
}

func dumpMeta(zw *zip.Writer) error {
mt, err := zw.Create("meta.txt")
mt, err := zw.Create(metaFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -293,6 +311,31 @@ func dumpMeta(zw *zip.Writer) error {
return nil
}

func dumpTiFlashReplica(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair]struct{}) error {
bf, err := zw.Create(tiFlashReplicasFile)
if err != nil {
return errors.AddStack(err)
}
is := domain.GetDomain(ctx).InfoSchema()
for pair := range pairs {
dbName := model.NewCIStr(pair.DBName)
tableName := model.NewCIStr(pair.TableName)
t, err := is.TableByName(dbName, tableName)
if err != nil {
logutil.BgLogger().Warn("failed to find table info", zap.Error(err),
zap.String("dbName", dbName.L), zap.String("tableName", tableName.L))
continue
}
if t.Meta().TiFlashReplica != nil && t.Meta().TiFlashReplica.Count > 0 {
row := []string{
pair.DBName, pair.TableName, strconv.FormatUint(t.Meta().TiFlashReplica.Count, 10),
}
fmt.Fprintf(bf, "%s\n", strings.Join(row, "\t"))
}
}
return nil
}

func dumpSchemas(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair]struct{}) error {
for pair := range pairs {
err := getShowCreateTable(pair, zw, ctx)
Expand Down Expand Up @@ -338,7 +381,7 @@ func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
if err != nil {
return err
}
vf, err := zw.Create("variables.toml")
vf, err := zw.Create(variablesFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -365,7 +408,7 @@ func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error {
if err != nil {
return err
}
bf, err := zw.Create("session_bindings.sql")
bf, err := zw.Create(sessionBindingFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -389,7 +432,7 @@ func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error {
if err != nil {
return err
}
bf, err := zw.Create("global_bindings.sql")
bf, err := zw.Create(globalBindingFile)
if err != nil {
return errors.AddStack(err)
}
Expand Down Expand Up @@ -424,7 +467,7 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, sql string, isAnalyze b
if err != nil {
return err
}
fw, err := zw.Create("explain.txt")
fw, err := zw.Create(explainFile)
if err != nil {
return errors.AddStack(err)
}
Expand Down Expand Up @@ -604,9 +647,48 @@ func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error
return nil
}

func loadSetTiFlashReplica(ctx sessionctx.Context, z *zip.Reader) error {
for _, zipFile := range z.File {
if strings.Compare(zipFile.Name, tiFlashReplicasFile) == 0 {
v, err := zipFile.Open()
if err != nil {
return errors.AddStack(err)
}
//nolint: errcheck,all_revive
defer v.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(v)
if err != nil {
return errors.AddStack(err)
}
rows := strings.Split(buf.String(), "\n")
for _, row := range rows {
if len(row) < 1 {
continue
}
r := strings.Split(row, "\t")
if len(r) < 3 {
logutil.BgLogger().Debug("plan replayer: skip error",
zap.Error(errors.New("setting tiflash replicas failed")))
continue
}
dbName := r[0]
tableName := r[1]
c := context.Background()
// Though we record tiflash replica in txt, we only set 1 tiflash replica as it's enough for reproduce the plan
sql := fmt.Sprintf("alter table %s.%s set tiflash replica 1", dbName, tableName)
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sql)
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
}
}
}
return nil
}

func loadVariables(ctx sessionctx.Context, z *zip.Reader) error {
unLoadVars := make([]string, 0)
for _, zipFile := range z.File {
if strings.Compare(zipFile.Name, "variables.toml") == 0 {
if strings.Compare(zipFile.Name, variablesFile) == 0 {
varMap := make(map[string]string)
v, err := zipFile.Open()
if err != nil {
Expand All @@ -622,20 +704,28 @@ func loadVariables(ctx sessionctx.Context, z *zip.Reader) error {
for name, value := range varMap {
sysVar := variable.GetSysVar(name)
if sysVar == nil {
return variable.ErrUnknownSystemVar.GenWithStackByArgs(name)
unLoadVars = append(unLoadVars, name)
logutil.BgLogger().Warn(fmt.Sprintf("skip set variable %s:%s", name, value), zap.Error(err))
continue
}
sVal, err := sysVar.Validate(vars, value, variable.ScopeSession)
if err != nil {
logutil.BgLogger().Debug(fmt.Sprintf("skip variable %s:%s", name, value), zap.Error(err))
unLoadVars = append(unLoadVars, name)
logutil.BgLogger().Warn(fmt.Sprintf("skip variable %s:%s", name, value), zap.Error(err))
continue
}
err = vars.SetSystemVar(name, sVal)
if err != nil {
return err
unLoadVars = append(unLoadVars, name)
logutil.BgLogger().Warn(fmt.Sprintf("skip set variable %s:%s", name, value), zap.Error(err))
continue
}
}
}
}
if len(unLoadVars) > 0 {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("variables set failed:%s", strings.Join(unLoadVars, ",")))
}
return nil
}

Expand Down Expand Up @@ -722,6 +812,12 @@ func (e *PlanReplayerLoadInfo) Update(data []byte) error {
}
}

// set tiflash replica if exists
err = loadSetTiFlashReplica(e.Ctx, z)
if err != nil {
return err
}

// build view next
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
Expand Down