Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: adaption for keyspace feature #40532

Merged
merged 33 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d8fcd54
BR adaption for keyspace feature
iosmanthus Sep 22, 2022
cfc10d8
make tidb br forward compatible
iosmanthus Oct 18, 2022
fd4318b
Merge branch 'master' of github.com:pingcap/tidb into keyspace-br
iosmanthus Jan 12, 2023
31296ef
make check
iosmanthus Jan 12, 2023
cdd76f0
fix lint
iosmanthus Jan 12, 2023
79872e6
review by self
iosmanthus Jan 12, 2023
e158abe
Merge branch 'master' of github.com:pingcap/tidb into keyspace-br
iosmanthus Jan 16, 2023
4858bf9
fix forward compatibility of keyspace br
iosmanthus Jan 19, 2023
d3b6dfb
Merge branch 'master' of github.com:pingcap/tidb into keyspace-br
iosmanthus Jan 19, 2023
8ce0a61
fix goimports
iosmanthus Jan 19, 2023
9f51eb6
fix goimports
iosmanthus Jan 19, 2023
ca4f381
fix goimports
iosmanthus Jan 19, 2023
72827d1
fix unit test compile
iosmanthus Jan 19, 2023
63bdfd7
make bazel_prepare
iosmanthus Jan 19, 2023
96d54e6
Merge branch 'master' of github.com:pingcap/tidb into keyspace-br
iosmanthus Jan 19, 2023
576bd89
goimports -w ./
iosmanthus Jan 28, 2023
8f5c086
Merge branch 'master' of github.com:pingcap/tidb into keyspace-br
iosmanthus Jan 28, 2023
25d0f4e
sync bazel deps
hawkingrei Jan 28, 2023
0f3d739
Merge branch 'master' into keyspace-br
hawkingrei Jan 28, 2023
47d51a0
fix goimports
iosmanthus Jan 28, 2023
2a07b71
add some comments for keyspace br restore
iosmanthus Jan 30, 2023
704e049
Merge branch 'master' into keyspace-br
iosmanthus Jan 30, 2023
f9bc6d9
Merge branch 'master' into keyspace-br
iosmanthus Jan 31, 2023
0a219ea
fix bazel ignore path
iosmanthus Jan 31, 2023
7d7baa2
Merge branch 'master' of github.com:pingcap/tidb into keyspace-br
iosmanthus Jan 31, 2023
cdee5a2
Merge branch 'master' into keyspace-br
iosmanthus Jan 31, 2023
cc31140
Merge branch 'master' into keyspace-br
iosmanthus Feb 1, 2023
b7912b4
Merge branch 'master' into keyspace-br
iosmanthus Feb 1, 2023
e1282d3
revert ignore path
iosmanthus Feb 1, 2023
cc199e5
Merge branch 'master' into keyspace-br
iosmanthus Feb 1, 2023
5ff8d30
Merge branch 'master' into keyspace-br
iosmanthus Feb 1, 2023
0270ae2
Merge branch 'master' into keyspace-br
ti-chi-bot Feb 2, 2023
c43a34a
Merge branch 'master' into keyspace-br
ti-chi-bot Feb 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4831,7 +4831,6 @@ def go_deps():
sum = "h1:CYjC+xzdPvbV65gi6Dr4YowKcmLo045pm18L0DhdELM=",
version = "v0.2.0",
)

go_repository(
name = "com_google_cloud_go_gsuiteaddons",
build_file_proto_mode = "disable",
Expand Down Expand Up @@ -5044,7 +5043,6 @@ def go_deps():
sum = "h1:u6EznTGzIdsyOsvm+Xkw0aSuKFXQlyjGE9a4exk6iNQ=",
version = "v1.3.1",
)

go_repository(
name = "com_google_cloud_go_recaptchaenterprise_v2",
build_file_proto_mode = "disable",
Expand Down Expand Up @@ -5249,7 +5247,6 @@ def go_deps():
sum = "h1:/CsSTkbmO9HC8iQpxbK8ATms3OQaX3YQUeTMGCxlaK4=",
version = "v1.2.0",
)

