Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <wenqimou@gmail.com>
  • Loading branch information
Tristan1900 committed Aug 30, 2024
1 parent 60c7e61 commit c71db0e
Show file tree
Hide file tree
Showing 16 changed files with 3,507 additions and 6,741 deletions.
8,358 changes: 1,676 additions & 6,682 deletions DEPS.bzl

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error {

if err := task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg); err != nil {
log.Error("failed to restore", zap.Error(err))
printWorkaroundOnFullRestoreError(command, err)
printWorkaroundOnFullRestoreError(err)
return errors.Trace(err)
}
return nil
}

// print workaround when we met not fresh or incompatible cluster error on full cluster restore
func printWorkaroundOnFullRestoreError(command *cobra.Command, err error) {
func printWorkaroundOnFullRestoreError(err error) {
if !errors.ErrorEqual(err, berrors.ErrRestoreNotFreshCluster) &&
!errors.ErrorEqual(err, berrors.ErrRestoreIncompatibleSys) {
return
Expand Down
12 changes: 8 additions & 4 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/checkpoint"
"github.com/pingcap/tidb/br/pkg/checksum"
Expand Down Expand Up @@ -416,7 +417,7 @@ func ApplyKVFilesWithBatchMethod(
return nil
}

func ApplyKVFilesWithSingelMethod(
func ApplyKVFilesWithSingleMethod(
ctx context.Context,
files LogIter,
applyFunc func(file []*LogDataFileInfo, kvCount int64, size uint64),
Expand Down Expand Up @@ -457,6 +458,8 @@ func (rc *LogClient) RestoreKVFiles(
pitrBatchSize uint32,
updateStats func(kvCount uint64, size uint64),
onProgress func(cnt int64),
cipherInfo *backuppb.CipherInfo,
masterKeys []*encryptionpb.MasterKey,
) error {
var (
err error
Expand All @@ -468,7 +471,7 @@ func (rc *LogClient) RestoreKVFiles(
defer func() {
if err == nil {
elapsed := time.Since(start)
log.Info("Restore KV files", zap.Duration("take", elapsed))
log.Info("Restored KV files", zap.Duration("take", elapsed))
summary.CollectSuccessUnit("files", fileCount, elapsed)
}
}()
Expand Down Expand Up @@ -528,7 +531,8 @@ func (rc *LogClient) RestoreKVFiles(
}
}()

return rc.fileImporter.ImportKVFiles(ectx, files, rule, rc.shiftStartTS, rc.startTS, rc.restoreTS, supportBatch)
return rc.fileImporter.ImportKVFiles(ectx, files, rule, rc.shiftStartTS, rc.startTS, rc.restoreTS,
supportBatch, cipherInfo, masterKeys)
})
}
}
Expand All @@ -537,7 +541,7 @@ func (rc *LogClient) RestoreKVFiles(
if supportBatch {
err = ApplyKVFilesWithBatchMethod(ectx, logIter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc, &applyWg)
} else {
err = ApplyKVFilesWithSingelMethod(ectx, logIter, applyFunc, &applyWg)
err = ApplyKVFilesWithSingleMethod(ectx, logIter, applyFunc, &applyWg)
}
return errors.Trace(err)
})
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/log_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func TestApplyKVFilesWithSingelMethod(t *testing.T) {
}
}

logclient.ApplyKVFilesWithSingelMethod(
logclient.ApplyKVFilesWithSingleMethod(
context.TODO(),
toLogDataFileInfoIter(iter.FromSlice(ds)),
applyFunc,
Expand Down Expand Up @@ -1284,7 +1284,7 @@ func TestApplyKVFilesWithBatchMethod5(t *testing.T) {
require.Equal(t, backuppb.FileType_Delete, types[len(types)-1])

types = make([]backuppb.FileType, 0)
logclient.ApplyKVFilesWithSingelMethod(
logclient.ApplyKVFilesWithSingleMethod(
context.TODO(),
toLogDataFileInfoIter(iter.FromSlice(ds)),
applyFunc,
Expand Down
24 changes: 18 additions & 6 deletions br/pkg/restore/log_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -101,6 +102,8 @@ func (importer *LogFileImporter) ImportKVFiles(
startTS uint64,
restoreTS uint64,
supportBatch bool,
cipherInfo *backuppb.CipherInfo,
masterKeys []*encryptionpb.MasterKey,
) error {
var (
startKey []byte
Expand All @@ -111,7 +114,7 @@ func (importer *LogFileImporter) ImportKVFiles(

if !supportBatch && len(files) > 1 {
return errors.Annotatef(berrors.ErrInvalidArgument,
"do not support batch apply but files count:%v > 1", len(files))
"do not support batch apply, file count: %v > 1", len(files))
}
log.Debug("import kv files", zap.Int("batch file count", len(files)))

Expand Down Expand Up @@ -143,7 +146,8 @@ func (importer *LogFileImporter) ImportKVFiles(
if len(subfiles) == 0 {
return RPCResultOK()
}
return importer.importKVFileForRegion(ctx, subfiles, rule, shiftStartTS, startTS, restoreTS, r, supportBatch)
return importer.importKVFileForRegion(ctx, subfiles, rule, shiftStartTS, startTS, restoreTS, r, supportBatch,
cipherInfo, masterKeys)
})
return errors.Trace(err)
}
Expand Down Expand Up @@ -184,9 +188,11 @@ func (importer *LogFileImporter) importKVFileForRegion(
restoreTS uint64,
info *split.RegionInfo,
supportBatch bool,
cipherInfo *backuppb.CipherInfo,
masterKeys []*encryptionpb.MasterKey,
) RPCResult {
// Try to download file.
result := importer.downloadAndApplyKVFile(ctx, files, rule, info, shiftStartTS, startTS, restoreTS, supportBatch)
result := importer.downloadAndApplyKVFile(ctx, files, rule, info, shiftStartTS, startTS, restoreTS, supportBatch, cipherInfo, masterKeys)
if !result.OK() {
errDownload := result.Err
for _, e := range multierr.Errors(errDownload) {
Expand Down Expand Up @@ -216,7 +222,9 @@ func (importer *LogFileImporter) downloadAndApplyKVFile(
startTS uint64,
restoreTS uint64,
supportBatch bool,
) RPCResult {
cipherInfo *backuppb.CipherInfo,
masterKeys []*encryptionpb.MasterKey) RPCResult {

leader := regionInfo.Leader
if leader == nil {
return RPCResultFromError(errors.Annotatef(berrors.ErrPDLeaderNotFound,
Expand Down Expand Up @@ -276,6 +284,8 @@ func (importer *LogFileImporter) downloadAndApplyKVFile(
RewriteRules: rewriteRules,
Context: reqCtx,
StorageCacheId: importer.cacheKey,
CipherInfo: cipherInfo,
MasterKeys: masterKeys,
}
} else {
req = &import_sstpb.ApplyRequest{
Expand All @@ -284,16 +294,18 @@ func (importer *LogFileImporter) downloadAndApplyKVFile(
RewriteRule: *rewriteRules[0],
Context: reqCtx,
StorageCacheId: importer.cacheKey,
CipherInfo: cipherInfo,
MasterKeys: masterKeys,
}
}

log.Debug("apply kv file", logutil.Leader(leader))
log.Debug("applying kv file", logutil.Leader(leader))
resp, err := importer.importClient.ApplyKVFile(ctx, leader.GetStoreId(), req)
if err != nil {
return RPCResultFromError(errors.Trace(err))
}
if resp.GetError() != nil {
logutil.CL(ctx).Warn("import meet error", zap.Stringer("error", resp.GetError()))
logutil.CL(ctx).Warn("import has error", zap.Stringer("error", resp.GetError()))
return RPCResultFromPBError(resp.GetError())
}
return RPCResultOK()
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"backup_raw.go",
"backup_txn.go",
"common.go",
"encryption.go",
"restore.go",
"restore_data.go",
"restore_ebs_meta.go",
Expand Down Expand Up @@ -106,12 +107,13 @@ go_test(
"backup_test.go",
"common_test.go",
"config_test.go",
"encryption_test.go",
"restore_test.go",
"stream_test.go",
],
embed = [":task"],
flaky = True,
shard_count = 34,
shard_count = 39,
deps = [
"//br/pkg/backup",
"//br/pkg/config",
Expand Down
45 changes: 45 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ const (
crypterAES256KeyLen = 32

flagFullBackupType = "type"

masterKeysDelimiter = ","
flagMasterKeysConfig = "master-keys"
defaultMasterKeyScheme = "plaintext"
)

const (
Expand Down Expand Up @@ -260,8 +264,15 @@ type Config struct {
// GrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed.
GRPCKeepaliveTimeout time.Duration `json:"grpc-keepalive-timeout" toml:"grpc-keepalive-timeout"`

// Plaintext data key mainly used for full/snapshot backup and restore.
// Could be used in log backup and restore but not recommended in a serious environment since data key is stored
// in pd in plaintext.
CipherInfo backuppb.CipherInfo `json:"-" toml:"-"`

// Master key based encryption for log restore.
// More than one can be specified for log restore if master key rotated during log backup.
MasterKeys []encryptionpb.MasterKey `json:"master-keys" toml:"master-keys"`

// whether there's explicit filter
ExplicitFilter bool `json:"-" toml:"-"`

Expand Down Expand Up @@ -321,6 +332,12 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
flags.Uint(flagMetadataDownloadBatchSize, defaultMetadataDownloadBatchSize,
"the batch size of downloading metadata, such as log restore metadata for truncate or restore")

flags.String(flagMasterKeysConfig, defaultMasterKeyScheme, "Master key configs for point in time restore "+
"can use comma separated string to specify multiple master key backends if log backup had master key rotation."+
"example: \"local:///path/to/master/key/file,"+
"aws-kms:///{key-id}?AWS_ACCESS_KEY_ID={access-key}&AWS_SECRET_ACCESS_KEY={secret-key}&REGION={region},"+
"azure-kms:///{key-name}/{key-version}?AZURE_TENANT_ID={tenant-id}&AZURE_CLIENT_ID={client-id}&AZURE_CLIENT_SECRET={client-secret}&AZURE_VAULT_NAME={vault-name},"+
"gcp-kms:///projects/{project-id}/locations/{location}/keyRings/{keyring}/cryptoKeys/{key-name}?AUTH=specified&CREDENTIALS={credentials}\"")
_ = flags.MarkHidden(flagMetadataDownloadBatchSize)

storage.DefineFlags(flags)
Expand Down Expand Up @@ -622,13 +639,41 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

if err = cfg.parseAndValidateMasterKeyInfo(flags); err != nil {
return errors.Trace(err)
}

if cfg.MetadataDownloadBatchSize, err = flags.GetUint(flagMetadataDownloadBatchSize); err != nil {
return errors.Trace(err)
}

return cfg.normalizePDURLs()
}

func (cfg *Config) parseAndValidateMasterKeyInfo(flags *pflag.FlagSet) error {
masterKeyString, err := flags.GetString(flagMasterKeysConfig)
if err != nil {
return errors.Errorf("master key flag '%s' is not defined: %v", flagMasterKeysConfig, err)
}

if masterKeyString == "" || masterKeyString == defaultMasterKeyScheme {
return nil
}

masterKeyStrings := strings.Split(masterKeyString, ",")
cfg.MasterKeys = make([]encryptionpb.MasterKey, 0, len(masterKeyStrings))

for _, keyString := range masterKeyStrings {
masterKey, err := validateAndParseMasterKeyString(strings.TrimSpace(keyString))
if err != nil {
return errors.Wrapf(err, "invalid master key configuration: %s", keyString)
}
cfg.MasterKeys = append(cfg.MasterKeys, masterKey)
}

return nil
}

// NewMgr creates a new mgr at the given PD address.
func NewMgr(ctx context.Context,
g glue.Glue, pds []string,
Expand Down
Loading

0 comments on commit c71db0e

Please sign in to comment.