Skip to content

Commit

Permalink
owner, gcutil: always update service GC safepoint when owner finds ne…
Browse files Browse the repository at this point in the history
…w changefeeds (#2512) (#2854)
  • Loading branch information
ti-chi-bot committed Sep 23, 2021
1 parent a933190 commit f06bbdc
Show file tree
Hide file tree
Showing 23 changed files with 666 additions and 325 deletions.
4 changes: 2 additions & 2 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Capture struct {
cancel context.CancelFunc

newProcessorManager func() *processor.Manager
newOwner func() *owner.Owner
newOwner func(pd.Client) *owner.Owner
}

// NewCapture returns a new Capture instance
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}

log.Info("campaign owner successfully", zap.String("capture-id", c.info.ID))
owner := c.newOwner()
owner := c.newOwner(c.pdClient)
c.setOwner(owner)
err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval)
c.setOwner(nil)
Expand Down
8 changes: 6 additions & 2 deletions cdc/capture/http_validatior.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -62,8 +63,11 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
changefeedConfig.StartTS = oracle.ComposeTS(ts, logical)
}

if err := util.CheckSafetyOfStartTs(ctx, capture.pdClient, changefeedConfig.ID, changefeedConfig.StartTS); err != nil {
if err != cerror.ErrStartTsBeforeGC {
// Ensure the start ts is valid in the next 1 hour.
const ensureTTL = 60 * 60
if err := gc.EnsureChangefeedStartTsSafety(
ctx, capture.pdClient, changefeedConfig.ID, ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
return nil, cerror.ErrPDEtcdAPIError.Wrap(err)
}
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/notify"
"github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -299,7 +300,9 @@ func (o *Owner) newChangeFeed(
log.Info("Find new changefeed", zap.Stringer("info", info),
zap.String("changefeed", id), zap.Uint64("checkpoint ts", checkpointTs))
if info.Config.CheckGCSafePoint {
err := util.CheckSafetyOfStartTs(ctx, o.pdClient, id, checkpointTs)
ensureTTL := int64(10 * 60)
err := gc.EnsureChangefeedStartTsSafety(
ctx, o.pdClient, id, ensureTTL, checkpointTs)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
23 changes: 18 additions & 5 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/ticdc/cdc/model"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -39,7 +40,7 @@ type changefeed struct {
scheduler *scheduler
barriers *barriers
feedStateManager *feedStateManager
gcManager GcManager
gcManager gc.Manager

schema *schemaWrap4Owner
sink AsyncSink
Expand Down Expand Up @@ -68,7 +69,7 @@ type changefeed struct {
newSink func(ctx cdcContext.Context) (AsyncSink, error)
}

func newChangefeed(id model.ChangeFeedID, gcManager GcManager) *changefeed {
func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
c := &changefeed{
id: id,
scheduler: newScheduler(),
Expand All @@ -86,7 +87,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager GcManager) *changefeed {
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager GcManager,
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error),
) *changefeed {
Expand Down Expand Up @@ -122,7 +123,7 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor
func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs uint64) error {
state := c.state.Info.State
if state == model.StateNormal || state == model.StateStopped || state == model.StateError {
if err := c.gcManager.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
if err := c.gcManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -202,7 +203,19 @@ LOOP:
failpoint.Return(errors.New("failpoint injected retriable error"))
})
if c.state.Info.Config.CheckGCSafePoint {
err := util.CheckSafetyOfStartTs(ctx, ctx.GlobalVars().PDClient, c.state.ID, checkpointTs)
// Check TiDB GC safepoint does not exceed the checkpoint.
//
// We update TTL to 10 minutes,
// 1. to delete the service GC safepoint effectively,
// 2. in case owner update TiCDC service GC safepoint fails.
//
// Also it unblocks TiDB GC, because the service GC safepoint is set to
// 1 hour TTL during creating changefeed.
//
// See more gc doc.
ensureTTL := int64(10 * 60)
err := gc.EnsureChangefeedStartTsSafety(
ctx, ctx.GlobalVars().PDClient, c.state.ID, ensureTTL, checkpointTs)
if err != nil {
return errors.Trace(err)
}
Expand Down
11 changes: 7 additions & 4 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -107,10 +108,12 @@ type changefeedSuite struct {

func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *model.ChangefeedReactorState,
map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester) {
ctx.GlobalVars().PDClient = &mockPDClient{updateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return safePoint, nil
}}
gcManager := newGCManager()
ctx.GlobalVars().PDClient = &gc.MockPDClient{
UpdateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return safePoint, nil
},
}
gcManager := gc.NewManager(ctx.GlobalVars().PDClient)
cf := newChangefeed4Test(ctx.ChangefeedVars().ID, gcManager, func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
return &mockDDLPuller{resolvedTs: startTs - 1}, nil
}, func(ctx cdcContext.Context) (AsyncSink, error) {
Expand Down
171 changes: 0 additions & 171 deletions cdc/owner/gc_manager_test.go

This file was deleted.

Loading

0 comments on commit f06bbdc

Please sign in to comment.