Skip to content

Commit

Permalink
planner: cherry pick two CTE related commit (#44306)
Browse files Browse the repository at this point in the history
ref #43759, ref #44054
  • Loading branch information
winoros committed May 31, 2023
1 parent 129de9f commit 064a4e5
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 89 deletions.
27 changes: 13 additions & 14 deletions cmd/explaintest/r/cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -562,20 +562,19 @@ create table tpk1(c1 int primary key);
insert into tpk1 values(1), (2), (3);
explain with cte1 as (select c1 from tpk) select /*+ merge_join(dt1, dt2) */ * from tpk1 dt1 inner join cte1 dt2 inner join cte1 dt3 on dt1.c1 = dt2.c1 and dt2.c1 = dt3.c1;
id estRows task access object operator info
Projection_20 10000.00 root test.tpk1.c1, test.tpk.c1, test.tpk.c1
└─HashJoin_22 10000.00 root inner join, equal:[eq(test.tpk.c1, test.tpk.c1)]
├─Selection_23(Build) 6400.00 root not(isnull(test.tpk.c1))
│ └─CTEFullScan_24 8000.00 root CTE:cte1 AS dt3 data:CTE_0
└─MergeJoin_25(Probe) 8000.00 root inner join, left key:test.tpk1.c1, right key:test.tpk.c1
├─Sort_31(Build) 6400.00 root test.tpk.c1
│ └─Selection_29 6400.00 root not(isnull(test.tpk.c1))
│ └─CTEFullScan_30 8000.00 root CTE:cte1 AS dt2 data:CTE_0
└─TableReader_27(Probe) 10000.00 root data:TableFullScan_26
└─TableFullScan_26 10000.00 cop[tikv] table:dt1 keep order:true, stats:pseudo
CTE_0 8000.00 root Non-Recursive CTE
└─Selection_13(Seed Part) 8000.00 root or(not(isnull(test.tpk.c1)), not(isnull(test.tpk.c1)))
└─TableReader_16 10000.00 root data:TableFullScan_15
└─TableFullScan_15 10000.00 cop[tikv] table:tpk keep order:false, stats:pseudo
Projection_19 12500.00 root test.tpk1.c1, test.tpk.c1, test.tpk.c1
└─HashJoin_21 12500.00 root inner join, equal:[eq(test.tpk.c1, test.tpk.c1)]
├─Selection_22(Build) 8000.00 root not(isnull(test.tpk.c1))
│ └─CTEFullScan_23 10000.00 root CTE:cte1 AS dt3 data:CTE_0
└─MergeJoin_24(Probe) 10000.00 root inner join, left key:test.tpk1.c1, right key:test.tpk.c1
├─Sort_30(Build) 8000.00 root test.tpk.c1
│ └─Selection_28 8000.00 root not(isnull(test.tpk.c1))
│ └─CTEFullScan_29 10000.00 root CTE:cte1 AS dt2 data:CTE_0
└─TableReader_26(Probe) 10000.00 root data:TableFullScan_25
└─TableFullScan_25 10000.00 cop[tikv] table:dt1 keep order:true, stats:pseudo
CTE_0 10000.00 root Non-Recursive CTE
└─TableReader_15(Seed Part) 10000.00 root data:TableFullScan_14
└─TableFullScan_14 10000.00 cop[tikv] table:tpk keep order:false, stats:pseudo
with cte1 as (select c1 from tpk) select /*+ merge_join(dt1, dt2) */ * from tpk1 dt1 inner join cte1 dt2 inner join cte1 dt3 on dt1.c1 = dt2.c1 and dt2.c1 = dt3.c1;
c1 c1 c1
1 1 1
Expand Down
43 changes: 20 additions & 23 deletions cmd/explaintest/r/explain_cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ CTE_0 2.00 root Recursive CTE
└─CTETable_17 1.00 root Scan on CTE_0
explain with cte(a) as (with recursive cte1(a) as (select 1 union select a + 1 from cte1 where a < 10) select * from cte1) select * from cte t1, cte t2;
id estRows task access object operator info
HashJoin_26 2.56 root CARTESIAN inner join
├─CTEFullScan_29(Build) 1.60 root CTE:cte AS t2 data:CTE_0
└─CTEFullScan_28(Probe) 1.60 root CTE:cte AS t1 data:CTE_0
CTE_0 1.60 root Non-Recursive CTE
└─Selection_21(Seed Part) 1.60 root 1
└─CTEFullScan_23 2.00 root CTE:cte1 data:CTE_1
HashJoin_25 4.00 root CARTESIAN inner join
├─CTEFullScan_28(Build) 2.00 root CTE:cte AS t2 data:CTE_0
└─CTEFullScan_27(Probe) 2.00 root CTE:cte AS t1 data:CTE_0
CTE_0 2.00 root Non-Recursive CTE
└─CTEFullScan_22(Seed Part) 2.00 root CTE:cte1 data:CTE_1
CTE_1 2.00 root Recursive CTE
├─Projection_16(Seed Part) 1.00 root 1->Column#2
│ └─TableDual_17 1.00 root rows:1
Expand All @@ -91,15 +90,13 @@ CTE_1 8001.00 root Recursive CTE
└─CTETable_33 10000.00 root Scan on CTE_1
explain with q(a,b) as (select * from t1) select /*+ merge(q) no_merge(q1) */ * from q, q q1 where q.a=1 and q1.a=2;
id estRows task access object operator info
HashJoin_19 40960000.00 root CARTESIAN inner join
├─Selection_23(Build) 6400.00 root eq(test.t1.c1, 2)
│ └─CTEFullScan_24 8000.00 root CTE:q AS q1 data:CTE_0
└─Selection_21(Probe) 6400.00 root eq(test.t1.c1, 1)
└─CTEFullScan_22 8000.00 root CTE:q data:CTE_0
CTE_0 8000.00 root Non-Recursive CTE
└─Selection_11(Seed Part) 8000.00 root or(eq(test.t1.c1, 1), eq(test.t1.c1, 2))
└─TableReader_14 10000.00 root data:TableFullScan_13
└─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
HashJoin_15 2.56 root CARTESIAN inner join
├─Selection_19(Build) 1.60 root eq(test.t1.c1, 2)
│ └─CTEFullScan_20 2.00 root CTE:q AS q1 data:CTE_0
└─Selection_17(Probe) 1.60 root eq(test.t1.c1, 1)
└─CTEFullScan_18 2.00 root CTE:q data:CTE_0
CTE_0 2.00 root Non-Recursive CTE
└─Batch_Point_Get_12(Seed Part) 2.00 root table:t1 handle:[1 2], keep order:false, desc:false
explain with recursive cte(a,b) as (select 1, concat('a', 1) union select a+1, concat(b, 1) from cte where a < 5) select * from cte;
id estRows task access object operator info
CTEFullScan_17 2.00 root CTE:cte data:CTE_0
Expand Down Expand Up @@ -458,14 +455,14 @@ select v1.tps v1_tps,v2.tps v2_tps
from version1 v1, version2 v2
where v1.bench_type =v2.bench_type;
id estRows task access object operator info
HashJoin 8000.00 root inner join, equal:[eq(test.t1.bench_type, test.t1.bench_type)]
├─Selection(Build) 6400.00 root eq(test.t1.version, "6.0.0"), not(isnull(test.t1.bench_type))
│ └─CTEFullScan 8000.00 root CTE:all_data data:CTE_0
└─Selection(Probe) 6400.00 root eq(test.t1.version, "5.4.0"), not(isnull(test.t1.bench_type))
└─CTEFullScan 8000.00 root CTE:all_data data:CTE_0
CTE_0 8000.00 root Non-Recursive CTE
└─Selection(Seed Part) 8000.00 root or(and(eq(test.t1.version, "5.4.0"), not(isnull(test.t1.bench_type))), and(eq(test.t1.version, "6.0.0"), not(isnull(test.t1.bench_type))))
└─TableReader 10000.00 root data:TableFullScan
HashJoin 19.97 root inner join, equal:[eq(test.t1.bench_type, test.t1.bench_type)]
├─Selection(Build) 15.98 root eq(test.t1.version, "6.0.0"), not(isnull(test.t1.bench_type))
│ └─CTEFullScan 19.97 root CTE:all_data data:CTE_0
└─Selection(Probe) 15.98 root eq(test.t1.version, "5.4.0"), not(isnull(test.t1.bench_type))
└─CTEFullScan 19.97 root CTE:all_data data:CTE_0
CTE_0 19.97 root Non-Recursive CTE
└─TableReader(Seed Part) 19.97 root data:Selection
└─Selection 19.97 cop[tikv] or(and(eq(test.t1.version, "5.4.0"), not(isnull(test.t1.bench_type))), and(eq(test.t1.version, "6.0.0"), not(isnull(test.t1.bench_type))))
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
drop table if exists tbl;
create table tbl (id int);
Expand Down
2 changes: 1 addition & 1 deletion executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func TestCheckActRowsWithUnistore(t *testing.T) {
},
{
sql: "with cte(a) as (select a from t_unistore_act_rows) select (select 1 from cte limit 1) from cte;",
expected: []string{"4", "4", "4", "4", "4"},
expected: []string{"4", "1", "1", "1", "4", "4", "4", "4", "4"},
},
{
sql: "select a, row_number() over (partition by b) from t_unistore_act_rows;",
Expand Down
1 change: 1 addition & 0 deletions executor/oomtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_test(
srcs = ["oom_test.go"],
flaky = True,
race = "on",
shard_count = 3,
deps = [
"//testkit",
"//testkit/testsetup",
Expand Down
16 changes: 14 additions & 2 deletions planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ func (er *expressionRewriter) handleExistSubquery(ctx context.Context, v *ast.Ex
semiJoinRewrite = false
}

if er.b.disableSubQueryPreprocessing || len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 {
if er.b.disableSubQueryPreprocessing || len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 || hasCTEConsumerInSubPlan(np) {
er.p, er.err = er.b.buildSemiApply(er.p, np, nil, er.asScalar, v.Not, semiJoinRewrite, noDecorrelate)
if er.err != nil || !er.asScalar {
return v, true
Expand Down Expand Up @@ -1052,7 +1052,7 @@ func (er *expressionRewriter) handleScalarSubquery(ctx context.Context, v *ast.S
noDecorrelate = false
}

if er.b.disableSubQueryPreprocessing || len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 {
if er.b.disableSubQueryPreprocessing || len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 || hasCTEConsumerInSubPlan(np) {
er.p = er.b.buildApplyWithJoinType(er.p, np, LeftOuterJoin, noDecorrelate)
if np.Schema().Len() > 1 {
newCols := make([]expression.Expression, 0, np.Schema().Len())
Expand Down Expand Up @@ -1110,6 +1110,18 @@ func (er *expressionRewriter) handleScalarSubquery(ctx context.Context, v *ast.S
return v, true
}

func hasCTEConsumerInSubPlan(p LogicalPlan) bool {
if _, ok := p.(*LogicalCTE); ok {
return true
}
for _, child := range p.Children() {
if hasCTEConsumerInSubPlan(child) {
return true
}
}
return false
}

// Leave implements Visitor interface.
func (er *expressionRewriter) Leave(originInNode ast.Node) (retNode ast.Node, ok bool) {
if er.err != nil {
Expand Down
14 changes: 14 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8272,3 +8272,17 @@ func TestIssue41458(t *testing.T) {
require.Equalf(t, expectedRes[i], op, fmt.Sprintf("Mismatch at index %d.", i))
}
}

func TestIssue43645(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t1(id int,col1 varchar(10),col2 varchar(10),col3 varchar(10));")
tk.MustExec("CREATE TABLE t2(id int,col1 varchar(10),col2 varchar(10),col3 varchar(10));")
tk.MustExec("INSERT INTO t1 values(1,NULL,NULL,null),(2,NULL,NULL,null),(3,NULL,NULL,null);")
tk.MustExec("INSERT INTO t2 values(1,'a','aa','aaa'),(2,'b','bb','bbb'),(3,'c','cc','ccc');")

rs := tk.MustQuery("WITH tmp AS (SELECT t2.* FROM t2) select (SELECT tmp.col1 FROM tmp WHERE tmp.id=t1.id ) col1, (SELECT tmp.col2 FROM tmp WHERE tmp.id=t1.id ) col2, (SELECT tmp.col3 FROM tmp WHERE tmp.id=t1.id ) col3 from t1;")
rs.Sort().Check(testkit.Rows("a aa aaa", "b bb bbb", "c cc ccc"))
}
7 changes: 7 additions & 0 deletions planner/core/issuetest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "issuetest_test",
srcs = ["planner_issue_test.go"],
deps = ["//testkit"],
)
35 changes: 35 additions & 0 deletions planner/core/issuetest/planner_issue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package issuetest

import (
"testing"

"github.com/pingcap/tidb/testkit"
)

func TestIssue44051(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t1(id int,col1 varchar(10),col2 varchar(10),col3 varchar(10));")
tk.MustExec("CREATE TABLE t2(id int,col1 varchar(10),col2 varchar(10),col3 varchar(10));")
tk.MustExec("INSERT INTO t1 values(1,NULL,NULL,null),(2,NULL,NULL,null),(3,NULL,NULL,null);")
tk.MustExec("INSERT INTO t2 values(1,'a','aa','aaa'),(2,'b','bb','bbb'),(3,'c','cc','ccc');")

rs := tk.MustQuery("WITH tmp AS (SELECT t2.* FROM t2) SELECT * FROM t1 WHERE t1.id = (select id from tmp where id = 1) or t1.id = (select id from tmp where id = 2) or t1.id = (select id from tmp where id = 3)")
rs.Sort().Check(testkit.Rows("1 <nil> <nil> <nil>", "2 <nil> <nil> <nil>", "3 <nil> <nil> <nil>"))
}
23 changes: 21 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,12 +959,31 @@ func (b *PlanBuilder) buildSet(ctx context.Context, v *ast.SetStmt) (Plan, error
char, col := b.ctx.GetSessionVars().GetCharsetInfo()
vars.Value = ast.NewValueExpr(cn.Name.Name.O, char, col)
}
mockTablePlan := LogicalTableDual{}.Init(b.ctx, b.getSelectOffset())
// The mocked plan need one output for the complex cases.
// See the following IF branch.
mockTablePlan := LogicalTableDual{RowCount: 1}.Init(b.ctx, b.getSelectOffset())
var err error
assign.Expr, _, err = b.rewrite(ctx, vars.Value, mockTablePlan, nil, true)
var possiblePlan LogicalPlan
assign.Expr, possiblePlan, err = b.rewrite(ctx, vars.Value, mockTablePlan, nil, true)
if err != nil {
return nil, err
}
// It's possible that the subquery of the SET_VAR is a complex one so we need to get the result by evaluating the plan.
if _, ok := possiblePlan.(*LogicalTableDual); !ok {
physicalPlan, _, err := DoOptimize(ctx, b.ctx, b.optFlag, possiblePlan)
if err != nil {
return nil, err
}
row, err := EvalSubqueryFirstRow(ctx, physicalPlan, b.is, b.ctx)
if err != nil {
return nil, err
}
constant := &expression.Constant{
Value: row[0],
RetType: assign.Expr.GetType(),
}
assign.Expr = constant
}
} else {
assign.IsDefault = true
}
Expand Down
19 changes: 16 additions & 3 deletions planner/core/rule_predicate_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,13 +993,26 @@ func (p *LogicalCTE) PredicatePushDown(predicates []expression.Expression, _ *lo
if !p.isOuterMostCTE {
return predicates, p.self
}
if len(predicates) == 0 {
pushedPredicates := make([]expression.Expression, len(predicates))
copy(pushedPredicates, predicates)
// The filter might change the correlated status of the cte.
// We forbid the push down that makes the change for now.
// Will support it later.
if !p.cte.IsInApply {
for i := len(pushedPredicates) - 1; i >= 0; i-- {
if len(expression.ExtractCorColumns(pushedPredicates[i])) == 0 {
continue
}
pushedPredicates = append(pushedPredicates[0:i], pushedPredicates[i+1:]...)
}
}
if len(pushedPredicates) == 0 {
p.cte.pushDownPredicates = append(p.cte.pushDownPredicates, expression.NewOne())
return predicates, p.self
}
newPred := make([]expression.Expression, 0, len(predicates))
for i := range predicates {
newPred = append(newPred, predicates[i].Clone())
for i := range pushedPredicates {
newPred = append(newPred, pushedPredicates[i].Clone())
ResolveExprAndReplace(newPred[i], p.cte.ColumnMap)
}
p.cte.pushDownPredicates = append(p.cte.pushDownPredicates, expression.ComposeCNFCondition(p.ctx, newPred...))
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,7 @@ func (p *LogicalCTE) DeriveStats(_ []*property.StatsInfo, selfSchema *expression
newSel.SetChildren(p.cte.seedPartLogicalPlan)
p.cte.seedPartLogicalPlan = newSel
}
p.cte.seedPartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.ctx, p.cte.optFlag, p.cte.seedPartLogicalPlan)
p.cte.seedPartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.ctx, p.cte.optFlag|flagPredicatePushDown, p.cte.seedPartLogicalPlan)
if err != nil {
return nil, err
}
Expand Down
24 changes: 2 additions & 22 deletions planner/core/testdata/flat_plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -265,21 +265,11 @@
{
"Depth": 2,
"Label": 0,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 3,
"Label": 0,
"IsRoot": false,
"StoreType": 0,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": " ",
"TextTreeIndent": "",
"IsLastChild": true
}
],
Expand Down Expand Up @@ -307,21 +297,11 @@
{
"Depth": 2,
"Label": 0,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 3,
"Label": 0,
"IsRoot": false,
"StoreType": 0,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": " ",
"TextTreeIndent": "",
"IsLastChild": true
}
]
Expand Down
39 changes: 18 additions & 21 deletions planner/core/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -2181,26 +2181,24 @@
{
"SQL": "with cte as (with cte as (select 1) select * from cte) select * from cte a, cte b; -- inline inner cte, cannot be inlined outer cte",
"Plan": [
"HashJoin 0.64 root CARTESIAN inner join",
"├─CTEFullScan(Build) 0.80 root CTE:cte AS b data:CTE_0",
"└─CTEFullScan(Probe) 0.80 root CTE:cte AS a data:CTE_0",
"CTE_0 0.80 root Non-Recursive CTE",
"└─Selection(Seed Part) 0.80 root 1",
" └─Projection 1.00 root 1->Column#3",
" └─TableDual 1.00 root rows:1"
"HashJoin 1.00 root CARTESIAN inner join",
"├─CTEFullScan(Build) 1.00 root CTE:cte AS b data:CTE_0",
"└─CTEFullScan(Probe) 1.00 root CTE:cte AS a data:CTE_0",
"CTE_0 1.00 root Non-Recursive CTE",
"└─Projection(Seed Part) 1.00 root 1->Column#3",
" └─TableDual 1.00 root rows:1"
],
"Warning": null
},
{
"SQL": "with cte1 as (select 1), cte2 as (with cte3 as (select * from cte1) select * from cte3) select * from cte1, cte2; -- inline cte2, cte3, cannot be inlined cte1",
"Plan": [
"HashJoin 0.64 root CARTESIAN inner join",
"├─CTEFullScan(Build) 0.80 root CTE:cte1 data:CTE_0",
"└─CTEFullScan(Probe) 0.80 root CTE:cte1 data:CTE_0",
"CTE_0 0.80 root Non-Recursive CTE",
"└─Selection(Seed Part) 0.80 root 1",
" └─Projection 1.00 root 1->Column#1",
" └─TableDual 1.00 root rows:1"
"HashJoin 1.00 root CARTESIAN inner join",
"├─CTEFullScan(Build) 1.00 root CTE:cte1 data:CTE_0",
"└─CTEFullScan(Probe) 1.00 root CTE:cte1 data:CTE_0",
"CTE_0 1.00 root Non-Recursive CTE",
"└─Projection(Seed Part) 1.00 root 1->Column#1",
" └─TableDual 1.00 root rows:1"
],
"Warning": null
},
Expand All @@ -2215,13 +2213,12 @@
{
"SQL": "with cte1 as (select 1), cte2 as (select * from cte1) select * from cte2 a, cte2 b; -- inline cte1, cannot be inlined cte2",
"Plan": [
"HashJoin 0.64 root CARTESIAN inner join",
"├─CTEFullScan(Build) 0.80 root CTE:cte2 AS b data:CTE_1",
"└─CTEFullScan(Probe) 0.80 root CTE:cte2 AS a data:CTE_1",
"CTE_1 0.80 root Non-Recursive CTE",
"└─Selection(Seed Part) 0.80 root 1",
" └─Projection 1.00 root 1->Column#3",
" └─TableDual 1.00 root rows:1"
"HashJoin 1.00 root CARTESIAN inner join",
"├─CTEFullScan(Build) 1.00 root CTE:cte2 AS b data:CTE_1",
"└─CTEFullScan(Probe) 1.00 root CTE:cte2 AS a data:CTE_1",
"CTE_1 1.00 root Non-Recursive CTE",
"└─Projection(Seed Part) 1.00 root 1->Column#3",
" └─TableDual 1.00 root rows:1"
],
"Warning": null
},
Expand Down

0 comments on commit 064a4e5

Please sign in to comment.