Skip to content

Commit

Permalink
store/copr: move row hint into key range (#40105)
Browse files Browse the repository at this point in the history
ref #39361
  • Loading branch information
you06 committed Jan 19, 2023
1 parent 280b773 commit de856d9
Show file tree
Hide file tree
Showing 12 changed files with 353 additions and 133 deletions.
30 changes: 19 additions & 11 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,16 @@ func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementCon
// SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles
// "handles" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder {
var keyRanges []kv.KeyRange
keyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
keyRanges, hints := TableHandlesToKVRanges(tid, handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

// SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges.
// handles in slice must be kv.PartitionHandle.
func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder {
keyRanges := PartitionHandlesToKVRanges(handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
keyRanges, hints := PartitionHandlesToKVRanges(handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

Expand Down Expand Up @@ -194,6 +193,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui
return builder
}

// SetKeyRangesWithHints sets "KeyRanges" for "kv.Request" with row count hints.
func (builder *RequestBuilder) SetKeyRangesWithHints(keyRanges []kv.KeyRange, hints []int) *RequestBuilder {
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

// SetWrappedKeyRanges sets "KeyRanges" for "kv.Request".
func (builder *RequestBuilder) SetWrappedKeyRanges(keyRanges *kv.KeyRanges) *RequestBuilder {
builder.Request.KeyRanges = keyRanges
Expand Down Expand Up @@ -551,7 +556,7 @@ func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc
// For continuous handles, we should merge them to a single key range.
func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []int) {
krs := make([]kv.KeyRange, 0, len(handles))
hint := make([]int, 0, len(handles))
hints := make([]int, 0, len(handles))
i := 0
for i < len(handles) {
if commonHandle, ok := handles[i].(*kv.CommonHandle); ok {
Expand All @@ -560,7 +565,7 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
EndKey: tablecodec.EncodeRowKey(tid, kv.Key(commonHandle.Encoded()).Next()),
}
krs = append(krs, ran)
hint = append(hint, 1)
hints = append(hints, 1)
i++
continue
}
Expand All @@ -576,16 +581,17 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
hint = append(hint, j-i)
hints = append(hints, j-i)
i = j
}
return krs, hint
return krs, hints
}

// PartitionHandlesToKVRanges convert ParitionHandles to kv ranges.
// Handle in slices must be kv.PartitionHandle
func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
func PartitionHandlesToKVRanges(handles []kv.Handle) ([]kv.KeyRange, []int) {
krs := make([]kv.KeyRange, 0, len(handles))
hints := make([]int, 0, len(handles))
i := 0
for i < len(handles) {
ph := handles[i].(kv.PartitionHandle)
Expand All @@ -597,6 +603,7 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
EndKey: tablecodec.EncodeRowKey(pid, append(commonHandle.Encoded(), 0)),
}
krs = append(krs, ran)
hints = append(hints, 1)
i++
continue
}
Expand All @@ -615,9 +622,10 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
startKey := tablecodec.EncodeRowKey(pid, low)
endKey := tablecodec.EncodeRowKey(pid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
hints = append(hints, j-i)
i = j
}
return krs
return krs, hints
}

// IndexRangesToKVRanges converts index ranges to "KeyRange".
Expand Down
26 changes: 13 additions & 13 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func TestTableHandlesToKVRanges(t *testing.T) {

// Build key ranges.
expect := getExpectedRanges(1, hrs)
actual, _ := TableHandlesToKVRanges(1, handles)
actual, hints := TableHandlesToKVRanges(1, handles)

// Compare key ranges and expected key ranges.
require.Equal(t, len(expect), len(actual))
require.Equal(t, hints, []int{1, 4, 2, 1, 2})
for i := range actual {
require.Equal(t, expect[i].StartKey, actual[i].StartKey)
require.Equal(t, expect[i].EndKey, actual[i].EndKey)
Expand Down Expand Up @@ -378,7 +379,7 @@ func TestRequestBuilder3(t *testing.T) {
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{
KeyRanges: kv.NewNonParitionedKeyRangesWithHint([]kv.KeyRange{
{
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Expand All @@ -395,17 +396,16 @@ func TestRequestBuilder3(t *testing.T) {
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64},
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65},
},
}),
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
FixedRowCountHint: []int{1, 4, 2, 1},
}, []int{1, 4, 2, 1}),
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
expect.Paging.MinPagingSize = paging.MinPagingSize
expect.Paging.MaxPagingSize = paging.MaxPagingSize
Expand Down
8 changes: 4 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4220,13 +4220,13 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
continue
}
handle := kv.IntHandle(content.keys[0].GetInt64())
tmp, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, tmp...)
ranges, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, ranges...)
}
} else {
for _, p := range usedPartitionList {
tmp, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, tmp...)
ranges, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, ranges...)
}
}

Expand Down
58 changes: 58 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,3 +633,61 @@ func TestCoprocessorPagingReqKeyRangeSorted(t *testing.T) {
tk.MustExec(`set @a=0x61219F79C90D3541F70E, @b=5501707547099269248, @c=0xEC43EFD30131DEA2CB8B, @d="呣丼蒢咿卻鹻铴础湜僂頃dž縍套衞陀碵碼幓9", @e="鹹楞睕堚尛鉌翡佾搁紟精廬姆燵藝潐楻翇慸嵊";`)
tk.MustExec(`execute stmt using @a,@b,@c,@d,@e;`)
}

