Skip to content

Commit

Permalink
cdc: implement replication set (pingcap#5450)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Jun 23, 2022
1 parent 09cf059 commit 677031c
Show file tree
Hide file tree
Showing 7 changed files with 1,289 additions and 151 deletions.
1 change: 1 addition & 0 deletions cdc/scheduler/internal/tp/balance_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (b *balancer) Name() string {
}

func (b *balancer) Schedule(
checkpointTs model.Ts,
currentTables []model.TableID,
captures map[model.CaptureID]*model.CaptureInfo,
captureTables map[model.CaptureID]captureStatus,
Expand Down
5 changes: 4 additions & 1 deletion cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type scheduler interface {
Name() string
Schedule(
checkpointTs model.Ts,
currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
captureTables map[model.CaptureID]captureStatus,
Expand Down Expand Up @@ -71,7 +72,7 @@ func (c *coordinator) poll(
captureTables := c.manager.captureTableSets()
allTasks := make([]*scheduleTask, 0)
for _, sched := range c.scheduler {
tasks := sched.Schedule(currentTables, aliveCaptures, captureTables)
tasks := sched.Schedule(checkpointTs, currentTables, aliveCaptures, captureTables)
allTasks = append(allTasks, tasks...)
}
recvMsgs := c.recvMessages()
Expand All @@ -81,6 +82,8 @@ func (c *coordinator) poll(
return errors.Trace(err)
}
c.sendMessages(sentMsgs)

// checkpoint calcuation
return nil
}

Expand Down
Loading

0 comments on commit 677031c

Please sign in to comment.