Skip to content

Commit

Permalink
tp: cleanup metrics and fix a bug that ignore schedule task mistakenly (
Browse files Browse the repository at this point in the history
pingcap#5906)

* tp: clean up metrics and add logs
* tp: cleanup running task when a table has shutdown
* tests: skip check move table results

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Jun 24, 2022
1 parent 3ff716d commit 4bca8cd
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 43 deletions.
9 changes: 9 additions & 0 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func (c *captureManager) HandleAliveCaptureUpdate(
for id, capture := range c.Captures {
c.changes.Init[id] = capture.Tables
}
log.Info("tpscheduler: all capture initialized",
zap.Int("captureCount", len(c.Captures)))
c.initialized = true
}

Expand All @@ -253,3 +255,10 @@ func (c *captureManager) CollectMetrics() {
Set(float64(len(capture.Tables)))
}
}

func (c *captureManager) CleanMetrics() {
cf := c.changefeedID
for _, capture := range c.Captures {
captureTableGauge.DeleteLabelValues(cf.Namespace, cf.ID, capture.Addr)
}
}
8 changes: 8 additions & 0 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func newCoordinator(
schedulers: schedulers,
replicationM: newReplicationManager(cfg.MaxTaskConcurrency, changefeedID),
captureM: newCaptureManager(changefeedID, revision, cfg.HeartbeatTick),
changefeedID: changefeedID,
tasksCounter: make(map[struct {
scheduler string
task string
Expand Down Expand Up @@ -169,6 +170,13 @@ func (c *coordinator) Close(ctx context.Context) {
defer c.mu.Unlock()

_ = c.trans.Close()
c.captureM.CleanMetrics()
c.replicationM.CleanMetrics()

log.Info("tpscheduler: coordinator closed",
zap.Any("ownerRev", c.captureM.OwnerRev),
zap.String("namespace", c.changefeedID.Namespace),
zap.String("name", c.changefeedID.ID))
}

// ===========
Expand Down
54 changes: 51 additions & 3 deletions cdc/scheduler/internal/tp/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,15 @@ func (r *replicationManager) HandleCaptureChanges(
if changes.Removed != nil {
for _, table := range r.tables {
for captureID := range changes.Removed {
msgs, err := table.handleCaptureShutdown(captureID)
msgs, affected, err := table.handleCaptureShutdown(captureID)
if err != nil {
return nil, errors.Trace(err)
}
sentMsgs = append(sentMsgs, msgs...)
if affected {
// Cleanup its running task.
delete(r.runningTasks, table.TableID)
}
}
}
}
Expand Down Expand Up @@ -354,11 +358,13 @@ func (r *replicationManager) handleBurstBalanceTasks(
for _, task := range task.RemoveTables {
perCapture[task.CaptureID]++
}
fields := make([]zap.Field, 0, len(perCapture))
fields := make([]zap.Field, 0, len(perCapture)+3)
for captureID, count := range perCapture {
fields = append(fields, zap.Int(captureID, count))
}
fields = append(fields, zap.Int("total", len(task.AddTables)+len(task.RemoveTables)))
fields = append(fields, zap.Int("addTable", len(task.AddTables)))
fields = append(fields, zap.Int("removeTable", len(task.RemoveTables)))
fields = append(fields, zap.Int("moveTable", len(task.MoveTables)))
log.Info("tpscheduler: handle burst balance task", fields...)

sentMsgs := make([]*schedulepb.Message, 0, len(task.AddTables))
Expand Down Expand Up @@ -491,3 +497,45 @@ func (r *replicationManager) CollectMetrics() {
Set(float64(counter))
}
}

func (r *replicationManager) CleanMetrics() {
cf := r.changefeedID
tableGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
slowestTableIDGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
slowestTableStateGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
slowestTableCheckpointTsGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
slowestTableResolvedTsGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
metricAcceptScheduleTask := acceptScheduleTaskCounter.MustCurryWith(map[string]string{
"namespace": cf.Namespace, "changefeed": cf.ID,
})
metricAcceptScheduleTask.DeleteLabelValues("addTable")
metricAcceptScheduleTask.DeleteLabelValues("removeTable")
metricAcceptScheduleTask.DeleteLabelValues("moveTable")
metricAcceptScheduleTask.DeleteLabelValues("burstBalance")
var stateCounters [6]int
for _, table := range r.tables {
switch table.State {
case ReplicationSetStateUnknown:
stateCounters[ReplicationSetStateUnknown]++
case ReplicationSetStateAbsent:
stateCounters[ReplicationSetStateAbsent]++
case ReplicationSetStatePrepare:
stateCounters[ReplicationSetStatePrepare]++
case ReplicationSetStateCommit:
stateCounters[ReplicationSetStateCommit]++
case ReplicationSetStateReplicating:
stateCounters[ReplicationSetStateReplicating]++
case ReplicationSetStateRemoving:
stateCounters[ReplicationSetStateRemoving]++
}
}
for s := range stateCounters {
tableStateGauge.
DeleteLabelValues(cf.Namespace, cf.ID, ReplicationSetState(s).String())
}
}
40 changes: 40 additions & 0 deletions cdc/scheduler/internal/tp/replication_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,43 @@ func TestReplicationManagerHandleCaptureChanges(t *testing.T) {
require.Equal(t, ReplicationSetStateReplicating, r.tables[3].State)
require.Equal(t, ReplicationSetStateAbsent, r.tables[4].State)
}

func TestReplicationManagerHandleCaptureChangesDuringAddTable(t *testing.T) {
t.Parallel()

r := newReplicationManager(1, model.ChangeFeedID{})
addTableCh := make(chan int, 1)

msgs, err := r.HandleTasks([]*scheduleTask{{
addTable: &addTable{TableID: 1, CaptureID: "1"},
accept: func() {
addTableCh <- 1
},
}})
require.Nil(t, err)
require.Len(t, msgs, 1)
require.NotNil(t, r.runningTasks[1])
require.Equal(t, 1, <-addTableCh)

changes := captureChanges{Removed: map[string][]schedulepb.TableStatus{
"1": {{TableID: 1, State: schedulepb.TableStatePreparing}},
}}
msgs, err = r.HandleCaptureChanges(&changes, 0)
require.Nil(t, err)
require.Len(t, msgs, 0)
require.Len(t, r.tables, 1)
require.Equal(t, ReplicationSetStateAbsent, r.tables[1].State)
require.Nil(t, r.runningTasks[1])

// New task must be accepted.
msgs, err = r.HandleTasks([]*scheduleTask{{
addTable: &addTable{TableID: 1, CaptureID: "1"},
accept: func() {
addTableCh <- 1
},
}})
require.Nil(t, err)
require.Len(t, msgs, 1)
require.NotNil(t, r.runningTasks[1])
require.Equal(t, 1, <-addTableCh)
}
11 changes: 8 additions & 3 deletions cdc/scheduler/internal/tp/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,19 +675,24 @@ func (r *ReplicationSet) hasRemoved() bool {
return r.State == ReplicationSetStateRemoving && len(r.Captures) == 0
}

// handleCaptureShutdown handle capture shutdown event.
// Besides returning messages and errors, it also returns a bool to indicate
// whether r is affected by the capture shutdown.
func (r *ReplicationSet) handleCaptureShutdown(
captureID model.CaptureID,
) ([]*schedulepb.Message, error) {
) ([]*schedulepb.Message, bool, error) {
_, ok := r.Captures[captureID]
if !ok {
return nil, nil
// r is not affected by the capture shutdown.
return nil, false, nil
}
// The capture has shutdown, the table has stopped.
status := schedulepb.TableStatus{
TableID: r.TableID,
State: schedulepb.TableStateStopped,
}
return r.poll(&status, captureID)
msgs, err := r.poll(&status, captureID)
return msgs, true, errors.Trace(err)
}

func (r *ReplicationSet) updateCheckpoint(checkpoint schedulepb.Checkpoint) {
Expand Down
41 changes: 31 additions & 10 deletions cdc/scheduler/internal/tp/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,11 +795,13 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
require.Equal(t, ReplicationSetStatePrepare, r.State)
require.Equal(t, from, r.Secondary)

affected := false
// Secondary shutdown during Prepare, Prepare -> Absent
t.Run("AddTableSecondaryShutdownDuringPrepare", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(from)
msgs, affected, err = rClone.handleCaptureShutdown(from)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -821,8 +823,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Secondary shutdown during Commit, Commit -> Absent
t.Run("AddTableSecondaryShutdownDuringCommit", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(from)
msgs, affected, err = rClone.handleCaptureShutdown(from)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -844,8 +847,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Primary shutdown during Replicating, Replicating -> Absent
t.Run("AddTablePrimaryShutdownDuringReplicating", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(from)
msgs, affected, err = rClone.handleCaptureShutdown(from)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -864,16 +868,18 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Primary shutdown during Prepare, Prepare -> Prepare
t.Run("MoveTablePrimaryShutdownDuringPrepare", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Primary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Primary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.EqualValues(t, map[string]struct{}{dest: {}}, rClone.Captures)
require.Equal(t, "", rClone.Primary)
require.Equal(t, dest, rClone.Secondary)
require.Equal(t, ReplicationSetStatePrepare, rClone.State)
// Secondary shutdown after primary shutdown, Prepare -> Absent
msgs, err = rClone.handleCaptureShutdown(rClone.Secondary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Secondary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -883,8 +889,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Primary shutdown during Prepare, Prepare -> Prepare
t.Run("MoveTableSecondaryShutdownDuringPrepare", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Secondary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Secondary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.EqualValues(t, map[string]struct{}{from: {}}, rClone.Captures)
require.Equal(t, from, rClone.Primary)
Expand All @@ -906,8 +913,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Original primary shutdown during Commit, Commit -> Commit
t.Run("MoveTableOriginalPrimaryShutdownDuringCommit", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Primary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Primary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 1)
require.EqualValues(t, &schedulepb.Message{
To: dest,
Expand All @@ -927,8 +935,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
require.Equal(t, "", rClone.Secondary)
require.Equal(t, ReplicationSetStateCommit, rClone.State)
// New primary shutdown after original primary shutdown, Commit -> Absent
msgs, err = rClone.handleCaptureShutdown(dest)
msgs, affected, err = rClone.handleCaptureShutdown(dest)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -939,8 +948,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Secondary shutdown during Commit, Commit -> Commit
t.Run("MoveTableSecondaryShutdownDuringCommit", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Secondary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Secondary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.EqualValues(t, map[string]struct{}{from: {}}, rClone.Captures)
require.Equal(t, from, rClone.Primary)
Expand Down Expand Up @@ -1019,8 +1029,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
require.Equal(t, "", r.Secondary)
t.Run("MoveTableNewPrimaryShutdownDuringCommit", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Primary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Primary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -1038,6 +1049,16 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
require.Equal(t, ReplicationSetStateReplicating, r.State)
require.Equal(t, dest, r.Primary)
require.Equal(t, "", r.Secondary)

// Unknown capture shutdown has no effect.
t.Run("UnknownCaptureShutdown", func(t *testing.T) {
rClone := clone(r)
msgs, affected, err = rClone.handleCaptureShutdown("unknown")
require.Nil(t, err)
require.False(t, affected)
require.Len(t, msgs, 0)
require.EqualValues(t, r, rClone)
})
}

func TestReplicationSetMoveTableWithHeartbeatResponse(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions cdc/scheduler/internal/tp/scheduler_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ func (b *basicScheduler) Schedule(
for captureID := range captures {
captureIDs = append(captureIDs, captureID)
}
const logTableIDThreshold = 50
tableField := zap.Skip()
if len(newTables) < logTableIDThreshold {
tableField = zap.Int64s("tableIDs", newTables)
}
log.Info("tpscheduler: burst add table",
tableField, zap.Strings("captureIDs", captureIDs))
tasks = append(
tasks, newBurstBalanceAddTables(checkpointTs, newTables, captureIDs))
if len(newTables) == len(currentTables) {
Expand Down
27 changes: 0 additions & 27 deletions tests/integration_tests/move_table/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,33 +177,6 @@ func (c *cluster) moveAllTables(ctx context.Context, sourceCapture, targetCaptur
log.Info("moved table successful", zap.Int64("tableID", table.ID))
}

for counter := 0; counter < maxCheckSourceEmptyRetries; counter++ {
err := retry.Do(ctx, func() error {
return c.refreshInfo(ctx)
}, retry.WithBackoffBaseDelay(100), retry.WithMaxTries(5+1), retry.WithIsRetryableErr(cerrors.IsRetryableError))
if err != nil {
log.Warn("error refreshing cluster info", zap.Error(err))
}

tables, ok := c.captures[sourceCapture]
if !ok {
log.Warn("source capture is gone", zap.String("sourceCapture", sourceCapture))
return errors.New("source capture is gone")
}

if len(tables) == 0 {
log.Info("source capture is now empty", zap.String("sourceCapture", sourceCapture))
break
}

if counter != maxCheckSourceEmptyRetries {
log.Debug("source capture is not empty, will try again", zap.String("sourceCapture", sourceCapture))
time.Sleep(time.Second * 10)
} else {
return errors.New("source capture is not empty after retries")
}
}

return nil
}

Expand Down

0 comments on commit 4bca8cd

Please sign in to comment.