Skip to content

Commit

Permalink
tiny fix
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Nov 27, 2018
1 parent 5bdb46f commit 029c6fd
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 20 deletions.
11 changes: 6 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,10 +1623,11 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
readerBuilder: &dataReaderBuilder{innerPlan, b},
rowTypes: innerTypes,
},
workerWg: new(sync.WaitGroup),
joiner: newJoiner(b.ctx, v.JoinType, v.OuterIndex == 1, defaultValues, v.OtherConditions, leftTypes, rightTypes),
indexRanges: v.Ranges,
keyOff2IdxOff: v.KeyOff2IdxOff,
workerWg: new(sync.WaitGroup),
joiner: newJoiner(b.ctx, v.JoinType, v.OuterIndex == 1, defaultValues, v.OtherConditions, leftTypes, rightTypes),
indexRanges: v.Ranges,
keyOff2IdxOff: v.KeyOff2IdxOff,
nextColCompareFilters: v.CompareFilters,
}
outerKeyCols := make([]int, len(v.OuterJoinKeys))
for i := 0; i < len(v.OuterJoinKeys); i++ {
Expand Down Expand Up @@ -1966,7 +1967,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
for _, nextColRan := range nextColRanges {
for _, ran := range ranges {
ran.LowVal[lastPos] = nextColRan.LowVal[0]
ran.LowVal[lastPos] = nextColRan.HighVal[0]
ran.HighVal[lastPos] = nextColRan.HighVal[0]
ran.LowExclude = nextColRan.LowExclude
ran.HighExclude = nextColRan.HighExclude
}
Expand Down
15 changes: 8 additions & 7 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,14 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork
copiedRanges = append(copiedRanges, ran.Clone())
}
iw := &innerWorker{
innerCtx: e.innerCtx,
outerCtx: e.outerCtx,
taskCh: taskCh,
ctx: e.ctx,
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
innerCtx: e.innerCtx,
outerCtx: e.outerCtx,
taskCh: taskCh,
ctx: e.ctx,
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
nextColCompareFilters: e.nextColCompareFilters,
}
return iw
}
Expand Down
7 changes: 5 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (p *LogicalJoin) constructIndexJoin(prop *property.PhysicalProperty, innerJ
innerPlan: innerPlan,
KeyOff2IdxOff: newKeyOff,
Ranges: ranges,
compareFilters: compareFilters,
CompareFilters: compareFilters,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), chReqProps...)
join.SetSchema(p.schema)
return []PhysicalPlan{join}
Expand Down Expand Up @@ -606,6 +606,7 @@ func (cwc *ColWithCompareOps) resolveIndices(schema *expression.Schema) {

func (p *LogicalJoin) analyzeLookUpFilters(indexInfo *model.IndexInfo, innerPlan *DataSource, innerJoinKeys []*expression.Column) ([]*ranger.Range, []int, []expression.Expression, *ColWithCompareOps, error) {
idxCols, colLengths := expression.IndexInfo2Cols(innerPlan.schema.Columns, indexInfo)
log.Warnf("index cols: %v", idxCols)
if len(idxCols) == 0 {
return nil, nil, nil, nil, nil
}
Expand Down Expand Up @@ -661,12 +662,13 @@ func (p *LogicalJoin) analyzeLookUpFilters(indexInfo *model.IndexInfo, innerPlan
return ranges, idxOff2keyOff, remained, nil, nil
}
nextCol := idxCols[nextColPos]
log.Warnf("next col: %v", nextCol)
nextColCmpFilterManager := &ColWithCompareOps{
targetCol: nextCol,
affectedColSchema: expression.NewSchema(),
}
loopCandidates:
for _, filter := range rangeFilterCandidates {
for _, filter := range p.OtherConditions {
sf, ok := filter.(*expression.ScalarFunction)
if !ok || !(sf.FuncName.L == ast.LE || sf.FuncName.L == ast.LT || sf.FuncName.L == ast.GE || sf.FuncName.L == ast.GT) {
continue
Expand Down Expand Up @@ -695,6 +697,7 @@ loopCandidates:
nextColCmpFilterManager.appendNewExpr(symmetricOp[sf.FuncName.L], sf.GetArgs()[0], affectedCols)
}
}
log.Warnf("next col filters: %v", nextColCmpFilterManager.opArg)
if len(nextColCmpFilterManager.OpType) == 0 {
colAccesses, colRemained := ranger.DetachCondsForTableRange(p.ctx, rangeFilterCandidates, nextCol)
remained = append(remained, colRemained...)
Expand Down
5 changes: 3 additions & 2 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ type PhysicalIndexJoin struct {
// Ranges stores the IndexRanges when the inner plan is index scan.
Ranges []*ranger.Range
// KeyOff2IdxOff maps the offsets in join key to the offsets in the index.
KeyOff2IdxOff []int
compareFilters *ColWithCompareOps
KeyOff2IdxOff []int
// CompareFilters ... (will finish in later commit)
CompareFilters *ColWithCompareOps
}

// PhysicalMergeJoin represents merge join for inner/ outer join.
Expand Down
8 changes: 4 additions & 4 deletions planner/core/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ func (p *PhysicalIndexJoin) ResolveIndices() {
for i, expr := range p.OtherConditions {
p.OtherConditions[i] = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema))
}
if p.compareFilters != nil {
p.compareFilters.resolveIndices(p.children[p.OuterIndex].Schema())
for i := range p.compareFilters.affectedColSchema.Columns {
p.compareFilters.affectedColSchema.Columns[i] = p.compareFilters.affectedColSchema.Columns[i].ResolveIndices(p.children[p.OuterIndex].Schema()).(*expression.Column)
if p.CompareFilters != nil {
p.CompareFilters.resolveIndices(p.children[p.OuterIndex].Schema())
for i := range p.CompareFilters.affectedColSchema.Columns {
p.CompareFilters.affectedColSchema.Columns[i] = p.CompareFilters.affectedColSchema.Columns[i].ResolveIndices(p.children[p.OuterIndex].Schema()).(*expression.Column)
}
}
}
Expand Down

0 comments on commit 029c6fd

Please sign in to comment.