Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: prune cop task for tiflash #13131

Merged
merged 17 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package core
import (
"bytes"
"fmt"

alivxxx marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/statistics"
)

Expand Down Expand Up @@ -110,11 +110,17 @@ func (p *PhysicalTableScan) ExplainInfo() string {
} else if haveCorCol {
fmt.Fprintf(buffer, ", range: decided by %v", p.AccessCondition)
} else if len(p.Ranges) > 0 {
fmt.Fprint(buffer, ", range:")
for i, idxRange := range p.Ranges {
fmt.Fprint(buffer, idxRange.String())
if i+1 < len(p.Ranges) {
fmt.Fprint(buffer, ", ")
if p.StoreType == kv.TiFlash {
// TiFlash table always use full range scan for each region,
// the Ranges here is used to prune cop task
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
fmt.Fprintf(buffer, ", range:[-inf,+inf]")
windtalker marked this conversation as resolved.
Show resolved Hide resolved
} else {
fmt.Fprint(buffer, ", range:")
for i, idxRange := range p.Ranges {
fmt.Fprint(buffer, idxRange.String())
if i+1 < len(p.Ranges) {
fmt.Fprint(buffer, ", ")
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,9 +1035,10 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper
ts.StoreType = kv.TiKV
}
if ts.StoreType == kv.TiFlash {
// append the AccessCondition to filterCondition because TiFlash only support full range scan for each
windtalker marked this conversation as resolved.
Show resolved Hide resolved
// region, do not reset ts.Ranges as it will help prune regions during `buildCopTasks`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a full range scan. Which task is pruned before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, for a table t with id as its primary key, and the id is the handle column, if user fire a query like select count(*) from t where id >= 100, then obviously, only some of the regions have the useful data. For TiKV, TiDB will only send the cop request to these regions, this "task prune" is done by passing range [100, +inf) to the buildCopTask function. For TiFlash, although it requires full range scan on every region, we can still prune the task which is sending cop request to the regions that do not have data with id >= 100

ts.filterCondition = append(ts.filterCondition, ts.AccessCondition...)
ts.AccessCondition = nil
ts.Ranges = ranger.FullIntRange(false)
}
ts.SetSchema(ds.schema)
if ts.Table.PKIsHandle {
Expand Down
44 changes: 30 additions & 14 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,38 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
}

var tasks []*copTask
appendTask := func(region RegionVerID, ranges *copRanges) {
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice
// to make sure the message can be sent successfully.
rLen := ranges.len()
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
appendTask := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) {
if req.StoreType == kv.TiKV {
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice
// to make sure the message can be sent successfully.
rLen := ranges.len()
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
tasks = append(tasks, &copTask{
region: regionWithRangeInfo.Region,
ranges: ranges.slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
})
i = nextI
}
} else if req.StoreType == kv.TiFlash {
fullRange := kv.KeyRange{StartKey: regionWithRangeInfo.StartKey, EndKey: regionWithRangeInfo.EndKey}
tasks = append(tasks, &copTask{
region: region,
ranges: ranges.slice(i, nextI),
region: regionWithRangeInfo.Region,
// TiFlash only support full range scan for the region, ignore the real ranges
// does not affect the correctness because we already merge the access range condition
// into filter condition in `getOriginalPhysicalTableScan`
ranges: &copRanges{mid: []kv.KeyRange{fullRange}},
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
})
i = nextI
}
}

Expand All @@ -257,7 +273,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
return tasks, nil
}

func splitRanges(bo *Backoffer, cache *RegionCache, ranges *copRanges, fn func(region RegionVerID, ranges *copRanges)) error {
func splitRanges(bo *Backoffer, cache *RegionCache, ranges *copRanges, fn func(regionWithRangeInfo *KeyLocation, ranges *copRanges)) error {
for ranges.len() > 0 {
loc, err := cache.LocateKey(bo, ranges.at(0).StartKey)
if err != nil {
Expand All @@ -274,7 +290,7 @@ func splitRanges(bo *Backoffer, cache *RegionCache, ranges *copRanges, fn func(r
}
// All rest ranges belong to the same region.
if i == ranges.len() {
fn(loc.Region, ranges)
fn(loc, ranges)
break
}

Expand All @@ -286,7 +302,7 @@ func splitRanges(bo *Backoffer, cache *RegionCache, ranges *copRanges, fn func(r
StartKey: r.StartKey,
EndKey: loc.EndKey,
}
fn(loc.Region, taskRanges)
fn(loc, taskRanges)

ranges = ranges.slice(i+1, ranges.len())
ranges.first = &kv.KeyRange{
Expand All @@ -296,7 +312,7 @@ func splitRanges(bo *Backoffer, cache *RegionCache, ranges *copRanges, fn func(r
} else {
// rs[i] is not in the region.
taskRanges := ranges.slice(0, i)
fn(loc.Region, taskRanges)
fn(loc, taskRanges)
ranges = ranges.slice(i, ranges.len())
}
}
Expand All @@ -309,7 +325,7 @@ func SplitRegionRanges(bo *Backoffer, cache *RegionCache, keyRanges []kv.KeyRang
ranges := copRanges{mid: keyRanges}

var ret []kv.KeyRange
appendRange := func(region RegionVerID, ranges *copRanges) {
appendRange := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) {
for i := 0; i < ranges.len(); i++ {
ret = append(ret, ranges.at(i))
}
Expand Down
53 changes: 53 additions & 0 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,50 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
bo := NewBackoffer(context.Background(), 3000)

req := &kv.Request{}
flashReq := &kv.Request{}
flashReq.StoreType = kv.TiFlash
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
Expand All @@ -69,27 +92,57 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "t")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "t")
}

func (s *testCoprocessorSuite) TestSplitRegionRanges(c *C) {
Expand Down