Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pitr: prevent from restore point to cluster running log backup #40871

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d56f62c
prevent from restore point to cluster running log backup
Leavrth Jan 30, 2023
2035e4b
close the etcd client in time
Leavrth Jan 30, 2023
c6402dc
resolve call cycle
Leavrth Feb 3, 2023
287e06c
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
Leavrth Feb 3, 2023
4392e9a
Update br/pkg/task/stream.go
Leavrth Feb 3, 2023
12a5731
add more info in error
Leavrth Feb 7, 2023
0b3c1cb
add integration test
Leavrth Feb 7, 2023
a41629b
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
joccau Feb 8, 2023
18316df
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 8, 2023
c7a8f3c
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
joccau Feb 8, 2023
ab99a2a
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 8, 2023
e72baff
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 8, 2023
6367ce1
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 8, 2023
05a5bae
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 8, 2023
63f2f83
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 8, 2023
55ba745
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 8, 2023
dd7277c
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 8, 2023
d9b6fb3
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 8, 2023
98fc96c
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 9, 2023
ff86497
Merge branch 'master' into prevent_from_restore_point_to_cluster_runn…
ti-chi-bot Feb 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,9 @@ func (push *pushDown) pushBackup(
if len(errMsg) <= 0 {
errMsg = errPb.Msg
}
return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s %s",
return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s",
store.GetId(),
redact.String(store.GetAddress()),
req.StorageBackend.String(),
errMsg,
)
}
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2101,15 +2101,16 @@ func (rc *Client) RestoreKVFiles(
return errors.Trace(err)
})

if err = eg.Wait(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log info below need skipFile, which is updated by these worker functions. So we need to wait these functions to finish at first, and then print the log Info.

summary.CollectFailureUnit("file", err)
log.Error("restore files failed", zap.Error(err))
}

log.Info("total skip files due to table id not matched", zap.Int("count", skipFile))
if skipFile > 0 {
log.Debug("table id in full backup storage", zap.Any("tables", rules))
}

if err = eg.Wait(); err != nil {
summary.CollectFailureUnit("file", err)
log.Error("restore files failed", zap.Error(err))
}
return errors.Trace(err)
}

Expand Down
7 changes: 7 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,13 +490,20 @@ func IsStreamRestore(cmdName string) bool {

// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
if err := checkTaskExists(c, cfg); err != nil {
return errors.Annotate(err, "failed to check task exits")
}

config.UpdateGlobal(func(conf *config.Config) {
conf.KeyspaceName = cfg.KeyspaceName
})
if IsStreamRestore(cmdName) {
return RunStreamRestore(c, g, cmdName, cfg)
}
return runRestore(c, g, cmdName, cfg)
}

func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
cfg.Adjust()
defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
Expand Down
34 changes: 30 additions & 4 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,8 +868,8 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
return nil
}

func checkConfigForStatus(cfg *StreamConfig) error {
if len(cfg.PD) == 0 {
func checkConfigForStatus(pd []string) error {
if len(pd) == 0 {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the command needs access to PD, please specify `-u` or `--pd`")
}
Expand Down Expand Up @@ -919,7 +919,7 @@ func RunStreamStatus(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

if err := checkConfigForStatus(cfg); err != nil {
if err := checkConfigForStatus(cfg.PD); err != nil {
return err
}
ctl, err := makeStatusController(ctx, cfg, g)
Expand Down Expand Up @@ -1034,6 +1034,32 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
return nil
}

// checkTaskExists checks whether there is a log backup task running.
// If so, return an error.
func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error {
if err := checkConfigForStatus(cfg.PD); err != nil {
return err
}
etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
if err != nil {
return err
}
cli := streamhelper.NewMetaDataClient(etcdCLI)
defer func() {
if err := cli.Close(); err != nil {
log.Error("failed to close the etcd client", zap.Error(err))
}
}()
tasks, err := cli.GetAllTasks(ctx)
if err != nil {
return err
}
if len(tasks) > 0 {
return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster.", tasks[0].Info.Name)
}
return nil
}

// RunStreamRestore restores stream log.
func RunStreamRestore(
c context.Context,
Expand Down Expand Up @@ -1095,7 +1121,7 @@ func RunStreamRestore(
logStorage := cfg.Config.Storage
cfg.Config.Storage = cfg.FullBackupStorage
// TiFlash replica is restored to down-stream on 'pitr' currently.
if err = RunRestore(ctx, g, FullRestoreCmd, cfg); err != nil {
if err = runRestore(ctx, g, FullRestoreCmd, cfg); err != nil {
return errors.Trace(err)
}
cfg.Config.Storage = logStorage
Expand Down