Skip to content

Commit

Permalink
planner: Support normal left/right outer MPPTask join with other cond…
Browse files Browse the repository at this point in the history
…ition to use either side as build side (#43355)

close #43347
  • Loading branch information
yibin87 committed Apr 27, 2023
1 parent 50dd8b4 commit 822767b
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 9 deletions.
56 changes: 56 additions & 0 deletions planner/core/casetest/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,62 @@ func TestMPPRightSemiJoin(t *testing.T) {
}
}

func TestMPPRightOuterJoin(t *testing.T) {
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(3))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, c int)")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2 (b int, d int)")

tk.MustExec("insert into t1 values (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);")
tk.MustExec("insert into t2 values (1, 12), (2, 18), (7, 66);")

{
tk.MustExec("alter table t1 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
}
{
tk.MustExec("alter table t2 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t2")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
}
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;")
{
var input []string
var output []struct {
SQL string
Plan []string
Warn []string
}
planSuiteData := GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
})
if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "insert") {
tk.MustExec(tt)
continue
}
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}
}

func TestHintScope(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
14 changes: 7 additions & 7 deletions planner/core/casetest/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -3832,13 +3832,13 @@
" └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 mpp[tiflash] funcs:count(1)->Column#12",
" └─HashJoin 32.00 mpp[tiflash] right outer join, equal:[eq(test.fact_t.d1_k, test.d1_t.d1_k)], right cond:gt(test.d1_t.value, 10), other cond:gt(test.fact_t.col1, test.d1_t.value)",
" ├─ExchangeReceiver(Build) 16.00 mpp[tiflash] ",
" │ └─ExchangeSender 16.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.fact_t.d1_k, collate: binary]",
" │ └─Selection 16.00 mpp[tiflash] not(isnull(test.fact_t.col1)), not(isnull(test.fact_t.d1_k))",
" │ └─TableFullScan 16.00 mpp[tiflash] table:fact_t pushed down filter:empty, keep order:false",
" └─ExchangeReceiver(Probe) 4.00 mpp[tiflash] ",
" └─ExchangeSender 4.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.d1_t.d1_k, collate: binary]",
" └─TableFullScan 4.00 mpp[tiflash] table:d1_t keep order:false"
" ├─ExchangeReceiver(Build) 4.00 mpp[tiflash] ",
" │ └─ExchangeSender 4.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.d1_t.d1_k, collate: binary]",
" │ └─TableFullScan 4.00 mpp[tiflash] table:d1_t keep order:false",
" └─ExchangeReceiver(Probe) 16.00 mpp[tiflash] ",
" └─ExchangeSender 16.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.fact_t.d1_k, collate: binary]",
" └─Selection 16.00 mpp[tiflash] not(isnull(test.fact_t.col1)), not(isnull(test.fact_t.d1_k))",
" └─TableFullScan 16.00 mpp[tiflash] table:fact_t pushed down filter:empty, keep order:false"
]
},
{
Expand Down
11 changes: 11 additions & 0 deletions planner/core/casetest/testdata/plan_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@
"explain select * from t1 where exists (select * from t2 where t1.a=t2.b)"
]
},
{
"name": "TestMPPRightOuterJoin",
"cases": [
"set @@session.tidb_allow_mpp=true",
"explain select * from t1 right join t2 on t1.a=t2.b and t1.c < t2.d",
"set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=0",
"set @@session.tidb_broadcast_join_threshold_size=0",
"set @@session.tidb_broadcast_join_threshold_count=0",
"explain select * from t1 right join t2 on t1.a=t2.b and t1.c < t2.d"
]
},
{
"name": "TestIssue37520",
"cases": [
Expand Down
55 changes: 55 additions & 0 deletions planner/core/casetest/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1629,6 +1629,61 @@
}
]
},
{
"Name": "TestMPPRightOuterJoin",
"Cases": [
{
"SQL": "set @@session.tidb_allow_mpp=true",
"Plan": null,
"Warn": null
},
{
"SQL": "explain select * from t1 right join t2 on t1.a=t2.b and t1.c < t2.d",
"Plan": [
"TableReader_30 3.00 root MppVersion: 1, data:ExchangeSender_29",
"└─ExchangeSender_29 3.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashJoin_28 3.00 mpp[tiflash] right outer join, equal:[eq(test.t1.a, test.t2.b)], other cond:lt(test.t1.c, test.t2.d)",
" ├─ExchangeReceiver_14(Build) 5.00 mpp[tiflash] ",
" │ └─ExchangeSender_13 5.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST",
" │ └─Selection_12 5.00 mpp[tiflash] not(isnull(test.t1.a)), not(isnull(test.t1.c))",
" │ └─TableFullScan_11 5.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false",
" └─TableFullScan_15(Probe) 3.00 mpp[tiflash] table:t2 keep order:false"
],
"Warn": null
},
{
"SQL": "set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=0",
"Plan": null,
"Warn": null
},
{
"SQL": "set @@session.tidb_broadcast_join_threshold_size=0",
"Plan": null,
"Warn": null
},
{
"SQL": "set @@session.tidb_broadcast_join_threshold_count=0",
"Plan": null,
"Warn": null
},
{
"SQL": "explain select * from t1 right join t2 on t1.a=t2.b and t1.c < t2.d",
"Plan": [
"TableReader_32 3.00 root MppVersion: 1, data:ExchangeSender_31",
"└─ExchangeSender_31 3.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashJoin_30 3.00 mpp[tiflash] right outer join, equal:[eq(test.t1.a, test.t2.b)], other cond:lt(test.t1.c, test.t2.d)",
" ├─ExchangeReceiver_17(Build) 3.00 mpp[tiflash] ",
" │ └─ExchangeSender_16 3.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t2.b, collate: binary]",
" │ └─TableFullScan_15 3.00 mpp[tiflash] table:t2 keep order:false",
" └─ExchangeReceiver_14(Probe) 5.00 mpp[tiflash] ",
" └─ExchangeSender_13 5.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t1.a, collate: binary]",
" └─Selection_12 5.00 mpp[tiflash] not(isnull(test.t1.a)), not(isnull(test.t1.c))",
" └─TableFullScan_11 5.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false"
],
"Warn": null
}
]
},
{
"Name": "TestIssue37520",
"Cases": [
Expand Down
4 changes: 2 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2416,8 +2416,8 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
// so we can choose the build side based on the row count, except that:
// 1. it is a broadcast join(for broadcast join, it makes sense to use the broadcast side as the build side)
// 2. or session variable MPPOuterJoinFixedBuildSide is set to true
// 3. or there are otherConditions for this join
if useBCJ || p.ctx.GetSessionVars().MPPOuterJoinFixedBuildSide || len(p.OtherConditions) > 0 {
// 3. or nullAware/cross joins
if useBCJ || p.isNAAJ() || len(p.EqualConditions) == 0 || p.ctx.GetSessionVars().MPPOuterJoinFixedBuildSide {
if !p.ctx.GetSessionVars().MPPOuterJoinFixedBuildSide {
// The hint has higher priority than variable.
fixedBuildSide = true
Expand Down

0 comments on commit 822767b

Please sign in to comment.