Skip to content

Commit

Permalink
*: add tiflash replica sync progress (#14713)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Feb 12, 2020
1 parent 5c4f457 commit 7cd8ba3
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 7 deletions.
63 changes: 63 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
ServerInformationPath = "/tidb/server/info"
// ServerMinStartTSPath store the server min start timestamp.
ServerMinStartTSPath = "/tidb/server/minstartts"
// TiFlashTableSyncProgressPath store the tiflash table replica sync progress.
TiFlashTableSyncProgressPath = "/tiflash/table/sync"
// keyOpDefaultRetryCnt is the default retry count for etcd store.
keyOpDefaultRetryCnt = 5
// keyOpDefaultTimeout is the default time out for etcd store.
Expand Down Expand Up @@ -170,6 +172,67 @@ func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
return is.getAllServerInfo(ctx)
}

// UpdateTiFlashTableSyncProgress is used to update the tiflash table replica sync progress.
func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progress float64) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
if is.etcdCli == nil {
return nil
}
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid)
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, key, strconv.FormatFloat(progress, 'f', 2, 64))
}

// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress.
func DeleteTiFlashTableSyncProgress(tid int64) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
if is.etcdCli == nil {
return nil
}
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid)
return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
}

// GetTiFlashTableSyncProgress uses to get all the tiflash table replica sync progress.
func GetTiFlashTableSyncProgress(ctx context.Context) (map[int64]float64, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
}
progressMap := make(map[int64]float64)
if is.etcdCli == nil {
return progressMap, nil
}
for i := 0; i < keyOpDefaultRetryCnt; i++ {
resp, err := is.etcdCli.Get(ctx, TiFlashTableSyncProgressPath+"/", clientv3.WithPrefix())
if err != nil {
logutil.BgLogger().Info("get tiflash table replica sync progress failed, continue checking.", zap.Error(err))
continue
}
for _, kv := range resp.Kvs {
tid, err := strconv.ParseInt(string(kv.Key[len(TiFlashTableSyncProgressPath)+1:]), 10, 64)
if err != nil {
logutil.BgLogger().Info("invalid tiflash table replica sync progress key.", zap.String("key", string(kv.Key)))
continue
}
progress, err := strconv.ParseFloat(string(kv.Value), 64)
if err != nil {
logutil.BgLogger().Info("invalid tiflash table replica sync progress value.",
zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
continue
}
progressMap[tid] = progress
}
break
}
return progressMap, nil
}

func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
allInfo := make(map[string]*ServerInfo)
if is.etcdCli == nil {
Expand Down
23 changes: 21 additions & 2 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,7 @@ var tableTableTiFlashReplicaCols = []columnInfo{
{"REPLICA_COUNT", mysql.TypeLonglong, 64, 0, nil, nil},
{"LOCATION_LABELS", mysql.TypeVarchar, 64, 0, nil, nil},
{"AVAILABLE", mysql.TypeTiny, 1, 0, nil, nil},
{"PROGRESS", mysql.TypeDouble, 22, 0, nil, nil},
}

var tableInspectionResultCols = []columnInfo{
Expand Down Expand Up @@ -2312,20 +2313,38 @@ func dataForTiDBClusterInfo(ctx sessionctx.Context) ([][]types.Datum, error) {
}

// dataForTableTiFlashReplica constructs data for table tiflash replica info.
func dataForTableTiFlashReplica(schemas []*model.DBInfo) [][]types.Datum {
func dataForTableTiFlashReplica(ctx sessionctx.Context, schemas []*model.DBInfo) [][]types.Datum {
var rows [][]types.Datum
progressMap, err := infosync.GetTiFlashTableSyncProgress(context.Background())
if err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
}
for _, schema := range schemas {
for _, tbl := range schema.Tables {
if tbl.TiFlashReplica == nil {
continue
}
progress := 1.0
if !tbl.TiFlashReplica.Available {
if pi := tbl.GetPartitionInfo(); pi != nil && len(pi.Definitions) > 0 {
progress = 0
for _, p := range pi.Definitions {
// TODO: need check partition replica available.
progress += progressMap[p.ID]
}
progress = progress / float64(len(pi.Definitions))
} else {
progress = progressMap[tbl.ID]
}
}
record := types.MakeDatums(
schema.Name.O, // TABLE_SCHEMA
tbl.Name.O, // TABLE_NAME
tbl.ID, // TABLE_ID
int64(tbl.TiFlashReplica.Count), // REPLICA_COUNT
strings.Join(tbl.TiFlashReplica.LocationLabels, ","), // LOCATION_LABELS
tbl.TiFlashReplica.Available, // AVAILABLE
progress, // PROGRESS
)
rows = append(rows, record)
}
Expand Down Expand Up @@ -2515,7 +2534,7 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column)
case TableClusterInfo:
fullRows, err = dataForTiDBClusterInfo(ctx)
case tableTiFlashReplica:
fullRows = dataForTableTiFlashReplica(dbs)
fullRows = dataForTableTiFlashReplica(ctx, dbs)
case TableMetricTables:
fullRows = dataForMetricTables(ctx)
// Data for cluster memory table.
Expand Down
4 changes: 2 additions & 2 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,11 +984,11 @@ func (s *testTableSuite) TestForTableTiFlashReplica(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, index idx(a))")
tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';")
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 0"))
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE, PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 0 0"))
tbl, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tbl.Meta().TiFlashReplica.Available = true
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 1"))
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE, PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 1 1"))
}

func (s *testClusterTableSuite) TestForMetricTables(c *C) {
Expand Down
18 changes: 15 additions & 3 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,10 @@ func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

type tableFlashReplicaStatus struct {
// Modifying the field name needs to negotiate with TiFlash colleague.
ID int64 `json:"id"`
RegionCount uint64 `json:"region_count"`
ID int64 `json:"id"`
// RegionCount is the number of regions that need sync.
RegionCount uint64 `json:"region_count"`
// FlashRegionCount is the number of regions that already sync completed.
FlashRegionCount uint64 `json:"flash_region_count"`
}

Expand All @@ -779,10 +781,20 @@ func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http
writeError(w, err)
return
}
err = do.DDL().UpdateTableReplicaInfo(s, status.ID, status.checkTableFlashReplicaAvailable())
available := status.checkTableFlashReplicaAvailable()
err = do.DDL().UpdateTableReplicaInfo(s, status.ID, available)
if err != nil {
writeError(w, err)
}
if available {
err = infosync.DeleteTiFlashTableSyncProgress(status.ID)
} else {
err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), status.ID, float64(status.FlashRegionCount)/float64(status.RegionCount))
}
if err != nil {
writeError(w, err)
}

logutil.BgLogger().Info("handle flash replica report", zap.Int64("table ID", status.ID), zap.Uint64("region count",
status.RegionCount),
zap.Uint64("flash region count", status.FlashRegionCount),
Expand Down

0 comments on commit 7cd8ba3

Please sign in to comment.