Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: split regions before pitr restore #39941

Merged
merged 22 commits into from
Jan 22, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,11 @@ func (rc *Client) SplitRanges(ctx context.Context,
return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv)
}

func (rc *Client) WrapLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules) LogIter {
client := split.NewSplitClient(rc.GetPDClient(), rc.GetTLSConfig(), false)
return NewLogFilesIterWithSplitHelper(iter, rules, client)
}

// RestoreSSTFiles tries to restore the files.
func (rc *Client) RestoreSSTFiles(
ctx context.Context,
Expand Down
339 changes: 339 additions & 0 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand All @@ -19,9 +20,12 @@ import (
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -428,3 +432,338 @@ func replacePrefix(s []byte, rewriteRules *RewriteRules) ([]byte, *sst.RewriteRu

return s, nil
}

const SplitThreShold = 128 * 1024 * 1024 // 128 MB

type LogSplitHelper struct {
tableSplitter map[int64]*split.SplitHelper
rules map[int64]*RewriteRules
client split.SplitClient
pool *utils.WorkerPool
eg *errgroup.Group
}

func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient) *LogSplitHelper {
return &LogSplitHelper{
tableSplitter: make(map[int64]*split.SplitHelper),
rules: rules,
client: client,
pool: utils.NewWorkerPool(16, "split region"),
eg: nil,
}
}

const splitFileThreshold = 1024 * 1024 // 1 MB

func (helper *LogSplitHelper) skipFile(file *backuppb.DataFileInfo) bool {
_, exist := helper.rules[file.TableId]
return file.Length < splitFileThreshold || file.IsMeta || !exist
}

func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) {
if helper.skipFile(file) {
return
}
splitHelper, exist := helper.tableSplitter[file.TableId]
if !exist {
splitHelper = split.NewSplitHelper()
helper.tableSplitter[file.TableId] = splitHelper
}

splitHelper.Merge(split.Valued{
Key: split.Span{
StartKey: file.StartKey,
EndKey: file.EndKey,
},
Value: file.Length,
})
}

type splitFunc = func(context.Context, *RegionSplitter, uint64, *split.RegionInfo, []split.Valued) ([]*split.RegionInfo, error)

func (helper *LogSplitHelper) splitRegionByPoints(
ctx context.Context,
regionSplitter *RegionSplitter,
initialLength uint64,
region *split.RegionInfo,
valueds []split.Valued,
) ([]*split.RegionInfo, error) {
var (
splitPoints [][]byte = make([][]byte, 0)
lastKey []byte = nil
length uint64 = initialLength
)
for _, v := range valueds {
// decode will discard ts behind the key, which results in the same key for consecutive ranges
if v.Value+length > SplitThreShold && !bytes.Equal(lastKey, v.GetStartKey()) {
_, rawKey, _ := codec.DecodeBytes(v.GetStartKey(), nil)
splitPoints = append(splitPoints, rawKey)
length = 0
}
lastKey = v.GetStartKey()
length += v.Value
}

if len(splitPoints) == 0 {
return nil, nil
}

newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints)
if errSplit != nil {
log.Warn("failed to split regions by the scaned region, retry...")
_, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil)
ranges := make([]rtree.Range, 0, len(splitPoints))
for _, point := range splitPoints {
ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point})
startKey = point
}
helper.pool.ApplyOnErrorGroup(helper.eg, func() error {
return regionSplitter.Split(ctx, ranges, nil, false, func([][]byte) {})
})
return nil, nil
}
log.Info("split the region", zap.Uint64("region-id", region.Region.Id), zap.Int("split-point-number", len(splitPoints)))
return newRegions, nil
}

// GetRewriteTableID gets rewrite table id by the rewrite rule and original table id
func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 {
tableKey := tablecodec.EncodeTablePrefix(tableID)
rule := matchOldPrefix(tableKey, rewriteRules)
tableKey = bytes.Replace(tableKey, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1)
if rule == nil {
return 0
}
return tablecodec.DecodeTableID(tableKey)
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
}

// SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region
func SplitPoint(
ctx context.Context,
tableID int64,
splitHelper *split.SplitHelper,
client split.SplitClient,
rewriteRules *RewriteRules,
splitF splitFunc,
) ([]*split.RegionInfo, error) {
// common status
var (
err error = nil
vStartKey []byte = nil
vEndKey []byte = nil
endKey []byte = codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(tableID+1))

// scatterRegions is the region array that will be scattered
scatterRegions []*split.RegionInfo = make([]*split.RegionInfo, 0)
regionSplitter *RegionSplitter = NewRegionSplitter(client)
)
// region traverse status
var (
// the region buffer of each scan
regions []*split.RegionInfo = nil
regionIndex int = 0
)
// region split status
var (
// range span +----------------+------+---+-------------+
// region span +------------------------------------+
// +initial length+ +end valued+
// regionValueds is the ranges array overlapped with `regionInfo`
regionValueds []split.Valued = nil
// regionInfo is the region to be split
regionInfo *split.RegionInfo = nil
// intialLength is the length of the part of the first range overlapped with the region
initialLength uint64 = 0
)
// range status
var (
// regionOverCount is the number of regions overlapped with the range
regionOverCount uint64 = 0
)

