From 74e49085cfee82a81f7b83e49fa88554993439e7 Mon Sep 17 00:00:00 2001 From: "Zhuomin(Charming) Liu" Date: Mon, 10 Feb 2020 20:02:22 +0800 Subject: [PATCH] planner: make the read storage hint not force plan (#14644) --- cmd/explaintest/r/access_tiflash.result | 57 -------- cmd/explaintest/t/access_tiflash.test | 25 ---- planner/core/integration_test.go | 80 ++++++++++ planner/core/logical_plan_builder.go | 29 ++-- .../core/testdata/integration_suite_in.json | 23 +++ .../core/testdata/integration_suite_out.json | 137 ++++++++++++++++++ util/testutil/testutil.go | 8 + 7 files changed, 268 insertions(+), 91 deletions(-) delete mode 100644 cmd/explaintest/r/access_tiflash.result delete mode 100644 cmd/explaintest/t/access_tiflash.test diff --git a/cmd/explaintest/r/access_tiflash.result b/cmd/explaintest/r/access_tiflash.result deleted file mode 100644 index d44565ca9e6fd..0000000000000 --- a/cmd/explaintest/r/access_tiflash.result +++ /dev/null @@ -1,57 +0,0 @@ -drop table if exists t, tt; -create table t(a int, b int, index ia(a)); -desc select avg(a) from t; -id count task operator info -StreamAgg_20 1.00 root funcs:avg(Column#7, Column#8)->Column#4 -└─IndexReader_21 1.00 root index:StreamAgg_8 - └─StreamAgg_8 1.00 cop[tikv] funcs:avg(test.t.a)->Column#7 - └─IndexScan_19 10000.00 cop[tikv] table:t, index:a, range:[NULL,+inf], keep order:false, stats:pseudo -desc select /*+ read_from_storage(tiflash[t]) */ avg(a) from t; -id count task operator info -StreamAgg_16 1.00 root funcs:avg(Column#7, Column#8)->Column#4 -└─TableReader_17 1.00 root data:StreamAgg_8 - └─StreamAgg_8 1.00 cop[tiflash] funcs:count(test.t.a)->Column#7, funcs:sum(test.t.a)->Column#8 - └─TableScan_15 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo -desc select /*+ read_from_storage(tiflash[t]) */ sum(a) from t; -id count task operator info -StreamAgg_16 1.00 root funcs:sum(Column#6)->Column#4 -└─TableReader_17 1.00 root data:StreamAgg_8 - └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(test.t.a)->Column#6 - └─TableScan_15 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo -desc select /*+ read_from_storage(tiflash[t]) */ sum(a+1) from t; -id count task operator info -StreamAgg_16 1.00 root funcs:sum(Column#6)->Column#4 -└─TableReader_17 1.00 root data:StreamAgg_8 - └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(plus(test.t.a, 1))->Column#6 - └─TableScan_15 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo -desc select /*+ read_from_storage(tiflash[t]) */ sum(isnull(a)) from t; -id count task operator info -StreamAgg_16 1.00 root funcs:sum(Column#6)->Column#4 -└─TableReader_17 1.00 root data:StreamAgg_8 - └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(isnull(test.t.a))->Column#6 - └─TableScan_15 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo -create table tt(a int, b int, primary key(a)); -desc select * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55); -id count task operator info -TableReader_6 44.00 root data:TableScan_5 -└─TableScan_5 44.00 cop[tikv] table:tt, range:(1,20), [30,55), keep order:false, stats:pseudo -desc select /*+ read_from_storage(tiflash[tt]) */ * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55); -id count task operator info -TableReader_7 44.00 root data:Selection_6 -└─Selection_6 44.00 cop[tiflash] or(and(gt(test.tt.a, 1), lt(test.tt.a, 20)), and(ge(test.tt.a, 30), lt(test.tt.a, 55))) - └─TableScan_5 44.00 cop[tiflash] table:tt, range:[-inf,+inf], keep order:false, stats:pseudo -drop table if exists ttt; -create table ttt (a int, primary key (a desc)); -desc select * from ttt order by ttt.a desc; -id count task operator info -TableReader_11 10000.00 root data:TableScan_10 -└─TableScan_10 10000.00 cop[tikv] table:ttt, range:[-inf,+inf], keep order:true, desc, stats:pseudo -desc select /*+ read_from_storage(tiflash[ttt]) */ * from ttt order by ttt.a desc; -id count task operator info -Sort_4 10000.00 root test.ttt.a:desc -└─TableReader_8 10000.00 root data:TableScan_7 - └─TableScan_7 10000.00 cop[tiflash] table:ttt, range:[-inf,+inf], keep order:false, stats:pseudo -desc select /*+ read_from_storage(tiflash[ttt]) */ * from ttt order by ttt.a; -id count task operator info -TableReader_11 10000.00 root data:TableScan_10 -└─TableScan_10 10000.00 cop[tiflash] table:ttt, range:[-inf,+inf], keep order:true, stats:pseudo diff --git a/cmd/explaintest/t/access_tiflash.test b/cmd/explaintest/t/access_tiflash.test deleted file mode 100644 index 76604cdf0bfc4..0000000000000 --- a/cmd/explaintest/t/access_tiflash.test +++ /dev/null @@ -1,25 +0,0 @@ -drop table if exists t, tt; - -create table t(a int, b int, index ia(a)); - -desc select avg(a) from t; - -desc select /*+ read_from_storage(tiflash[t]) */ avg(a) from t; - -desc select /*+ read_from_storage(tiflash[t]) */ sum(a) from t; - -desc select /*+ read_from_storage(tiflash[t]) */ sum(a+1) from t; - -desc select /*+ read_from_storage(tiflash[t]) */ sum(isnull(a)) from t; - -create table tt(a int, b int, primary key(a)); - -desc select * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55); -desc select /*+ read_from_storage(tiflash[tt]) */ * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55); - -drop table if exists ttt; -create table ttt (a int, primary key (a desc)); - -desc select * from ttt order by ttt.a desc; -desc select /*+ read_from_storage(tiflash[ttt]) */ * from ttt order by ttt.a desc; -desc select /*+ read_from_storage(tiflash[ttt]) */ * from ttt order by ttt.a; diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index cded25fea83f3..e9c96ad0dabd3 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -311,6 +311,86 @@ func (s *testIntegrationSuite) TestSelPushDownTiFlash(c *C) { )) } +func (s *testIntegrationSuite) TestReadFromStorageHint(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t, tt, ttt") + tk.MustExec("create table t(a int, b int, index ia(a))") + tk.MustExec("create table tt(a int, b int, primary key(a))") + tk.MustExec("create table ttt(a int, primary key (a desc))") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + c.Assert(s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()), DeepEquals, output[i].Warn) + } +} + +func (s *testIntegrationSuite) TestReadFromStorageHintAndIsolationRead(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t, tt, ttt") + tk.MustExec("create table t(a int, b int, index ia(a))") + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tikv\"") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + tk.Se.GetSessionVars().StmtCtx.SetWarnings(nil) + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + c.Assert(s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()), DeepEquals, output[i].Warn) + } +} + func (s *testIntegrationSuite) TestPartitionTableStats(c *C) { tk := testkit.NewTestKit(c, s.store) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 970406f0ae6f1..5d03df3c85f7c 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/planner/property" - "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" @@ -440,7 +439,19 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { alias = &hintTableInfo{dbName: ds.DBName, tblName: ds.tableInfo.Name, selectOffset: ds.SelectBlockOffset()} } if hintInfo.ifPreferTiKV(alias) { - ds.preferStoreType |= preferTiKV + for _, path := range ds.possibleAccessPaths { + if path.StoreType == kv.TiKV { + ds.preferStoreType |= preferTiKV + break + } + } + if ds.preferStoreType == 0 { + errMsg := fmt.Sprintf("No available path for table %s.%s with the store type %s of the hint /*+ read_from_storage */, "+ + "please check the status of the table replica and variable value of tidb_isolation_read_engines(%v)", + ds.DBName.O, ds.table.Meta().Name.O, kv.TiKV.Name(), ds.ctx.GetSessionVars().GetIsolationReadEngines()) + warning := ErrInternal.GenWithStack(errMsg) + ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning) + } } if hintInfo.ifPreferTiFlash(alias) { if ds.preferStoreType != 0 { @@ -451,18 +462,18 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { ds.preferStoreType = 0 return } - ds.preferStoreType |= preferTiFlash - hasTiFlashPath := false for _, path := range ds.possibleAccessPaths { if path.StoreType == kv.TiFlash { - hasTiFlashPath = true + ds.preferStoreType |= preferTiFlash break } } - // TODO: For now, if there is a TiFlash hint for a table, we enforce a TiFlash path. But hint is just a suggestion - // for the planner. We can keep it since we need it to debug with PD and TiFlash. In future, this should be removed. - if !hasTiFlashPath { - ds.possibleAccessPaths = append(ds.possibleAccessPaths, &util.AccessPath{IsTablePath: true, StoreType: kv.TiFlash}) + if ds.preferStoreType == 0 { + errMsg := fmt.Sprintf("No available path for table %s.%s with the store type %s of the hint /*+ read_from_storage */, "+ + "please check the status of the table replica and variable value of tidb_isolation_read_engines(%v)", + ds.DBName.O, ds.table.Meta().Name.O, kv.TiFlash.Name(), ds.ctx.GetSessionVars().GetIsolationReadEngines()) + warning := ErrInternal.GenWithStack(errMsg) + ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning) } } } diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index 45ae693ab8be2..11c0c1a5c268b 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -57,5 +57,28 @@ "cases": [ "explain select /*+ USE_INDEX_MERGE(t, a, b) */ * from t where a = 1 or b = 2" ] + }, + { + "name": "TestReadFromStorageHint", + "cases": [ + "desc select avg(a) from t", + "desc select /*+ read_from_storage(tiflash[t]) */ avg(a) from t", + "desc select /*+ read_from_storage(tiflash[t]) */ sum(a) from t", + "desc select /*+ read_from_storage(tiflash[t]) */ sum(a+1) from t", + "desc select /*+ read_from_storage(tiflash[t]) */ sum(isnull(a)) from t", + "desc select * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55)", + "desc select /*+ read_from_storage(tiflash[tt]) */ * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55)", + "desc select * from ttt order by ttt.a desc", + "desc select /*+ read_from_storage(tiflash[ttt]) */ * from ttt order by ttt.a desc", + "desc select /*+ read_from_storage(tiflash[ttt]) */ * from ttt order by ttt.a" + ] + }, + { + "name": "TestReadFromStorageHintAndIsolationRead", + "cases": [ + "desc select /*+ read_from_storage(tikv[t], tiflash[t]) */ avg(a) from t", + "desc select /*+ read_from_storage(tikv[t]) */ avg(a) from t", + "desc select /*+ read_from_storage(tiflash[t]) */ avg(a) from t" + ] } ] diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index 0ee991147b4f0..b86ec218f81a2 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -205,5 +205,142 @@ ] } ] + }, + { + "Name": "TestReadFromStorageHint", + "Cases": [ + { + "SQL": "desc select avg(a) from t", + "Plan": [ + "StreamAgg_24 1.00 root funcs:avg(Column#7, Column#8)->Column#4", + "└─TableReader_25 1.00 root data:StreamAgg_8", + " └─StreamAgg_8 1.00 cop[tiflash] funcs:count(test.t.a)->Column#7, funcs:sum(test.t.a)->Column#8", + " └─TableScan_22 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select /*+ read_from_storage(tiflash[t]) */ avg(a) from t", + "Plan": [ + "StreamAgg_16 1.00 root funcs:avg(Column#7, Column#8)->Column#4", + "└─TableReader_17 1.00 root data:StreamAgg_8", + " └─StreamAgg_8 1.00 cop[tiflash] funcs:count(test.t.a)->Column#7, funcs:sum(test.t.a)->Column#8", + " └─TableScan_15 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select /*+ read_from_storage(tiflash[t]) */ sum(a) from t", + "Plan": [ + "StreamAgg_16 1.00 root funcs:sum(Column#6)->Column#4", + "└─TableReader_17 1.00 root data:StreamAgg_8", + " └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(test.t.a)->Column#6", + " └─TableScan_15 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select /*+ read_from_storage(tiflash[t]) */ sum(a+1) from t", + "Plan": [ + "StreamAgg_16 1.00 root funcs:sum(Column#6)->Column#4", + "└─TableReader_17 1.00 root data:StreamAgg_8", + " └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(plus(test.t.a, 1))->Column#6", + " └─TableScan_15 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select /*+ read_from_storage(tiflash[t]) */ sum(isnull(a)) from t", + "Plan": [ + "StreamAgg_16 1.00 root funcs:sum(Column#6)->Column#4", + "└─TableReader_17 1.00 root data:StreamAgg_8", + " └─StreamAgg_8 1.00 cop[tiflash] funcs:sum(isnull(test.t.a))->Column#6", + " └─TableScan_15 10000.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55)", + "Plan": [ + "TableReader_9 44.00 root data:Selection_8", + "└─Selection_8 44.00 cop[tiflash] or(and(gt(test.tt.a, 1), lt(test.tt.a, 20)), and(ge(test.tt.a, 30), lt(test.tt.a, 55)))", + " └─TableScan_7 44.00 cop[tiflash] table:tt, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select /*+ read_from_storage(tiflash[tt]) */ * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55)", + "Plan": [ + "TableReader_7 44.00 root data:Selection_6", + "└─Selection_6 44.00 cop[tiflash] or(and(gt(test.tt.a, 1), lt(test.tt.a, 20)), and(ge(test.tt.a, 30), lt(test.tt.a, 55)))", + " └─TableScan_5 44.00 cop[tiflash] table:tt, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select * from ttt order by ttt.a desc", + "Plan": [ + "TableReader_13 10000.00 root data:TableScan_12", + "└─TableScan_12 10000.00 cop[tikv] table:ttt, range:[-inf,+inf], keep order:true, desc, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select /*+ read_from_storage(tiflash[ttt]) */ * from ttt order by ttt.a desc", + "Plan": [ + "Sort_4 10000.00 root test.ttt.a:desc", + "└─TableReader_8 10000.00 root data:TableScan_7", + " └─TableScan_7 10000.00 cop[tiflash] table:ttt, range:[-inf,+inf], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select /*+ read_from_storage(tiflash[ttt]) */ * from ttt order by ttt.a", + "Plan": [ + "TableReader_11 10000.00 root data:TableScan_10", + "└─TableScan_10 10000.00 cop[tiflash] table:ttt, range:[-inf,+inf], keep order:true, stats:pseudo" + ], + "Warn": null + } + ] + }, + { + "Name": "TestReadFromStorageHintAndIsolationRead", + "Cases": [ + { + "SQL": "desc select /*+ read_from_storage(tikv[t], tiflash[t]) */ avg(a) from t", + "Plan": [ + "StreamAgg_20 1.00 root funcs:avg(Column#7, Column#8)->Column#4", + "└─IndexReader_21 1.00 root index:StreamAgg_8", + " └─StreamAgg_8 1.00 cop[tikv] funcs:avg(test.t.a)->Column#7", + " └─IndexScan_19 10000.00 cop[tikv] table:t, index:a, range:[NULL,+inf], keep order:false, stats:pseudo" + ], + "Warn": [ + "[planner:1815]Storage hints are conflict, you can only specify one storage type of table test.t" + ] + }, + { + "SQL": "desc select /*+ read_from_storage(tikv[t]) */ avg(a) from t", + "Plan": [ + "StreamAgg_20 1.00 root funcs:avg(Column#7, Column#8)->Column#4", + "└─IndexReader_21 1.00 root index:StreamAgg_8", + " └─StreamAgg_8 1.00 cop[tikv] funcs:avg(test.t.a)->Column#7", + " └─IndexScan_19 10000.00 cop[tikv] table:t, index:a, range:[NULL,+inf], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "desc select /*+ read_from_storage(tiflash[t]) */ avg(a) from t", + "Plan": [ + "StreamAgg_20 1.00 root funcs:avg(Column#7, Column#8)->Column#4", + "└─IndexReader_21 1.00 root index:StreamAgg_8", + " └─StreamAgg_8 1.00 cop[tikv] funcs:avg(test.t.a)->Column#7", + " └─IndexScan_19 10000.00 cop[tikv] table:t, index:a, range:[NULL,+inf], keep order:false, stats:pseudo" + ], + "Warn": [ + "[planner:1815]No available path for table test.t with the store type tiflash of the hint /*+ read_from_storage */, please check the status of the table replica and variable value of tidb_isolation_read_engines(map[0:{}])" + ] + } + ] } ] diff --git a/util/testutil/testutil.go b/util/testutil/testutil.go index 6b3129a0d9f86..df29da5ea7467 100644 --- a/util/testutil/testutil.go +++ b/util/testutil/testutil.go @@ -259,6 +259,14 @@ func (t *TestData) ConvertRowsToStrings(rows [][]interface{}) (rs []string) { return rs } +// ConvertSQLWarnToStrings converts []SQLWarn to []string. +func (t *TestData) ConvertSQLWarnToStrings(warns []stmtctx.SQLWarn) (rs []string) { + for _, warn := range warns { + rs = append(rs, fmt.Sprintf(warn.Err.Error())) + } + return rs +} + // GenerateOutputIfNeeded generate the output file. func (t *TestData) GenerateOutputIfNeeded() error { if !record {