Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Implement Raw Restore #104

Merged
merged 37 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1679ab5
Update kvproto
MyonKeminta Dec 10, 2019
ee18eaa
Implement raw restore
MyonKeminta Dec 12, 2019
a92fbad
Merge branch 'master' of https://github.com/pingcap/br into m/raw-res…
MyonKeminta Dec 12, 2019
838fe4c
fix build
MyonKeminta Dec 12, 2019
501eb9b
Set range for file importer
MyonKeminta Dec 12, 2019
f1c803d
Remove unnecessary comments
MyonKeminta Dec 12, 2019
cdd7449
check cf and support multi ranges in BackupMeta
MyonKeminta Dec 13, 2019
da2c5cd
Check files' cf; address comments
MyonKeminta Dec 13, 2019
3af0a4f
Merge branch 'master' of https://github.com/pingcap/br into m/raw-res…
MyonKeminta Dec 25, 2019
c7c7243
adjust structure to keep consistent with master
MyonKeminta Dec 25, 2019
d3accd6
Merge branch 'master' of https://github.com/pingcap/br into m/raw-res…
MyonKeminta Feb 6, 2020
5816304
Fix build
MyonKeminta Feb 6, 2020
78c1757
Merge branch 'master' of https://github.com/pingcap/br into m/raw-res…
MyonKeminta Feb 11, 2020
1a5c99b
Merge branch 'master' of https://github.com/pingcap/br into m/raw-res…
MyonKeminta Mar 4, 2020
3f9b16b
Fix build and make check, avoid accessing TiDB in rawkv mode
MyonKeminta Mar 5, 2020
5d54e71
Merge branch 'master' into m/raw-restore
3pointer Mar 5, 2020
65c9e43
Fix test
MyonKeminta Mar 5, 2020
07f1021
Merge branch 'm/raw-restore' of https://github.com/MyonKeminta/br int…
MyonKeminta Mar 5, 2020
4638167
Fix tests
MyonKeminta Mar 5, 2020
5bea24b
Fix broken logic after merging master
MyonKeminta Mar 10, 2020
429c548
Merge branch 'master' of https://github.com/pingcap/br into m/raw-res…
MyonKeminta Mar 10, 2020
42f50e4
Update pkg/task/restore_raw.go
MyonKeminta Mar 11, 2020
0acc3d8
Address comments
MyonKeminta Mar 11, 2020
51cf368
Address comments
MyonKeminta Mar 11, 2020
ecf46eb
Mark raw restore as experimental
MyonKeminta Mar 11, 2020
a106210
Merge branch 'master' into m/raw-restore
3pointer Mar 11, 2020
5c1540f
Merge branch 'm/raw-restore' of https://github.com/MyonKeminta/br int…
MyonKeminta Mar 11, 2020
3d14041
Fix build
MyonKeminta Mar 12, 2020
c4500a8
Address comments
MyonKeminta Mar 13, 2020
f1e7b05
test: Add check for deleting data and partial backup
MyonKeminta Mar 13, 2020
47b9558
Merge branch 'master' into m/raw-restore
3pointer Mar 16, 2020
c4e6a51
Fix build
MyonKeminta Mar 16, 2020
a1e0f0e
Add license header
MyonKeminta Mar 16, 2020
0bac6cc
Merge branch 'master' into m/raw-restore
sre-bot Mar 16, 2020
2606719
fix ci
3pointer Mar 16, 2020
35070c3
fix ci
3pointer Mar 17, 2020
2fa84cd
Merge branch 'master' into m/raw-restore
3pointer Mar 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/spf13/cobra"

"github.com/pingcap/br/pkg/gluetikv"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/task"
"github.com/pingcap/br/pkg/utils"
Expand All @@ -21,11 +22,13 @@ func runBackupCommand(command *cobra.Command, cmdName string) error {
}

func runBackupRawCommand(command *cobra.Command, cmdName string) error {
cfg := task.BackupRawConfig{Config: task.Config{LogProgress: HasLogFile()}}
cfg := task.BackupRawConfig{
RawKvConfig: task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}},
}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}
return task.RunBackupRaw(GetDefaultContext(), tidbGlue, cmdName, &cfg)
return task.RunBackupRaw(GetDefaultContext(), gluetikv.Glue{}, cmdName, &cfg)
}