splitHelper.Traverse(func(v split.Valued) bool {
if v.Value == 0 {
return true
}
// use `vStartKey` and `vEndKey` to compare with region's key
vStartKey, vEndKey, err = GetRewriteEncodedKeys(v, rewriteRules)
if err != nil {
return false
}
// traverse to the first region overlapped with the range
for ; regionIndex < len(regions); regionIndex++ {
if bytes.Compare(vStartKey, regions[regionIndex].Region.EndKey) < 0 {
break
}
}
// cannot find any regions overlapped with the range
// need to scan regions again
if regionIndex == len(regions) {
regions = nil
}
regionOverCount = 0
for {
if regionIndex >= len(regions) {
var startKey []byte
if len(regions) > 0 {
// has traversed over the region buffer, should scan from the last region's end-key of the region buffer
startKey = regions[len(regions)-1].Region.EndKey
} else {
// scan from the range's start-key
startKey = vStartKey
}
// scan at most 128 regions into the region buffer
regions, err = split.ScanRegionsWithRetry(ctx, client, startKey, endKey, 128)
if err != nil {
return false
}
regionIndex = 0
}

region := regions[regionIndex]
// this region must be overlapped with the range
regionOverCount++
// the region is the last one overlapped with the range,
// should split the last recorded region,
// and then record this region as the region to be split
if bytes.Compare(vEndKey, region.Region.EndKey) < 0 {
endLength := v.Value / regionOverCount
if len(regionValueds) > 0 && regionInfo != region {
// add a part of the range as the end part
if bytes.Compare(vStartKey, regionInfo.Region.EndKey) < 0 {
regionValueds = append(regionValueds, split.NewValued(vStartKey, regionInfo.Region.EndKey, endLength))
}
// try to split the region
newRegions, errSplit := splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds)
if errSplit != nil {
err = errSplit
return false
}
scatterRegions = append(scatterRegions, newRegions...)
regionValueds = make([]split.Valued, 0)
}
if regionOverCount == 1 {
// the region completely contains the range
regionValueds = append(regionValueds, split.Valued{
Key: split.Span{
StartKey: vStartKey,
EndKey: vEndKey,
},
Value: v.Value,
})
} else {
// the region is overlapped with the last part of the range
initialLength = endLength
}
regionInfo = region
// try the next range
return true
}

// try the next region
regionIndex++
}
})

if err != nil {
return nil, errors.Trace(err)
}
if len(regionValueds) > 0 {
// try to split the region
newRegions, err := splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds)
if err != nil {
return nil, errors.Trace(err)
}
scatterRegions = append(scatterRegions, newRegions...)
}

return scatterRegions, nil
}

func (helper *LogSplitHelper) Split(ctx context.Context) error {
var ectx context.Context
helper.eg, ectx = errgroup.WithContext(ctx)
scatterRegions := make([]*split.RegionInfo, 0)
for tableID, splitter := range helper.tableSplitter {
delete(helper.tableSplitter, tableID)
rewriteRule, exists := helper.rules[tableID]
if !exists {
log.Info("skip splitting due to no table id matched", zap.Int64("tableID", tableID))
continue
}
newTableID := GetRewriteTableID(tableID, rewriteRule)
if newTableID == 0 {
log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID))
continue
}
newRegions, err := SplitPoint(ectx, newTableID, splitter, helper.client, rewriteRule, helper.splitRegionByPoints)
if err != nil {
return errors.Trace(err)
}
scatterRegions = append(scatterRegions, newRegions...)
}

if err := helper.eg.Wait(); err != nil {
return errors.Trace(err)
}

startTime := time.Now()
regionSplitter := NewRegionSplitter(helper.client)
for _, region := range scatterRegions {
regionSplitter.waitForScatterRegion(ctx, region)
if time.Since(startTime) > split.ScatterWaitUpperInterval {
break
}
}

return nil
}

type LogFilesIterWithSplitHelper struct {
iter LogIter
helper *LogSplitHelper
buffer []*backuppb.DataFileInfo
next int
}

const SplitFilesBufferSize = 2048

func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, client split.SplitClient) LogIter {
return &LogFilesIterWithSplitHelper{
iter: iter,
helper: NewLogSplitHelper(rules, client),
buffer: nil,
next: 0,
}
}

func (splitIter *LogFilesIterWithSplitHelper) TryNext(ctx context.Context) iter.IterResult[*backuppb.DataFileInfo] {
if splitIter.next >= len(splitIter.buffer) {
splitIter.buffer = make([]*backuppb.DataFileInfo, 0, SplitFilesBufferSize)
for r := splitIter.iter.TryNext(ctx); !r.Finished; r = splitIter.iter.TryNext(ctx) {
if r.Err != nil {
return r
}
f := r.Item
splitIter.helper.Merge(f)
splitIter.buffer = append(splitIter.buffer, f)
if len(splitIter.buffer) >= SplitFilesBufferSize {
break
}
}
splitIter.next = 0
if len(splitIter.buffer) == 0 {
return iter.Done[*backuppb.DataFileInfo]()
}
log.Info("start to split the regions")
startTime := time.Now()
if err := splitIter.helper.Split(ctx); err != nil {
return iter.Throw[*backuppb.DataFileInfo](errors.Trace(err))
}
log.Info("end to split the regions", zap.Duration("takes", time.Since(startTime)))
}

res := iter.Emit(splitIter.buffer[splitIter.next])
splitIter.next += 1
return res
}
Loading