Skip to content

Commit

Permalink
tp: add metrics (pingcap#5823)
Browse files Browse the repository at this point in the history
* tp: fill resolved ts when add a new table

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: relax invarint check

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: add metrics

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: record tasks in burstBalance

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Jun 24, 2022
1 parent 8f6c468 commit eb3e7ca
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 49 deletions.
3 changes: 2 additions & 1 deletion cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor"
"github.com/pingcap/tiflow/cdc/puller"
redowriter "github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/scheduler"
sink "github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sorter"
Expand Down Expand Up @@ -54,14 +55,14 @@ func init() {
actor.InitMetrics(registry)
orchestrator.InitMetrics(registry)
p2p.InitMetrics(registry)
// Sorter metrics
sorter.InitMetrics(registry)
memory.InitMetrics(registry)
unified.InitMetrics(registry)
leveldb.InitMetrics(registry)
redowriter.InitMetrics(registry)
db.InitMetrics(registry)
kafka.InitMetrics(registry)
scheduler.InitMetrics(registry)
// TiKV client metrics, including metrics about resolved and region cache.
originalRegistry := prometheus.DefaultRegisterer
prometheus.DefaultRegisterer = registry
Expand Down
26 changes: 21 additions & 5 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ type CaptureStatus struct {
Epoch schedulepb.ProcessorEpoch
State CaptureState
Tables []schedulepb.TableStatus
Addr string
}

func newCaptureStatus(rev schedulepb.OwnerRevision) *CaptureStatus {
return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialized}
func newCaptureStatus(rev schedulepb.OwnerRevision, addr string) *CaptureStatus {
return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialized, Addr: addr}
}

func (c *CaptureStatus) handleHeartbeatResponse(
Expand Down Expand Up @@ -100,13 +101,19 @@ type captureManager struct {
// A logical clock counter, for heartbeat.
tickCounter int
heartbeatTick int

changefeedID model.ChangeFeedID
}

func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *captureManager {
func newCaptureManager(
changefeedID model.ChangeFeedID, rev schedulepb.OwnerRevision, heartbeatTick int,
) *captureManager {
return &captureManager{
OwnerRev: rev,
Captures: make(map[model.CaptureID]*CaptureStatus),
heartbeatTick: heartbeatTick,

changefeedID: changefeedID,
}
}

Expand Down Expand Up @@ -178,10 +185,10 @@ func (c *captureManager) HandleAliveCaptureUpdate(
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) []*schedulepb.Message {
msgs := make([]*schedulepb.Message, 0)
for id := range aliveCaptures {
for id, info := range aliveCaptures {
if _, ok := c.Captures[id]; !ok {
// A new capture.
c.Captures[id] = newCaptureStatus(c.OwnerRev)
c.Captures[id] = newCaptureStatus(c.OwnerRev, info.AdvertiseAddr)
log.Info("tpscheduler: find a new capture", zap.String("capture", id))
msgs = append(msgs, &schedulepb.Message{
To: id,
Expand Down Expand Up @@ -232,3 +239,12 @@ func (c *captureManager) TakeChanges() *captureChanges {
c.changes = nil
return changes
}

func (c *captureManager) CollectMetrics() {
cf := c.changefeedID
for _, capture := range c.Captures {
captureTableGauge.
WithLabelValues(cf.Namespace, cf.ID, capture.Addr).
Set(float64(len(capture.Tables)))
}
}
8 changes: 4 additions & 4 deletions cdc/scheduler/internal/tp/capture_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestCaptureStatusHandleHeartbeatResponse(t *testing.T) {

rev := schedulepb.OwnerRevision{Revision: 1}
epoch := schedulepb.ProcessorEpoch{Epoch: "test"}
c := newCaptureStatus(rev)
c := newCaptureStatus(rev, "")
require.Equal(t, CaptureStateUninitialized, c.State)

// Uninitialized -> Initialized
Expand All @@ -50,7 +50,7 @@ func TestCaptureManagerHandleAliveCaptureUpdate(t *testing.T) {
t.Parallel()

rev := schedulepb.OwnerRevision{}
cm := newCaptureManager(rev, 2)
cm := newCaptureManager(model.ChangeFeedID{}, rev, 2)
ms := map[model.CaptureID]*model.CaptureInfo{
"1": {}, "2": {}, "3": {},
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestCaptureManagerHandleMessages(t *testing.T) {
"1": {},
"2": {},
}
cm := newCaptureManager(rev, 2)
cm := newCaptureManager(model.ChangeFeedID{}, rev, 2)
require.False(t, cm.CheckAllCaptureInitialized())

// Initial handle alive captures.
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestCaptureManagerTick(t *testing.T) {
t.Parallel()

rev := schedulepb.OwnerRevision{}
cm := newCaptureManager(rev, 2)
cm := newCaptureManager(model.ChangeFeedID{}, rev, 2)

// No heartbeat if there is no capture.
msgs := cm.Tick(nil)
Expand Down
51 changes: 45 additions & 6 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tp
import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -28,7 +29,10 @@ import (
"go.uber.org/zap"
)

const checkpointCannotProceed = internal.CheckpointCannotProceed
const (
checkpointCannotProceed = internal.CheckpointCannotProceed
metricsInterval = 10 * time.Second
)

var _ internal.Scheduler = (*coordinator)(nil)

Expand All @@ -40,30 +44,35 @@ type coordinator struct {
schedulers map[schedulerType]scheduler
replicationM *replicationManager
captureM *captureManager

lastCollectTime time.Time
changefeedID model.ChangeFeedID
tasksCounter map[struct{ scheduler, task string }]int
}

// NewCoordinator returns a two phase scheduler.
func NewCoordinator(
ctx context.Context,
captureID model.CaptureID,
changeFeedID model.ChangeFeedID,
changefeedID model.ChangeFeedID,
checkpointTs model.Ts,
messageServer *p2p.MessageServer,
messageRouter p2p.MessageRouter,
ownerRevision int64,
cfg *config.SchedulerConfig,
) (internal.Scheduler, error) {
trans, err := newTransport(ctx, changeFeedID, schedulerRole, messageServer, messageRouter)
trans, err := newTransport(ctx, changefeedID, schedulerRole, messageServer, messageRouter)
if err != nil {
return nil, errors.Trace(err)
}
coord := newCoordinator(captureID, ownerRevision, cfg)
coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg)
coord.trans = trans
return coord, nil
}

func newCoordinator(
captureID model.CaptureID,
changefeedID model.ChangeFeedID,
ownerRevision int64,
cfg *config.SchedulerConfig,
) *coordinator {
Expand All @@ -78,8 +87,12 @@ func newCoordinator(
revision: revision,
captureID: captureID,
schedulers: schedulers,
replicationM: newReplicationManager(cfg.MaxTaskConcurrency),
captureM: newCaptureManager(revision, cfg.HeartbeatTick),
replicationM: newReplicationManager(cfg.MaxTaskConcurrency, changefeedID),
captureM: newCaptureManager(changefeedID, revision, cfg.HeartbeatTick),
tasksCounter: make(map[struct {
scheduler string
task string
}]int),
}
}

Expand Down Expand Up @@ -190,6 +203,12 @@ func (c *coordinator) poll(
zap.String("scheduler", scheduler.Name()))
}
allTasks = append(allTasks, tasks...)
for _, t := range tasks {
name := struct {
scheduler, task string
}{scheduler: scheduler.Name(), task: t.Name()}
c.tasksCounter[name]++
}
}

// Handle generated schedule tasks.
Expand All @@ -205,6 +224,8 @@ func (c *coordinator) poll(
return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err)
}

c.maybeCollectMetrics()

// Checkpoint calculation
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables)
return newCheckpointTs, newResolvedTs, nil
Expand Down Expand Up @@ -250,3 +271,21 @@ func (c *coordinator) sendMsgs(ctx context.Context, msgs []*schedulepb.Message)
}
return c.trans.Send(ctx, msgs)
}

func (c *coordinator) maybeCollectMetrics() {
now := time.Now()
if now.Sub(c.lastCollectTime) < metricsInterval {
return
}
c.lastCollectTime = now

cf := c.replicationM.changefeedID
for name, counter := range c.tasksCounter {
scheduleTaskCounter.
WithLabelValues(cf.Namespace, cf.ID, name.scheduler, name.task).
Add(float64(counter))
c.tasksCounter[name] = 0
}
c.replicationM.CollectMetrics()
c.captureM.CollectMetrics()
}
35 changes: 25 additions & 10 deletions cdc/scheduler/internal/tp/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestCoordinatorSendMsgs(t *testing.T) {
captureID: "0",
trans: trans,
}
coord.captureM = newCaptureManager(coord.revision, 0)
coord.captureM = newCaptureManager(model.ChangeFeedID{}, coord.revision, 0)
coord.sendMsgs(
ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}})

Expand Down Expand Up @@ -145,7 +145,7 @@ func TestCoordinatorRecvMsgs(t *testing.T) {
func TestCoordinatorHeartbeat(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestCoordinatorHeartbeat(t *testing.T) {

func TestCoordinatorAddCapture(t *testing.T) {
t.Parallel()
coord := newCoordinator("a", 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestCoordinatorAddCapture(t *testing.T) {
func TestCoordinatorRemoveCapture(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
Expand Down Expand Up @@ -341,9 +341,14 @@ func BenchmarkCoordinatorInit(b *testing.B) {
coord = &coordinator{
trans: &mockTrans{},
schedulers: schedulers,
replicationM: newReplicationManager(10),
replicationM: newReplicationManager(10, model.ChangeFeedID{}),
// Disable heartbeat.
captureM: newCaptureManager(schedulepb.OwnerRevision{}, math.MaxInt),
captureM: newCaptureManager(
model.ChangeFeedID{}, schedulepb.OwnerRevision{}, math.MaxInt),
tasksCounter: make(map[struct {
scheduler string
task string
}]int),
}
name = fmt.Sprintf("InitTable %d", total)
return name, coord, currentTables, captures
Expand All @@ -360,7 +365,8 @@ func BenchmarkCoordinatorHeartbeat(b *testing.B) {
const captureCount = 8
captures = map[model.CaptureID]*model.CaptureInfo{}
// Always heartbeat.
captureM := newCaptureManager(schedulepb.OwnerRevision{}, 0)
captureM := newCaptureManager(
model.ChangeFeedID{}, schedulepb.OwnerRevision{}, 0)
captureM.initialized = true
for i := 0; i < captureCount; i++ {
captures[fmt.Sprint(i)] = &model.CaptureInfo{}
Expand All @@ -375,8 +381,12 @@ func BenchmarkCoordinatorHeartbeat(b *testing.B) {
coord = &coordinator{
trans: &mockTrans{},
schedulers: schedulers,
replicationM: newReplicationManager(10),
replicationM: newReplicationManager(10, model.ChangeFeedID{}),
captureM: captureM,
tasksCounter: make(map[struct {
scheduler string
task string
}]int),
}
name = fmt.Sprintf("Heartbeat %d", total)
return name, coord, currentTables, captures
Expand All @@ -393,13 +403,14 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
const captureCount = 8
captures = map[model.CaptureID]*model.CaptureInfo{}
// Disable heartbeat.
captureM := newCaptureManager(schedulepb.OwnerRevision{}, math.MaxInt)
captureM := newCaptureManager(
model.ChangeFeedID{}, schedulepb.OwnerRevision{}, math.MaxInt)
captureM.initialized = true
for i := 0; i < captureCount; i++ {
captures[fmt.Sprint(i)] = &model.CaptureInfo{}
captureM.Captures[fmt.Sprint(i)] = &CaptureStatus{State: CaptureStateInitialized}
}
replicationM := newReplicationManager(10)
replicationM := newReplicationManager(10, model.ChangeFeedID{})
currentTables = make([]model.TableID, 0, total)
heartbeatResp := make(map[model.CaptureID]*schedulepb.Message)
for i := 0; i < total; i++ {
Expand Down Expand Up @@ -447,6 +458,10 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
schedulers: schedulers,
replicationM: replicationM,
captureM: captureM,
tasksCounter: make(map[struct {
scheduler string
task string
}]int),
}
name = fmt.Sprintf("HeartbeatResponse %d", total)
return name, coord, currentTables, captures
Expand Down
Loading

0 comments on commit eb3e7ca

Please sign in to comment.