go_repository(
name = "com_google_cloud_go_vision_v2",
build_file_proto_mode = "disable",
Expand Down
46 changes: 37 additions & 9 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type ExecutorBuilder struct {
oldTable *metautil.Table

concurrency uint

oldKeyspace []byte
newKeyspace []byte
}

// NewExecutorBuilder returns a new executor builder.
Expand All @@ -51,9 +54,26 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder {
return builder
}

func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder {
builder.oldKeyspace = keyspace
return builder
}

func (builder *ExecutorBuilder) SetNewKeyspace(keyspace []byte) *ExecutorBuilder {
builder.newKeyspace = keyspace
return builder
}

// Build builds a checksum executor.
func (builder *ExecutorBuilder) Build() (*Executor, error) {
reqs, err := buildChecksumRequest(builder.table, builder.oldTable, builder.ts, builder.concurrency)
reqs, err := buildChecksumRequest(
builder.table,
builder.oldTable,
builder.ts,
builder.concurrency,
builder.oldKeyspace,
builder.newKeyspace,
)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -65,6 +85,8 @@ func buildChecksumRequest(
oldTable *metautil.Table,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) ([]*kv.Request, error) {
var partDefs []model.PartitionDefinition
if part := newTable.Partition; part != nil {
Expand All @@ -76,7 +98,7 @@ func buildChecksumRequest(
if oldTable != nil {
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency)
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -91,7 +113,7 @@ func buildChecksumRequest(
}
}
}
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency)
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -108,9 +130,11 @@ func buildRequest(
oldTableID int64,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) ([]*kv.Request, error) {
reqs := make([]*kv.Request, 0)
req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency)
req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -139,7 +163,7 @@ func buildRequest(
}
}
req, err = buildIndexRequest(
tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency)
tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -156,12 +180,14 @@ func buildTableRequest(
oldTableID int64,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldTable != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: tablecodec.GenTableRecordPrefix(oldTableID),
NewPrefix: tablecodec.GenTableRecordPrefix(tableID),
OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.GenTableRecordPrefix(oldTableID)...),
NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.GenTableRecordPrefix(tableID)...),
}
}

Expand Down Expand Up @@ -195,12 +221,14 @@ func buildIndexRequest(
oldIndexInfo *model.IndexInfo,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldIndexInfo != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID),
NewPrefix: tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID),
OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...),
NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...),
}
}
checksum := &tipb.ChecksumRequest{
Expand Down
1 change: 1 addition & 0 deletions br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//br/pkg/pdutil",
"//br/pkg/utils",
"//br/pkg/version",
"//config",
"//domain",
"//kv",
"@com_github_docker_go_units//:go-units",
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -174,7 +175,8 @@ func NewMgr(
}

// Disable GC because TiDB enables GC already.
storage, err := g.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddrs), securityOption)
path := fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", pdAddrs, config.GetGlobalKeyspaceName())
storage, err := g.Open(path, securityOption)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ type MetaReader struct {

// NewMetaReader creates MetaReader.
func NewMetaReader(
backpMeta *backuppb.BackupMeta,
backupMeta *backuppb.BackupMeta,
storage storage.ExternalStorage,
cipher *backuppb.CipherInfo) *MetaReader {
return &MetaReader{
storage: storage,
backupMeta: backpMeta,
backupMeta: backupMeta,
cipher: cipher,
}
}
Expand Down
25 changes: 21 additions & 4 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ type Client struct {

// the successfully preallocated table IDs.
preallocedTableIDs *tidalloc.PreallocIDs

// the rewrite mode of the downloaded SST files in TiKV.
rewriteMode RewriteMode
}

// NewRestoreClient returns a new RestoreClient.
Expand Down Expand Up @@ -317,6 +320,14 @@ func (rc *Client) GetBatchDdlSize() uint {
return rc.batchDdlSize
}

func (rc *Client) SetRewriteMode(mode RewriteMode) {
rc.rewriteMode = mode
}

func (rc *Client) GetRewriteMode() RewriteMode {
return rc.rewriteMode
}

// Close a client.
func (rc *Client) Close() {
// rc.db can be nil in raw kv mode.
Expand Down Expand Up @@ -346,7 +357,7 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool) {
metaClient := split.NewSplitClient(rc.pdClient, rc.tlsConf, isRawKvMode)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, rc.rewriteMode)
}

func (rc *Client) SetRawKVClient(c *RawKVBatchClient) {
Expand Down Expand Up @@ -870,7 +881,7 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma
}
})
if err != nil {
log.Error("create tables fail")
log.Error("create tables fail", zap.Error(err))
return err
}
for _, ct := range cts {
Expand Down Expand Up @@ -1042,7 +1053,7 @@ func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient,
rc.SetRateLimit(42)
rc.SetConcurrency(concurrency)
rc.hasSpeedLimited = false
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false)
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, RewriteModeLegacy)
return rc.setSpeedLimit(ctx, rc.rateLimit)
}

Expand Down Expand Up @@ -1182,7 +1193,7 @@ func (rc *Client) RestoreSSTFiles(
zap.Duration("take", time.Since(fileStart)))
updateCh.Inc()
}()
return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.backupMeta.ApiVersion)
return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion())
})
}

Expand Down Expand Up @@ -1434,6 +1445,8 @@ func (rc *Client) execChecksum(
exe, err := checksum.NewExecutorBuilder(tbl.Table, startTS).
SetOldTable(tbl.OldTable).
SetConcurrency(concurrency).
SetOldKeyspace(tbl.RewriteRule.OldKeyspace).
SetNewKeyspace(tbl.RewriteRule.NewKeyspace).
Build()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -2762,6 +2775,10 @@ func TidyOldSchemas(sr *stream.SchemasReplace) *backup.Schemas {
return schemas
}

func CheckKeyspaceBREnable(ctx context.Context, pdClient pd.Client) error {
return version.CheckClusterVersion(ctx, pdClient, version.CheckVersionForKeyspaceBR)
}

func CheckNewCollationEnable(
backupNewCollationEnable string,
g glue.Glue,
Expand Down
Loading