func TestCoprocessorBatchByStore(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t(id int primary key, c1 int, c2 int, key i(c1))")
tk.MustExec(`create table t1(id int primary key, c1 int, c2 int, key i(c1)) partition by range(id) (
partition p0 values less than(10000),
partition p1 values less than (50000),
partition p2 values less than (100000))`)
for i := 0; i < 10; i++ {
tk.MustExec("insert into t values(?, ?, ?)", i*10000, i*10000, i%2)
tk.MustExec("insert into t1 values(?, ?, ?)", i*10000, i*10000, i%2)
}
tk.MustQuery("split table t between (0) and (100000) regions 20").Check(testkit.Rows("20 1"))
tk.MustQuery("split table t1 between (0) and (100000) regions 20").Check(testkit.Rows("60 1"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/setRangesPerTask", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/setRangesPerTask"))
}()
ranges := []string{
"(c1 >= 0 and c1 < 5000)",
"(c1 >= 10000 and c1 < 15000)",
"(c1 >= 20000 and c1 < 25000)",
"(c1 >= 30000 and c1 < 35000)",
"(c1 >= 40000 and c1 < 45000)",
"(c1 >= 50000 and c1 < 55000)",
"(c1 >= 60000 and c1 < 65000)",
"(c1 >= 70000 and c1 < 75000)",
"(c1 >= 80000 and c1 < 85000)",
"(c1 >= 90000 and c1 < 95000)",
}
evenRows := testkit.Rows("0 0 0", "20000 20000 0", "40000 40000 0", "60000 60000 0", "80000 80000 0")
oddRows := testkit.Rows("10000 10000 1", "30000 30000 1", "50000 50000 1", "70000 70000 1", "90000 90000 1")
reverseOddRows := testkit.Rows("90000 90000 1", "70000 70000 1", "50000 50000 1", "30000 30000 1", "10000 10000 1")
for _, table := range []string{"t", "t1"} {
baseSQL := fmt.Sprintf("select * from %s force index(i) where id < 100000 and (%s)", table, strings.Join(ranges, " or "))
for _, paging := range []string{"on", "off"} {
tk.MustExec("set session tidb_enable_paging=?", paging)
for size := 0; size < 10; size++ {
tk.MustExec("set session tidb_store_batch_size=?", size)
tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows)
tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows)
// every batched task will get region error and fallback.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/batchCopRegionError", "return"))
tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows)
tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/batchCopRegionError"))
}
}
}
}
34 changes: 26 additions & 8 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,25 +354,41 @@ func (t StoreType) Name() string {
// KeyRanges wrap the ranges for partitioned table cases.
// We might send ranges from different in the one request.
type KeyRanges struct {
ranges [][]KeyRange
ranges [][]KeyRange
rowCountHints [][]int

isPartitioned bool
}

// NewPartitionedKeyRanges constructs a new RequestRange for partitioned table.
func NewPartitionedKeyRanges(ranges [][]KeyRange) *KeyRanges {
return NewPartitionedKeyRangesWithHints(ranges, nil)
}

// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table.
func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges {
return NewNonParitionedKeyRangesWithHint(ranges, nil)
}

// NewPartitionedKeyRangesWithHints constructs a new RequestRange for partitioned table with row count hint.
func NewPartitionedKeyRangesWithHints(ranges [][]KeyRange, hints [][]int) *KeyRanges {
return &KeyRanges{
ranges: ranges,
rowCountHints: hints,
isPartitioned: true,
}
}

// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table.
func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges {
return &KeyRanges{
// NewNonParitionedKeyRangesWithHint constructs a new RequestRange for a non partitioned table with rou count hint.
func NewNonParitionedKeyRangesWithHint(ranges []KeyRange, hints []int) *KeyRanges {
rr := &KeyRanges{
ranges: [][]KeyRange{ranges},
isPartitioned: false,
}
if hints != nil {
rr.rowCountHints = [][]int{hints}
}
return rr
}

// FirstPartitionRange returns the the result of first range.
Expand Down Expand Up @@ -430,9 +446,13 @@ func (rr *KeyRanges) SortByFunc(sortFunc func(i, j KeyRange) bool) {
}

// ForEachPartitionWithErr runs the func for each partition with an error check.
func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange) error) (err error) {
func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange, []int) error) (err error) {
for i := range rr.ranges {
err = theFunc(rr.ranges[i])
var hints []int
if len(rr.rowCountHints) > i {
hints = rr.rowCountHints[i]
}
err = theFunc(rr.ranges[i], hints)
if err != nil {
return err
}
Expand Down Expand Up @@ -549,8 +569,6 @@ type Request struct {
}
// RequestSource indicates whether the request is an internal request.
RequestSource util.RequestSource
// FixedRowCountHint is the optimization hint for copr request for task scheduling.
FixedRowCountHint []int
// StoreBatchSize indicates the batch size of coprocessor in the same store.
StoreBatchSize int
// ResourceGroupName is the name of the bind resource group.
Expand Down
2 changes: 2 additions & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
Expand Down Expand Up @@ -77,6 +78,7 @@ go_test(
"//store/driver/backoff",
"//testkit/testsetup",
"//util/paging",
"//util/trxevents",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/mpp",
Expand Down
Loading

0 comments on commit de856d9

Please sign in to comment.