// NewBackupCommand return a full backup subcommand.
Expand Down
25 changes: 25 additions & 0 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/spf13/cobra"

"github.com/pingcap/br/pkg/gluetikv"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/task"
"github.com/pingcap/br/pkg/utils"
Expand All @@ -19,6 +20,16 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error {
return task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg)
}

func runRestoreRawCommand(command *cobra.Command, cmdName string) error {
cfg := task.RestoreRawConfig{
RawKvConfig: task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}},
}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}
return task.RunRestoreRaw(GetDefaultContext(), gluetikv.Glue{}, cmdName, &cfg)
}

// NewRestoreCommand returns a restore subcommand
func NewRestoreCommand() *cobra.Command {
command := &cobra.Command{
Expand All @@ -43,6 +54,7 @@ func NewRestoreCommand() *cobra.Command {
newFullRestoreCommand(),
newDbRestoreCommand(),
newTableRestoreCommand(),
newRawRestoreCommand(),
)
task.DefineRestoreFlags(command.PersistentFlags())

Expand Down Expand Up @@ -83,3 +95,16 @@ func newTableRestoreCommand() *cobra.Command {
task.DefineTableFlags(command)
return command
}

func newRawRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "raw",
Short: "(experimental) restore a raw kv range to TiKV cluster",
RunE: func(cmd *cobra.Command, _ []string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cobra commands should just handle cli options and then call a function. Then it will be easy to create other interfaces to br or to use as a library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't know. I just tried to keep my code consistent with other commands here. @5kbpers Do you have any idea?

return runRestoreRawCommand(cmd, "Raw restore")
},
}

task.DefineRawRestoreFlags(command)
return command
}
18 changes: 6 additions & 12 deletions pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ import (

"github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/tikv"

"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/gluetikv"
)

// Glue is an implementation of glue.Glue using a new TiDB session.
type Glue struct{}
type Glue struct {
tikvGlue gluetikv.Glue
}

type tidbSession struct {
se session.Session
Expand All @@ -41,15 +42,8 @@ func (Glue) CreateSession(store kv.Storage) (glue.Session, error) {
}

// Open implements glue.Glue
func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
if option.CAPath != "" {
conf := config.GetGlobalConfig()
conf.Security.ClusterSSLCA = option.CAPath
conf.Security.ClusterSSLCert = option.CertPath
conf.Security.ClusterSSLKey = option.KeyPath
config.StoreGlobalConfig(conf)
}
return tikv.Driver{}.Open(path)
func (g Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
return g.tikvGlue.Open(path, option)
}

// OwnsStorage implements glue.Glue
Expand Down
41 changes: 41 additions & 0 deletions pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package gluetikv

import (
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"

"github.com/pingcap/br/pkg/glue"
)

// Glue is an implementation of glue.Glue that accesses only TiKV without TiDB.
type Glue struct{}

// GetDomain implements glue.Glue
func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
return nil, nil
}

// CreateSession implements glue.Glue
func (Glue) CreateSession(store kv.Storage) (glue.Session, error) {
return nil, nil
}

// Open implements glue.Glue
func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
if option.CAPath != "" {
conf := config.GetGlobalConfig()
conf.Security.ClusterSSLCA = option.CAPath
conf.Security.ClusterSSLCert = option.CertPath
conf.Security.ClusterSSLKey = option.KeyPath
config.StoreGlobalConfig(conf)
}
return tikv.Driver{}.Open(path)
}

// OwnsStorage implements glue.Glue
func (Glue) OwnsStorage() bool {
return true
}
136 changes: 129 additions & 7 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
package restore

import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"encoding/json"
"math"
"sort"
Expand Down Expand Up @@ -96,33 +98,96 @@ func (rc *Client) IsOnline() bool {

// Close a client
func (rc *Client) Close() {
rc.db.Close()
// rc.db can be nil in raw kv mode.
if rc.db != nil {
rc.db.Close()
}
rc.cancel()
log.Info("Restore client closed")
}

// InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient
func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.StorageBackend) error {
databases, err := utils.LoadBackupTables(backupMeta)
if err != nil {
return errors.Trace(err)
if !backupMeta.IsRawKv {
databases, err := utils.LoadBackupTables(backupMeta)
if err != nil {
return errors.Trace(err)
}
rc.databases = databases
}
var ddlJobs []*model.Job
err = json.Unmarshal(backupMeta.GetDdls(), &ddlJobs)
err := json.Unmarshal(backupMeta.GetDdls(), &ddlJobs)
3pointer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
rc.databases = databases
rc.ddlJobs = ddlJobs
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
importClient := NewImportClient(metaClient, rc.tlsConf)
rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, rc.rateLimit)
rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, backupMeta.IsRawKv, rc.rateLimit)
return nil
}

