Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tp: cleanup metrics and fix a bug that ignore schedule task mistakenly #5906

Merged
merged 3 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func (c *captureManager) HandleAliveCaptureUpdate(
for id, capture := range c.Captures {
c.changes.Init[id] = capture.Tables
}
log.Info("tpscheduler: all capture initialized",
zap.Int("captureCount", len(c.Captures)))
c.initialized = true
}

Expand All @@ -253,3 +255,10 @@ func (c *captureManager) CollectMetrics() {
Set(float64(len(capture.Tables)))
}
}

func (c *captureManager) CleanMetrics() {
cf := c.changefeedID
for _, capture := range c.Captures {
captureTableGauge.DeleteLabelValues(cf.Namespace, cf.ID, capture.Addr)
}
}
8 changes: 8 additions & 0 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func newCoordinator(
schedulers: schedulers,
replicationM: newReplicationManager(cfg.MaxTaskConcurrency, changefeedID),
captureM: newCaptureManager(changefeedID, revision, cfg.HeartbeatTick),
changefeedID: changefeedID,
tasksCounter: make(map[struct {
scheduler string
task string
Expand Down Expand Up @@ -169,6 +170,13 @@ func (c *coordinator) Close(ctx context.Context) {
defer c.mu.Unlock()

_ = c.trans.Close()
c.captureM.CleanMetrics()
c.replicationM.CleanMetrics()

log.Info("tpscheduler: coordinator closed",
zap.Any("ownerRev", c.captureM.OwnerRev),
zap.String("namespace", c.changefeedID.Namespace),
zap.String("name", c.changefeedID.ID))
}

// ===========
Expand Down
54 changes: 51 additions & 3 deletions cdc/scheduler/internal/tp/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,15 @@ func (r *replicationManager) HandleCaptureChanges(
if changes.Removed != nil {
for _, table := range r.tables {
for captureID := range changes.Removed {
msgs, err := table.handleCaptureShutdown(captureID)
msgs, affected, err := table.handleCaptureShutdown(captureID)
if err != nil {
return nil, errors.Trace(err)
}
sentMsgs = append(sentMsgs, msgs...)
if affected {
// Cleanup its running task.
delete(r.runningTasks, table.TableID)
}
}
}
}
Expand Down Expand Up @@ -354,11 +358,13 @@ func (r *replicationManager) handleBurstBalanceTasks(
for _, task := range task.RemoveTables {
perCapture[task.CaptureID]++
}
fields := make([]zap.Field, 0, len(perCapture))
fields := make([]zap.Field, 0, len(perCapture)+3)
for captureID, count := range perCapture {
fields = append(fields, zap.Int(captureID, count))
}
fields = append(fields, zap.Int("total", len(task.AddTables)+len(task.RemoveTables)))
fields = append(fields, zap.Int("addTable", len(task.AddTables)))
fields = append(fields, zap.Int("removeTable", len(task.RemoveTables)))
fields = append(fields, zap.Int("moveTable", len(task.MoveTables)))
log.Info("tpscheduler: handle burst balance task", fields...)

sentMsgs := make([]*schedulepb.Message, 0, len(task.AddTables))
Expand Down Expand Up @@ -491,3 +497,45 @@ func (r *replicationManager) CollectMetrics() {
Set(float64(counter))
}
}

func (r *replicationManager) CleanMetrics() {
cf := r.changefeedID
tableGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
slowestTableIDGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
slowestTableStateGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
slowestTableCheckpointTsGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
slowestTableResolvedTsGauge.
DeleteLabelValues(cf.Namespace, cf.ID)
metricAcceptScheduleTask := acceptScheduleTaskCounter.MustCurryWith(map[string]string{
"namespace": cf.Namespace, "changefeed": cf.ID,
})
metricAcceptScheduleTask.DeleteLabelValues("addTable")
metricAcceptScheduleTask.DeleteLabelValues("removeTable")
metricAcceptScheduleTask.DeleteLabelValues("moveTable")
metricAcceptScheduleTask.DeleteLabelValues("burstBalance")
var stateCounters [6]int
for _, table := range r.tables {
switch table.State {
case ReplicationSetStateUnknown:
stateCounters[ReplicationSetStateUnknown]++
case ReplicationSetStateAbsent:
stateCounters[ReplicationSetStateAbsent]++
case ReplicationSetStatePrepare:
stateCounters[ReplicationSetStatePrepare]++
case ReplicationSetStateCommit:
stateCounters[ReplicationSetStateCommit]++
case ReplicationSetStateReplicating:
stateCounters[ReplicationSetStateReplicating]++
case ReplicationSetStateRemoving:
stateCounters[ReplicationSetStateRemoving]++
}
}
for s := range stateCounters {
tableStateGauge.
DeleteLabelValues(cf.Namespace, cf.ID, ReplicationSetState(s).String())
}
}
40 changes: 40 additions & 0 deletions cdc/scheduler/internal/tp/replication_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,43 @@ func TestReplicationManagerHandleCaptureChanges(t *testing.T) {
require.Equal(t, ReplicationSetStateReplicating, r.tables[3].State)
require.Equal(t, ReplicationSetStateAbsent, r.tables[4].State)
}

func TestReplicationManagerHandleCaptureChangesDuringAddTable(t *testing.T) {
t.Parallel()

r := newReplicationManager(1, model.ChangeFeedID{})
addTableCh := make(chan int, 1)

msgs, err := r.HandleTasks([]*scheduleTask{{
addTable: &addTable{TableID: 1, CaptureID: "1"},
accept: func() {
addTableCh <- 1
},
}})
require.Nil(t, err)
require.Len(t, msgs, 1)
require.NotNil(t, r.runningTasks[1])
require.Equal(t, 1, <-addTableCh)

changes := captureChanges{Removed: map[string][]schedulepb.TableStatus{
"1": {{TableID: 1, State: schedulepb.TableStatePreparing}},
}}
msgs, err = r.HandleCaptureChanges(&changes, 0)
require.Nil(t, err)
require.Len(t, msgs, 0)
require.Len(t, r.tables, 1)
require.Equal(t, ReplicationSetStateAbsent, r.tables[1].State)
require.Nil(t, r.runningTasks[1])

// New task must be accepted.
msgs, err = r.HandleTasks([]*scheduleTask{{
addTable: &addTable{TableID: 1, CaptureID: "1"},
accept: func() {
addTableCh <- 1
},
}})
require.Nil(t, err)
require.Len(t, msgs, 1)
require.NotNil(t, r.runningTasks[1])
require.Equal(t, 1, <-addTableCh)
}
11 changes: 8 additions & 3 deletions cdc/scheduler/internal/tp/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,19 +675,24 @@ func (r *ReplicationSet) hasRemoved() bool {
return r.State == ReplicationSetStateRemoving && len(r.Captures) == 0
}

// handleCaptureShutdown handle capture shutdown event.
// Besides returning messages and errors, it also returns a bool to indicate
// whether r is affected by the capture shutdown.
func (r *ReplicationSet) handleCaptureShutdown(
captureID model.CaptureID,
) ([]*schedulepb.Message, error) {
) ([]*schedulepb.Message, bool, error) {
_, ok := r.Captures[captureID]
if !ok {
return nil, nil
// r is not affected by the capture shutdown.
return nil, false, nil
}
// The capture has shutdown, the table has stopped.
status := schedulepb.TableStatus{
TableID: r.TableID,
State: schedulepb.TableStateStopped,
}
return r.poll(&status, captureID)
msgs, err := r.poll(&status, captureID)
return msgs, true, errors.Trace(err)
}

func (r *ReplicationSet) updateCheckpoint(checkpoint schedulepb.Checkpoint) {
Expand Down
41 changes: 31 additions & 10 deletions cdc/scheduler/internal/tp/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,11 +795,13 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
require.Equal(t, ReplicationSetStatePrepare, r.State)
require.Equal(t, from, r.Secondary)

affected := false
// Secondary shutdown during Prepare, Prepare -> Absent
t.Run("AddTableSecondaryShutdownDuringPrepare", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(from)
msgs, affected, err = rClone.handleCaptureShutdown(from)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -821,8 +823,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Secondary shutdown during Commit, Commit -> Absent
t.Run("AddTableSecondaryShutdownDuringCommit", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(from)
msgs, affected, err = rClone.handleCaptureShutdown(from)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -844,8 +847,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Primary shutdown during Replicating, Replicating -> Absent
t.Run("AddTablePrimaryShutdownDuringReplicating", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(from)
msgs, affected, err = rClone.handleCaptureShutdown(from)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -864,16 +868,18 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Primary shutdown during Prepare, Prepare -> Prepare
t.Run("MoveTablePrimaryShutdownDuringPrepare", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Primary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Primary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.EqualValues(t, map[string]struct{}{dest: {}}, rClone.Captures)
require.Equal(t, "", rClone.Primary)
require.Equal(t, dest, rClone.Secondary)
require.Equal(t, ReplicationSetStatePrepare, rClone.State)
// Secondary shutdown after primary shutdown, Prepare -> Absent
msgs, err = rClone.handleCaptureShutdown(rClone.Secondary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Secondary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -883,8 +889,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Primary shutdown during Prepare, Prepare -> Prepare
t.Run("MoveTableSecondaryShutdownDuringPrepare", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Secondary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Secondary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.EqualValues(t, map[string]struct{}{from: {}}, rClone.Captures)
require.Equal(t, from, rClone.Primary)
Expand All @@ -906,8 +913,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Original primary shutdown during Commit, Commit -> Commit
t.Run("MoveTableOriginalPrimaryShutdownDuringCommit", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Primary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Primary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 1)
require.EqualValues(t, &schedulepb.Message{
To: dest,
Expand All @@ -927,8 +935,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
require.Equal(t, "", rClone.Secondary)
require.Equal(t, ReplicationSetStateCommit, rClone.State)
// New primary shutdown after original primary shutdown, Commit -> Absent
msgs, err = rClone.handleCaptureShutdown(dest)
msgs, affected, err = rClone.handleCaptureShutdown(dest)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -939,8 +948,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
// Secondary shutdown during Commit, Commit -> Commit
t.Run("MoveTableSecondaryShutdownDuringCommit", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Secondary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Secondary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.EqualValues(t, map[string]struct{}{from: {}}, rClone.Captures)
require.Equal(t, from, rClone.Primary)
Expand Down Expand Up @@ -1019,8 +1029,9 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
require.Equal(t, "", r.Secondary)
t.Run("MoveTableNewPrimaryShutdownDuringCommit", func(t *testing.T) {
rClone := clone(r)
msgs, err = rClone.handleCaptureShutdown(rClone.Primary)
msgs, affected, err = rClone.handleCaptureShutdown(rClone.Primary)
require.Nil(t, err)
require.True(t, affected)
require.Len(t, msgs, 0)
require.Empty(t, rClone.Captures)
require.Equal(t, "", rClone.Primary)
Expand All @@ -1038,6 +1049,16 @@ func TestReplicationSetCaptureShutdown(t *testing.T) {
require.Equal(t, ReplicationSetStateReplicating, r.State)
require.Equal(t, dest, r.Primary)
require.Equal(t, "", r.Secondary)

// Unknown capture shutdown has no effect.
t.Run("UnknownCaptureShutdown", func(t *testing.T) {
rClone := clone(r)
msgs, affected, err = rClone.handleCaptureShutdown("unknown")
require.Nil(t, err)
require.False(t, affected)
require.Len(t, msgs, 0)
require.EqualValues(t, r, rClone)
})
}

func TestReplicationSetMoveTableWithHeartbeatResponse(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions cdc/scheduler/internal/tp/scheduler_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ func (b *basicScheduler) Schedule(
for captureID := range captures {
captureIDs = append(captureIDs, captureID)
}
const logTableIDThreshold = 50
tableField := zap.Skip()
if len(newTables) < logTableIDThreshold {
tableField = zap.Int64s("tableIDs", newTables)
}
log.Info("tpscheduler: burst add table",
tableField, zap.Strings("captureIDs", captureIDs))
tasks = append(
tasks, newBurstBalanceAddTables(checkpointTs, newTables, captureIDs))
if len(newTables) == len(currentTables) {
Expand Down
27 changes: 0 additions & 27 deletions tests/integration_tests/move_table/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,33 +177,6 @@ func (c *cluster) moveAllTables(ctx context.Context, sourceCapture, targetCaptur
log.Info("moved table successful", zap.Int64("tableID", table.ID))
}

for counter := 0; counter < maxCheckSourceEmptyRetries; counter++ {
err := retry.Do(ctx, func() error {
return c.refreshInfo(ctx)
}, retry.WithBackoffBaseDelay(100), retry.WithMaxTries(5+1), retry.WithIsRetryableErr(cerrors.IsRetryableError))
if err != nil {
log.Warn("error refreshing cluster info", zap.Error(err))
}

tables, ok := c.captures[sourceCapture]
if !ok {
log.Warn("source capture is gone", zap.String("sourceCapture", sourceCapture))
return errors.New("source capture is gone")
}

if len(tables) == 0 {
log.Info("source capture is now empty", zap.String("sourceCapture", sourceCapture))
break
}

if counter != maxCheckSourceEmptyRetries {
log.Debug("source capture is not empty, will try again", zap.String("sourceCapture", sourceCapture))
time.Sleep(time.Second * 10)
} else {
return errors.New("source capture is not empty after retries")
}
}

return nil
}

Expand Down