From 782f08c5f458ed8a21ec31667a4ef70653bcfaf9 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Wed, 4 Sep 2024 20:39:28 +0800 Subject: [PATCH] br: restore merge split ranges with small data (#55662) close pingcap/tidb#53532 --- br/pkg/restore/snap_client/BUILD.bazel | 7 +- br/pkg/restore/snap_client/batcher.go | 142 ---- br/pkg/restore/snap_client/client.go | 2 +- br/pkg/restore/snap_client/export_test.go | 8 +- br/pkg/restore/snap_client/import.go | 302 +++++---- br/pkg/restore/snap_client/import_test.go | 6 +- br/pkg/restore/snap_client/pipeline_items.go | 55 +- br/pkg/restore/snap_client/tikv_sender.go | 343 ++++++---- .../restore/snap_client/tikv_sender_test.go | 609 ++++++++++++++++++ br/pkg/restore/utils/rewrite_rule.go | 8 +- br/pkg/task/BUILD.bazel | 1 + br/pkg/task/restore.go | 7 + br/tests/br_split_region_fail/run.sh | 2 +- 13 files changed, 1054 insertions(+), 438 deletions(-) delete mode 100644 br/pkg/restore/snap_client/batcher.go diff --git a/br/pkg/restore/snap_client/BUILD.bazel b/br/pkg/restore/snap_client/BUILD.bazel index 3dbb1260c46e0..e10541b3b4d00 100644 --- a/br/pkg/restore/snap_client/BUILD.bazel +++ b/br/pkg/restore/snap_client/BUILD.bazel @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "snap_client", srcs = [ - "batcher.go", "client.go", "import.go", "pipeline_items.go", @@ -30,7 +29,6 @@ go_library( "//br/pkg/restore/internal/snap_split", "//br/pkg/restore/split", "//br/pkg/restore/utils", - "//br/pkg/rtree", "//br/pkg/storage", "//br/pkg/summary", "//br/pkg/utils", @@ -68,6 +66,7 @@ go_library( "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", ], ) @@ -85,9 +84,10 @@ go_test( ], embed = [":snap_client"], flaky = True, - shard_count = 16, + shard_count = 18, deps = [ "//br/pkg/errors", + "//br/pkg/glue", "//br/pkg/gluetidb", "//br/pkg/metautil", "//br/pkg/mock", @@ -97,6 +97,7 @@ go_test( "//br/pkg/utils", "//br/pkg/utiltest", "//pkg/domain", + "//pkg/kv", "//pkg/meta/model", "//pkg/parser/model", "//pkg/parser/mysql", diff --git a/br/pkg/restore/snap_client/batcher.go b/br/pkg/restore/snap_client/batcher.go deleted file mode 100644 index 39ba91d36a8fb..0000000000000 --- a/br/pkg/restore/snap_client/batcher.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package snapclient - -import ( - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/tidb/br/pkg/glue" - "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/rtree" - "github.com/pingcap/tidb/br/pkg/summary" -) - -// DrainResult is the collection of some ranges and theirs metadata. -type DrainResult struct { - // TablesToSend are tables that would be send at this batch. - TablesToSend []CreatedTable - // BlankTablesAfterSend are tables that will be full-restored after this batch send. - BlankTablesAfterSend []CreatedTable - // RewriteRules are the rewrite rules for the tables. - // the key is the table id after rewritten. - RewriteRulesMap map[int64]*utils.RewriteRules - Ranges []rtree.RangeStats - // Record which part of ranges belongs to the table - TableEndOffsetInRanges []int -} - -// Files returns all files of this drain result. -func (result DrainResult) Files() []TableIDWithFiles { - tableIDWithFiles := make([]TableIDWithFiles, 0, len(result.TableEndOffsetInRanges)) - var startOffset int = 0 - for i, endOffset := range result.TableEndOffsetInRanges { - tableID := result.TablesToSend[i].Table.ID - ranges := result.Ranges[startOffset:endOffset] - // each range has at least a default file + a write file - files := make([]*backuppb.File, 0, len(ranges)*2) - for _, rg := range ranges { - files = append(files, rg.Files...) - } - var rules *utils.RewriteRules - if r, ok := result.RewriteRulesMap[tableID]; ok { - rules = r - } - tableIDWithFiles = append(tableIDWithFiles, TableIDWithFiles{ - TableID: tableID, - Files: files, - RewriteRules: rules, - }) - - // update start offset - startOffset = endOffset - } - - return tableIDWithFiles -} - -func newDrainResult() DrainResult { - return DrainResult{ - TablesToSend: make([]CreatedTable, 0), - BlankTablesAfterSend: make([]CreatedTable, 0), - RewriteRulesMap: utils.EmptyRewriteRulesMap(), - Ranges: make([]rtree.RangeStats, 0), - TableEndOffsetInRanges: make([]int, 0), - } -} - -// fileterOutRanges filter out the files from `drained-range` that exists in the checkpoint set. -func filterOutRanges(checkpointSet map[string]struct{}, drained []rtree.RangeStats, updateCh glue.Progress) []rtree.RangeStats { - progress := int(0) - totalKVs := uint64(0) - totalBytes := uint64(0) - for i, rg := range drained { - newFiles := make([]*backuppb.File, 0, len(rg.Files)) - for _, f := range rg.Files { - rangeKey := getFileRangeKey(f.Name) - if _, exists := checkpointSet[rangeKey]; exists { - // the range has been import done, so skip it and - // update the summary information - progress += 1 - totalKVs += f.TotalKvs - totalBytes += f.TotalBytes - } else { - newFiles = append(newFiles, f) - } - } - // the newFiles may be empty - drained[i].Files = newFiles - } - if progress > 0 { - // (split/scatter + download/ingest) / (default cf + write cf) - updateCh.IncBy(int64(progress) * 2 / 2) - summary.CollectSuccessUnit(summary.TotalKV, progress, totalKVs) - summary.CollectSuccessUnit(summary.SkippedKVCountByCheckpoint, progress, totalKVs) - summary.CollectSuccessUnit(summary.TotalBytes, progress, totalBytes) - summary.CollectSuccessUnit(summary.SkippedBytesByCheckpoint, progress, totalBytes) - } - return drained -} - -// drainRanges 'drains' ranges from current tables. -// for example, let a '-' character be a range, assume we have: -// |---|-----|-------| -// |t1 |t2 |t3 | -// after we run drainRanges() with batchSizeThreshold = 6, let '*' be the ranges will be sent this batch : -// |***|***--|-------| -// |t1 |t2 |-------| -// -// drainRanges() will return: -// TablesToSend: [t1, t2] (so we can make them enter restore mode) -// BlankTableAfterSend: [t1] (so we can make them leave restore mode after restoring this batch) -// RewriteRules: rewrite rules for [t1, t2] (so we can restore them) -// Ranges: those stared ranges (so we can restore them) -// -// then, it will leaving the batcher's cachedTables like this: -// |--|-------| -// |t2|t3 | -// as you can see, all restored ranges would be removed. -func drainRanges( - tableWithRanges []TableWithRange, - checkpointSetWithTableID map[int64]map[string]struct{}, - updateCh glue.Progress, -) DrainResult { - result := newDrainResult() - - for offset, thisTable := range tableWithRanges { - t, exists := checkpointSetWithTableID[thisTable.Table.ID] - - result.RewriteRulesMap[thisTable.Table.ID] = thisTable.RewriteRule - result.TablesToSend = append(result.TablesToSend, thisTable.CreatedTable) - result.BlankTablesAfterSend = append(result.BlankTablesAfterSend, thisTable.CreatedTable) - // let's 'drain' the ranges of current table. This op must not make the batch full. - if exists { - result.Ranges = append(result.Ranges, filterOutRanges(t, thisTable.Range, updateCh)...) - } else { - result.Ranges = append(result.Ranges, thisTable.Range...) - } - result.TableEndOffsetInRanges = append(result.TableEndOffsetInRanges, len(result.Ranges)) - // clear the table length. - tableWithRanges[offset].Range = []rtree.RangeStats{} - } - - return result -} diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index d7c5a1d599bcb..be54507c23ed0 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -1060,7 +1060,7 @@ func (rc *SnapClient) WaitForFilesRestored(ctx context.Context, files []*backupp log.Info("import sst files done", logutil.Files(files)) updateCh.Inc() }() - return rc.fileImporter.ImportSSTFiles(ectx, []*backuppb.File{fileReplica}, restoreutils.EmptyRewriteRule(), rc.cipher, rc.backupMeta.ApiVersion) + return rc.fileImporter.ImportSSTFiles(ectx, []TableIDWithFiles{{Files: []*backuppb.File{fileReplica}, RewriteRules: restoreutils.EmptyRewriteRule()}}, rc.cipher, rc.backupMeta.ApiVersion) }) } if err := eg.Wait(); err != nil { diff --git a/br/pkg/restore/snap_client/export_test.go b/br/pkg/restore/snap_client/export_test.go index 74ddcf9ae8600..22b8868217933 100644 --- a/br/pkg/restore/snap_client/export_test.go +++ b/br/pkg/restore/snap_client/export_test.go @@ -33,9 +33,11 @@ var ( RestoreLabelKey = restoreLabelKey RestoreLabelValue = restoreLabelValue - GetSSTMetaFromFile = getSSTMetaFromFile - GetKeyRangeByMode = getKeyRangeByMode - MapTableToFiles = mapTableToFiles + GetSSTMetaFromFile = getSSTMetaFromFile + GetKeyRangeByMode = getKeyRangeByMode + MapTableToFiles = mapTableToFiles + GetFileRangeKey = getFileRangeKey + GetSortedPhysicalTables = getSortedPhysicalTables ) // MockClient create a fake Client used to test. diff --git a/br/pkg/restore/snap_client/import.go b/br/pkg/restore/snap_client/import.go index cdab5a678628a..a335f651977d7 100644 --- a/br/pkg/restore/snap_client/import.go +++ b/br/pkg/restore/snap_client/import.go @@ -42,7 +42,6 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/codec" kvutil "github.com/tikv/client-go/v2/util" - "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" @@ -275,8 +274,7 @@ func getKeyRangeByMode(mode KvMode) func(f *backuppb.File, rules *restoreutils.R // getKeyRangeForFiles gets the maximum range on files. func (importer *SnapFileImporter) getKeyRangeForFiles( - files []*backuppb.File, - rewriteRules *restoreutils.RewriteRules, + filesGroup []TableIDWithFiles, ) ([]byte, []byte, error) { var ( startKey, endKey []byte @@ -284,38 +282,35 @@ func (importer *SnapFileImporter) getKeyRangeForFiles( err error ) getRangeFn := getKeyRangeByMode(importer.kvMode) - for _, f := range files { - start, end, err = getRangeFn(f, rewriteRules) - if err != nil { - return nil, nil, errors.Trace(err) - } - if len(startKey) == 0 || bytes.Compare(start, startKey) < 0 { - startKey = start - } - if len(endKey) == 0 || bytes.Compare(endKey, end) < 0 { - endKey = end + for _, files := range filesGroup { + for _, f := range files.Files { + start, end, err = getRangeFn(f, files.RewriteRules) + if err != nil { + return nil, nil, errors.Trace(err) + } + if len(startKey) == 0 || bytes.Compare(start, startKey) < 0 { + startKey = start + } + if len(endKey) == 0 || bytes.Compare(endKey, end) < 0 { + endKey = end + } } } - log.Debug("rewrite file keys", logutil.Files(files), - logutil.Key("startKey", startKey), logutil.Key("endKey", endKey)) return startKey, endKey, nil } // ImportSSTFiles tries to import a file. -// All rules must contain encoded keys. +// Assert 1: All rewrite rules must contain raw key prefix. +// Assert 2: len(filesGroup[any].Files) > 0. func (importer *SnapFileImporter) ImportSSTFiles( ctx context.Context, - files []*backuppb.File, - rewriteRules *restoreutils.RewriteRules, + filesGroup []TableIDWithFiles, cipher *backuppb.CipherInfo, apiVersion kvrpcpb.APIVersion, ) error { - start := time.Now() - log.Debug("import file", logutil.Files(files)) - // Rewrite the start key and end key of file to scan regions - startKey, endKey, err := importer.getKeyRangeForFiles(files, rewriteRules) + startKey, endKey, err := importer.getKeyRangeForFiles(filesGroup) if err != nil { return errors.Trace(err) } @@ -328,69 +323,53 @@ func (importer *SnapFileImporter) ImportSSTFiles( return errors.Trace(errScanRegion) } - log.Debug("scan regions", logutil.Files(files), zap.Int("count", len(regionInfos))) + log.Debug("scan regions", logutil.Key("start key", startKey), logutil.Key("end key", endKey), zap.Int("count", len(regionInfos))) + start := time.Now() // Try to download and ingest the file in every region - regionLoop: for _, regionInfo := range regionInfos { info := regionInfo // Try to download file. - downloadMetas, errDownload := importer.download(ctx, info, files, rewriteRules, cipher, apiVersion) + downloadMetas, errDownload := importer.download(ctx, info, filesGroup, cipher, apiVersion) if errDownload != nil { - for _, e := range multierr.Errors(errDownload) { - switch errors.Cause(e) { // nolint:errorlint - case berrors.ErrKVRewriteRuleNotFound, berrors.ErrKVRangeIsEmpty: - // Skip this region - log.Warn("download file skipped", - logutil.Files(files), - logutil.Region(info.Region), - logutil.Key("startKey", startKey), - logutil.Key("endKey", endKey), - logutil.Key("file-simple-start", files[0].StartKey), - logutil.Key("file-simple-end", files[0].EndKey), - logutil.ShortError(e)) - continue regionLoop - } - } log.Warn("download file failed, retry later", - logutil.Files(files), logutil.Region(info.Region), logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), logutil.ShortError(errDownload)) return errors.Trace(errDownload) } - log.Debug("download file done", - zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)), - logutil.Key("start", files[0].StartKey), logutil.Key("end", files[0].EndKey)) + log.Debug("download file done", zap.Stringer("take", time.Since(start)), + logutil.Key("start", startKey), logutil.Key("end", endKey)) start = time.Now() - if errIngest := importer.ingest(ctx, files, info, downloadMetas); errIngest != nil { + if errIngest := importer.ingest(ctx, info, downloadMetas); errIngest != nil { log.Warn("ingest file failed, retry later", - logutil.Files(files), + logutil.Key("start", startKey), + logutil.Key("end", endKey), logutil.SSTMetas(downloadMetas), logutil.Region(info.Region), zap.Error(errIngest)) return errors.Trace(errIngest) } - log.Debug("ingest file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start))) - } - - for _, f := range files { - summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs) - summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes) + log.Debug("ingest file done", logutil.Key("start", startKey), logutil.Key("end", endKey), zap.Stringer("take", time.Since(start))) } return nil }, utils.NewImportSSTBackoffer()) if err != nil { - log.Error("import sst file failed after retry, stop the whole progress", logutil.Files(files), zap.Error(err)) + log.Error("import sst file failed after retry, stop the whole progress", zapFilesGroup(filesGroup), zap.Error(err)) return errors.Trace(err) } + for _, files := range filesGroup { + for _, f := range files.Files { + summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs) + summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes) + } + } return nil } // getSSTMetaFromFile compares the keys in file, region and rewrite rules, then returns a sst conn. // The range of the returned sst meta is [regionRule.NewKeyPrefix, append(regionRule.NewKeyPrefix, 0xff)]. func getSSTMetaFromFile( - id []byte, file *backuppb.File, region *metapb.Region, regionRule *import_sstpb.RewriteRule, @@ -452,8 +431,9 @@ func getSSTMetaFromFile( logutil.Key("startKey", rangeStart), logutil.Key("endKey", rangeEnd)) + uid := uuid.New() return &import_sstpb.SSTMeta{ - Uuid: id, + Uuid: uid[:], CfName: cfName, Range: &import_sstpb.Range{ Start: rangeStart, @@ -472,21 +452,18 @@ func getSSTMetaFromFile( func (importer *SnapFileImporter) download( ctx context.Context, regionInfo *split.RegionInfo, - files []*backuppb.File, - rewriteRules *restoreutils.RewriteRules, + filesGroup []TableIDWithFiles, cipher *backuppb.CipherInfo, apiVersion kvrpcpb.APIVersion, ) ([]*import_sstpb.SSTMeta, error) { - var ( - downloadMetas = make([]*import_sstpb.SSTMeta, 0, len(files)) - ) + var downloadMetas []*import_sstpb.SSTMeta errDownload := utils.WithRetry(ctx, func() error { var e error // we treat Txn kv file as Raw kv file. because we don't have table id to decode if importer.kvMode == Raw || importer.kvMode == Txn { - downloadMetas, e = importer.downloadRawKVSST(ctx, regionInfo, files, cipher, apiVersion) + downloadMetas, e = importer.downloadRawKVSST(ctx, regionInfo, filesGroup, cipher, apiVersion) } else { - downloadMetas, e = importer.downloadSST(ctx, regionInfo, files, rewriteRules, cipher, apiVersion) + downloadMetas, e = importer.downloadSST(ctx, regionInfo, filesGroup, cipher, apiVersion) } failpoint.Inject("restore-storage-error", func(val failpoint.Value) { @@ -499,11 +476,11 @@ func (importer *SnapFileImporter) download( e = status.Error(codes.Unavailable, "the connection to TiKV has been cut by a neko, meow :3") }) if isDecryptSstErr(e) { - log.Info("fail to decrypt when download sst, try again with no-crypt", logutil.Files(files)) + log.Info("fail to decrypt when download sst, try again with no-crypt", zapFilesGroup(filesGroup)) if importer.kvMode == Raw || importer.kvMode == Txn { - downloadMetas, e = importer.downloadRawKVSST(ctx, regionInfo, files, nil, apiVersion) + downloadMetas, e = importer.downloadRawKVSST(ctx, regionInfo, filesGroup, nil, apiVersion) } else { - downloadMetas, e = importer.downloadSST(ctx, regionInfo, files, rewriteRules, nil, apiVersion) + downloadMetas, e = importer.downloadSST(ctx, regionInfo, filesGroup, nil, apiVersion) } } if e != nil { @@ -516,18 +493,28 @@ func (importer *SnapFileImporter) download( return downloadMetas, errDownload } +// Notice that the KvMode must be TiDB. func (importer *SnapFileImporter) buildDownloadRequest( file *backuppb.File, rewriteRules *restoreutils.RewriteRules, regionInfo *split.RegionInfo, cipher *backuppb.CipherInfo, ) (*import_sstpb.DownloadRequest, import_sstpb.SSTMeta, error) { - uid := uuid.New() - id := uid[:] // Get the rewrite rule for the file. fileRule := restoreutils.FindMatchedRewriteRule(file, rewriteRules) if fileRule == nil { - return nil, import_sstpb.SSTMeta{}, errors.Trace(berrors.ErrKVRewriteRuleNotFound) + log.Warn("download file skipped", logutil.Region(regionInfo.Region), zap.Error(berrors.ErrKVRewriteRuleNotFound)) + return nil, import_sstpb.SSTMeta{}, nil + } + + // Check whether the range of the file overlaps with the region + encodedStartKey := restoreutils.RewriteAndEncodeRawKey(file.StartKey, fileRule) + if len(regionInfo.Region.EndKey) > 0 && bytes.Compare(encodedStartKey, regionInfo.Region.EndKey) >= 0 { + return nil, import_sstpb.SSTMeta{}, nil + } + encodedEndKey := restoreutils.RewriteAndEncodeRawKey(file.EndKey, fileRule) + if bytes.Compare(encodedEndKey, regionInfo.Region.StartKey) <= 0 { + return nil, import_sstpb.SSTMeta{}, nil } // For the legacy version of TiKV, we need to encode the key prefix, since in the legacy @@ -544,7 +531,7 @@ func (importer *SnapFileImporter) buildDownloadRequest( rule.NewKeyPrefix = restoreutils.EncodeKeyPrefix(fileRule.GetNewKeyPrefix()) } - sstMeta, err := getSSTMetaFromFile(id, file, regionInfo.Region, &rule, importer.rewriteMode) + sstMeta, err := getSSTMetaFromFile(file, regionInfo.Region, &rule, importer.rewriteMode) if err != nil { return nil, import_sstpb.SSTMeta{}, err } @@ -571,8 +558,7 @@ func (importer *SnapFileImporter) buildDownloadRequest( func (importer *SnapFileImporter) downloadSST( ctx context.Context, regionInfo *split.RegionInfo, - files []*backuppb.File, - rewriteRules *restoreutils.RewriteRules, + filesGroup []TableIDWithFiles, cipher *backuppb.CipherInfo, apiVersion kvrpcpb.APIVersion, ) ([]*import_sstpb.SSTMeta, error) { @@ -580,14 +566,20 @@ func (importer *SnapFileImporter) downloadSST( downloadMetasMap := make(map[string]import_sstpb.SSTMeta) resultMetasMap := make(map[string]*import_sstpb.SSTMeta) downloadReqsMap := make(map[string]*import_sstpb.DownloadRequest) - for _, file := range files { - req, sstMeta, err := importer.buildDownloadRequest(file, rewriteRules, regionInfo, cipher) - if err != nil { - return nil, errors.Trace(err) + for _, files := range filesGroup { + for _, file := range files.Files { + req, sstMeta, err := importer.buildDownloadRequest(file, files.RewriteRules, regionInfo, cipher) + if err != nil { + return nil, errors.Trace(err) + } + // the range of the file does not overlap with the region + if req == nil { + continue + } + sstMeta.ApiVersion = apiVersion + downloadMetasMap[file.Name] = sstMeta + downloadReqsMap[file.Name] = req } - sstMeta.ApiVersion = apiVersion - downloadMetasMap[file.Name] = sstMeta - downloadReqsMap[file.Name] = req } eg, ectx := errgroup.WithContext(ctx) @@ -603,11 +595,7 @@ func (importer *SnapFileImporter) downloadSST( defer func() { importer.releaseToken(tokenCh) }() - for _, file := range files { - req, ok := downloadReqsMap[file.Name] - if !ok { - return errors.New("not found file key for download request") - } + for fileName, req := range downloadReqsMap { var err error var resp *import_sstpb.DownloadResponse resp, err = utils.WithRetryV2(ectx, utils.NewDownloadSSTBackoffer(), func(ctx context.Context) (*import_sstpb.DownloadResponse, error) { @@ -622,31 +610,32 @@ func (importer *SnapFileImporter) downloadSST( return errors.Annotate(berrors.ErrKVDownloadFailed, resp.GetError().GetMessage()) } if resp.GetIsEmpty() { - return errors.Trace(berrors.ErrKVRangeIsEmpty) + log.Warn("download file skipped", zap.String("filename", fileName), + logutil.Region(regionInfo.Region), zap.Error(berrors.ErrKVRangeIsEmpty)) + continue } mu.Lock() - sstMeta, ok := downloadMetasMap[file.Name] + sstMeta, ok := downloadMetasMap[fileName] if !ok { mu.Unlock() - return errors.Errorf("not found file %s for download sstMeta", file.Name) + return errors.Errorf("not found file %s for download sstMeta", fileName) } sstMeta.Range = &import_sstpb.Range{ Start: restoreutils.TruncateTS(resp.Range.GetStart()), End: restoreutils.TruncateTS(resp.Range.GetEnd()), } - resultMetasMap[file.Name] = &sstMeta + resultMetasMap[fileName] = &sstMeta mu.Unlock() log.Debug("download from peer", + zap.String("filename", fileName), logutil.Region(regionInfo.Region), - logutil.File(file), logutil.Peer(peer), logutil.Key("resp-range-start", resp.Range.Start), logutil.Key("resp-range-end", resp.Range.End), zap.Bool("resp-isempty", resp.IsEmpty), zap.Uint32("resp-crc32", resp.Crc32), - zap.Int("len files", len(files)), ) } return nil @@ -661,85 +650,94 @@ func (importer *SnapFileImporter) downloadSST( func (importer *SnapFileImporter) downloadRawKVSST( ctx context.Context, regionInfo *split.RegionInfo, - files []*backuppb.File, + filesGroup []TableIDWithFiles, cipher *backuppb.CipherInfo, apiVersion kvrpcpb.APIVersion, ) ([]*import_sstpb.SSTMeta, error) { - downloadMetas := make([]*import_sstpb.SSTMeta, 0, len(files)) - for _, file := range files { - uid := uuid.New() - id := uid[:] - // Empty rule - var rule import_sstpb.RewriteRule - sstMeta, err := getSSTMetaFromFile(id, file, regionInfo.Region, &rule, RewriteModeLegacy) - if err != nil { - return nil, err - } + downloadMetas := make([]*import_sstpb.SSTMeta, 0, len(filesGroup)*2+1) + for _, files := range filesGroup { + for _, file := range files.Files { + // Empty rule + var rule import_sstpb.RewriteRule + sstMeta, err := getSSTMetaFromFile(file, regionInfo.Region, &rule, RewriteModeLegacy) + if err != nil { + return nil, err + } - // Cut the SST file's range to fit in the restoring range. - if bytes.Compare(importer.rawStartKey, sstMeta.Range.GetStart()) > 0 { - sstMeta.Range.Start = importer.rawStartKey - } - if len(importer.rawEndKey) > 0 && - (len(sstMeta.Range.GetEnd()) == 0 || bytes.Compare(importer.rawEndKey, sstMeta.Range.GetEnd()) <= 0) { - sstMeta.Range.End = importer.rawEndKey - sstMeta.EndKeyExclusive = true - } - if bytes.Compare(sstMeta.Range.GetStart(), sstMeta.Range.GetEnd()) > 0 { - return nil, errors.Trace(berrors.ErrKVRangeIsEmpty) - } + // Cut the SST file's range to fit in the restoring range. + if bytes.Compare(importer.rawStartKey, sstMeta.Range.GetStart()) > 0 { + sstMeta.Range.Start = importer.rawStartKey + } + if len(importer.rawEndKey) > 0 && + (len(sstMeta.Range.GetEnd()) == 0 || bytes.Compare(importer.rawEndKey, sstMeta.Range.GetEnd()) <= 0) { + sstMeta.Range.End = importer.rawEndKey + sstMeta.EndKeyExclusive = true + } + if bytes.Compare(sstMeta.Range.GetStart(), sstMeta.Range.GetEnd()) > 0 { + log.Warn("download file skipped", zap.String("filename", file.Name), + logutil.Region(regionInfo.Region), zap.Error(berrors.ErrKVRangeIsEmpty)) + continue + } - req := &import_sstpb.DownloadRequest{ - Sst: *sstMeta, - StorageBackend: importer.backend, - Name: file.GetName(), - RewriteRule: rule, - IsRawKv: true, - CipherInfo: cipher, - StorageCacheId: importer.cacheKey, - } - log.Debug("download SST", logutil.SSTMeta(sstMeta), logutil.Region(regionInfo.Region)) - - var atomicResp atomic.Pointer[import_sstpb.DownloadResponse] - eg, ectx := errgroup.WithContext(ctx) - for _, p := range regionInfo.Region.GetPeers() { - peer := p - eg.Go(func() error { - resp, err := importer.importClient.DownloadSST(ectx, peer.GetStoreId(), req) - if err != nil { - return errors.Trace(err) - } - if resp.GetError() != nil { - return errors.Annotate(berrors.ErrKVDownloadFailed, resp.GetError().GetMessage()) - } - if resp.GetIsEmpty() { - return errors.Trace(berrors.ErrKVRangeIsEmpty) - } + req := &import_sstpb.DownloadRequest{ + Sst: *sstMeta, + StorageBackend: importer.backend, + Name: file.GetName(), + RewriteRule: rule, + IsRawKv: true, + CipherInfo: cipher, + StorageCacheId: importer.cacheKey, + } + log.Debug("download SST", logutil.SSTMeta(sstMeta), logutil.Region(regionInfo.Region)) + + var atomicResp atomic.Pointer[import_sstpb.DownloadResponse] + eg, ectx := errgroup.WithContext(ctx) + for _, p := range regionInfo.Region.GetPeers() { + peer := p + eg.Go(func() error { + resp, err := importer.importClient.DownloadSST(ectx, peer.GetStoreId(), req) + if err != nil { + return errors.Trace(err) + } + if resp.GetError() != nil { + return errors.Annotate(berrors.ErrKVDownloadFailed, resp.GetError().GetMessage()) + } + if resp.GetIsEmpty() { + log.Warn("download file skipped", zap.String("filename", file.Name), + logutil.Region(regionInfo.Region), zap.Error(berrors.ErrKVRangeIsEmpty)) + return nil + } - atomicResp.Store(resp) - return nil - }) - } + atomicResp.Store(resp) + return nil + }) + } - if err := eg.Wait(); err != nil { - return nil, err - } + if err := eg.Wait(); err != nil { + return nil, err + } - downloadResp := atomicResp.Load() - sstMeta.Range.Start = downloadResp.Range.GetStart() - sstMeta.Range.End = downloadResp.Range.GetEnd() - sstMeta.ApiVersion = apiVersion - downloadMetas = append(downloadMetas, sstMeta) + downloadResp := atomicResp.Load() + if downloadResp == nil { + continue + } + sstMeta.Range.Start = downloadResp.Range.GetStart() + sstMeta.Range.End = downloadResp.Range.GetEnd() + sstMeta.ApiVersion = apiVersion + downloadMetas = append(downloadMetas, sstMeta) + } } return downloadMetas, nil } func (importer *SnapFileImporter) ingest( ctx context.Context, - files []*backuppb.File, info *split.RegionInfo, downloadMetas []*import_sstpb.SSTMeta, ) error { + if len(downloadMetas) == 0 { + return nil + } tokenCh := importer.ingestTokensMap.acquireTokenCh(info.Leader.GetStoreId(), importer.concurrencyPerStore) select { case <-ctx.Done(): @@ -780,7 +778,6 @@ func (importer *SnapFileImporter) ingest( } // do not get region info, wait a second and GetRegion() again. log.Warn("ingest get region by key return nil", logutil.Region(info.Region), - logutil.Files(files), logutil.SSTMetas(downloadMetas), ) time.Sleep(time.Second) @@ -791,7 +788,6 @@ func (importer *SnapFileImporter) ingest( return errors.Trace(berrors.ErrKVEpochNotMatch) } log.Debug("ingest sst returns not leader error, retry it", - logutil.Files(files), logutil.SSTMetas(downloadMetas), logutil.Region(info.Region), zap.Stringer("newLeader", newInfo.Leader)) diff --git a/br/pkg/restore/snap_client/import_test.go b/br/pkg/restore/snap_client/import_test.go index 71d5a758f39c7..762beb3784d22 100644 --- a/br/pkg/restore/snap_client/import_test.go +++ b/br/pkg/restore/snap_client/import_test.go @@ -103,7 +103,7 @@ func TestGetSSTMetaFromFile(t *testing.T) { StartKey: []byte("t2abc"), EndKey: []byte("t3a"), } - sstMeta, err := snapclient.GetSSTMetaFromFile([]byte{}, file, region, rule, snapclient.RewriteModeLegacy) + sstMeta, err := snapclient.GetSSTMetaFromFile(file, region, rule, snapclient.RewriteModeLegacy) require.Nil(t, err) require.Equal(t, "t2abc", string(sstMeta.GetRange().GetStart())) require.Equal(t, "t2\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff", string(sstMeta.GetRange().GetEnd())) @@ -171,7 +171,7 @@ func TestSnapImporter(t *testing.T) { files, rules := generateFiles() for _, file := range files { importer.WaitUntilUnblock() - err = importer.ImportSSTFiles(ctx, []*backuppb.File{file}, rules, nil, kvrpcpb.APIVersion_V1) + err = importer.ImportSSTFiles(ctx, []snapclient.TableIDWithFiles{{Files: []*backuppb.File{file}, RewriteRules: rules}}, nil, kvrpcpb.APIVersion_V1) require.NoError(t, err) } err = importer.Close() @@ -192,7 +192,7 @@ func TestSnapImporterRaw(t *testing.T) { files, rules := generateFiles() for _, file := range files { importer.WaitUntilUnblock() - err = importer.ImportSSTFiles(ctx, []*backuppb.File{file}, rules, nil, kvrpcpb.APIVersion_V1) + err = importer.ImportSSTFiles(ctx, []snapclient.TableIDWithFiles{{Files: []*backuppb.File{file}, RewriteRules: rules}}, nil, kvrpcpb.APIVersion_V1) require.NoError(t, err) } err = importer.Close() diff --git a/br/pkg/restore/snap_client/pipeline_items.go b/br/pkg/restore/snap_client/pipeline_items.go index f10b2a25b9f3b..f76417a636c4a 100644 --- a/br/pkg/restore/snap_client/pipeline_items.go +++ b/br/pkg/restore/snap_client/pipeline_items.go @@ -22,9 +22,9 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/pkg/domain/infosync" @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/pkg/util/engine" pdhttp "github.com/tikv/pd/client/http" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" ) @@ -51,16 +52,10 @@ type CreatedTable struct { OldTable *metautil.Table } -func defaultOutputTableChan() chan *CreatedTable { - return make(chan *CreatedTable, defaultChannelSize) -} - -// TableWithRange is a CreatedTable that has been bind to some of key ranges. -type TableWithRange struct { - CreatedTable - - // Range has been rewrited by rewrite rules. - Range []rtree.RangeStats +type PhysicalTable struct { + NewPhysicalID int64 + OldPhysicalID int64 + RewriteRules *restoreutils.RewriteRules } type TableIDWithFiles struct { @@ -73,6 +68,44 @@ type TableIDWithFiles struct { RewriteRules *restoreutils.RewriteRules } +type zapFilesGroupMarshaler []TableIDWithFiles + +// MarshalLogObjectForFiles is an internal util function to zap something having `Files` field. +func MarshalLogObjectForFiles(files []TableIDWithFiles, encoder zapcore.ObjectEncoder) error { + return zapFilesGroupMarshaler(files).MarshalLogObject(encoder) +} + +func (fgs zapFilesGroupMarshaler) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + elements := make([]string, 0) + total := 0 + totalKVs := uint64(0) + totalBytes := uint64(0) + totalSize := uint64(0) + for _, fg := range fgs { + for _, f := range fg.Files { + total += 1 + elements = append(elements, f.GetName()) + totalKVs += f.GetTotalKvs() + totalBytes += f.GetTotalBytes() + totalSize += f.GetSize_() + } + } + encoder.AddInt("total", total) + _ = encoder.AddArray("files", logutil.AbbreviatedArrayMarshaler(elements)) + encoder.AddUint64("totalKVs", totalKVs) + encoder.AddUint64("totalBytes", totalBytes) + encoder.AddUint64("totalSize", totalSize) + return nil +} + +func zapFilesGroup(filesGroup []TableIDWithFiles) zap.Field { + return zap.Object("files", zapFilesGroupMarshaler(filesGroup)) +} + +func defaultOutputTableChan() chan *CreatedTable { + return make(chan *CreatedTable, defaultChannelSize) +} + func concurrentHandleTablesCh( ctx context.Context, inCh <-chan *CreatedTable, diff --git a/br/pkg/restore/snap_client/tikv_sender.go b/br/pkg/restore/snap_client/tikv_sender.go index 89c535ad57fbf..e6e2336fdc724 100644 --- a/br/pkg/restore/snap_client/tikv_sender.go +++ b/br/pkg/restore/snap_client/tikv_sender.go @@ -21,7 +21,6 @@ import ( "strings" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" @@ -37,6 +36,31 @@ import ( "golang.org/x/sync/errgroup" ) +func getSortedPhysicalTables(createdTables []*CreatedTable) []*PhysicalTable { + physicalTables := make([]*PhysicalTable, 0, len(createdTables)) + for _, createdTable := range createdTables { + physicalTables = append(physicalTables, &PhysicalTable{ + NewPhysicalID: createdTable.Table.ID, + OldPhysicalID: createdTable.OldTable.Info.ID, + RewriteRules: createdTable.RewriteRule, + }) + + partitionIDMap := restoreutils.GetPartitionIDMap(createdTable.Table, createdTable.OldTable.Info) + for oldID, newID := range partitionIDMap { + physicalTables = append(physicalTables, &PhysicalTable{ + NewPhysicalID: newID, + OldPhysicalID: oldID, + RewriteRules: createdTable.RewriteRule, + }) + } + } + // sort the physical table by downstream stream physical id + sort.Slice(physicalTables, func(a, b int) bool { + return physicalTables[a].NewPhysicalID < physicalTables[b].NewPhysicalID + }) + return physicalTables +} + // mapTableToFiles makes a map that mapping table ID to its backup files. // aware that one file can and only can hold one table. func mapTableToFiles(files []*backuppb.File) (map[int64][]*backuppb.File, int) { @@ -66,49 +90,85 @@ func mapTableToFiles(files []*backuppb.File) (map[int64][]*backuppb.File, int) { return result, maxSplitKeyCount } +// filterOutFiles filters out files that exist in the checkpoint set. +func filterOutFiles(checkpointSet map[string]struct{}, files []*backuppb.File, updateCh glue.Progress) []*backuppb.File { + progress := int(0) + totalKVs := uint64(0) + totalBytes := uint64(0) + newFiles := make([]*backuppb.File, 0, len(files)) + for _, file := range files { + rangeKey := getFileRangeKey(file.Name) + if _, exists := checkpointSet[rangeKey]; exists { + // the range has been import done, so skip it and + // update the summary information + progress += 1 + totalKVs += file.TotalKvs + totalBytes += file.TotalBytes + } else { + newFiles = append(newFiles, file) + } + } + if progress > 0 { + // (split/scatter + download/ingest) / (default cf + write cf) + updateCh.IncBy(int64(progress) * 2 / 2) + summary.CollectSuccessUnit(summary.TotalKV, progress, totalKVs) + summary.CollectSuccessUnit(summary.SkippedKVCountByCheckpoint, progress, totalKVs) + summary.CollectSuccessUnit(summary.TotalBytes, progress, totalBytes) + summary.CollectSuccessUnit(summary.SkippedBytesByCheckpoint, progress, totalBytes) + } + return newFiles +} + +// If there are many tables with only a few rows, the number of merged SSTs will be too large. +// So set a threshold to avoid it. +const MergedRangeCountThreshold = 1536 + // SortAndValidateFileRanges sort, merge and validate files by tables and yields tables with range. func SortAndValidateFileRanges( createdTables []*CreatedTable, allFiles []*backuppb.File, + checkpointSetWithTableID map[int64]map[string]struct{}, splitSizeBytes, splitKeyCount uint64, -) ([][]byte, []TableWithRange, error) { - // sort the created table by downstream stream table id - sort.Slice(createdTables, func(a, b int) bool { - return createdTables[a].Table.ID < createdTables[b].Table.ID - }) + splitOnTable bool, + updateCh glue.Progress, +) ([][]byte, [][]TableIDWithFiles, error) { + sortedPhysicalTables := getSortedPhysicalTables(createdTables) // mapping table ID to its backup files fileOfTable, hintSplitKeyCount := mapTableToFiles(allFiles) // sort, merge, and validate files in each tables, and generate split keys by the way var ( // to generate region split keys, merge the small ranges over the adjacent tables - sortedSplitKeys = make([][]byte, 0, hintSplitKeyCount) + sortedSplitKeys = make([][]byte, 0, hintSplitKeyCount) + groupSize = uint64(0) + groupCount = uint64(0) + lastKey []byte = nil - tableWithRanges = make([]TableWithRange, 0, len(createdTables)) + // group the files by the generated split keys + tableIDWithFilesGroup = make([][]TableIDWithFiles, 0, hintSplitKeyCount) + lastFilesGroup []TableIDWithFiles = nil + + // statistic + mergedRangeCount = 0 ) log.Info("start to merge ranges", zap.Uint64("kv size threshold", splitSizeBytes), zap.Uint64("kv count threshold", splitKeyCount)) - for _, table := range createdTables { - files := fileOfTable[table.OldTable.Info.ID] - if partitions := table.OldTable.Info.Partition; partitions != nil { - for _, partition := range partitions.Definitions { - files = append(files, fileOfTable[partition.ID]...) - } - } + for _, table := range sortedPhysicalTables { + files := fileOfTable[table.OldPhysicalID] for _, file := range files { - if err := restoreutils.ValidateFileRewriteRule(file, table.RewriteRule); err != nil { + if err := restoreutils.ValidateFileRewriteRule(file, table.RewriteRules); err != nil { return nil, nil, errors.Trace(err) } } // Merge small ranges to reduce split and scatter regions. // Notice that the files having the same start key and end key are in the same range. sortedRanges, stat, err := restoreutils.MergeAndRewriteFileRanges( - files, table.RewriteRule, splitSizeBytes, splitKeyCount) + files, table.RewriteRules, splitSizeBytes, splitKeyCount) if err != nil { return nil, nil, errors.Trace(err) } log.Info("merge and validate file", - zap.Stringer("database", table.OldTable.DB.Name), - zap.Stringer("table", table.Table.Name), + zap.Int64("new physical ID", table.NewPhysicalID), + zap.Int64("old physical ID", table.OldPhysicalID), zap.Int("Files(total)", stat.TotalFiles), zap.Int("File(write)", stat.TotalWriteCFFile), zap.Int("File(default)", stat.TotalDefaultCFFile), @@ -119,16 +179,103 @@ func SortAndValidateFileRanges( zap.Int("Merged(keys avg)", stat.MergedRegionKeysAvg), zap.Int("Merged(bytes avg)", stat.MergedRegionBytesAvg)) + // skip some ranges if recorded by checkpoint + // Notice that skip ranges after select split keys in order to make the split keys + // always the same. + checkpointSet := checkpointSetWithTableID[table.NewPhysicalID] + + // Generate the split keys, and notice that the way to generate split keys must be deterministic + // and regardless of the current cluster region distribution. Therefore, when restore fails, the + // generated split keys keep the same as before the next time we retry to restore. + // + // Here suppose that all the ranges is in the one region at beginning. + // In general, the ids of tables, which are created in the previous stage, are continuously because: + // + // 1. Before create tables, the cluster global id is allocated to ${GLOBAL_ID}; + // 2. Suppose the ids of tables to be created are {t_i}, which t_i < t_j if i < j. + // 3. BR preallocate the global id from ${GLOBAL_ID} to t_max, so the table ids, which are larger + // than ${GLOBAL_ID}, has the same downstream ids. + // 4. Then BR creates tables, and the table ids, which are less than or equal to ${GLOBAL_ID}, are + // allocated to [t_max + 1, ...) in the downstream cluster. + // 5. Therefore, the BR-created tables are usually continuously. + // + // Besides, the prefix of the existing region's start key and end key should not be `t{restored_table_id}`. for _, rg := range sortedRanges { - sortedSplitKeys = append(sortedSplitKeys, rg.EndKey) + // split key generation + afterMergedGroupSize := groupSize + rg.Size + afterMergedGroupCount := groupCount + rg.Count + if afterMergedGroupSize > splitSizeBytes || afterMergedGroupCount > splitKeyCount || mergedRangeCount > MergedRangeCountThreshold { + log.Info("merge ranges across tables due to kv size/count or merged count threshold exceeded", + zap.Uint64("merged kv size", groupSize), + zap.Uint64("merged kv count", groupCount), + zap.Int("merged range count", mergedRangeCount)) + groupSize, groupCount = rg.Size, rg.Count + mergedRangeCount = 0 + // can not merge files anymore, so generate a new split key + if lastKey != nil { + sortedSplitKeys = append(sortedSplitKeys, lastKey) + } + // then generate a new files group + if lastFilesGroup != nil { + tableIDWithFilesGroup = append(tableIDWithFilesGroup, lastFilesGroup) + // reset the lastFiltesGroup immediately because it is not always updated in each loop cycle. + lastFilesGroup = nil + } + } else { + groupSize, groupCount = afterMergedGroupSize, afterMergedGroupCount + } + // override the previous key, which may not become a split key. + lastKey = rg.EndKey + // mergedRangeCount increment by the number of files before filtered by checkpoint in order to make split keys + // always the same as that from before execution. + mergedRangeCount += len(rg.Files) + // checkpoint filter out the import done files in the previous restore executions. + // Notice that skip ranges after select split keys in order to make the split keys + // always the same. + newFiles := filterOutFiles(checkpointSet, rg.Files, updateCh) + // append the new files into the group + if len(newFiles) > 0 { + if len(lastFilesGroup) == 0 || lastFilesGroup[len(lastFilesGroup)-1].TableID != table.NewPhysicalID { + lastFilesGroup = append(lastFilesGroup, TableIDWithFiles{ + TableID: table.NewPhysicalID, + Files: nil, + RewriteRules: table.RewriteRules, + }) + } + lastFilesGroup[len(lastFilesGroup)-1].Files = append(lastFilesGroup[len(lastFilesGroup)-1].Files, newFiles...) + } } - tableWithRanges = append(tableWithRanges, TableWithRange{ - CreatedTable: *table, - Range: sortedRanges, - }) + // If the config split-table/split-region-on-table is on, it skip merging ranges over tables. + if splitOnTable { + log.Info("merge ranges across tables due to split on table", + zap.Uint64("merged kv size", groupSize), + zap.Uint64("merged kv count", groupCount), + zap.Int("merged range count", mergedRangeCount)) + groupSize, groupCount = 0, 0 + mergedRangeCount = 0 + // Besides, ignore the table's last key that might be chosen as a split key, because there + // is already a table split key. + lastKey = nil + if lastFilesGroup != nil { + tableIDWithFilesGroup = append(tableIDWithFilesGroup, lastFilesGroup) + lastFilesGroup = nil + } + } + } + // append the key of the last range anyway + if lastKey != nil { + sortedSplitKeys = append(sortedSplitKeys, lastKey) + } + // append the last files group anyway + if lastFilesGroup != nil { + log.Info("merge ranges across tables due to the last group", + zap.Uint64("merged kv size", groupSize), + zap.Uint64("merged kv count", groupCount), + zap.Int("merged range count", mergedRangeCount)) + tableIDWithFilesGroup = append(tableIDWithFilesGroup, lastFilesGroup) } - return sortedSplitKeys, tableWithRanges, nil + return sortedSplitKeys, tableIDWithFilesGroup, nil } func (rc *SnapClient) RestoreTables( @@ -138,6 +285,7 @@ func (rc *SnapClient) RestoreTables( allFiles []*backuppb.File, checkpointSetWithTableID map[int64]map[string]struct{}, splitSizeBytes, splitKeyCount uint64, + splitOnTable bool, updateCh glue.Progress, ) error { if err := placementRuleManager.SetPlacementRule(ctx, createdTables); err != nil { @@ -151,25 +299,24 @@ func (rc *SnapClient) RestoreTables( }() start := time.Now() - sortedSplitKeys, tableWithRanges, err := SortAndValidateFileRanges(createdTables, allFiles, splitSizeBytes, splitKeyCount) + sortedSplitKeys, tableIDWithFilesGroup, err := SortAndValidateFileRanges(createdTables, allFiles, checkpointSetWithTableID, splitSizeBytes, splitKeyCount, splitOnTable, updateCh) if err != nil { return errors.Trace(err) } - drainResult := drainRanges(tableWithRanges, checkpointSetWithTableID, updateCh) - log.Info("Merge ranges", zap.Duration("take", time.Since(start))) + log.Info("Restore Stage Duration", zap.String("stage", "merge ranges"), zap.Duration("take", time.Since(start))) start = time.Now() if err = rc.SplitPoints(ctx, sortedSplitKeys, updateCh, false); err != nil { return errors.Trace(err) } - log.Info("Split regions", zap.Duration("take", time.Since(start))) + log.Info("Restore Stage Duration", zap.String("stage", "split regions"), zap.Duration("take", time.Since(start))) start = time.Now() - if err = rc.RestoreSSTFiles(ctx, drainResult.Files(), updateCh); err != nil { + if err = rc.RestoreSSTFiles(ctx, tableIDWithFilesGroup, updateCh); err != nil { return errors.Trace(err) } elapsed := time.Since(start) - log.Info("Retore files", zap.Duration("take", elapsed)) + log.Info("Restore Stage Duration", zap.String("stage", "restore files"), zap.Duration("take", elapsed)) summary.CollectSuccessUnit("files", len(allFiles), elapsed) return nil @@ -216,109 +363,67 @@ func getFileRangeKey(f string) string { return f[:idx] } -// isFilesBelongToSameRange check whether two files are belong to the same range with different cf. -func isFilesBelongToSameRange(f1, f2 string) bool { - return getFileRangeKey(f1) == getFileRangeKey(f2) -} - -func drainFilesByRange(files []*backuppb.File) ([]*backuppb.File, []*backuppb.File) { - if len(files) == 0 { - return nil, nil - } - idx := 1 - for idx < len(files) { - if !isFilesBelongToSameRange(files[idx-1].Name, files[idx].Name) { - break - } - idx++ - } - - return files[:idx], files[idx:] -} - // RestoreSSTFiles tries to restore the files. func (rc *SnapClient) RestoreSSTFiles( ctx context.Context, - tableIDWithFiles []TableIDWithFiles, + tableIDWithFilesGroup [][]TableIDWithFiles, updateCh glue.Progress, -) (err error) { - start := time.Now() - fileCount := 0 - defer func() { - elapsed := time.Since(start) - if err == nil { - log.Info("Restore files", zap.Duration("take", elapsed)) - summary.CollectSuccessUnit("files", fileCount, elapsed) - } - }() - - log.Debug("start to restore files", zap.Int("files", fileCount)) - - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Client.RestoreSSTFiles", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - eg, ectx := errgroup.WithContext(ctx) - err = rc.setSpeedLimit(ctx, rc.rateLimit) - if err != nil { +) error { + if err := rc.setSpeedLimit(ctx, rc.rateLimit); err != nil { return errors.Trace(err) } - var rangeFiles []*backuppb.File - var leftFiles []*backuppb.File -LOOPFORTABLE: - for _, tableIDWithFile := range tableIDWithFiles { - tableID := tableIDWithFile.TableID - files := tableIDWithFile.Files - rules := tableIDWithFile.RewriteRules - fileCount += len(files) - for rangeFiles, leftFiles = drainFilesByRange(files); len(rangeFiles) != 0; rangeFiles, leftFiles = drainFilesByRange(leftFiles) { - if ectx.Err() != nil { - log.Warn("Restoring encountered error and already stopped, give up remained files.", - zap.Int("remained", len(leftFiles)), - logutil.ShortError(ectx.Err())) - // We will fetch the error from the errgroup then (If there were). - // Also note if the parent context has been canceled or something, - // breaking here directly is also a reasonable behavior. - break LOOPFORTABLE - } - filesReplica := rangeFiles - rc.fileImporter.WaitUntilUnblock() - rc.workerPool.ApplyOnErrorGroup(eg, func() (restoreErr error) { - fileStart := time.Now() - defer func() { - if restoreErr == nil { - log.Info("import files done", logutil.Files(filesReplica), - zap.Duration("take", time.Since(fileStart))) - updateCh.Inc() - } - }() - if importErr := rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()); importErr != nil { - return errors.Trace(importErr) + eg, ectx := errgroup.WithContext(ctx) + for _, tableIDWithFiles := range tableIDWithFilesGroup { + if ectx.Err() != nil { + log.Warn("Restoring encountered error and already stopped, give up remained files.", + logutil.ShortError(ectx.Err())) + // We will fetch the error from the errgroup then (If there were). + // Also note if the parent context has been canceled or something, + // breaking here directly is also a reasonable behavior. + break + } + filesReplica := tableIDWithFiles + rc.fileImporter.WaitUntilUnblock() + rc.workerPool.ApplyOnErrorGroup(eg, func() (restoreErr error) { + fileStart := time.Now() + defer func() { + if restoreErr == nil { + log.Info("import files done", zapFilesGroup(filesReplica), + zap.Duration("take", time.Since(fileStart))) + updateCh.Inc() } + }() + if importErr := rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()); importErr != nil { + return errors.Trace(importErr) + } - // the data of this range has been import done - if rc.checkpointRunner != nil && len(filesReplica) > 0 { - rangeKey := getFileRangeKey(filesReplica[0].Name) - // The checkpoint range shows this ranges of kvs has been restored into - // the table corresponding to the table-id. - if err := checkpoint.AppendRangesForRestore(ectx, rc.checkpointRunner, tableID, rangeKey); err != nil { - return errors.Trace(err) + // the data of this range has been import done + if rc.checkpointRunner != nil && len(filesReplica) > 0 { + for _, filesGroup := range filesReplica { + rangeKeySet := make(map[string]struct{}) + for _, file := range filesGroup.Files { + rangeKey := getFileRangeKey(file.Name) + // Assert that the files having the same rangeKey are all in the current filesGroup.Files + rangeKeySet[rangeKey] = struct{}{} + } + for rangeKey := range rangeKeySet { + // The checkpoint range shows this ranges of kvs has been restored into + // the table corresponding to the table-id. + if err := checkpoint.AppendRangesForRestore(ectx, rc.checkpointRunner, filesGroup.TableID, rangeKey); err != nil { + return errors.Trace(err) + } } } - return nil - }) - } + } + + return nil + }) } if err := eg.Wait(); err != nil { summary.CollectFailureUnit("file", err) - log.Error( - "restore files failed", - zap.Error(err), - ) + log.Error("restore files failed", zap.Error(err)) return errors.Trace(err) } // Once the parent context canceled and there is no task running in the errgroup, diff --git a/br/pkg/restore/snap_client/tikv_sender_test.go b/br/pkg/restore/snap_client/tikv_sender_test.go index d58c8f73439a2..b23ec6298d40f 100644 --- a/br/pkg/restore/snap_client/tikv_sender_test.go +++ b/br/pkg/restore/snap_client/tikv_sender_test.go @@ -15,11 +15,18 @@ package snapclient_test import ( + "fmt" + "math/rand" "testing" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/metautil" snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/stretchr/testify/require" ) @@ -64,3 +71,605 @@ func TestMapTableToFiles(t *testing.T) { require.Equal(t, filesOfTable2, result[2]) require.Equal(t, 3, hintSplitKeyCount) } + +func newPartitionID(ids []int64) *model.PartitionInfo { + definitions := make([]model.PartitionDefinition, 0, len(ids)) + for i, id := range ids { + definitions = append(definitions, model.PartitionDefinition{ + ID: id, + Name: pmodel.NewCIStr(fmt.Sprintf("%d", i)), + }) + } + return &model.PartitionInfo{Definitions: definitions} +} + +func newCreatedTable(oldTableID, newTableID int64, oldPartitionIDs, newPartitionIDs []int64) *snapclient.CreatedTable { + return &snapclient.CreatedTable{ + Table: &model.TableInfo{ + ID: newTableID, + Partition: newPartitionID(newPartitionIDs), + }, + OldTable: &metautil.Table{ + Info: &model.TableInfo{ + ID: oldTableID, + Partition: newPartitionID(oldPartitionIDs), + }, + }, + } +} + +func physicalIDs(physicalTables []*snapclient.PhysicalTable) (oldIDs, newIDs []int64) { + oldIDs = make([]int64, 0, len(physicalTables)) + newIDs = make([]int64, 0, len(physicalTables)) + for _, table := range physicalTables { + oldIDs = append(oldIDs, table.OldPhysicalID) + newIDs = append(newIDs, table.NewPhysicalID) + } + + return oldIDs, newIDs +} + +func TestGetSortedPhysicalTables(t *testing.T) { + createdTables := []*snapclient.CreatedTable{ + newCreatedTable(100, 200, []int64{32, 145, 324}, []int64{900, 23, 54}), + newCreatedTable(300, 400, []int64{322, 11245, 343224}, []int64{9030, 22353, 5354}), + } + physicalTables := snapclient.GetSortedPhysicalTables(createdTables) + oldIDs, newIDs := physicalIDs(physicalTables) + require.Equal(t, []int64{145, 324, 100, 300, 32, 343224, 322, 11245}, oldIDs) + require.Equal(t, []int64{23, 54, 200, 400, 900, 5354, 9030, 22353}, newIDs) +} + +type MockUpdateCh struct { + glue.Progress +} + +func (m MockUpdateCh) IncBy(cnt int64) {} + +func generateCreatedTables(t *testing.T, upstreamTableIDs []int64, upstreamPartitionIDs map[int64][]int64, downstreamID func(upstream int64) int64) []*snapclient.CreatedTable { + createdTables := make([]*snapclient.CreatedTable, 0, len(upstreamTableIDs)) + triggerID := 0 + for _, upstreamTableID := range upstreamTableIDs { + downstreamTableID := downstreamID(upstreamTableID) + createdTable := &snapclient.CreatedTable{ + Table: &model.TableInfo{ + ID: downstreamTableID, + Name: pmodel.NewCIStr(fmt.Sprintf("tbl-%d", upstreamTableID)), + Indices: []*model.IndexInfo{ + {Name: pmodel.NewCIStr("idx1"), ID: 1}, + {Name: pmodel.NewCIStr("idx2"), ID: 2}, + {Name: pmodel.NewCIStr("idx3"), ID: 3}, + }, + }, + OldTable: &metautil.Table{ + DB: &model.DBInfo{Name: pmodel.NewCIStr("test")}, + Info: &model.TableInfo{ + ID: upstreamTableID, + Indices: []*model.IndexInfo{ + {Name: pmodel.NewCIStr("idx1"), ID: 1}, + {Name: pmodel.NewCIStr("idx2"), ID: 2}, + {Name: pmodel.NewCIStr("idx3"), ID: 3}, + }, + }, + }, + } + partitionIDs, exists := upstreamPartitionIDs[upstreamTableID] + if exists { + triggerID += 1 + downDefs := make([]model.PartitionDefinition, 0, len(partitionIDs)) + upDefs := make([]model.PartitionDefinition, 0, len(partitionIDs)) + for _, partitionID := range partitionIDs { + downDefs = append(downDefs, model.PartitionDefinition{ + Name: pmodel.NewCIStr(fmt.Sprintf("p_%d", partitionID)), + ID: downstreamID(partitionID), + }) + upDefs = append(upDefs, model.PartitionDefinition{ + Name: pmodel.NewCIStr(fmt.Sprintf("p_%d", partitionID)), + ID: partitionID, + }) + } + createdTable.OldTable.Info.Partition = &model.PartitionInfo{ + Definitions: upDefs, + } + createdTable.Table.Partition = &model.PartitionInfo{ + Definitions: downDefs, + } + } + // generate rewrite rules + createdTable.RewriteRule = restoreutils.GetRewriteRules(createdTable.Table, createdTable.OldTable.Info, 0, true) + createdTables = append(createdTables, createdTable) + } + + require.Equal(t, len(upstreamPartitionIDs), triggerID) + disorderTables(createdTables) + return createdTables +} + +func disorderTables(createdTables []*snapclient.CreatedTable) { + // Each position will be replaced by a random table + rand.Shuffle(len(createdTables), func(i, j int) { + createdTables[i], createdTables[j] = createdTables[j], createdTables[i] + }) +} + +func file(tableID int64, startRow, endRow int, totalKvs, totalBytes uint64, cf string) *backuppb.File { + return &backuppb.File{ + Name: fmt.Sprintf("file_%d_%d_%s.sst", tableID, startRow, cf), + StartKey: tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(startRow)), + EndKey: tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(endRow)), + TotalKvs: totalKvs, + TotalBytes: totalBytes, + Cf: cf, + } +} + +func key(tableID int64, row int) []byte { + return tablecodec.EncodeRowKeyWithHandle(downstreamID(tableID), kv.IntHandle(row)) +} + +func files(physicalTableID int64, startRows []int, cfs []string) snapclient.TableIDWithFiles { + files := make([]*backuppb.File, 0, len(startRows)) + for i, startRow := range startRows { + files = append(files, &backuppb.File{Name: fmt.Sprintf("file_%d_%d_%s.sst", physicalTableID, startRow, cfs[i])}) + } + return snapclient.TableIDWithFiles{ + TableID: downstreamID(physicalTableID), + Files: files, + } +} + +func downstreamID(upstream int64) int64 { return upstream + ((999-upstream)%10+1)*1000 } + +func cptKey(tableID int64, startRow int, cf string) string { + return snapclient.GetFileRangeKey(fmt.Sprintf("file_%d_%d_%s.sst", tableID, startRow, cf)) +} + +func TestSortAndValidateFileRanges(t *testing.T) { + updateCh := MockUpdateCh{} + + d := restoreutils.DefaultCFName + w := restoreutils.WriteCFName + cases := []struct { + // created tables + upstreamTableIDs []int64 + upstreamPartitionIDs map[int64][]int64 + + // files + files []*backuppb.File + + // checkpoint set + checkpointSetWithTableID map[int64]map[string]struct{} + + // config + splitSizeBytes uint64 + splitKeyCount uint64 + splitOnTable bool + + // expected result + splitKeys [][]byte + tableIDWithFilesGroups [][]snapclient.TableIDWithFiles + }{ + { // large sst, split-on-table, no checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + // downstream id: [100:10100] [101:9101] [102:8102] [103:7103] + // downstream id: [200:10200] [201:9201] [202:8202] [203:7203] + // downstream id: [300:10300] [301:9301] [302:8302] [303:7303] + // sorted physical: [103, 203, 303, (102), (202), (302), 101, 201, 301, (100), 200, 300] + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: nil, + splitSizeBytes: 80, + splitKeyCount: 80, + splitOnTable: true, + splitKeys: [][]byte{ + /*split table key*/ key(202, 2), /*split table key*/ + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + {files(202, []int{1, 1}, []string{w, d})}, + {files(202, []int{2, 2}, []string{w, d})}, + {files(302, []int{1}, []string{w})}, + {files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // large sst, split-on-table, checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: map[int64]map[string]struct{}{ + downstreamID(100): {cptKey(100, 1, w): struct{}{}}, + downstreamID(202): {cptKey(202, 1, w): struct{}{}}, + }, + splitSizeBytes: 80, + splitKeyCount: 80, + splitOnTable: true, + splitKeys: [][]byte{ + /*split table key*/ key(202, 2), /*split table key*/ + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + //{files(202, []int{1, 1}, []string{w, d})}, + {files(202, []int{2, 2}, []string{w, d})}, + {files(302, []int{1}, []string{w})}, + //{files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // large sst, no split-on-table, no checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: nil, + splitSizeBytes: 80, + splitKeyCount: 80, + splitOnTable: false, + splitKeys: [][]byte{ + key(102, 2), key(202, 2), key(202, 3), key(302, 2), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + {files(202, []int{1, 1}, []string{w, d})}, + {files(202, []int{2, 2}, []string{w, d})}, + {files(302, []int{1}, []string{w})}, + {files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // large sst, no split-on-table, checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: map[int64]map[string]struct{}{ + downstreamID(100): {cptKey(100, 1, w): struct{}{}}, + downstreamID(202): {cptKey(202, 1, w): struct{}{}}, + }, + splitSizeBytes: 80, + splitKeyCount: 80, + splitOnTable: false, + splitKeys: [][]byte{ + key(102, 2), key(202, 2), key(202, 3), key(302, 2), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + //{files(202, []int{1, 1}, []string{w, d})}, + {files(202, []int{2, 2}, []string{w, d})}, + {files(302, []int{1}, []string{w})}, + //{files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // small sst 1, split-table, no checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: nil, + splitSizeBytes: 350, + splitKeyCount: 350, + splitOnTable: true, + splitKeys: [][]byte{ + key(202, 2), /*split table key*/ + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + {files(202, []int{1, 1}, []string{w, d})}, + {files(202, []int{2, 2}, []string{w, d})}, + {files(302, []int{1}, []string{w})}, + {files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // small sst 1, split-table, checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: map[int64]map[string]struct{}{ + downstreamID(100): {cptKey(100, 1, w): struct{}{}}, + downstreamID(202): {cptKey(202, 1, w): struct{}{}}, + }, + splitSizeBytes: 350, + splitKeyCount: 350, + splitOnTable: true, + splitKeys: [][]byte{ + key(202, 2), /*split table key*/ + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + // {files(202, []int{1, 1}, []string{w, d})}, + {files(202, []int{2, 2}, []string{w, d})}, + {files(302, []int{1}, []string{w})}, + // {files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // small sst 1, no split-table, no checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: nil, + splitSizeBytes: 350, + splitKeyCount: 350, + splitOnTable: false, + splitKeys: [][]byte{ + key(202, 2), key(302, 2), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w}), files(202, []int{1, 1}, []string{w, d})}, + {files(202, []int{2, 2}, []string{w, d}), files(302, []int{1}, []string{w})}, + {files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // small sst 1, no split-table, checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: map[int64]map[string]struct{}{ + downstreamID(100): {cptKey(100, 1, w): struct{}{}}, + downstreamID(202): {cptKey(202, 1, w): struct{}{}}, + }, + splitSizeBytes: 350, + splitKeyCount: 350, + splitOnTable: false, + splitKeys: [][]byte{ + key(202, 2), key(302, 2), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + {files(202, []int{2, 2}, []string{w, d}), files(302, []int{1}, []string{w})}, + }, + }, + { // small sst 2, split-table, no checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: nil, + splitSizeBytes: 450, + splitKeyCount: 450, + splitOnTable: true, + splitKeys: [][]byte{}, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + {files(202, []int{1, 1, 2, 2}, []string{w, d, w, d})}, + {files(302, []int{1}, []string{w})}, + {files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // small sst 2, split-table, checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: map[int64]map[string]struct{}{ + downstreamID(100): {cptKey(100, 1, w): struct{}{}}, + downstreamID(202): {cptKey(202, 1, w): struct{}{}}, + }, + splitSizeBytes: 450, + splitKeyCount: 450, + splitOnTable: true, + splitKeys: [][]byte{}, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + {files(202, []int{2, 2}, []string{w, d})}, + {files(302, []int{1}, []string{w})}, + }, + }, + { // small sst 2, no split-table, no checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: nil, + splitSizeBytes: 450, + splitKeyCount: 450, + splitOnTable: false, + splitKeys: [][]byte{ + key(102, 2), key(202, 3), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + {files(202, []int{1, 1, 2, 2}, []string{w, d, w, d})}, + {files(302, []int{1}, []string{w}), files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // small sst 2, no split-table, checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: map[int64]map[string]struct{}{ + downstreamID(100): {cptKey(100, 1, w): struct{}{}}, + downstreamID(202): {cptKey(202, 1, w): struct{}{}}, + }, + splitSizeBytes: 450, + splitKeyCount: 450, + splitOnTable: false, + splitKeys: [][]byte{ + key(102, 2), key(202, 3), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + {files(202, []int{2, 2}, []string{w, d})}, + {files(302, []int{1}, []string{w})}, + }, + }, + { // small sst 3, no split-table, no checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: nil, + splitSizeBytes: 501, + splitKeyCount: 501, + splitOnTable: false, + splitKeys: [][]byte{ + key(202, 3), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w}), files(202, []int{1, 1, 2, 2}, []string{w, d, w, d})}, + {files(302, []int{1}, []string{w}), files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // small sst 3, no split-table, checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 100, 100, w), file(202, 2, 3, 100, 100, d), + file(302, 1, 2, 100, 100, w), + }, + checkpointSetWithTableID: map[int64]map[string]struct{}{ + downstreamID(100): {cptKey(100, 1, w): struct{}{}}, + downstreamID(202): {cptKey(202, 1, w): struct{}{}}, + }, + splitSizeBytes: 501, + splitKeyCount: 501, + splitOnTable: false, + splitKeys: [][]byte{ + key(202, 3), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w}), files(202, []int{2, 2}, []string{w, d, w, d})}, + {files(302, []int{1}, []string{w})}, + }, + }, + { // small sst 4, no split-table, no checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 400, 400, w), file(202, 2, 3, 80, 80, d), + file(302, 1, 2, 10, 10, w), + }, + checkpointSetWithTableID: nil, + splitSizeBytes: 501, + splitKeyCount: 501, + splitOnTable: false, + splitKeys: [][]byte{ + key(202, 2), key(302, 2), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w}), files(202, []int{1, 1}, []string{w, d})}, + {files(202, []int{2, 2}, []string{w, d}), files(302, []int{1}, []string{w})}, + {files(100, []int{1, 1}, []string{w, d})}, + }, + }, + { // small sst 4, no split-table, checkpoint + upstreamTableIDs: []int64{100, 200, 300}, + upstreamPartitionIDs: map[int64][]int64{100: {101, 102, 103}, 200: {201, 202, 203}, 300: {301, 302, 303}}, + files: []*backuppb.File{ + file(100, 1, 2, 100, 100, w), file(100, 1, 2, 100, 100, d), + file(102, 1, 2, 100, 100, w), + file(202, 1, 2, 100, 100, w), file(202, 1, 2, 100, 100, d), + file(202, 2, 3, 400, 400, w), file(202, 2, 3, 80, 80, d), + file(302, 1, 2, 10, 10, w), + }, + checkpointSetWithTableID: map[int64]map[string]struct{}{ + downstreamID(100): {cptKey(100, 1, w): struct{}{}}, + downstreamID(202): {cptKey(202, 1, w): struct{}{}}, + }, + splitSizeBytes: 501, + splitKeyCount: 501, + splitOnTable: false, + splitKeys: [][]byte{ + key(202, 2), key(302, 2), key(100, 2), + }, + tableIDWithFilesGroups: [][]snapclient.TableIDWithFiles{ + {files(102, []int{1}, []string{w})}, + {files(202, []int{2, 2}, []string{w, d}), files(302, []int{1}, []string{w})}, + }, + }, + } + + for i, cs := range cases { + t.Log(i) + createdTables := generateCreatedTables(t, cs.upstreamTableIDs, cs.upstreamPartitionIDs, downstreamID) + splitKeys, tableIDWithFilesGroups, err := snapclient.SortAndValidateFileRanges(createdTables, cs.files, cs.checkpointSetWithTableID, cs.splitSizeBytes, cs.splitKeyCount, cs.splitOnTable, updateCh) + require.NoError(t, err) + require.Equal(t, cs.splitKeys, splitKeys) + require.Equal(t, len(cs.tableIDWithFilesGroups), len(tableIDWithFilesGroups)) + for i, expectFilesGroup := range cs.tableIDWithFilesGroups { + actualFilesGroup := tableIDWithFilesGroups[i] + require.Equal(t, len(expectFilesGroup), len(actualFilesGroup)) + for j, expectFiles := range expectFilesGroup { + actualFiles := actualFilesGroup[j] + require.Equal(t, expectFiles.TableID, actualFiles.TableID) + for k, expectFile := range expectFiles.Files { + actualFile := actualFiles.Files[k] + require.Equal(t, expectFile.Name, actualFile.Name) + } + } + } + } +} diff --git a/br/pkg/restore/utils/rewrite_rule.go b/br/pkg/restore/utils/rewrite_rule.go index 053d5550766a1..eca06a58bee6a 100644 --- a/br/pkg/restore/utils/rewrite_rule.go +++ b/br/pkg/restore/utils/rewrite_rule.go @@ -238,12 +238,16 @@ func rewriteRawKey(key []byte, rewriteRules *RewriteRules) ([]byte, *import_sstp } if len(key) > 0 { rule := matchOldPrefix(key, rewriteRules) - ret := bytes.Replace(key, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1) - return codec.EncodeBytes([]byte{}, ret), rule + return RewriteAndEncodeRawKey(key, rule), rule } return nil, nil } +func RewriteAndEncodeRawKey(key []byte, rule *import_sstpb.RewriteRule) []byte { + ret := bytes.Replace(key, rule.GetOldKeyPrefix(), rule.GetNewKeyPrefix(), 1) + return codec.EncodeBytes([]byte{}, ret) +} + func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.RewriteRule { for _, rule := range rewriteRules.Data { if bytes.HasPrefix(key, rule.GetOldKeyPrefix()) { diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 89bbcfac685b3..e62938e7fab66 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -50,6 +50,7 @@ go_library( "//br/pkg/utils", "//br/pkg/version", "//pkg/config", + "//pkg/ddl", "//pkg/domain", "//pkg/infoschema", "//pkg/kv", diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 82e7963e7148c..3463df2cbba97 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -8,6 +8,7 @@ import ( "fmt" "slices" "strings" + "sync/atomic" "time" "github.com/docker/go-units" @@ -32,6 +33,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" @@ -1119,6 +1121,11 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } if err := client.RestoreTables(ctx, placementRuleManager, createdTables, files, checkpointSetWithTableID, kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value, + // If the command is from BR binary, the ddl.EnableSplitTableRegion is always 0, + // If the command is from BRIE SQL, the ddl.EnableSplitTableRegion is TiDB config split-table. + // Notice that `split-region-on-table` configure from TiKV split on the region having data, it may trigger after restore done. + // It's recommended to enable TiDB configure `split-table` instead. + atomic.LoadUint32(&ddl.EnableSplitTableRegion) == 1, updateCh, ); err != nil { return errors.Trace(err) diff --git a/br/tests/br_split_region_fail/run.sh b/br/tests/br_split_region_fail/run.sh index 87751695fab00..cc35015271231 100644 --- a/br/tests/br_split_region_fail/run.sh +++ b/br/tests/br_split_region_fail/run.sh @@ -48,7 +48,7 @@ echo "restore start..." unset BR_LOG_TO_TERM GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/split/not-leader-error=1*return(true)->1*return(false);\ github.com/pingcap/tidb/br/pkg/restore/split/somewhat-retryable-error=3*return(true)" \ -run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024 --log-file $LOG || true +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024 --merge-region-key-count 1 --log-file $LOG || true BR_LOG_TO_TERM=1 grep "a error occurs on split region" $LOG && \