From fc86974403328c8195f1234e656786135fdbdeb5 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 13 Dec 2022 17:49:28 +0800 Subject: [PATCH 01/20] draft Signed-off-by: Leavrth --- br/pkg/restore/client.go | 8 + br/pkg/restore/log_client.go | 6 +- br/pkg/restore/split.go | 246 ++++++++++++++++++++++++ br/pkg/restore/split/split.go | 59 ++++++ br/pkg/restore/split/sum_sorted.go | 179 +++++++++++++++++ br/pkg/restore/split/sum_sorted_test.go | 160 +++++++++++++++ br/pkg/restore/split_test.go | 191 ++++++++++++++++++ br/pkg/task/stream.go | 10 +- 8 files changed, 855 insertions(+), 4 deletions(-) create mode 100644 br/pkg/restore/split/sum_sorted.go create mode 100644 br/pkg/restore/split/sum_sorted_test.go diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 49b9b7bb7f58e..8c1482ed9d8ca 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1094,6 +1094,14 @@ func (rc *Client) SplitRanges(ctx context.Context, return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv) } +func (rc *Client) SplitByLogFiles( + ctx context.Context, + splitHelper *LogSplitHelper, + rewriteRules map[int64]*RewriteRules, +) error { + return splitHelper.Split(ctx, split.NewSplitClient(rc.GetPDClient(), rc.GetTLSConfig(), false), rewriteRules) +} + // RestoreSSTFiles tries to restore the files. func (rc *Client) RestoreSSTFiles( ctx context.Context, diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go index cce295090ba02..21aa3d0c6e1cf 100644 --- a/br/pkg/restore/log_client.go +++ b/br/pkg/restore/log_client.go @@ -212,17 +212,17 @@ func (rc *logFileManager) collectDDLFilesAndPrepareCache( // LoadDDLFilesAndCountDMLFiles loads all DDL files needs to be restored in the restoration. // At the same time, if the `counter` isn't nil, counting the DML file needs to be restored into `counter`. // This function returns all DDL files needing directly because we need sort all of them. -func (rc *logFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context, counter *int) ([]Log, error) { +func (rc *logFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context, fn func(*backuppb.DataFileInfo)) ([]Log, error) { m, err := rc.streamingMeta(ctx) if err != nil { return nil, err } - if counter != nil { + if fn != nil { m = iter.Tap(m, func(m Meta) { for _, fg := range m.FileGroups { for _, f := range fg.DataFilesInfo { if !f.IsMeta && !rc.ShouldFilterOut(f) { - *counter += 1 + fn(f) } } } diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index a707d0f086ce9..abdb7f1e41cfb 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -11,6 +11,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" sst "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -19,6 +20,7 @@ import ( "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/multierr" "go.uber.org/zap" @@ -428,3 +430,247 @@ func replacePrefix(s []byte, rewriteRules *RewriteRules) ([]byte, *sst.RewriteRu return s, nil } + +const SplitThreShold = 128 * 1024 * 1024 // 128 MB + +type LogSplitHelper struct { + tableSplitter map[int64]*split.SplitHelper +} + +func NewLogSplitHelper() *LogSplitHelper { + return &LogSplitHelper{ + tableSplitter: make(map[int64]*split.SplitHelper), + } +} + +const splitFileThreshold = 1024 * 1024 // 1 MB + +func checkFile(file *backuppb.DataFileInfo) bool { + return !(file.Length < splitFileThreshold || file.IsMeta) +} + +func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) { + if !checkFile(file) { + return + } + splitHelper, exist := helper.tableSplitter[file.TableId] + if !exist { + splitHelper = split.NewSplitHelper() + helper.tableSplitter[file.TableId] = splitHelper + } + + splitHelper.Merge(split.Valued{ + Key: split.Span{ + StartKey: file.StartKey, + EndKey: file.EndKey, + }, + Value: file.Length, + }) +} + +type splitFunc = func(context.Context, *RegionSplitter, uint64, *split.RegionInfo, []split.Valued) ([]*split.RegionInfo, error) + +func splitRegionByPoints( + ctx context.Context, + regionSplitter *RegionSplitter, + initialLength uint64, + region *split.RegionInfo, + valueds []split.Valued, +) ([]*split.RegionInfo, error) { + var ( + splitPoints [][]byte = make([][]byte, 0) + length uint64 = initialLength + ) + for _, v := range valueds { + if length > SplitThreShold { + splitPoints = append(splitPoints, v.GetStartKey()) + length = 0 + } + length += v.Value + } + + newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints) + if errSplit != nil { + log.Warn("failed split regions") + startKey := region.Region.StartKey + ranges := make([]rtree.Range, 0) + for _, point := range splitPoints { + ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point}) + startKey = point + } + return nil, regionSplitter.Split(ctx, ranges, nil, false, func([][]byte) {}) + } + + return newRegions, nil +} + +func getRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { + endKey := tablecodec.EncodeTablePrefix(tableID) + endKey, rule := rewriteEncodedKey(endKey, rewriteRules) + if rule == nil { + return 0 + } + return tablecodec.DecodeTableID(endKey) +} + +func SplitPoint( + ctx context.Context, + tableID int64, + splitHelper *split.SplitHelper, + client split.SplitClient, + rewriteRules *RewriteRules, + splitF splitFunc, +) error { + // common status + var ( + err error = nil + vStartKey []byte = nil + vEndKey []byte = nil + endKey []byte = codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(tableID+1)) + + scatterRegions []*split.RegionInfo = make([]*split.RegionInfo, 0) + regionSplitter *RegionSplitter = NewRegionSplitter(client) + ) + // region traverse status + var ( + regions []*split.RegionInfo = nil + regionIndex int = 0 + ) + // region split status + var ( + // range span +----------------+------+---+-------------+ + // region span +------------------------------------+ + // +initial length+ +end length+ + // intialLength is the length of the part of the first range overlapped with the region + regionValueds []split.Valued = nil + regionInfo *split.RegionInfo = nil + initialLength uint64 = 0 + ) + // range status + var ( + regionOverCount uint64 = 0 + ) + + splitHelper.Traverse(func(v split.Valued) bool { + if v.Value == 0 { + return true + } + vStartKey, vEndKey, err = GetRewriteEncodedKeys(v, rewriteRules) + if err != nil { + return false + } + // traverse to the first region overlapped with the range + for ; regionIndex < len(regions); regionIndex++ { + if bytes.Compare(vStartKey, regions[regionIndex].Region.EndKey) < 0 { + break + } + } + // cannot find any regions overlapped with the range + // need to scan regions again + if regionIndex == len(regions) { + regions = nil + } + regionOverCount = 0 + for { + if regionIndex >= len(regions) { + var startKey []byte + if len(regions) > 0 { + startKey = regions[len(regions)-1].Region.EndKey + } else { + startKey = vStartKey + } + regions, err = split.ScanRegionsWithRetry(ctx, client, startKey, endKey, 128) + if err != nil { + return false + } + regionIndex = 0 + } + + region := regions[regionIndex] + // this region must be overlapped with the range + regionOverCount++ + // 1. over the value key + if bytes.Compare(vEndKey, region.Region.EndKey) < 0 { + endLength := v.Value / regionOverCount + if len(regionValueds) > 0 && regionInfo != region { + // add a part of the range as the end part + if bytes.Compare(vStartKey, regionInfo.Region.EndKey) < 0 { + regionValueds = append(regionValueds, split.NewValued(vStartKey, regionInfo.Region.EndKey, endLength)) + } + // try to split the region + newRegions, err := splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) + if err != nil { + return false + } + scatterRegions = append(scatterRegions, newRegions...) + regionValueds = make([]split.Valued, 0) + } + if regionOverCount == 1 { + regionValueds = append(regionValueds, split.Valued{ + Key: split.Span{ + StartKey: vStartKey, + EndKey: vEndKey, + }, + Value: v.Value, + }) + } else { + initialLength = endLength + } + regionInfo = region + // try the next range + return true + } + + // try the next region + regionIndex++ + } + }) + + if err != nil { + return errors.Trace(err) + } + if len(regionValueds) > 0 { + // try to split the region + newRegions, err := splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) + if err != nil { + return errors.Trace(err) + } + scatterRegions = append(scatterRegions, newRegions...) + } + + startTime := time.Now() + for _, region := range scatterRegions { + regionSplitter.waitForScatterRegion(ctx, region) + if time.Since(startTime) > split.ScatterWaitUpperInterval { + break + } + } + + return nil +} + +func (helper *LogSplitHelper) Split( + ctx context.Context, + client split.SplitClient, + rewriteRulesMap map[int64]*RewriteRules, +) error { + for tableID, splitter := range helper.tableSplitter { + delete(helper.tableSplitter, tableID) + rewriteRule, exists := rewriteRulesMap[tableID] + if !exists { + log.Info("no rule. pass.", zap.Int64("tableID", tableID)) + continue + } + newTableID := getRewriteTableID(tableID, rewriteRule) + if newTableID == 0 { + log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) + continue + } + if err := SplitPoint(ctx, newTableID, splitter, client, rewriteRule, splitRegionByPoints); err != nil { + return errors.Trace(err) + } + + } + + return nil +} diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index bd00c445e1184..6a5a62e11dce4 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -121,6 +121,65 @@ func PaginateScanRegion( return regions, err } +// CheckPartRegionConsistency allows only the first half of regions +func CheckPartRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { + // current pd can't guarantee the consistency of returned regions + if len(regions) == 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endKey: %s", + redact.Key(startKey), redact.Key(endKey)) + } + + if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, "first region's startKey > startKey, startKey: %s, regionStartKey: %s", + redact.Key(startKey), redact.Key(regions[0].Region.StartKey)) + } + + cur := regions[0] + for _, r := range regions[1:] { + if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s", + redact.Key(cur.Region.EndKey), redact.Key(r.Region.StartKey)) + } + cur = r + } + + return nil +} + +func ScanRegionsWithRetry( + ctx context.Context, client SplitClient, startKey, endKey []byte, limit int, +) ([]*RegionInfo, error) { + if len(endKey) != 0 && bytes.Compare(startKey, endKey) > 0 { + return nil, errors.Annotatef(berrors.ErrRestoreInvalidRange, "startKey > endKey, startKey: %s, endkey: %s", + hex.EncodeToString(startKey), hex.EncodeToString(endKey)) + } + + var regions []*RegionInfo + var err error + // we don't need to return multierr. since there only 3 times retry. + // in most case 3 times retry have the same error. so we just return the last error. + // actually we'd better remove all multierr in br/lightning. + // because it's not easy to check multierr equals normal error. + // see https://github.com/pingcap/tidb/issues/33419. + _ = utils.WithRetry(ctx, func() error { + regions, err = client.ScanRegions(ctx, startKey, endKey, limit) + if err != nil { + err = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan regions from start-key:%s, err: %s", + redact.Key(startKey), err.Error()) + return err + } + + if err = CheckPartRegionConsistency(startKey, endKey, regions); err != nil { + log.Warn("failed to scan region, retrying", logutil.ShortError(err)) + return err + } + + return nil + }, newScanRegionBackoffer()) + + return regions, err +} + type scanRegionBackoffer struct { attempt int } diff --git a/br/pkg/restore/split/sum_sorted.go b/br/pkg/restore/split/sum_sorted.go new file mode 100644 index 0000000000000..9a472d4269f0f --- /dev/null +++ b/br/pkg/restore/split/sum_sorted.go @@ -0,0 +1,179 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. +package split + +import ( + "bytes" + "fmt" + + "github.com/google/btree" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/kv" +) + +// Value is the value type of stored in the span tree. +type Value = uint64 + +// join finds the upper bound of two values. +func join(a, b Value) Value { + return a + b +} + +// Span is the type of an adjacent sub key space. +type Span = kv.KeyRange + +// Valued is span binding to a value, which is the entry type of span tree. +type Valued struct { + Key Span + Value Value +} + +func NewValued(startKey, endKey []byte, value Value) Valued { + return Valued{ + Key: Span{ + StartKey: startKey, + EndKey: endKey, + }, + Value: value, + } +} + +func (r Valued) String() string { + return fmt.Sprintf("(%s, %.2f MB)", logutil.StringifyRange(r.Key), float64(r.Value)/1024/1024) +} + +func (r Valued) Less(other btree.Item) bool { + return bytes.Compare(r.Key.StartKey, other.(Valued).Key.StartKey) < 0 +} + +func (v Valued) GetStartKey() []byte { + return v.Key.StartKey +} + +func (v Valued) GetEndKey() []byte { + return v.Key.EndKey +} + +// SplitHelper represents a set of valued ranges, which doesn't overlap and union of them all is the full key space. +type SplitHelper struct { + inner *btree.BTree +} + +// NewFullWith creates a set of a subset of spans. +func NewSplitHelper() *SplitHelper { + t := btree.New(16) + t.ReplaceOrInsert(Valued{Value: 0, Key: Span{StartKey: []byte(""), EndKey: []byte("")}}) + return &SplitHelper{inner: t} +} + +func (f *SplitHelper) Len() int { + return f.inner.Len() +} + +func (f *SplitHelper) Merge(val Valued) { + if len(val.Key.StartKey) == 0 || len(val.Key.EndKey) == 0 { + return + } + overlaps := make([]Valued, 0, 16) + f.overlapped(val.Key, &overlaps) + f.mergeWithOverlap(val, overlaps) +} + +func (f *SplitHelper) Traverse(m func(Valued) bool) { + f.inner.Ascend(func(item btree.Item) bool { + return m(item.(Valued)) + }) +} + +func (f *SplitHelper) mergeWithOverlap(val Valued, overlapped []Valued) { + // There isn't any range overlaps with the input range, perhaps the input range is empty. + // do nothing for this case. + if len(overlapped) == 0 { + return + } + + for _, r := range overlapped { + f.inner.Delete(r) + // Assert All overlapped ranges are deleted. + } + + appendSize := val.Value / uint64(len(overlapped)) + var ( + rightTrail *Valued + leftTrail *Valued + emitToCollected = func(rng Valued, standalone bool, split bool) { + merged := rng.Value + if split { + merged /= 2 + } + if !standalone { + merged = join(appendSize, merged) + } + rng.Value = merged + f.inner.ReplaceOrInsert(rng) + } + ) + + leftmost := overlapped[0] + if bytes.Compare(leftmost.Key.StartKey, val.Key.StartKey) < 0 { + leftTrail = &Valued{ + Key: Span{StartKey: leftmost.Key.StartKey, EndKey: val.Key.StartKey}, + Value: leftmost.Value, + } + overlapped[0].Key.StartKey = val.Key.StartKey + } + + rightmost := overlapped[len(overlapped)-1] + if utils.CompareBytesExt(rightmost.Key.EndKey, true, val.Key.EndKey, true) > 0 { + rightTrail = &Valued{ + Key: Span{StartKey: val.Key.EndKey, EndKey: rightmost.Key.EndKey}, + Value: rightmost.Value, + } + overlapped[len(overlapped)-1].Key.EndKey = val.Key.EndKey + if len(overlapped) == 1 && leftTrail != nil { + val := rightTrail.Value * 2 / 3 + leftTrail.Value = val + overlapped[0].Value = val + rightTrail.Value = val + } + } + + if leftTrail != nil { + emitToCollected(*leftTrail, true, true) + } + + for i, rng := range overlapped { + split := (i == 0 && leftTrail != nil) || (i == len(overlapped)-1 && rightTrail != nil) + emitToCollected(rng, false, split) + } + + if rightTrail != nil { + emitToCollected(*rightTrail, true, true) + } +} + +// overlapped inserts the overlapped ranges of the span into the `result` slice. +func (f *SplitHelper) overlapped(k Span, result *[]Valued) { + var first Span + f.inner.DescendLessOrEqual(Valued{Key: k}, func(item btree.Item) bool { + first = item.(Valued).Key + return false + }) + + f.inner.AscendGreaterOrEqual(Valued{Key: first}, func(item btree.Item) bool { + r := item.(Valued) + if !overlaps(r.Key, k) { + return false + } + *result = append(*result, r) + return true + }) +} + +// Overlaps checks whether two spans have overlapped part. +func overlaps(a, append Span) bool { + if len(a.EndKey) == 0 { + return bytes.Compare(append.EndKey, a.StartKey) > 0 + } + return bytes.Compare(a.StartKey, append.EndKey) < 0 && bytes.Compare(append.StartKey, a.EndKey) < 0 +} diff --git a/br/pkg/restore/split/sum_sorted_test.go b/br/pkg/restore/split/sum_sorted_test.go new file mode 100644 index 0000000000000..9252cb491aa00 --- /dev/null +++ b/br/pkg/restore/split/sum_sorted_test.go @@ -0,0 +1,160 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. +package split_test + +import ( + "testing" + + "github.com/pingcap/tidb/br/pkg/restore/split" + "github.com/stretchr/testify/require" +) + +func v(s, e string, val split.Value) split.Valued { + return split.Valued{ + Key: split.Span{ + StartKey: []byte(s), + EndKey: []byte(e), + }, + Value: val, + } +} + +func mb(b uint64) uint64 { + return b * 1024 * 1024 +} + +func TestSumSorted(t *testing.T) { + cases := []struct { + values []split.Valued + result []uint64 + }{ + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("d", "g", mb(100)), + }, + result: []uint64{0, 250, 25, 75, 50, 0}, + }, + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("d", "f", mb(100)), + }, + result: []uint64{0, 250, 25, 125, 0}, + }, + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("c", "f", mb(100)), + }, + result: []uint64{0, 250, 150, 0}, + }, + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("c", "f", mb(100)), + v("da", "db", mb(100)), + }, + result: []uint64{0, 250, 50, 150, 50, 0}, + }, + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("c", "f", mb(100)), + v("da", "db", mb(100)), + v("cb", "db", mb(100)), + }, + result: []uint64{0, 250, 25, 75, 200, 50, 0}, + }, + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("c", "f", mb(100)), + v("da", "db", mb(100)), + v("cb", "f", mb(150)), + }, + result: []uint64{0, 250, 25, 75, 200, 100, 0}, + }, + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("c", "f", mb(100)), + v("da", "db", mb(100)), + v("cb", "df", mb(150)), + }, + result: []uint64{0, 250, 25, 75, 200, 75, 25, 0}, + }, + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("c", "f", mb(100)), + v("da", "db", mb(100)), + v("cb", "df", mb(150)), + }, + result: []uint64{0, 250, 25, 75, 200, 75, 25, 0}, + }, + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("c", "f", mb(100)), + v("da", "db", mb(100)), + v("c", "df", mb(150)), + }, + result: []uint64{0, 250, 100, 200, 75, 25, 0}, + }, + { + values: []split.Valued{ + v("a", "f", mb(100)), + v("a", "c", mb(200)), + v("c", "f", mb(100)), + v("da", "db", mb(100)), + v("c", "f", mb(150)), + }, + result: []uint64{0, 250, 100, 200, 100, 0}, + }, + } + + for _, ca := range cases { + full := split.NewSplitHelper() + for _, v := range ca.values { + full.Merge(v) + } + + i := 0 + full.Traverse(func(v split.Valued) bool { + require.Equal(t, mb(ca.result[i]), v.Value) + i++ + return true + }) + } +} + +func TestSumSorted1(t *testing.T) { + full := split.NewSplitHelper() + + // 250 50 150 50 + // . a c . da db f + full.Merge(v("a", "f", mb(100))) + full.Merge(v("a", "c", mb(200))) + full.Merge(v("c", "f", mb(100))) + full.Merge(v("da", "db", mb(100))) + full.Merge(v("c", "f", mb(150))) + + //full.Merge(v("d", "e", mb(50))) + //full.Merge(v("c", "e", mb(170))) + + full.Traverse(func(v split.Valued) bool { + t.Log(v.String()) + return true + }) + + require.True(t, false) +} diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index b726a5ec78729..a12a775d6a298 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -5,6 +5,7 @@ package restore_test import ( "bytes" "context" + "fmt" "sync" "testing" "time" @@ -23,6 +24,7 @@ import ( "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/store/pdtypes" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "github.com/stretchr/testify/require" "go.uber.org/multierr" @@ -729,3 +731,192 @@ func TestSplitFailed(t *testing.T) { require.GreaterOrEqual(t, len(r.splitRanges), 2) require.Len(t, r.restoredFiles, 0) } + +func keyWithTablePrefix(tableID int64, key string) []byte { + rawKey := append(tablecodec.GenTableRecordPrefix(tableID), []byte(key)...) + return codec.EncodeBytes([]byte{}, rawKey) +} + +func TestSplitPoint(t *testing.T) { + ctx := context.Background() + var oldTableID int64 = 50 + var tableID int64 = 100 + rewriteRules := &restore.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), + NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), + }, + }, + } + + // range: b c d e g i + // +---+ +---+ +---------+ + // +-------------+----------+---------+ + // region: a f h j + splitHelper := split.NewSplitHelper() + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: 100}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: 200}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "g"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: 300}) + client := NewFakeSplitClient() + client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "f")) + client.AppendRegion(keyWithTablePrefix(tableID, "f"), keyWithTablePrefix(tableID, "h")) + client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j")) + client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a")) + + err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) ([]*split.RegionInfo, error) { + require.Equal(t, u, uint64(0)) + require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) + require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "f")) + require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "b")) + require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "c")) + require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "d")) + require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "e")) + require.Equal(t, len(v), 2) + return nil, nil + }) + require.NoError(t, err) +} + +func getCharFromNumber(prefix string, i int) string { + c := '1' + (i % 10) + b := '1' + (i%100)/10 + a := '1' + i/100 + return fmt.Sprintf("%s%c%c%c", prefix, a, b, c) +} + +func TestSplitPoint2(t *testing.T) { + ctx := context.Background() + var oldTableID int64 = 50 + var tableID int64 = 100 + rewriteRules := &restore.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), + NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), + }, + }, + } + + // range: b c d e f i j k l n + // +---+ +---+ +-----------------+ +----+ +--------+ + // +---------------+--+.....+----+------------+---------+ + // region: a g >128 h m o + splitHelper := split.NewSplitHelper() + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: 100}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: 200}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "f"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: 300}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "j"), EndKey: keyWithTablePrefix(oldTableID, "k")}, Value: 200}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "l"), EndKey: keyWithTablePrefix(oldTableID, "n")}, Value: 200}) + client := NewFakeSplitClient() + client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "g")) + client.AppendRegion(keyWithTablePrefix(tableID, "g"), keyWithTablePrefix(tableID, getCharFromNumber("g", 0))) + for i := 0; i < 256; i++ { + client.AppendRegion(keyWithTablePrefix(tableID, getCharFromNumber("g", i)), keyWithTablePrefix(tableID, getCharFromNumber("g", i+1))) + } + client.AppendRegion(keyWithTablePrefix(tableID, getCharFromNumber("g", 256)), keyWithTablePrefix(tableID, "h")) + client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "m")) + client.AppendRegion(keyWithTablePrefix(tableID, "m"), keyWithTablePrefix(tableID, "o")) + client.AppendRegion(keyWithTablePrefix(tableID, "o"), keyWithTablePrefix(tableID+1, "a")) + + firstSplit := true + err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) ([]*split.RegionInfo, error) { + if firstSplit { + require.Equal(t, u, uint64(0)) + require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) + require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "g")) + require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "b")) + require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "c")) + require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "d")) + require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "e")) + require.EqualValues(t, v[2].Key.StartKey, keyWithTablePrefix(tableID, "f")) + require.EqualValues(t, v[2].Key.EndKey, keyWithTablePrefix(tableID, "g")) + require.Equal(t, v[2].Value, uint64(1)) + require.Equal(t, len(v), 3) + firstSplit = false + } else { + require.Equal(t, u, uint64(1)) + require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "h")) + require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "m")) + require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "j")) + require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "k")) + require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "l")) + require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "m")) + require.Equal(t, v[1].Value, uint64(100)) + require.Equal(t, len(v), 2) + } + return nil, nil + }) + require.NoError(t, err) +} + +type fakeSplitClient struct { + regions []*split.RegionInfo +} + +func NewFakeSplitClient() *fakeSplitClient { + return &fakeSplitClient{ + regions: make([]*split.RegionInfo, 0), + } +} + +func (f *fakeSplitClient) AppendRegion(startKey, endKey []byte) { + f.regions = append(f.regions, &split.RegionInfo{ + Region: &metapb.Region{ + StartKey: startKey, + EndKey: endKey, + }, + }) +} + +func (*fakeSplitClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { + return nil, nil +} +func (*fakeSplitClient) GetRegion(ctx context.Context, key []byte) (*split.RegionInfo, error) { + return nil, nil +} +func (*fakeSplitClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) { + return nil, nil +} +func (*fakeSplitClient) SplitRegion(ctx context.Context, regionInfo *split.RegionInfo, key []byte) (*split.RegionInfo, error) { + return nil, nil +} +func (*fakeSplitClient) BatchSplitRegions(ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte) ([]*split.RegionInfo, error) { + return nil, nil +} +func (*fakeSplitClient) BatchSplitRegionsWithOrigin(ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte) (*split.RegionInfo, []*split.RegionInfo, error) { + return nil, nil, nil +} +func (*fakeSplitClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error { + return nil +} +func (*fakeSplitClient) ScatterRegions(ctx context.Context, regionInfo []*split.RegionInfo) error { + return nil +} +func (*fakeSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + return nil, nil +} +func (f *fakeSplitClient) ScanRegions(ctx context.Context, startKey, endKey []byte, limit int) ([]*split.RegionInfo, error) { + result := make([]*split.RegionInfo, 0) + count := 0 + for _, rng := range f.regions { + if bytes.Compare(rng.Region.StartKey, endKey) <= 0 && bytes.Compare(rng.Region.EndKey, startKey) > 0 { + result = append(result, rng) + count++ + } + if count >= limit { + break + } + } + return result, nil +} +func (*fakeSplitClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (pdtypes.Rule, error) { + return pdtypes.Rule{}, nil +} +func (*fakeSplitClient) SetPlacementRule(ctx context.Context, rule pdtypes.Rule) error { return nil } +func (*fakeSplitClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { + return nil +} +func (*fakeSplitClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { + return nil +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index fdcc728a9ce5f..366a7553fa9c8 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1177,8 +1177,12 @@ func restoreStream( totalKVCount += kvCount totalSize += size } + splitHelper := restore.NewLogSplitHelper() dataFileCount := 0 - ddlFiles, err := client.LoadDDLFilesAndCountDMLFiles(ctx, &dataFileCount) + ddlFiles, err := client.LoadDDLFilesAndCountDMLFiles(ctx, func(f *backuppb.DataFileInfo) { + dataFileCount += 1 + splitHelper.Merge(f) + }) if err != nil { return err } @@ -1196,6 +1200,10 @@ func restoreStream( return errors.Trace(err) } updateRewriteRules(rewriteRules, schemasReplace) + err = client.SplitByLogFiles(ctx, splitHelper, rewriteRules) + if err != nil { + return errors.Trace(err) + } logFilesIter, err := client.LoadDMLFiles(ctx) pd := g.StartProgress(ctx, "Restore KV Files", int64(dataFileCount), !cfg.LogProgress) From 575afd54f43d38d8deb873f176ea649915a85a20 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 14 Dec 2022 15:59:21 +0800 Subject: [PATCH 02/20] draft --- br/pkg/restore/log_client_test.go | 4 +++- br/pkg/restore/split.go | 23 ++++++++++++++--------- br/pkg/restore/split/sum_sorted_test.go | 22 ---------------------- br/pkg/restore/split_test.go | 16 ++++++++++++++++ 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go index 71db52cf7678f..6fdf4ae52a609 100644 --- a/br/pkg/restore/log_client_test.go +++ b/br/pkg/restore/log_client_test.go @@ -471,7 +471,9 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) { if c.DMLFileCount != nil { counter = new(int) } - data, err := fm.LoadDDLFilesAndCountDMLFiles(ctx, counter) + data, err := fm.LoadDDLFilesAndCountDMLFiles(ctx, func(*backuppb.DataFileInfo) { + *counter += 1 + }) req.NoError(err) if counter != nil { req.Equal(*c.DMLFileCount, *counter) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index abdb7f1e41cfb..96a8b7f0bca7f 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -479,13 +479,16 @@ func splitRegionByPoints( ) ([]*split.RegionInfo, error) { var ( splitPoints [][]byte = make([][]byte, 0) + lastKey []byte = nil length uint64 = initialLength ) for _, v := range valueds { - if length > SplitThreShold { - splitPoints = append(splitPoints, v.GetStartKey()) + if length > SplitThreShold && !bytes.Equal(lastKey, v.GetStartKey()) { + _, rawKey, _ := codec.DecodeBytes(v.GetStartKey(), nil) + splitPoints = append(splitPoints, rawKey) length = 0 } + lastKey = v.GetStartKey() length += v.Value } @@ -504,13 +507,14 @@ func splitRegionByPoints( return newRegions, nil } -func getRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { - endKey := tablecodec.EncodeTablePrefix(tableID) - endKey, rule := rewriteEncodedKey(endKey, rewriteRules) +func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { + tableKey := tablecodec.EncodeTablePrefix(tableID) + rule := matchOldPrefix(tableKey, rewriteRules) + tableKey = bytes.Replace(tableKey, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1) if rule == nil { return 0 } - return tablecodec.DecodeTableID(endKey) + return tablecodec.DecodeTableID(tableKey) } func SplitPoint( @@ -598,8 +602,9 @@ func SplitPoint( regionValueds = append(regionValueds, split.NewValued(vStartKey, regionInfo.Region.EndKey, endLength)) } // try to split the region - newRegions, err := splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) - if err != nil { + newRegions, errSplit := splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) + if errSplit != nil { + err = errSplit return false } scatterRegions = append(scatterRegions, newRegions...) @@ -661,7 +666,7 @@ func (helper *LogSplitHelper) Split( log.Info("no rule. pass.", zap.Int64("tableID", tableID)) continue } - newTableID := getRewriteTableID(tableID, rewriteRule) + newTableID := GetRewriteTableID(tableID, rewriteRule) if newTableID == 0 { log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) continue diff --git a/br/pkg/restore/split/sum_sorted_test.go b/br/pkg/restore/split/sum_sorted_test.go index 9252cb491aa00..d85774e966cc7 100644 --- a/br/pkg/restore/split/sum_sorted_test.go +++ b/br/pkg/restore/split/sum_sorted_test.go @@ -136,25 +136,3 @@ func TestSumSorted(t *testing.T) { }) } } - -func TestSumSorted1(t *testing.T) { - full := split.NewSplitHelper() - - // 250 50 150 50 - // . a c . da db f - full.Merge(v("a", "f", mb(100))) - full.Merge(v("a", "c", mb(200))) - full.Merge(v("c", "f", mb(100))) - full.Merge(v("da", "db", mb(100))) - full.Merge(v("c", "f", mb(150))) - - //full.Merge(v("d", "e", mb(50))) - //full.Merge(v("c", "e", mb(170))) - - full.Traverse(func(v split.Valued) bool { - t.Log(v.String()) - return true - }) - - require.True(t, false) -} diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index a12a775d6a298..00315a2a8cb7c 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -920,3 +920,19 @@ func (*fakeSplitClient) DeletePlacementRule(ctx context.Context, groupID, ruleID func (*fakeSplitClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { return nil } + +func TestGetRewriteTableID(t *testing.T) { + var tableID int64 = 76 + var oldTableID int64 = 80 + rewriteRules := &restore.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), + NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), + }, + }, + } + + newTableID := restore.GetRewriteTableID(oldTableID, rewriteRules) + require.Equal(t, tableID, newTableID) +} From e83213d6de2c1d196295c3f850360066926badf5 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 15 Dec 2022 13:52:29 +0800 Subject: [PATCH 03/20] draft Signed-off-by: Leavrth --- br/pkg/restore/client.go | 9 ++--- br/pkg/restore/log_client.go | 6 ++-- br/pkg/restore/log_client_test.go | 4 +-- br/pkg/restore/split.go | 60 ++++++++++++++++++++++++++++--- br/pkg/task/stream.go | 14 +++----- 5 files changed, 67 insertions(+), 26 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 381b2725fd170..adaebd5f13819 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1126,12 +1126,9 @@ func (rc *Client) SplitRanges(ctx context.Context, return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv) } -func (rc *Client) SplitByLogFiles( - ctx context.Context, - splitHelper *LogSplitHelper, - rewriteRules map[int64]*RewriteRules, -) error { - return splitHelper.Split(ctx, split.NewSplitClient(rc.GetPDClient(), rc.GetTLSConfig(), false), rewriteRules) +func (rc *Client) WrapLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules) LogIter { + client := split.NewSplitClient(rc.GetPDClient(), rc.GetTLSConfig(), false) + return NewLogFilesIterWithSplitHelper(iter, rules, client) } // RestoreSSTFiles tries to restore the files. diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go index 21aa3d0c6e1cf..cce295090ba02 100644 --- a/br/pkg/restore/log_client.go +++ b/br/pkg/restore/log_client.go @@ -212,17 +212,17 @@ func (rc *logFileManager) collectDDLFilesAndPrepareCache( // LoadDDLFilesAndCountDMLFiles loads all DDL files needs to be restored in the restoration. // At the same time, if the `counter` isn't nil, counting the DML file needs to be restored into `counter`. // This function returns all DDL files needing directly because we need sort all of them. -func (rc *logFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context, fn func(*backuppb.DataFileInfo)) ([]Log, error) { +func (rc *logFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context, counter *int) ([]Log, error) { m, err := rc.streamingMeta(ctx) if err != nil { return nil, err } - if fn != nil { + if counter != nil { m = iter.Tap(m, func(m Meta) { for _, fg := range m.FileGroups { for _, f := range fg.DataFilesInfo { if !f.IsMeta && !rc.ShouldFilterOut(f) { - fn(f) + *counter += 1 } } } diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go index 6fdf4ae52a609..71db52cf7678f 100644 --- a/br/pkg/restore/log_client_test.go +++ b/br/pkg/restore/log_client_test.go @@ -471,9 +471,7 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) { if c.DMLFileCount != nil { counter = new(int) } - data, err := fm.LoadDDLFilesAndCountDMLFiles(ctx, func(*backuppb.DataFileInfo) { - *counter += 1 - }) + data, err := fm.LoadDDLFilesAndCountDMLFiles(ctx, counter) req.NoError(err) if counter != nil { req.Equal(*c.DMLFileCount, *counter) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 96a8b7f0bca7f..b6d6b9830c040 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/multierr" @@ -435,11 +436,15 @@ const SplitThreShold = 128 * 1024 * 1024 // 128 MB type LogSplitHelper struct { tableSplitter map[int64]*split.SplitHelper + rules map[int64]*RewriteRules + client split.SplitClient } -func NewLogSplitHelper() *LogSplitHelper { +func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient) *LogSplitHelper { return &LogSplitHelper{ tableSplitter: make(map[int64]*split.SplitHelper), + rules: rules, + client: client, } } @@ -656,12 +661,10 @@ func SplitPoint( func (helper *LogSplitHelper) Split( ctx context.Context, - client split.SplitClient, - rewriteRulesMap map[int64]*RewriteRules, ) error { for tableID, splitter := range helper.tableSplitter { delete(helper.tableSplitter, tableID) - rewriteRule, exists := rewriteRulesMap[tableID] + rewriteRule, exists := helper.rules[tableID] if !exists { log.Info("no rule. pass.", zap.Int64("tableID", tableID)) continue @@ -671,7 +674,7 @@ func (helper *LogSplitHelper) Split( log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) continue } - if err := SplitPoint(ctx, newTableID, splitter, client, rewriteRule, splitRegionByPoints); err != nil { + if err := SplitPoint(ctx, newTableID, splitter, helper.client, rewriteRule, splitRegionByPoints); err != nil { return errors.Trace(err) } @@ -679,3 +682,50 @@ func (helper *LogSplitHelper) Split( return nil } + +type LogFilesIterWithSplitHelper struct { + iter LogIter + helper *LogSplitHelper + buffer []*backuppb.DataFileInfo + next int +} + +const SplitFilesBufferSize = 10240 + +func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, client split.SplitClient) LogIter { + return &LogFilesIterWithSplitHelper{ + iter: iter, + helper: NewLogSplitHelper(rules, client), + buffer: nil, + next: 0, + } +} + +func (splitIter *LogFilesIterWithSplitHelper) TryNext(ctx context.Context) iter.IterResult[*backuppb.DataFileInfo] { + if splitIter.next >= len(splitIter.buffer) { + splitIter.buffer = make([]*backuppb.DataFileInfo, 0, SplitFilesBufferSize) + for r := splitIter.iter.TryNext(ctx); !r.Finished; r = splitIter.iter.TryNext(ctx) { + if r.Err != nil { + return r + } + f := r.Item + splitIter.helper.Merge(f) + splitIter.buffer = append(splitIter.buffer, f) + if len(splitIter.buffer) >= SplitFilesBufferSize { + break + } + } + splitIter.next = 0 + if err := splitIter.helper.Split(ctx); err != nil { + return iter.Throw[*backuppb.DataFileInfo](errors.Trace(err)) + } + } + + if splitIter.next >= len(splitIter.buffer) { + return iter.Done[*backuppb.DataFileInfo]() + } + + res := iter.Emit(splitIter.buffer[splitIter.next]) + splitIter.next += 1 + return res +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 27a45445b2f0b..199494e30cbf5 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1216,12 +1216,8 @@ func restoreStream( totalKVCount += kvCount totalSize += size } - splitHelper := restore.NewLogSplitHelper() dataFileCount := 0 - ddlFiles, err := client.LoadDDLFilesAndCountDMLFiles(ctx, func(f *backuppb.DataFileInfo) { - dataFileCount += 1 - splitHelper.Merge(f) - }) + ddlFiles, err := client.LoadDDLFilesAndCountDMLFiles(ctx, &dataFileCount) if err != nil { return err } @@ -1239,15 +1235,15 @@ func restoreStream( return errors.Trace(err) } updateRewriteRules(rewriteRules, schemasReplace) - err = client.SplitByLogFiles(ctx, splitHelper, rewriteRules) + + logFilesIter, err := client.LoadDMLFiles(ctx) if err != nil { return errors.Trace(err) } - - logFilesIter, err := client.LoadDMLFiles(ctx) + logFilesIterWithSplit := client.WrapLogFilesIterWithSplitHelper(logFilesIter, rewriteRules) pd := g.StartProgress(ctx, "Restore KV Files", int64(dataFileCount), !cfg.LogProgress) err = withProgress(pd, func(p glue.Progress) error { - return client.RestoreKVFiles(ctx, rewriteRules, logFilesIter, cfg.PitrBatchCount, cfg.PitrBatchSize, updateStats, p.IncBy) + return client.RestoreKVFiles(ctx, rewriteRules, logFilesIterWithSplit, cfg.PitrBatchCount, cfg.PitrBatchSize, updateStats, p.IncBy) }) if err != nil { return errors.Annotate(err, "failed to restore kv files") From 9fd60f8e4a6e235d4023a306a314bef644ed6fee Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 15 Dec 2022 20:12:39 +0800 Subject: [PATCH 04/20] fix some bugs and build error Signed-off-by: Leavrth --- br/pkg/restore/split.go | 14 ++++++++------ br/pkg/restore/split/sum_sorted.go | 12 ++++++------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index b6d6b9830c040..30cd0c63aad3a 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -500,7 +500,7 @@ func splitRegionByPoints( newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints) if errSplit != nil { log.Warn("failed split regions") - startKey := region.Region.StartKey + _, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil) ranges := make([]rtree.Range, 0) for _, point := range splitPoints { ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point}) @@ -690,7 +690,7 @@ type LogFilesIterWithSplitHelper struct { next int } -const SplitFilesBufferSize = 10240 +const SplitFilesBufferSize = 2048 func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, client split.SplitClient) LogIter { return &LogFilesIterWithSplitHelper{ @@ -716,13 +716,15 @@ func (splitIter *LogFilesIterWithSplitHelper) TryNext(ctx context.Context) iter. } } splitIter.next = 0 + if len(splitIter.buffer) == 0 { + return iter.Done[*backuppb.DataFileInfo]() + } + log.Info("start to split the regions") + startTime := time.Now() if err := splitIter.helper.Split(ctx); err != nil { return iter.Throw[*backuppb.DataFileInfo](errors.Trace(err)) } - } - - if splitIter.next >= len(splitIter.buffer) { - return iter.Done[*backuppb.DataFileInfo]() + log.Info("end to split the regions", zap.Duration("takes", time.Since(startTime))) } res := iter.Emit(splitIter.buffer[splitIter.next]) diff --git a/br/pkg/restore/split/sum_sorted.go b/br/pkg/restore/split/sum_sorted.go index 9a472d4269f0f..726ffc209a824 100644 --- a/br/pkg/restore/split/sum_sorted.go +++ b/br/pkg/restore/split/sum_sorted.go @@ -38,12 +38,12 @@ func NewValued(startKey, endKey []byte, value Value) Valued { } } -func (r Valued) String() string { - return fmt.Sprintf("(%s, %.2f MB)", logutil.StringifyRange(r.Key), float64(r.Value)/1024/1024) +func (v Valued) String() string { + return fmt.Sprintf("(%s, %.2f MB)", logutil.StringifyRange(v.Key), float64(v.Value)/1024/1024) } -func (r Valued) Less(other btree.Item) bool { - return bytes.Compare(r.Key.StartKey, other.(Valued).Key.StartKey) < 0 +func (v Valued) Less(other btree.Item) bool { + return bytes.Compare(v.Key.StartKey, other.(Valued).Key.StartKey) < 0 } func (v Valued) GetStartKey() []byte { @@ -162,7 +162,7 @@ func (f *SplitHelper) overlapped(k Span, result *[]Valued) { f.inner.AscendGreaterOrEqual(Valued{Key: first}, func(item btree.Item) bool { r := item.(Valued) - if !overlaps(r.Key, k) { + if !Overlaps(r.Key, k) { return false } *result = append(*result, r) @@ -171,7 +171,7 @@ func (f *SplitHelper) overlapped(k Span, result *[]Valued) { } // Overlaps checks whether two spans have overlapped part. -func overlaps(a, append Span) bool { +func Overlaps(a, append Span) bool { if len(a.EndKey) == 0 { return bytes.Compare(append.EndKey, a.StartKey) > 0 } From cf69bd754121d1138bc39902af8aa4ec74ed30c1 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 16 Dec 2022 15:43:35 +0800 Subject: [PATCH 05/20] draft Signed-off-by: Leavrth --- br/pkg/restore/split.go | 67 +++++++++++++++++++++--------- br/pkg/restore/split/BUILD.bazel | 14 ++++++- br/pkg/restore/split/sum_sorted.go | 42 ++++++++++++------- 3 files changed, 88 insertions(+), 35 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 30cd0c63aad3a..165165c48e3be 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/util/codec" "go.uber.org/multierr" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -438,6 +439,8 @@ type LogSplitHelper struct { tableSplitter map[int64]*split.SplitHelper rules map[int64]*RewriteRules client split.SplitClient + pool *utils.WorkerPool + eg *errgroup.Group } func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient) *LogSplitHelper { @@ -445,17 +448,20 @@ func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient) tableSplitter: make(map[int64]*split.SplitHelper), rules: rules, client: client, + pool: utils.NewWorkerPool(16, "split region"), + eg: nil, } } const splitFileThreshold = 1024 * 1024 // 1 MB -func checkFile(file *backuppb.DataFileInfo) bool { - return !(file.Length < splitFileThreshold || file.IsMeta) +func (helper *LogSplitHelper) skipFile(file *backuppb.DataFileInfo) bool { + _, exist := helper.rules[file.TableId] + return file.Length < splitFileThreshold || file.IsMeta || !exist } func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) { - if !checkFile(file) { + if helper.skipFile(file) { return } splitHelper, exist := helper.tableSplitter[file.TableId] @@ -475,7 +481,7 @@ func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) { type splitFunc = func(context.Context, *RegionSplitter, uint64, *split.RegionInfo, []split.Valued) ([]*split.RegionInfo, error) -func splitRegionByPoints( +func (helper *LogSplitHelper) splitRegionByPoints( ctx context.Context, regionSplitter *RegionSplitter, initialLength uint64, @@ -497,21 +503,28 @@ func splitRegionByPoints( length += v.Value } + if len(splitPoints) == 0 { + return nil, nil + } + newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints) if errSplit != nil { - log.Warn("failed split regions") + log.Warn("failed to split regions by the scaned region, retry...") _, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil) - ranges := make([]rtree.Range, 0) + ranges := make([]rtree.Range, 0, len(splitPoints)) for _, point := range splitPoints { ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point}) startKey = point } - return nil, regionSplitter.Split(ctx, ranges, nil, false, func([][]byte) {}) + helper.pool.ApplyOnErrorGroup(helper.eg, func() error { + return regionSplitter.Split(ctx, ranges, nil, false, func([][]byte) {}) + }) } - + log.Info("split the region", zap.Uint64("region-id", region.Region.Id), zap.Int("split-point-number", len(splitPoints))) return newRegions, nil } +// GetRewriteTableID gets rewrite table id by the rewrite rule and original table id func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { tableKey := tablecodec.EncodeTablePrefix(tableID) rule := matchOldPrefix(tableKey, rewriteRules) @@ -522,6 +535,7 @@ func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { return tablecodec.DecodeTableID(tableKey) } +// SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region func SplitPoint( ctx context.Context, tableID int64, @@ -537,11 +551,13 @@ func SplitPoint( vEndKey []byte = nil endKey []byte = codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(tableID+1)) + // scatterRegions is the region array that will be scattered scatterRegions []*split.RegionInfo = make([]*split.RegionInfo, 0) regionSplitter *RegionSplitter = NewRegionSplitter(client) ) // region traverse status var ( + // the region buffer of each scan regions []*split.RegionInfo = nil regionIndex int = 0 ) @@ -549,14 +565,17 @@ func SplitPoint( var ( // range span +----------------+------+---+-------------+ // region span +------------------------------------+ - // +initial length+ +end length+ + // +initial length+ +end valued+ + // regionValueds is the ranges array overlapped with `regionInfo` + regionValueds []split.Valued = nil + // regionInfo is the region to be split + regionInfo *split.RegionInfo = nil // intialLength is the length of the part of the first range overlapped with the region - regionValueds []split.Valued = nil - regionInfo *split.RegionInfo = nil - initialLength uint64 = 0 + initialLength uint64 = 0 ) // range status var ( + // regionOverCount is the number of regions overlapped with the range regionOverCount uint64 = 0 ) @@ -564,6 +583,7 @@ func SplitPoint( if v.Value == 0 { return true } + // use `vStartKey` and `vEndKey` to compare with region's key vStartKey, vEndKey, err = GetRewriteEncodedKeys(v, rewriteRules) if err != nil { return false @@ -584,10 +604,13 @@ func SplitPoint( if regionIndex >= len(regions) { var startKey []byte if len(regions) > 0 { + // has traversed over the region buffer, should scan from the last region's end-key of the region buffer startKey = regions[len(regions)-1].Region.EndKey } else { + // scan from the range's start-key startKey = vStartKey } + // scan at most 128 regions into the region buffer regions, err = split.ScanRegionsWithRetry(ctx, client, startKey, endKey, 128) if err != nil { return false @@ -598,7 +621,9 @@ func SplitPoint( region := regions[regionIndex] // this region must be overlapped with the range regionOverCount++ - // 1. over the value key + // the region is the last one overlapped with the range, + // should split the last recorded region, + // and then record this region as the region to be split if bytes.Compare(vEndKey, region.Region.EndKey) < 0 { endLength := v.Value / regionOverCount if len(regionValueds) > 0 && regionInfo != region { @@ -616,6 +641,7 @@ func SplitPoint( regionValueds = make([]split.Valued, 0) } if regionOverCount == 1 { + // the region completely contains the range regionValueds = append(regionValueds, split.Valued{ Key: split.Span{ StartKey: vStartKey, @@ -624,6 +650,7 @@ func SplitPoint( Value: v.Value, }) } else { + // the region is overlapped with the last part of the range initialLength = endLength } regionInfo = region @@ -659,14 +686,14 @@ func SplitPoint( return nil } -func (helper *LogSplitHelper) Split( - ctx context.Context, -) error { +func (helper *LogSplitHelper) Split(ctx context.Context) error { + var ectx context.Context + helper.eg, ectx = errgroup.WithContext(ctx) for tableID, splitter := range helper.tableSplitter { delete(helper.tableSplitter, tableID) rewriteRule, exists := helper.rules[tableID] if !exists { - log.Info("no rule. pass.", zap.Int64("tableID", tableID)) + log.Info("skip splitting due to no table id matched", zap.Int64("tableID", tableID)) continue } newTableID := GetRewriteTableID(tableID, rewriteRule) @@ -674,10 +701,12 @@ func (helper *LogSplitHelper) Split( log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) continue } - if err := SplitPoint(ctx, newTableID, splitter, helper.client, rewriteRule, splitRegionByPoints); err != nil { + if err := SplitPoint(ectx, newTableID, splitter, helper.client, rewriteRule, helper.splitRegionByPoints); err != nil { return errors.Trace(err) } - + } + if err := helper.eg.Wait(); err != nil { + return errors.Trace(err) } return nil diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index 49fbec82c543c..ac9eb50eb4d20 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "split", @@ -6,6 +6,7 @@ go_library( "client.go", "region.go", "split.go", + "sum_sorted.go", ], importpath = "github.com/pingcap/tidb/br/pkg/restore/split", visibility = ["//visibility:public"], @@ -16,7 +17,9 @@ go_library( "//br/pkg/logutil", "//br/pkg/redact", "//br/pkg/utils", + "//kv", "//store/pdtypes", + "@com_github_google_btree//:btree", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/errorpb", @@ -34,3 +37,12 @@ go_library( "@org_uber_go_zap//:zap", ], ) + +go_test( + name = "split_test", + srcs = ["sum_sorted_test.go"], + deps = [ + ":split", + "@com_github_stretchr_testify//require", + ], +) diff --git a/br/pkg/restore/split/sum_sorted.go b/br/pkg/restore/split/sum_sorted.go index 726ffc209a824..ad2865d0c096c 100644 --- a/br/pkg/restore/split/sum_sorted.go +++ b/br/pkg/restore/split/sum_sorted.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. package split import ( @@ -46,10 +46,12 @@ func (v Valued) Less(other btree.Item) bool { return bytes.Compare(v.Key.StartKey, other.(Valued).Key.StartKey) < 0 } +// implement for `AppliedFile` func (v Valued) GetStartKey() []byte { return v.Key.StartKey } +// implement for `AppliedFile` func (v Valued) GetEndKey() []byte { return v.Key.EndKey } @@ -59,26 +61,23 @@ type SplitHelper struct { inner *btree.BTree } -// NewFullWith creates a set of a subset of spans. +// NewSplitHelper creates a set of a subset of spans, with the full key space as initial status func NewSplitHelper() *SplitHelper { t := btree.New(16) t.ReplaceOrInsert(Valued{Value: 0, Key: Span{StartKey: []byte(""), EndKey: []byte("")}}) return &SplitHelper{inner: t} } -func (f *SplitHelper) Len() int { - return f.inner.Len() -} - func (f *SplitHelper) Merge(val Valued) { if len(val.Key.StartKey) == 0 || len(val.Key.EndKey) == 0 { return } - overlaps := make([]Valued, 0, 16) + overlaps := make([]Valued, 0, 8) f.overlapped(val.Key, &overlaps) f.mergeWithOverlap(val, overlaps) } +// traverse the items in ascend order func (f *SplitHelper) Traverse(m func(Valued) bool) { f.inner.Ascend(func(item btree.Item) bool { return m(item.(Valued)) @@ -94,13 +93,19 @@ func (f *SplitHelper) mergeWithOverlap(val Valued, overlapped []Valued) { for _, r := range overlapped { f.inner.Delete(r) - // Assert All overlapped ranges are deleted. } + // Assert All overlapped ranges are deleted. + // the new valued item's Value is equally dividedd into `len(overlapped)` shares appendSize := val.Value / uint64(len(overlapped)) var ( - rightTrail *Valued - leftTrail *Valued + rightTrail *Valued + leftTrail *Valued + // overlapped ranges +-------------+----------+ + // new valued item +-------------+ + // a b c d e + // the part [a,b] is `standalone` because it is not overlapped with the new valued item + // the part [a,b] and [b,c] are `split` because they are from range [a,c] emitToCollected = func(rng Valued, standalone bool, split bool) { merged := rng.Value if split { @@ -131,6 +136,12 @@ func (f *SplitHelper) mergeWithOverlap(val Valued, overlapped []Valued) { } overlapped[len(overlapped)-1].Key.EndKey = val.Key.EndKey if len(overlapped) == 1 && leftTrail != nil { + // (split) (split) (split) + // overlapped ranges +-----------------------------+ + // new valued item +-------------+ + // a b c d + // now the overlapped range should be divided into 3 equal parts + // so modify the value to the 2/3x to be compatible with function `emitToCollected` val := rightTrail.Value * 2 / 3 leftTrail.Value = val overlapped[0].Value = val @@ -162,7 +173,7 @@ func (f *SplitHelper) overlapped(k Span, result *[]Valued) { f.inner.AscendGreaterOrEqual(Valued{Key: first}, func(item btree.Item) bool { r := item.(Valued) - if !Overlaps(r.Key, k) { + if !checkOverlaps(r.Key, k) { return false } *result = append(*result, r) @@ -170,10 +181,11 @@ func (f *SplitHelper) overlapped(k Span, result *[]Valued) { }) } -// Overlaps checks whether two spans have overlapped part. -func Overlaps(a, append Span) bool { +// checkOverlaps checks whether two spans have overlapped part. +// `ap` should be a finite range +func checkOverlaps(a, ap Span) bool { if len(a.EndKey) == 0 { - return bytes.Compare(append.EndKey, a.StartKey) > 0 + return bytes.Compare(ap.EndKey, a.StartKey) > 0 } - return bytes.Compare(a.StartKey, append.EndKey) < 0 && bytes.Compare(append.StartKey, a.EndKey) < 0 + return bytes.Compare(a.StartKey, ap.EndKey) < 0 && bytes.Compare(ap.StartKey, a.EndKey) < 0 } From d50c0f9da4a043999b8144cd5743a59c91f3b8cb Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 16 Dec 2022 15:45:49 +0800 Subject: [PATCH 06/20] fix return Signed-off-by: Leavrth --- br/pkg/restore/split.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 165165c48e3be..a440b1879ac5e 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -519,6 +519,7 @@ func (helper *LogSplitHelper) splitRegionByPoints( helper.pool.ApplyOnErrorGroup(helper.eg, func() error { return regionSplitter.Split(ctx, ranges, nil, false, func([][]byte) {}) }) + return nil, nil } log.Info("split the region", zap.Uint64("region-id", region.Region.Id), zap.Int("split-point-number", len(splitPoints))) return newRegions, nil From 9b949f4240e83eebeb5a444990efddf62b861079 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 16 Dec 2022 17:10:31 +0800 Subject: [PATCH 07/20] scatter at the end Signed-off-by: Leavrth --- br/pkg/restore/split.go | 33 ++++++++++++++----------- br/pkg/restore/split/sum_sorted_test.go | 2 +- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index a440b1879ac5e..295a84cdecb35 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -544,7 +544,7 @@ func SplitPoint( client split.SplitClient, rewriteRules *RewriteRules, splitF splitFunc, -) error { +) ([]*split.RegionInfo, error) { // common status var ( err error = nil @@ -554,7 +554,7 @@ func SplitPoint( // scatterRegions is the region array that will be scattered scatterRegions []*split.RegionInfo = make([]*split.RegionInfo, 0) - regionSplitter *RegionSplitter = NewRegionSplitter(client) + regionSplitter = NewRegionSplitter(client) ) // region traverse status var ( @@ -665,31 +665,24 @@ func SplitPoint( }) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } if len(regionValueds) > 0 { // try to split the region newRegions, err := splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } scatterRegions = append(scatterRegions, newRegions...) } - startTime := time.Now() - for _, region := range scatterRegions { - regionSplitter.waitForScatterRegion(ctx, region) - if time.Since(startTime) > split.ScatterWaitUpperInterval { - break - } - } - - return nil + return scatterRegions, nil } func (helper *LogSplitHelper) Split(ctx context.Context) error { var ectx context.Context helper.eg, ectx = errgroup.WithContext(ctx) + scatterRegions := make([]*split.RegionInfo, 0) for tableID, splitter := range helper.tableSplitter { delete(helper.tableSplitter, tableID) rewriteRule, exists := helper.rules[tableID] @@ -702,14 +695,26 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error { log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) continue } - if err := SplitPoint(ectx, newTableID, splitter, helper.client, rewriteRule, helper.splitRegionByPoints); err != nil { + newRegions, err := SplitPoint(ectx, newTableID, splitter, helper.client, rewriteRule, helper.splitRegionByPoints) + if err != nil { return errors.Trace(err) } + scatterRegions = append(scatterRegions, newRegions...) } + if err := helper.eg.Wait(); err != nil { return errors.Trace(err) } + startTime := time.Now() + regionSplitter := NewRegionSplitter(helper.client) + for _, region := range scatterRegions { + regionSplitter.waitForScatterRegion(ctx, region) + if time.Since(startTime) > split.ScatterWaitUpperInterval { + break + } + } + return nil } diff --git a/br/pkg/restore/split/sum_sorted_test.go b/br/pkg/restore/split/sum_sorted_test.go index d85774e966cc7..183af608677f1 100644 --- a/br/pkg/restore/split/sum_sorted_test.go +++ b/br/pkg/restore/split/sum_sorted_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. package split_test import ( From 6d16afd01f0664dd69ad27ca2c84c364f57504d9 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 16 Dec 2022 19:00:10 +0800 Subject: [PATCH 08/20] fix unit test Signed-off-by: Leavrth --- br/pkg/restore/split.go | 2 +- br/pkg/restore/split_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 295a84cdecb35..85fdae7f2f2e7 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -554,7 +554,7 @@ func SplitPoint( // scatterRegions is the region array that will be scattered scatterRegions []*split.RegionInfo = make([]*split.RegionInfo, 0) - regionSplitter = NewRegionSplitter(client) + regionSplitter *RegionSplitter = NewRegionSplitter(client) ) // region traverse status var ( diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 00315a2a8cb7c..cc5b760fedc27 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -764,7 +764,7 @@ func TestSplitPoint(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j")) client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a")) - err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) ([]*split.RegionInfo, error) { + _, err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) ([]*split.RegionInfo, error) { require.Equal(t, u, uint64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "f")) @@ -820,7 +820,7 @@ func TestSplitPoint2(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "o"), keyWithTablePrefix(tableID+1, "a")) firstSplit := true - err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) ([]*split.RegionInfo, error) { + _, err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) ([]*split.RegionInfo, error) { if firstSplit { require.Equal(t, u, uint64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) From 3bb5831adcf52a3526f947ccb600d22514c5b5a6 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 19 Dec 2022 16:32:42 +0800 Subject: [PATCH 09/20] split more accurately Signed-off-by: Leavrth --- br/pkg/restore/split.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 85fdae7f2f2e7..a3e2ef92d2f9d 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -494,7 +494,8 @@ func (helper *LogSplitHelper) splitRegionByPoints( length uint64 = initialLength ) for _, v := range valueds { - if length > SplitThreShold && !bytes.Equal(lastKey, v.GetStartKey()) { + // decode will discard ts behind the key, which results in the same key for consecutive ranges + if v.Value+length > SplitThreShold && !bytes.Equal(lastKey, v.GetStartKey()) { _, rawKey, _ := codec.DecodeBytes(v.GetStartKey(), nil) splitPoints = append(splitPoints, rawKey) length = 0 From 39b15453631997698f88e05220cd564c9f6a7440 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 22 Dec 2022 13:38:13 +0800 Subject: [PATCH 10/20] add unit test Signed-off-by: Leavrth --- br/pkg/restore/split_test.go | 42 ++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index cc5b760fedc27..c9a42254c3a55 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/store/pdtypes" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" @@ -936,3 +937,44 @@ func TestGetRewriteTableID(t *testing.T) { newTableID := restore.GetRewriteTableID(oldTableID, rewriteRules) require.Equal(t, tableID, newTableID) } + +type mockLogIter struct { + next int +} + +func (m *mockLogIter) TryNext(ctx context.Context) iter.IterResult[*backuppb.DataFileInfo] { + if m.next > 10000 { + return iter.Done[*backuppb.DataFileInfo]() + } + m.next += 1 + return iter.Emit(&backuppb.DataFileInfo{ + StartKey: []byte(fmt.Sprintf("a%d", m.next)), + EndKey: []byte("b"), + Length: 1024, // 1 KB + }) +} + +func TestLogFilesIterWithSplitHelper(t *testing.T) { + var tableID int64 = 76 + var oldTableID int64 = 80 + rewriteRules := &restore.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), + NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), + }, + }, + } + rewriteRulesMap := map[int64]*restore.RewriteRules{ + oldTableID: rewriteRules, + } + mockIter := &mockLogIter{} + ctx := context.Background() + logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, NewFakeSplitClient()) + next := 0 + for r := logIter.TryNext(ctx); !r.Finished; r = logIter.TryNext(ctx) { + require.NoError(t, r.Err) + next += 1 + require.Equal(t, []byte(fmt.Sprintf("a%d", next)), r.Item.StartKey) + } +} From 67e1f7b8ab36af3c8d4feaed542fb7633c0ff33b Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 27 Dec 2022 13:43:21 +0800 Subject: [PATCH 11/20] fix the boundary problem Signed-off-by: Leavrth --- br/pkg/restore/split.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index a3e2ef92d2f9d..1164c6e961092 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -490,7 +490,7 @@ func (helper *LogSplitHelper) splitRegionByPoints( ) ([]*split.RegionInfo, error) { var ( splitPoints [][]byte = make([][]byte, 0) - lastKey []byte = nil + lastKey []byte = region.Region.StartKey length uint64 = initialLength ) for _, v := range valueds { From 0416eeccfd59d6af5f8b8cf7e87efea8a087fab0 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 28 Dec 2022 10:04:18 +0800 Subject: [PATCH 12/20] commit some suggestions Signed-off-by: Leavrth --- br/pkg/restore/split.go | 6 +++--- br/pkg/restore/split/split.go | 2 +- br/pkg/restore/split_test.go | 32 ++++++++++++++++++++++++-------- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 1164c6e961092..80926a36f8113 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -528,13 +528,13 @@ func (helper *LogSplitHelper) splitRegionByPoints( // GetRewriteTableID gets rewrite table id by the rewrite rule and original table id func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { - tableKey := tablecodec.EncodeTablePrefix(tableID) + tableKey := tablecodec.GenTableRecordPrefix(tableID) rule := matchOldPrefix(tableKey, rewriteRules) - tableKey = bytes.Replace(tableKey, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1) if rule == nil { return 0 } - return tablecodec.DecodeTableID(tableKey) + + return tablecodec.DecodeTableID(rule.GetNewKeyPrefix()) } // SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index 6a5a62e11dce4..e06c8ab1c93d5 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -121,7 +121,7 @@ func PaginateScanRegion( return regions, err } -// CheckPartRegionConsistency allows only the first half of regions +// CheckPartRegionConsistency only checks the continuity of regions and the first region consistency. func CheckPartRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { // current pd can't guarantee the consistency of returned regions if len(regions) == 0 { diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index c9a42254c3a55..2f3b74debff5c 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -925,17 +925,33 @@ func (*fakeSplitClient) SetStoresLabel(ctx context.Context, stores []uint64, lab func TestGetRewriteTableID(t *testing.T) { var tableID int64 = 76 var oldTableID int64 = 80 - rewriteRules := &restore.RewriteRules{ - Data: []*import_sstpb.RewriteRule{ - { - OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), - NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), + { + rewriteRules := &restore.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), + NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), + }, }, - }, + } + + newTableID := restore.GetRewriteTableID(oldTableID, rewriteRules) + require.Equal(t, tableID, newTableID) } - newTableID := restore.GetRewriteTableID(oldTableID, rewriteRules) - require.Equal(t, tableID, newTableID) + { + rewriteRules := &restore.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.GenTableRecordPrefix(oldTableID), + NewKeyPrefix: tablecodec.GenTableRecordPrefix(tableID), + }, + }, + } + + newTableID := restore.GetRewriteTableID(oldTableID, rewriteRules) + require.Equal(t, tableID, newTableID) + } } type mockLogIter struct { From e7504a44761b6f309d3d227408c9a9a376360d3d Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 30 Dec 2022 19:15:30 +0800 Subject: [PATCH 13/20] adjust some factors Signed-off-by: Leavrth --- br/pkg/restore/split.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 80926a36f8113..a3be12149ef44 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -612,8 +612,8 @@ func SplitPoint( // scan from the range's start-key startKey = vStartKey } - // scan at most 128 regions into the region buffer - regions, err = split.ScanRegionsWithRetry(ctx, client, startKey, endKey, 128) + // scan at most 64 regions into the region buffer + regions, err = split.ScanRegionsWithRetry(ctx, client, startKey, endKey, 64) if err != nil { return false } @@ -711,7 +711,9 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error { regionSplitter := NewRegionSplitter(helper.client) for _, region := range scatterRegions { regionSplitter.waitForScatterRegion(ctx, region) - if time.Since(startTime) > split.ScatterWaitUpperInterval { + // It is too expensive to stop recovery and wait for a small number of regions + // to complete scatter, so the maximum waiting time is reduced to 1 minute. + if time.Since(startTime) > time.Minute { break } } @@ -726,7 +728,7 @@ type LogFilesIterWithSplitHelper struct { next int } -const SplitFilesBufferSize = 2048 +const SplitFilesBufferSize = 4096 func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, client split.SplitClient) LogIter { return &LogFilesIterWithSplitHelper{ From 3697c8ce5cdd7f4aa9e672c47a603409a1f90fc4 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 4 Jan 2023 11:08:30 +0800 Subject: [PATCH 14/20] split regions in parallel Signed-off-by: Leavrth --- br/pkg/restore/split.go | 121 ++++++++++++++++++++++------------- br/pkg/restore/split_test.go | 8 +-- 2 files changed, 79 insertions(+), 50 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index a3be12149ef44..0b4e164d02ff1 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -7,6 +7,7 @@ import ( "context" "strconv" "strings" + "sync" "time" "github.com/opentracing/opentracing-go" @@ -441,6 +442,7 @@ type LogSplitHelper struct { client split.SplitClient pool *utils.WorkerPool eg *errgroup.Group + regionsCh chan []*split.RegionInfo } func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient) *LogSplitHelper { @@ -448,7 +450,7 @@ func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient) tableSplitter: make(map[int64]*split.SplitHelper), rules: rules, client: client, - pool: utils.NewWorkerPool(16, "split region"), + pool: utils.NewWorkerPool(128, "split region"), eg: nil, } } @@ -479,7 +481,7 @@ func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) { }) } -type splitFunc = func(context.Context, *RegionSplitter, uint64, *split.RegionInfo, []split.Valued) ([]*split.RegionInfo, error) +type splitFunc = func(context.Context, *RegionSplitter, uint64, *split.RegionInfo, []split.Valued) error func (helper *LogSplitHelper) splitRegionByPoints( ctx context.Context, @@ -487,7 +489,7 @@ func (helper *LogSplitHelper) splitRegionByPoints( initialLength uint64, region *split.RegionInfo, valueds []split.Valued, -) ([]*split.RegionInfo, error) { +) error { var ( splitPoints [][]byte = make([][]byte, 0) lastKey []byte = region.Region.StartKey @@ -505,25 +507,32 @@ func (helper *LogSplitHelper) splitRegionByPoints( } if len(splitPoints) == 0 { - return nil, nil + return nil } - newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints) - if errSplit != nil { - log.Warn("failed to split regions by the scaned region, retry...") - _, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil) - ranges := make([]rtree.Range, 0, len(splitPoints)) - for _, point := range splitPoints { - ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point}) - startKey = point - } - helper.pool.ApplyOnErrorGroup(helper.eg, func() error { + helper.pool.ApplyOnErrorGroup(helper.eg, func() error { + newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints) + if errSplit != nil { + log.Warn("failed to split the scaned region", zap.Error(errSplit)) + _, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil) + ranges := make([]rtree.Range, 0, len(splitPoints)) + for _, point := range splitPoints { + ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point}) + startKey = point + } + return regionSplitter.Split(ctx, ranges, nil, false, func([][]byte) {}) - }) - return nil, nil - } - log.Info("split the region", zap.Uint64("region-id", region.Region.Id), zap.Int("split-point-number", len(splitPoints))) - return newRegions, nil + } + select { + case <-ctx.Done(): + return nil + case helper.regionsCh <- newRegions: + + } + log.Info("split the region", zap.Uint64("region-id", region.Region.Id), zap.Int("split-point-number", len(splitPoints))) + return nil + }) + return nil } // GetRewriteTableID gets rewrite table id by the rewrite rule and original table id @@ -545,7 +554,7 @@ func SplitPoint( client split.SplitClient, rewriteRules *RewriteRules, splitF splitFunc, -) ([]*split.RegionInfo, error) { +) error { // common status var ( err error = nil @@ -553,9 +562,7 @@ func SplitPoint( vEndKey []byte = nil endKey []byte = codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(tableID+1)) - // scatterRegions is the region array that will be scattered - scatterRegions []*split.RegionInfo = make([]*split.RegionInfo, 0) - regionSplitter *RegionSplitter = NewRegionSplitter(client) + regionSplitter *RegionSplitter = NewRegionSplitter(client) ) // region traverse status var ( @@ -634,12 +641,10 @@ func SplitPoint( regionValueds = append(regionValueds, split.NewValued(vStartKey, regionInfo.Region.EndKey, endLength)) } // try to split the region - newRegions, errSplit := splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) - if errSplit != nil { - err = errSplit + err = splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) + if err != nil { return false } - scatterRegions = append(scatterRegions, newRegions...) regionValueds = make([]split.Valued, 0) } if regionOverCount == 1 { @@ -666,24 +671,55 @@ func SplitPoint( }) if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } if len(regionValueds) > 0 { // try to split the region - newRegions, err := splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) + err = splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } - scatterRegions = append(scatterRegions, newRegions...) } - return scatterRegions, nil + return nil } func (helper *LogSplitHelper) Split(ctx context.Context) error { var ectx context.Context + var wg sync.WaitGroup helper.eg, ectx = errgroup.WithContext(ctx) - scatterRegions := make([]*split.RegionInfo, 0) + helper.regionsCh = make(chan []*split.RegionInfo, 1024) + wg.Add(1) + go func() { + defer wg.Done() + scatterRegions := make([]*split.RegionInfo, 0) + receiveNewRegions: + for { + select { + case <-ectx.Done(): + return + case newRegions, ok := <-helper.regionsCh: + if !ok { + break receiveNewRegions + } + + scatterRegions = append(scatterRegions, newRegions...) + } + } + + startTime := time.Now() + regionSplitter := NewRegionSplitter(helper.client) + for _, region := range scatterRegions { + regionSplitter.waitForScatterRegion(ctx, region) + // It is too expensive to stop recovery and wait for a small number of regions + // to complete scatter, so the maximum waiting time is reduced to 1 minute. + if time.Since(startTime) > time.Minute { + break + } + } + + }() + for tableID, splitter := range helper.tableSplitter { delete(helper.tableSplitter, tableID) rewriteRule, exists := helper.rules[tableID] @@ -696,27 +732,20 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error { log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) continue } - newRegions, err := SplitPoint(ectx, newTableID, splitter, helper.client, rewriteRule, helper.splitRegionByPoints) - if err != nil { + if err := SplitPoint(ectx, newTableID, splitter, helper.client, rewriteRule, helper.splitRegionByPoints); err != nil { return errors.Trace(err) } - scatterRegions = append(scatterRegions, newRegions...) + } + // wait for completion of splitting regions if err := helper.eg.Wait(); err != nil { return errors.Trace(err) } - startTime := time.Now() - regionSplitter := NewRegionSplitter(helper.client) - for _, region := range scatterRegions { - regionSplitter.waitForScatterRegion(ctx, region) - // It is too expensive to stop recovery and wait for a small number of regions - // to complete scatter, so the maximum waiting time is reduced to 1 minute. - if time.Since(startTime) > time.Minute { - break - } - } + // wait for completion of scattering regions + close(helper.regionsCh) + wg.Wait() return nil } diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 2f3b74debff5c..d3f37de250fae 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -765,7 +765,7 @@ func TestSplitPoint(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j")) client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a")) - _, err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) ([]*split.RegionInfo, error) { + err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) error { require.Equal(t, u, uint64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "f")) @@ -774,7 +774,7 @@ func TestSplitPoint(t *testing.T) { require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "d")) require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "e")) require.Equal(t, len(v), 2) - return nil, nil + return nil }) require.NoError(t, err) } @@ -821,7 +821,7 @@ func TestSplitPoint2(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "o"), keyWithTablePrefix(tableID+1, "a")) firstSplit := true - _, err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) ([]*split.RegionInfo, error) { + err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) error { if firstSplit { require.Equal(t, u, uint64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) @@ -846,7 +846,7 @@ func TestSplitPoint2(t *testing.T) { require.Equal(t, v[1].Value, uint64(100)) require.Equal(t, len(v), 2) } - return nil, nil + return nil }) require.NoError(t, err) } From 4159c1c29f5210de652738430257f20cf5665dd7 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 4 Jan 2023 19:21:46 +0800 Subject: [PATCH 15/20] split order by new table id Signed-off-by: Leavrth --- br/pkg/restore/split.go | 101 +++++++++++++++++++++++++---------- br/pkg/restore/split_test.go | 6 ++- 2 files changed, 78 insertions(+), 29 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 0b4e164d02ff1..f075799c3f161 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -5,6 +5,7 @@ package restore import ( "bytes" "context" + "sort" "strconv" "strings" "sync" @@ -434,6 +435,39 @@ func replacePrefix(s []byte, rewriteRules *RewriteRules) ([]byte, *sst.RewriteRu return s, nil } +type rewriteSplitter struct { + rewriteKey []byte + tableID int64 + rule *RewriteRules + splitter *split.SplitHelper +} + +type splitHelperIterator struct { + tableSplitters []*rewriteSplitter +} + +func (iter *splitHelperIterator) Traverse(fn func(v split.Valued, endKey []byte, rule *RewriteRules) bool) { + for _, entry := range iter.tableSplitters { + endKey := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(entry.tableID+1)) + rule := entry.rule + entry.splitter.Traverse(func(v split.Valued) bool { + return fn(v, endKey, rule) + }) + } +} + +func NewSplitHelperIteratorForTest(helper *split.SplitHelper, tableID int64, rule *RewriteRules) *splitHelperIterator { + return &splitHelperIterator{ + tableSplitters: []*rewriteSplitter{ + { + tableID: tableID, + rule: rule, + splitter: helper, + }, + }, + } +} + const SplitThreShold = 128 * 1024 * 1024 // 128 MB type LogSplitHelper struct { @@ -455,6 +489,35 @@ func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient) } } +func (helper *LogSplitHelper) iterator() *splitHelperIterator { + tableSplitters := make([]*rewriteSplitter, 0, len(helper.tableSplitter)) + for tableID, splitter := range helper.tableSplitter { + delete(helper.tableSplitter, tableID) + rewriteRule, exists := helper.rules[tableID] + if !exists { + log.Info("skip splitting due to no table id matched", zap.Int64("tableID", tableID)) + continue + } + newTableID := GetRewriteTableID(tableID, rewriteRule) + if newTableID == 0 { + log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) + continue + } + tableSplitters = append(tableSplitters, &rewriteSplitter{ + rewriteKey: codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(newTableID)), + tableID: newTableID, + rule: rewriteRule, + splitter: splitter, + }) + } + sort.Slice(tableSplitters, func(i, j int) bool { + return bytes.Compare(tableSplitters[i].rewriteKey, tableSplitters[j].rewriteKey) < 0 + }) + return &splitHelperIterator{ + tableSplitters: tableSplitters, + } +} + const splitFileThreshold = 1024 * 1024 // 1 MB func (helper *LogSplitHelper) skipFile(file *backuppb.DataFileInfo) bool { @@ -549,19 +612,12 @@ func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { // SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region func SplitPoint( ctx context.Context, - tableID int64, - splitHelper *split.SplitHelper, + iter *splitHelperIterator, client split.SplitClient, - rewriteRules *RewriteRules, splitF splitFunc, -) error { +) (err error) { // common status var ( - err error = nil - vStartKey []byte = nil - vEndKey []byte = nil - endKey []byte = codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(tableID+1)) - regionSplitter *RegionSplitter = NewRegionSplitter(client) ) // region traverse status @@ -588,12 +644,16 @@ func SplitPoint( regionOverCount uint64 = 0 ) - splitHelper.Traverse(func(v split.Valued) bool { + iter.Traverse(func(v split.Valued, endKey []byte, rule *RewriteRules) bool { if v.Value == 0 { return true } + var ( + vStartKey []byte = nil + vEndKey []byte = nil + ) // use `vStartKey` and `vEndKey` to compare with region's key - vStartKey, vEndKey, err = GetRewriteEncodedKeys(v, rewriteRules) + vStartKey, vEndKey, err = GetRewriteEncodedKeys(v, rule) if err != nil { return false } @@ -720,22 +780,9 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error { }() - for tableID, splitter := range helper.tableSplitter { - delete(helper.tableSplitter, tableID) - rewriteRule, exists := helper.rules[tableID] - if !exists { - log.Info("skip splitting due to no table id matched", zap.Int64("tableID", tableID)) - continue - } - newTableID := GetRewriteTableID(tableID, rewriteRule) - if newTableID == 0 { - log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) - continue - } - if err := SplitPoint(ectx, newTableID, splitter, helper.client, rewriteRule, helper.splitRegionByPoints); err != nil { - return errors.Trace(err) - } - + iter := helper.iterator() + if err := SplitPoint(ectx, iter, helper.client, helper.splitRegionByPoints); err != nil { + return errors.Trace(err) } // wait for completion of splitting regions diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index d3f37de250fae..9bb1c4b7cd9ac 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -765,7 +765,8 @@ func TestSplitPoint(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j")) client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a")) - err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) error { + iter := restore.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) + err := restore.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) error { require.Equal(t, u, uint64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "f")) @@ -821,7 +822,8 @@ func TestSplitPoint2(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "o"), keyWithTablePrefix(tableID+1, "a")) firstSplit := true - err := restore.SplitPoint(ctx, tableID, splitHelper, client, rewriteRules, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) error { + iter := restore.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) + err := restore.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) error { if firstSplit { require.Equal(t, u, uint64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) From 5cf2314d854eba12de3e56d0c97807b13629129b Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 6 Jan 2023 15:08:08 +0800 Subject: [PATCH 16/20] load split threshold from the tikv config Signed-off-by: Leavrth --- br/pkg/restore/client.go | 11 ++++- br/pkg/restore/split.go | 42 ++++++++++++------ br/pkg/restore/split/sum_sorted.go | 29 +++++++++---- br/pkg/restore/split/sum_sorted_test.go | 7 ++- br/pkg/restore/split_test.go | 31 +++++++------ br/pkg/task/stream.go | 5 ++- br/pkg/utils/db.go | 58 +++++++++++++++++++++++++ 7 files changed, 143 insertions(+), 40 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index adaebd5f13819..f6bc5d50b9b46 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -53,6 +53,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/mathutil" + "github.com/pingcap/tidb/util/sqlexec" filter "github.com/pingcap/tidb/util/table-filter" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -1126,9 +1127,15 @@ func (rc *Client) SplitRanges(ctx context.Context, return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv) } -func (rc *Client) WrapLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules) LogIter { +func (rc *Client) WrapLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, g glue.Glue, store kv.Storage) (LogIter, error) { + se, err := g.CreateSession(store) + if err != nil { + return nil, errors.Trace(err) + } + execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) + splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx) client := split.NewSplitClient(rc.GetPDClient(), rc.GetTLSConfig(), false) - return NewLogFilesIterWithSplitHelper(iter, rules, client) + return NewLogFilesIterWithSplitHelper(iter, rules, client, splitSize, splitKeys), nil } // RestoreSSTFiles tries to restore the files. diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index f075799c3f161..3e70a33733ebc 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -468,8 +468,6 @@ func NewSplitHelperIteratorForTest(helper *split.SplitHelper, tableID int64, rul } } -const SplitThreShold = 128 * 1024 * 1024 // 128 MB - type LogSplitHelper struct { tableSplitter map[int64]*split.SplitHelper rules map[int64]*RewriteRules @@ -477,15 +475,21 @@ type LogSplitHelper struct { pool *utils.WorkerPool eg *errgroup.Group regionsCh chan []*split.RegionInfo + + splitThreSholdSize uint64 + splitThreSholdKeys int64 } -func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient) *LogSplitHelper { +func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) *LogSplitHelper { return &LogSplitHelper{ tableSplitter: make(map[int64]*split.SplitHelper), rules: rules, client: client, pool: utils.NewWorkerPool(128, "split region"), eg: nil, + + splitThreSholdSize: splitSize, + splitThreSholdKeys: splitKeys, } } @@ -540,16 +544,20 @@ func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) { StartKey: file.StartKey, EndKey: file.EndKey, }, - Value: file.Length, + Value: split.Value{ + Size: file.Length, + Number: file.NumberOfEntries, + }, }) } -type splitFunc = func(context.Context, *RegionSplitter, uint64, *split.RegionInfo, []split.Valued) error +type splitFunc = func(context.Context, *RegionSplitter, uint64, int64, *split.RegionInfo, []split.Valued) error func (helper *LogSplitHelper) splitRegionByPoints( ctx context.Context, regionSplitter *RegionSplitter, initialLength uint64, + initialNumber int64, region *split.RegionInfo, valueds []split.Valued, ) error { @@ -557,16 +565,19 @@ func (helper *LogSplitHelper) splitRegionByPoints( splitPoints [][]byte = make([][]byte, 0) lastKey []byte = region.Region.StartKey length uint64 = initialLength + number int64 = initialNumber ) for _, v := range valueds { // decode will discard ts behind the key, which results in the same key for consecutive ranges - if v.Value+length > SplitThreShold && !bytes.Equal(lastKey, v.GetStartKey()) { + if !bytes.Equal(lastKey, v.GetStartKey()) && (v.Value.Size+length > helper.splitThreSholdSize || v.Value.Number+number > helper.splitThreSholdKeys) { _, rawKey, _ := codec.DecodeBytes(v.GetStartKey(), nil) splitPoints = append(splitPoints, rawKey) length = 0 + number = 0 } lastKey = v.GetStartKey() - length += v.Value + length += v.Value.Size + number += v.Value.Number } if len(splitPoints) == 0 { @@ -637,6 +648,7 @@ func SplitPoint( regionInfo *split.RegionInfo = nil // intialLength is the length of the part of the first range overlapped with the region initialLength uint64 = 0 + initialNumber int64 = 0 ) // range status var ( @@ -645,7 +657,7 @@ func SplitPoint( ) iter.Traverse(func(v split.Valued, endKey []byte, rule *RewriteRules) bool { - if v.Value == 0 { + if v.Value.Number == 0 || v.Value.Size == 0 { return true } var ( @@ -694,14 +706,15 @@ func SplitPoint( // should split the last recorded region, // and then record this region as the region to be split if bytes.Compare(vEndKey, region.Region.EndKey) < 0 { - endLength := v.Value / regionOverCount + endLength := v.Value.Size / regionOverCount + endNumber := v.Value.Number / int64(regionOverCount) if len(regionValueds) > 0 && regionInfo != region { // add a part of the range as the end part if bytes.Compare(vStartKey, regionInfo.Region.EndKey) < 0 { - regionValueds = append(regionValueds, split.NewValued(vStartKey, regionInfo.Region.EndKey, endLength)) + regionValueds = append(regionValueds, split.NewValued(vStartKey, regionInfo.Region.EndKey, split.Value{Size: endLength, Number: endNumber})) } // try to split the region - err = splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) + err = splitF(ctx, regionSplitter, initialLength, initialNumber, regionInfo, regionValueds) if err != nil { return false } @@ -719,6 +732,7 @@ func SplitPoint( } else { // the region is overlapped with the last part of the range initialLength = endLength + initialNumber = endNumber } regionInfo = region // try the next range @@ -735,7 +749,7 @@ func SplitPoint( } if len(regionValueds) > 0 { // try to split the region - err = splitF(ctx, regionSplitter, initialLength, regionInfo, regionValueds) + err = splitF(ctx, regionSplitter, initialLength, initialNumber, regionInfo, regionValueds) if err != nil { return errors.Trace(err) } @@ -806,10 +820,10 @@ type LogFilesIterWithSplitHelper struct { const SplitFilesBufferSize = 4096 -func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, client split.SplitClient) LogIter { +func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) LogIter { return &LogFilesIterWithSplitHelper{ iter: iter, - helper: NewLogSplitHelper(rules, client), + helper: NewLogSplitHelper(rules, client, splitSize, splitKeys), buffer: nil, next: 0, } diff --git a/br/pkg/restore/split/sum_sorted.go b/br/pkg/restore/split/sum_sorted.go index ad2865d0c096c..c4e9657900e35 100644 --- a/br/pkg/restore/split/sum_sorted.go +++ b/br/pkg/restore/split/sum_sorted.go @@ -12,11 +12,17 @@ import ( ) // Value is the value type of stored in the span tree. -type Value = uint64 +type Value struct { + Size uint64 + Number int64 +} // join finds the upper bound of two values. func join(a, b Value) Value { - return a + b + return Value{ + Size: a.Size + b.Size, + Number: a.Number + b.Number, + } } // Span is the type of an adjacent sub key space. @@ -39,7 +45,7 @@ func NewValued(startKey, endKey []byte, value Value) Valued { } func (v Valued) String() string { - return fmt.Sprintf("(%s, %.2f MB)", logutil.StringifyRange(v.Key), float64(v.Value)/1024/1024) + return fmt.Sprintf("(%s, %.2f MB, %d)", logutil.StringifyRange(v.Key), float64(v.Value.Size)/1024/1024, v.Value.Number) } func (v Valued) Less(other btree.Item) bool { @@ -64,7 +70,7 @@ type SplitHelper struct { // NewSplitHelper creates a set of a subset of spans, with the full key space as initial status func NewSplitHelper() *SplitHelper { t := btree.New(16) - t.ReplaceOrInsert(Valued{Value: 0, Key: Span{StartKey: []byte(""), EndKey: []byte("")}}) + t.ReplaceOrInsert(Valued{Value: Value{Size: 0, Number: 0}, Key: Span{StartKey: []byte(""), EndKey: []byte("")}}) return &SplitHelper{inner: t} } @@ -97,7 +103,10 @@ func (f *SplitHelper) mergeWithOverlap(val Valued, overlapped []Valued) { // Assert All overlapped ranges are deleted. // the new valued item's Value is equally dividedd into `len(overlapped)` shares - appendSize := val.Value / uint64(len(overlapped)) + appendValue := Value{ + Size: val.Value.Size / uint64(len(overlapped)), + Number: val.Value.Number / int64(len(overlapped)), + } var ( rightTrail *Valued leftTrail *Valued @@ -109,10 +118,11 @@ func (f *SplitHelper) mergeWithOverlap(val Valued, overlapped []Valued) { emitToCollected = func(rng Valued, standalone bool, split bool) { merged := rng.Value if split { - merged /= 2 + merged.Size /= 2 + merged.Number /= 2 } if !standalone { - merged = join(appendSize, merged) + merged = join(appendValue, merged) } rng.Value = merged f.inner.ReplaceOrInsert(rng) @@ -142,7 +152,10 @@ func (f *SplitHelper) mergeWithOverlap(val Valued, overlapped []Valued) { // a b c d // now the overlapped range should be divided into 3 equal parts // so modify the value to the 2/3x to be compatible with function `emitToCollected` - val := rightTrail.Value * 2 / 3 + val := Value{ + Size: rightTrail.Value.Size * 2 / 3, + Number: rightTrail.Value.Number * 2 / 3, + } leftTrail.Value = val overlapped[0].Value = val rightTrail.Value = val diff --git a/br/pkg/restore/split/sum_sorted_test.go b/br/pkg/restore/split/sum_sorted_test.go index 183af608677f1..3a3b3db6d90eb 100644 --- a/br/pkg/restore/split/sum_sorted_test.go +++ b/br/pkg/restore/split/sum_sorted_test.go @@ -18,8 +18,11 @@ func v(s, e string, val split.Value) split.Valued { } } -func mb(b uint64) uint64 { - return b * 1024 * 1024 +func mb(b uint64) split.Value { + return split.Value{ + Size: b * 1024 * 1024, + Number: int64(b), + } } func TestSumSorted(t *testing.T) { diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 9bb1c4b7cd9ac..1d6b8a4c6e849 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -756,9 +756,9 @@ func TestSplitPoint(t *testing.T) { // +-------------+----------+---------+ // region: a f h j splitHelper := split.NewSplitHelper() - splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: 100}) - splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: 200}) - splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "g"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: 300}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: split.Value{Size: 100, Number: 100}}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: split.Value{Size: 200, Number: 200}}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "g"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}}) client := NewFakeSplitClient() client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "f")) client.AppendRegion(keyWithTablePrefix(tableID, "f"), keyWithTablePrefix(tableID, "h")) @@ -766,8 +766,9 @@ func TestSplitPoint(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a")) iter := restore.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) - err := restore.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) error { + err := restore.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []split.Valued) error { require.Equal(t, u, uint64(0)) + require.Equal(t, o, int64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "f")) require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "b")) @@ -805,11 +806,11 @@ func TestSplitPoint2(t *testing.T) { // +---------------+--+.....+----+------------+---------+ // region: a g >128 h m o splitHelper := split.NewSplitHelper() - splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: 100}) - splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: 200}) - splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "f"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: 300}) - splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "j"), EndKey: keyWithTablePrefix(oldTableID, "k")}, Value: 200}) - splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "l"), EndKey: keyWithTablePrefix(oldTableID, "n")}, Value: 200}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: split.Value{Size: 100, Number: 100}}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: split.Value{Size: 200, Number: 200}}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "f"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "j"), EndKey: keyWithTablePrefix(oldTableID, "k")}, Value: split.Value{Size: 200, Number: 200}}) + splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "l"), EndKey: keyWithTablePrefix(oldTableID, "n")}, Value: split.Value{Size: 200, Number: 200}}) client := NewFakeSplitClient() client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "g")) client.AppendRegion(keyWithTablePrefix(tableID, "g"), keyWithTablePrefix(tableID, getCharFromNumber("g", 0))) @@ -823,9 +824,10 @@ func TestSplitPoint2(t *testing.T) { firstSplit := true iter := restore.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) - err := restore.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, ri *split.RegionInfo, v []split.Valued) error { + err := restore.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []split.Valued) error { if firstSplit { require.Equal(t, u, uint64(0)) + require.Equal(t, o, int64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "g")) require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "b")) @@ -834,18 +836,21 @@ func TestSplitPoint2(t *testing.T) { require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "e")) require.EqualValues(t, v[2].Key.StartKey, keyWithTablePrefix(tableID, "f")) require.EqualValues(t, v[2].Key.EndKey, keyWithTablePrefix(tableID, "g")) - require.Equal(t, v[2].Value, uint64(1)) + require.Equal(t, v[2].Value.Size, uint64(1)) + require.Equal(t, v[2].Value.Number, int64(1)) require.Equal(t, len(v), 3) firstSplit = false } else { require.Equal(t, u, uint64(1)) + require.Equal(t, o, int64(1)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "h")) require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "m")) require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "j")) require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "k")) require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "l")) require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "m")) - require.Equal(t, v[1].Value, uint64(100)) + require.Equal(t, v[1].Value.Size, uint64(100)) + require.Equal(t, v[1].Value.Number, int64(100)) require.Equal(t, len(v), 2) } return nil @@ -988,7 +993,7 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) { } mockIter := &mockLogIter{} ctx := context.Background() - logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, NewFakeSplitClient()) + logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, NewFakeSplitClient(), 144*1024*1024, 1440000) next := 0 for r := logIter.TryNext(ctx); !r.Finished; r = logIter.TryNext(ctx) { require.NoError(t, r.Err) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 199494e30cbf5..91538b1602b33 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1240,7 +1240,10 @@ func restoreStream( if err != nil { return errors.Trace(err) } - logFilesIterWithSplit := client.WrapLogFilesIterWithSplitHelper(logFilesIter, rewriteRules) + logFilesIterWithSplit, err := client.WrapLogFilesIterWithSplitHelper(logFilesIter, rewriteRules, g, mgr.GetStorage()) + if err != nil { + return errors.Trace(err) + } pd := g.StartProgress(ctx, "Restore KV Files", int64(dataFileCount), !cfg.LogProgress) err = withProgress(pd, func(p glue.Progress) error { return client.RestoreKVFiles(ctx, rewriteRules, logFilesIterWithSplit, cfg.PitrBatchCount, cfg.PitrBatchSize, updateStats, p.IncBy) diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 9574c06670573..231d54c2c4daa 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -8,8 +8,10 @@ import ( "strings" "sync" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/sqlexec" @@ -99,6 +101,62 @@ func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { return true, nil } +func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64) { + return getSplitSize(ctx), getSplitKeys(ctx) +} + +func getSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64 { + const defaultSplitSize = 96 * 1024 * 1024 + varStr := "show config where name = 'coprocessor.region-split-size' and type = 'tikv'" + rows, fields, err := ctx.ExecRestrictedSQL( + kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR), + nil, + varStr, + ) + if err != nil { + log.Warn("failed to get split size, use default value", logutil.ShortError(err)) + return defaultSplitSize + } + if len(rows) == 0 { + // use the default value + return defaultSplitSize + } + + d := rows[0].GetDatum(3, &fields[3].Column.FieldType) + splitSizeStr, err := d.ToString() + if err != nil { + log.Warn("failed to get split size, use default value", logutil.ShortError(err)) + return defaultSplitSize + } + splitSize, err := units.FromHumanSize(splitSizeStr) + if err != nil { + log.Warn("failed to get split size, use default value", logutil.ShortError(err)) + return defaultSplitSize + } + return uint64(splitSize) +} + +func getSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64 { + const defaultSplitKeys = 960000 + varStr := "show config where name = 'coprocessor.region-split-keys' and type = 'tikv'" + rows, fields, err := ctx.ExecRestrictedSQL( + kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR), + nil, + varStr, + ) + if err != nil { + log.Warn("failed to get split keys, use default value", logutil.ShortError(err)) + return defaultSplitKeys + } + if len(rows) == 0 { + // use the default value + return defaultSplitKeys + } + + d := rows[0].GetDatum(3, &fields[3].Column.FieldType) + return d.GetInt64() +} + func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error) { valStr := "show config where name = 'gc.ratio-threshold' and type = 'tikv'" rows, fields, errSQL := ctx.ExecRestrictedSQL( From e6d21f23e1b3b7baf62f408caa4f5cfe57c8c909 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 6 Jan 2023 16:55:35 +0800 Subject: [PATCH 17/20] fix error when parse the split keys Signed-off-by: Leavrth --- br/pkg/restore/client.go | 1 + br/pkg/utils/db.go | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index f6bc5d50b9b46..9c4778d00886b 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1134,6 +1134,7 @@ func (rc *Client) WrapLogFilesIterWithSplitHelper(iter LogIter, rules map[int64] } execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx) + log.Info("get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys)) client := split.NewSplitClient(rc.GetPDClient(), rc.GetTLSConfig(), false) return NewLogFilesIterWithSplitHelper(iter, rules, client, splitSize, splitKeys), nil } diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 231d54c2c4daa..8bf781036e6c0 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -5,6 +5,7 @@ package utils import ( "context" "database/sql" + "strconv" "strings" "sync" @@ -154,7 +155,17 @@ func getSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64 { } d := rows[0].GetDatum(3, &fields[3].Column.FieldType) - return d.GetInt64() + splitKeysStr, err := d.ToString() + if err != nil { + log.Warn("failed to get split keys, use default value", logutil.ShortError(err)) + return defaultSplitKeys + } + splitKeys, err := strconv.ParseInt(splitKeysStr, 10, 64) + if err != nil { + log.Warn("failed to get split keys, use default value", logutil.ShortError(err)) + return defaultSplitKeys + } + return splitKeys } func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error) { From 27bc7401601ca4bd8c8892ca0bd353097eaedac2 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 17 Jan 2023 13:59:48 +0800 Subject: [PATCH 18/20] fix lint Signed-off-by: Leavrth --- br/pkg/restore/BUILD.bazel | 1 + br/pkg/restore/split.go | 6 ++---- br/pkg/utils/BUILD.bazel | 1 + 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 772ec438976d7..87b4a1b251e4e 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -61,6 +61,7 @@ go_library( "//util/collate", "//util/hack", "//util/mathutil", + "//util/sqlexec", "//util/table-filter", "@com_github_emirpasic_gods//maps/treemap", "@com_github_go_sql_driver_mysql//:mysql", diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 3e70a33733ebc..17e04486587b9 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -601,7 +601,6 @@ func (helper *LogSplitHelper) splitRegionByPoints( case <-ctx.Done(): return nil case helper.regionsCh <- newRegions: - } log.Info("split the region", zap.Uint64("region-id", region.Region.Id), zap.Int("split-point-number", len(splitPoints))) return nil @@ -661,8 +660,8 @@ func SplitPoint( return true } var ( - vStartKey []byte = nil - vEndKey []byte = nil + vStartKey []byte + vEndKey []byte ) // use `vStartKey` and `vEndKey` to compare with region's key vStartKey, vEndKey, err = GetRewriteEncodedKeys(v, rule) @@ -791,7 +790,6 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error { break } } - }() iter := helper.iterator() diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index c3bcc629183d5..1cad8d5628dee 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//util", "//util/sqlexec", "@com_github_cheggaaa_pb_v3//:pb", + "@com_github_docker_go_units//:go-units", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", From c0179b3a4097c7f3537a057a56d35231f21a64c1 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 19 Jan 2023 10:55:57 +0800 Subject: [PATCH 19/20] add unit test Signed-off-by: Leavrth --- br/pkg/restore/split_test.go | 44 ++++++++++++++++++++++++++++++++++++ br/pkg/utils/db.go | 6 ++--- br/pkg/utils/db_test.go | 41 +++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 1d6b8a4c6e849..6f2f3dfd9fb1a 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -1001,3 +1001,47 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) { require.Equal(t, []byte(fmt.Sprintf("a%d", next)), r.Item.StartKey) } } + +func regionInfo(startKey, endKey string) *split.RegionInfo { + return &split.RegionInfo{ + Region: &metapb.Region{ + StartKey: []byte(startKey), + EndKey: []byte(endKey), + }, + } +} + +func TestSplitCheckPartRegionConsistency(t *testing.T) { + var ( + startKey []byte = []byte("a") + endKey []byte = []byte("f") + err error = nil + ) + err = split.CheckPartRegionConsistency(startKey, endKey, nil) + require.Error(t, err) + err = split.CheckPartRegionConsistency(startKey, endKey, []*split.RegionInfo{ + regionInfo("b", "c"), + }) + require.Error(t, err) + err = split.CheckPartRegionConsistency(startKey, endKey, []*split.RegionInfo{ + regionInfo("a", "c"), + regionInfo("d", "e"), + }) + require.Error(t, err) + err = split.CheckPartRegionConsistency(startKey, endKey, []*split.RegionInfo{ + regionInfo("a", "c"), + regionInfo("c", "d"), + }) + require.NoError(t, err) + err = split.CheckPartRegionConsistency(startKey, endKey, []*split.RegionInfo{ + regionInfo("a", "c"), + regionInfo("c", "d"), + regionInfo("d", "f"), + }) + require.NoError(t, err) + err = split.CheckPartRegionConsistency(startKey, endKey, []*split.RegionInfo{ + regionInfo("a", "c"), + regionInfo("c", "z"), + }) + require.NoError(t, err) +} diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 8bf781036e6c0..060df603d16cb 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -103,10 +103,10 @@ func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { } func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64) { - return getSplitSize(ctx), getSplitKeys(ctx) + return GetSplitSize(ctx), GetSplitKeys(ctx) } -func getSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64 { +func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64 { const defaultSplitSize = 96 * 1024 * 1024 varStr := "show config where name = 'coprocessor.region-split-size' and type = 'tikv'" rows, fields, err := ctx.ExecRestrictedSQL( @@ -137,7 +137,7 @@ func getSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64 { return uint64(splitSize) } -func getSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64 { +func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64 { const defaultSplitKeys = 960000 varStr := "show config where name = 'coprocessor.region-split-keys' and type = 'tikv'" rows, fields, err := ctx.ExecRestrictedSQL( diff --git a/br/pkg/utils/db_test.go b/br/pkg/utils/db_test.go index 1334d868641f0..1004764b0d206 100644 --- a/br/pkg/utils/db_test.go +++ b/br/pkg/utils/db_test.go @@ -168,3 +168,44 @@ func TestGc(t *testing.T) { require.Nil(t, err) require.Equal(t, ratio, "-1.0") } + +func TestRegionSplitInfo(t *testing.T) { + // config format: + // MySQL [(none)]> show config where name = 'coprocessor.region-split-size'; + // +------+-------------------+-------------------------------+-------+ + // | Type | Instance | Name | Value | + // +------+-------------------+-------------------------------+-------+ + // | tikv | 127.0.0.1:20161 | coprocessor.region-split-size | 10MB | + // +------+-------------------+-------------------------------+-------+ + // MySQL [(none)]> show config where name = 'coprocessor.region-split-keys'; + // +------+-------------------+-------------------------------+--------+ + // | Type | Instance | Name | Value | + // +------+-------------------+-------------------------------+--------+ + // | tikv | 127.0.0.1:20161 | coprocessor.region-split-keys | 100000 | + // +------+-------------------+-------------------------------+--------+ + + fields := make([]*ast.ResultField, 4) + tps := []*types.FieldType{ + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + } + for i := 0; i < len(tps); i++ { + rf := new(ast.ResultField) + rf.Column = new(model.ColumnInfo) + rf.Column.FieldType = *tps[i] + fields[i] = rf + } + rows := make([]chunk.Row, 0, 1) + row := chunk.MutRowFromValues("tikv", "127.0.0.1:20161", "coprocessor.region-split-size", "10MB").ToRow() + rows = append(rows, row) + s := &mockRestrictedSQLExecutor{rows: rows, fields: fields} + require.Equal(t, utils.GetSplitSize(s), uint64(10000000)) + + rows = make([]chunk.Row, 0, 1) + row = chunk.MutRowFromValues("tikv", "127.0.0.1:20161", "coprocessor.region-split-keys", "100000").ToRow() + rows = append(rows, row) + s = &mockRestrictedSQLExecutor{rows: rows, fields: fields} + require.Equal(t, utils.GetSplitKeys(s), int64(100000)) +} From a8eb546890bbe298aa81c736a8bb303c79fac494 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 19 Jan 2023 11:22:05 +0800 Subject: [PATCH 20/20] fix lint Signed-off-by: Leavrth --- br/pkg/restore/split_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 6f2f3dfd9fb1a..1b560a4e1474d 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -1015,7 +1015,7 @@ func TestSplitCheckPartRegionConsistency(t *testing.T) { var ( startKey []byte = []byte("a") endKey []byte = []byte("f") - err error = nil + err error ) err = split.CheckPartRegionConsistency(startKey, endKey, nil) require.Error(t, err)