Skip to content

Commit

Permalink
Merge branch 'master' into version-check-alpha
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Aug 11, 2022
2 parents 0ea9814 + 4acca1e commit 64c0cd7
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
5 changes: 4 additions & 1 deletion dm/syncer/shardddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ func (ddl *ShardDDL) HandleQueryEvent(ev *replication.QueryEvent, ec eventContex

if qec.shardingReSync != nil {
qec.shardingReSync.currLocation = *qec.currentLocation
if binlog.CompareLocation(qec.shardingReSync.currLocation, qec.shardingReSync.latestLocation, ddl.s.cfg.EnableGTID) >= 0 {
// TODO: refactor this, see https://github.com/pingcap/tiflow/issues/6691
// for optimistic ddl, we can resync idemponent ddl.
cmp := binlog.CompareLocation(qec.shardingReSync.currLocation, qec.shardingReSync.latestLocation, ddl.s.cfg.EnableGTID)
if cmp > 0 || (cmp == 0 && ddl.s.cfg.ShardMode != config.ShardOptimistic) {
ddl.logger.Info("re-replicate shard group was completed", zap.String("event", "query"), zap.Stringer("queryEventContext", qec))
return qec.closeShardingResync()
} else if ddl.s.cfg.ShardMode != config.ShardOptimistic {
Expand Down
18 changes: 18 additions & 0 deletions engine/jobmaster/dm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"
"time"

"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
Expand Down Expand Up @@ -217,6 +218,9 @@ func (c *JobCfg) fromDMTaskConfig(dmTaskCfg *dmconfig.TaskConfig) error {
}

func (c *JobCfg) adjust() error {
if err := c.verifySourceID(); err != nil {
return err
}
dmTaskCfg, err := c.toDMTaskConfig()
if err != nil {
return err
Expand All @@ -227,6 +231,20 @@ func (c *JobCfg) adjust() error {
return c.fromDMTaskConfig(dmTaskCfg)
}

func (c *JobCfg) verifySourceID() error {
sourceIDs := make(map[string]struct{})
for i, upstream := range c.Upstreams {
if upstream.SourceID == "" {
return errors.Errorf("source-id of %s upstream is empty", humanize.Ordinal(i+1))
}
if _, ok := sourceIDs[upstream.SourceID]; ok {
return errors.Errorf("source-id %s is duplicated", upstream.SourceID)
}
sourceIDs[upstream.SourceID] = struct{}{}
}
return nil
}

// TaskCfg shares same struct as JobCfg, but it only serves one upstream.
// TaskCfg can be converted to an equivalent DM subtask by ToDMSubTaskCfg.
type TaskCfg JobCfg
Expand Down
4 changes: 4 additions & 0 deletions engine/jobmaster/dm/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func TestJobCfg(t *testing.T) {
require.Equal(t, content3, content)

require.Error(t, jobCfg.DecodeFile("./job_not_exist.yaml"))
jobCfg.Upstreams[0].SourceID = ""
require.EqualError(t, jobCfg.adjust(), "source-id of 1st upstream is empty")
jobCfg.Upstreams[0].SourceID = jobCfg.Upstreams[1].SourceID
require.EqualError(t, jobCfg.adjust(), fmt.Sprintf("source-id %s is duplicated", jobCfg.Upstreams[0].SourceID))
}

func TestTaskCfg(t *testing.T) {
Expand Down

0 comments on commit 64c0cd7

Please sign in to comment.