Skip to content

Commit

Permalink
ddl: add limiter when setting TiFlash replicas for all tables in a da…
Browse files Browse the repository at this point in the history
…tabase (#32526)

ref #32254
  • Loading branch information
CalvinNeo committed Mar 11, 2022
1 parent 9a4ca3c commit 7fcdab6
Show file tree
Hide file tree
Showing 14 changed files with 365 additions and 35 deletions.
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ var (
// DDL is responsible for updating schema in data store and maintaining in-memory InfoSchema cache.
type DDL interface {
CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt, placementPolicyRef *model.PolicyRefInfo) error
AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error
AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
Expand Down
146 changes: 134 additions & 12 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ const (
longBlobMaxLength = 4294967295
// When setting the placement policy with "PLACEMENT POLICY `default`",
// it means to remove placement policy from the specified object.
defaultPlacementPolicyName = "default"
defaultPlacementPolicyName = "default"
tiflashCheckPendingTablesWaitTime = 3000 * time.Millisecond
// Once tiflashCheckPendingTablesLimit is reached, we trigger a limiter detection.
tiflashCheckPendingTablesLimit = 100
tiflashCheckPendingTablesRetry = 7
)

func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt, placementPolicyRef *model.PolicyRefInfo) (err error) {
Expand Down Expand Up @@ -206,13 +210,82 @@ func (d *ddl) ModifySchemaDefaultPlacement(ctx sessionctx.Context, stmt *ast.Alt
return errors.Trace(err)
}

func (d *ddl) ModifySchemaSetTiFlashReplica(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt, tiflashReplica *ast.TiFlashReplicaSpec) error {
// getPendingTiFlashTableCount counts unavailable TiFlash replica by iterating all tables in infoCache.
func (d *ddl) getPendingTiFlashTableCount(sctx sessionctx.Context, originVersion int64, pendingCount uint32) (int64, uint32) {
is := d.GetInfoSchemaWithInterceptor(sctx)
dbInfos := is.AllSchemas()
// If there are no schema change since last time(can be weird).
if is.SchemaMetaVersion() == originVersion {
return originVersion, pendingCount
}
cnt := uint32(0)
for _, dbInfo := range dbInfos {
if util.IsMemOrSysDB(dbInfo.Name.L) {
continue
}
for _, tbl := range dbInfo.Tables {
if tbl.TiFlashReplica != nil && !tbl.TiFlashReplica.Available {
cnt += 1
}
}
}
return is.SchemaMetaVersion(), cnt
}

func isSessionDone(sctx sessionctx.Context) (bool, uint32) {
done := false
killed := atomic.LoadUint32(&sctx.GetSessionVars().Killed)
if killed == 1 {
done = true
}
failpoint.Inject("BatchAddTiFlashSendDone", func(val failpoint.Value) {
done = val.(bool)
})
return done, killed
}

func (d *ddl) waitPendingTableThreshold(sctx sessionctx.Context, schemaID int64, tableID int64, originVersion int64, pendingCount uint32, threshold uint32) (bool, int64, uint32, bool) {
configRetry := tiflashCheckPendingTablesRetry
configWaitTime := tiflashCheckPendingTablesWaitTime
failpoint.Inject("FastFailCheckTiFlashPendingTables", func(value failpoint.Value) {
configRetry = value.(int)
configWaitTime = time.Millisecond * 200
})

for retry := 0; retry < configRetry; retry += 1 {
done, killed := isSessionDone(sctx)
if done {
logutil.BgLogger().Info("abort batch add TiFlash replica", zap.Int64("schemaID", schemaID), zap.Uint32("isKilled", killed))
return true, originVersion, pendingCount, false
}
originVersion, pendingCount = d.getPendingTiFlashTableCount(sctx, originVersion, pendingCount)
delay := time.Duration(0)
if pendingCount >= threshold {
logutil.BgLogger().Info("too many unavailable tables, wait", zap.Uint32("threshold", threshold), zap.Uint32("currentPendingCount", pendingCount), zap.Int64("schemaID", schemaID), zap.Int64("tableID", tableID), zap.Duration("time", configWaitTime))
delay = configWaitTime
} else {
// If there are not many unavailable tables, we don't need a force check.
return false, originVersion, pendingCount, false
}
time.Sleep(delay)
}
logutil.BgLogger().Info("too many unavailable tables, timeout", zap.Int64("schemaID", schemaID), zap.Int64("tableID", tableID))
// If timeout here, we will trigger a ddl job, to force sync schema. However, it doesn't mean we remove limiter,
// so there is a force check immediately after that.
return false, originVersion, pendingCount, true
}

func (d *ddl) ModifySchemaSetTiFlashReplica(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt, tiflashReplica *ast.TiFlashReplicaSpec) error {
dbName := model.NewCIStr(stmt.Name)
is := d.GetInfoSchemaWithInterceptor(ctx)
is := d.GetInfoSchemaWithInterceptor(sctx)
dbInfo, ok := is.SchemaByName(dbName)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(dbName.O)
}
if util.IsMemOrSysDB(dbInfo.Name.L) {
return errors.Trace(dbterror.ErrUnsupportedAlterReplicaForSysTable)
}

total := len(dbInfo.Tables)
succ := 0
skip := 0
Expand All @@ -222,18 +295,66 @@ func (d *ddl) ModifySchemaSetTiFlashReplica(ctx sessionctx.Context, stmt *ast.Al
if total == 0 {
return infoschema.ErrEmptyDatabase.GenWithStack("Empty database '%v'", dbName.O)
}
err := checkTiFlashReplicaCount(ctx, tiflashReplica.Count)
err := checkTiFlashReplicaCount(sctx, tiflashReplica.Count)
if err != nil {
return errors.Trace(err)
}

var originVersion int64 = 0
var pendingCount uint32 = 0
forceCheck := false

logutil.BgLogger().Info("start batch add TiFlash replicas", zap.Int("total", total), zap.Int64("schemaID", dbInfo.ID))
threshold := uint32(sctx.GetSessionVars().BatchPendingTiFlashCount)

for _, tbl := range dbInfo.Tables {
done, killed := isSessionDone(sctx)
if done {
logutil.BgLogger().Info("abort batch add TiFlash replica", zap.Int64("schemaID", dbInfo.ID), zap.Uint32("isKilled", killed))
return nil
}

tbReplicaInfo := tbl.TiFlashReplica
if !shouldModifyTiFlashReplica(tbReplicaInfo, tiflashReplica) {
logutil.BgLogger().Info("skip processing schema table", zap.Int64("tableID", tbl.ID), zap.Int64("schemaID", dbInfo.ID), zap.String("tableName", tbl.Name.String()), zap.String("schemaName", dbInfo.Name.String()))
logutil.BgLogger().Info("skip repeated processing table", zap.Int64("tableID", tbl.ID), zap.Int64("schemaID", dbInfo.ID), zap.String("tableName", tbl.Name.String()), zap.String("schemaName", dbInfo.Name.String()))
skip += 1
continue
}

// Ban setting replica count for tables in system database.
if tbl.TempTableType != model.TempTableNone {
logutil.BgLogger().Info("skip processing temporary table", zap.Int64("tableID", tbl.ID), zap.Int64("schemaID", dbInfo.ID), zap.String("tableName", tbl.Name.String()), zap.String("schemaName", dbInfo.Name.String()))
skip += 1
continue
}

charsetOk := true
// Ban setting replica count for tables which has charset not supported by TiFlash
for _, col := range tbl.Cols() {
_, ok := charset.TiFlashSupportedCharsets[col.Charset]
if !ok {
charsetOk = false
break
}
}
if !charsetOk {
logutil.BgLogger().Info("skip processing schema table, unsupported charset", zap.Int64("tableID", tbl.ID), zap.Int64("schemaID", dbInfo.ID), zap.String("tableName", tbl.Name.String()), zap.String("schemaName", dbInfo.Name.String()))
skip += 1
continue
}

// Alter `tiflashCheckPendingTablesLimit` tables are handled, we need to check if we have reached threshold.
if (succ+fail)%tiflashCheckPendingTablesLimit == 0 || forceCheck {
// We can execute one probing ddl to the latest schema, if we timeout in `pendingFunc`.
// However, we shall mark `forceCheck` to true, because we may still reach `threshold`.
finished := false
finished, originVersion, pendingCount, forceCheck = d.waitPendingTableThreshold(sctx, dbInfo.ID, tbl.ID, originVersion, pendingCount, threshold)
if finished {
logutil.BgLogger().Info("abort batch add TiFlash replica", zap.Int64("schemaID", dbInfo.ID))
return nil
}
}

job := &model.Job{
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
Expand All @@ -242,7 +363,7 @@ func (d *ddl) ModifySchemaSetTiFlashReplica(ctx sessionctx.Context, stmt *ast.Al
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{*tiflashReplica},
}
err := d.doDDLJob(ctx, job)
err := d.doDDLJob(sctx, job)
err = d.callHookOnChanged(err)
if err != nil {
oneFail = tbl.ID
Expand All @@ -257,7 +378,8 @@ func (d *ddl) ModifySchemaSetTiFlashReplica(ctx sessionctx.Context, stmt *ast.Al
failStmt = fmt.Sprintf("(including table %v)", oneFail)
}
msg := fmt.Sprintf("In total %v tables: %v succeed, %v failed%v, %v skipped", total, succ, fail, failStmt, skip)
ctx.GetSessionVars().StmtCtx.SetMessage(msg)
sctx.GetSessionVars().StmtCtx.SetMessage(msg)
logutil.BgLogger().Info("finish batch add TiFlash replica", zap.Int64("schemaID", dbInfo.ID))
return nil
}

Expand Down Expand Up @@ -335,7 +457,7 @@ func checkMultiSchemaSpecs(_sctx sessionctx.Context, specs []*ast.DatabaseOption
return nil
}

func (d *ddl) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (err error) {
func (d *ddl) AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (err error) {
// Resolve target charset and collation from options.
var (
toCharset, toCollate string
Expand All @@ -344,7 +466,7 @@ func (d *ddl) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (
tiflashReplica *ast.TiFlashReplicaSpec
)

err = checkMultiSchemaSpecs(ctx, stmt.Options)
err = checkMultiSchemaSpecs(sctx, stmt.Options)
if err != nil {
return err
}
Expand Down Expand Up @@ -380,17 +502,17 @@ func (d *ddl) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (
}

if isAlterCharsetAndCollate {
if err = d.ModifySchemaCharsetAndCollate(ctx, stmt, toCharset, toCollate); err != nil {
if err = d.ModifySchemaCharsetAndCollate(sctx, stmt, toCharset, toCollate); err != nil {
return err
}
}
if isAlterPlacement {
if err = d.ModifySchemaDefaultPlacement(ctx, stmt, placementPolicyRef); err != nil {
if err = d.ModifySchemaDefaultPlacement(sctx, stmt, placementPolicyRef); err != nil {
return err
}
}
if isTiFlashReplica {
if err = d.ModifySchemaSetTiFlashReplica(ctx, stmt, tiflashReplica); err != nil {
if err = d.ModifySchemaSetTiFlashReplica(sctx, stmt, tiflashReplica); err != nil {
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
for _, store := range pollTiFlashContext.TiFlashStores {
s := store
if err := d.UpdateTiFlashHTTPAddress(&s); err != nil {
logutil.BgLogger().Error("Update TiFlash status address failed", zap.Error(err))
}
}

Expand Down Expand Up @@ -447,6 +446,9 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
return false, err
}
}
failpoint.Inject("skipUpdateTableReplicaInfoInLoop", func() {
failpoint.Continue()
})
// Will call `onUpdateFlashReplicaStatus` to update `TiFlashReplica`.
if err := d.UpdateTableReplicaInfo(ctx, tb.ID, avail); err != nil {
if infoschema.ErrTableNotExists.Equal(err) && tb.IsPartition {
Expand Down
Loading

0 comments on commit 7fcdab6

Please sign in to comment.