// IsRawKvMode checks whether the backup data is in raw kv format, in which case transactional recover is forbidden.
func (rc *Client) IsRawKvMode() bool {
return rc.backupMeta.IsRawKv
}

// GetFilesInRawRange gets all files that are in the given range or intersects with the given range.
func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) ([]*backup.File, error) {
if !rc.IsRawKvMode() {
return nil, errors.New("the backup data is not in raw kv mode")
}

for _, rawRange := range rc.backupMeta.RawRanges {
// First check whether the given range is backup-ed. If not, we cannot perform the restore.
if rawRange.Cf != cf {
continue
}

if (len(rawRange.EndKey) > 0 && bytes.Compare(startKey, rawRange.EndKey) >= 0) ||
(len(endKey) > 0 && bytes.Compare(rawRange.StartKey, endKey) >= 0) {
// The restoring range is totally out of the current range. Skip it.
continue
}

if bytes.Compare(startKey, rawRange.StartKey) < 0 ||
utils.CompareEndKey(endKey, rawRange.EndKey) > 0 {
// Only partial of the restoring range is in the current backup-ed range. So the given range can't be fully
// restored.
return nil, errors.New("no backup data in the range")
3pointer marked this conversation as resolved.
Show resolved Hide resolved
}

// We have found the range that contains the given range. Find all necessary files.
files := make([]*backup.File, 0)

for _, file := range rc.backupMeta.Files {
if file.Cf != cf {
continue
}

if len(file.EndKey) > 0 && bytes.Compare(file.EndKey, startKey) < 0 {
// The file is before the range to be restored.
continue
}
if len(endKey) > 0 && bytes.Compare(endKey, file.StartKey) <= 0 {
// The file is after the range to be restored.
// The specified endKey is exclusive, so when it equals to a file's startKey, the file is still skipped.
continue
}

files = append(files, file)
}

// There should be at most one backed up range that covers the restoring range.
return files, nil
}

return nil, errors.New("no backup data in the range")
}

// SetConcurrency sets the concurrency of dbs tables files
func (rc *Client) SetConcurrency(c uint) {
rc.workerPool = utils.NewWorkerPool(c, "file")
Expand Down Expand Up @@ -323,6 +388,63 @@ func (rc *Client) RestoreFiles(
return nil
}

// RestoreRaw tries to restore raw keys in the specified range.
func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh chan<- struct{}) error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
log.Info("Restore Raw",
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)),
zap.Duration("take", elapsed))
}()
errCh := make(chan error, len(rc.databases))
3pointer marked this conversation as resolved.
Show resolved Hide resolved
wg := new(sync.WaitGroup)
defer close(errCh)

err := rc.fileImporter.SetRawRange(startKey, endKey)
if err != nil {

return errors.Trace(err)
}

emptyRules := &RewriteRules{}
for _, file := range files {
wg.Add(1)
fileReplica := file
rc.workerPool.Apply(
func() {
defer wg.Done()
select {
case <-rc.ctx.Done():
errCh <- nil
case errCh <- rc.fileImporter.Import(fileReplica, emptyRules):
updateCh <- struct{}{}
}
})
}
for range files {
err := <-errCh
if err != nil {
rc.cancel()
wg.Wait()
log.Error(
"restore raw range failed",
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)),
zap.Error(err),
)
return err
}
}
log.Info(
"finish to restore raw range",
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)),
)
return nil
}

//SwitchToImportMode switch tikv cluster to import mode
func (rc *Client) SwitchToImportMode(ctx context.Context) error {
return rc.switchTiKVMode(ctx, import_sstpb.SwitchMode_Import)
Expand Down
4 changes: 4 additions & 0 deletions pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
if err != nil {
return nil, errors.Trace(err)
}
// The session may be nil in raw kv mode
if se == nil {
return nil, nil
}
// Set SQL mode to None for avoiding SQL compatibility problem
err = se.Execute(context.Background(), "set @@sql_mode=''")
if err != nil {
Expand Down
Loading