Skip to content

Commit

Permalink
br: support reset_tiflash after ebs restoration (#40124) (#40447)
Browse files Browse the repository at this point in the history
close #40208
  • Loading branch information
fengou1 committed Jan 10, 2023
1 parent ee52655 commit ee5d6a1
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 2 deletions.
92 changes: 92 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,74 @@ func (rc *Client) SetWithSysTable(withSysTable bool) {
rc.withSysTable = withSysTable
}

func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
recorder := tiflashrec.New()

expectTiFlashStoreCount := uint64(0)
needTiFlash := false
for _, s := range allSchema {
for _, t := range s.Tables {
if t.TiFlashReplica != nil {
expectTiFlashStoreCount = mathutil.Max(expectTiFlashStoreCount, t.TiFlashReplica.Count)
recorder.AddTable(t.ID, *t.TiFlashReplica)
needTiFlash = true
}
}
}
if !needTiFlash {
log.Info("no need to set tiflash replica, since there is no tables enable tiflash replica")
return nil
}
// we wait for ten minutes to wait tiflash starts.
// since tiflash only starts when set unmark recovery mode finished.
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = utils.WithRetry(timeoutCtx, func() error {
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
log.Info("get tiflash store count for resetting TiFlash Replica",
zap.Uint64("count", tiFlashStoreCount))
if err != nil {
return errors.Trace(err)
}
if tiFlashStoreCount < expectTiFlashStoreCount {
log.Info("still waiting for enough tiflash store start",
zap.Uint64("expect", expectTiFlashStoreCount),
zap.Uint64("actual", tiFlashStoreCount),
)
return errors.New("tiflash store count is less than expected")
}
return nil
}, &waitTiFlashBackoffer{
Attempts: 30,
BaseBackoff: 4 * time.Second,
})
if err != nil {
return err
}

sqls := recorder.GenerateResetAlterTableDDLs(info)
log.Info("Generating SQLs for resetting tiflash replica",
zap.Strings("sqls", sqls))

return g.UseOneShotSession(storage, false, func(se glue.Session) error {
for _, sql := range sqls {
if errExec := se.ExecuteInternal(ctx, sql); errExec != nil {
logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.",
logutil.ShortError(errExec),
zap.String("sql", sql),
)
}
}
return nil
})
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down Expand Up @@ -2721,3 +2789,27 @@ func CheckNewCollationEnable(
log.Info("set new_collation_enabled", zap.Bool("new_collation_enabled", enabled))
return nil
}

type waitTiFlashBackoffer struct {
Attempts int
BaseBackoff time.Duration
}

// NextBackoff returns a duration to wait before retrying again
func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration {
bo := b.BaseBackoff
b.Attempts--
if b.Attempts == 0 {
return 0
}
b.BaseBackoff *= 2
if b.BaseBackoff > 32*time.Second {
b.BaseBackoff = 32 * time.Second
}
return bo
}

// Attempt returns the remain attempt times
func (b *waitTiFlashBackoffer) Attempt() int {
return b.Attempts
}
52 changes: 50 additions & 2 deletions br/pkg/restore/tiflashrec/tiflash_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,46 @@ func (r *TiFlashRecorder) Rewrite(oldID int64, newID int64) {
}
}

func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
table, ok := info.TableByID(id)
if !ok {
log.Warn("Table do not exist, skipping", zap.Int64("id", id))
return
}
schema, ok := info.SchemaByTable(table.Meta())
if !ok {
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
return
}
// Currently, we didn't backup tiflash cluster volume during volume snapshot backup,
// But the table has replica info after volume restoration.
// We should reset it to 0, then set it back. otherwise, it will return error when alter tiflash replica.
altTableSpec, err := alterTableSpecOf(replica, true)
if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
}
items = append(items, fmt.Sprintf(
"ALTER TABLE %s %s",
utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O),
altTableSpec),
)
altTableSpec, err = alterTableSpecOf(replica, false)
if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
}
items = append(items, fmt.Sprintf(
"ALTER TABLE %s %s",
utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O),
altTableSpec),
)
})
return items
}

func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
Expand All @@ -92,7 +132,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
return
}
altTableSpec, err := alterTableSpecOf(replica)
altTableSpec, err := alterTableSpecOf(replica, false)
if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
Expand All @@ -106,14 +146,22 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s
return items
}

func alterTableSpecOf(replica model.TiFlashReplicaInfo) (string, error) {
func alterTableSpecOf(replica model.TiFlashReplicaInfo, reset bool) (string, error) {
spec := &ast.AlterTableSpec{
Tp: ast.AlterTableSetTiFlashReplica,
TiFlashReplica: &ast.TiFlashReplicaSpec{
Count: replica.Count,
Labels: replica.LocationLabels,
},
}
if reset {
spec = &ast.AlterTableSpec{
Tp: ast.AlterTableSetTiFlashReplica,
TiFlashReplica: &ast.TiFlashReplicaSpec{
Count: 0,
},
}
}

buf := bytes.NewBuffer(make([]byte, 0, 32))
restoreCx := format.NewRestoreCtx(
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/restore/tiflashrec/tiflash_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,32 @@ func TestGenSql(t *testing.T) {
"ALTER TABLE `test`.`evils` SET TIFLASH REPLICA 1 LOCATION LABELS 'kIll''; OR DROP DATABASE test --', 'dEaTh with " + `\\"quoting\\"` + "'",
})
}

func TestGenResetSql(t *testing.T) {
tInfo := func(id int, name string) *model.TableInfo {
return &model.TableInfo{
ID: int64(id),
Name: model.NewCIStr(name),
}
}
fakeInfo := infoschema.MockInfoSchema([]*model.TableInfo{
tInfo(1, "fruits"),
tInfo(2, "whisper"),
})
rec := tiflashrec.New()
rec.AddTable(1, model.TiFlashReplicaInfo{
Count: 1,
})
rec.AddTable(2, model.TiFlashReplicaInfo{
Count: 2,
LocationLabels: []string{"climate"},
})

sqls := rec.GenerateResetAlterTableDDLs(fakeInfo)
require.ElementsMatch(t, sqls, []string{
"ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 0",
"ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 2 LOCATION LABELS 'climate'",
"ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 0",
"ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 1",
})
}
4 changes: 4 additions & 0 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
return errors.Trace(err)
}

// since we cannot reset tiflash automaticlly. so we should start it manually
if err = client.ResetTiFlashReplicas(ctx, g, mgr.GetStorage()); err != nil {
return errors.Trace(err)
}
progress.Close()
summary.CollectDuration("restore duration", time.Since(startAll))
summary.SetSuccessStatus(true)
Expand Down

0 comments on commit ee5d6a1

Please sign in to comment.