Skip to content

Commit

Permalink
owner(ticdc): fix changefeed backoff (pingcap#9687)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Sep 7, 2023
1 parent 2ab025e commit f9b6d24
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 46 deletions.
50 changes: 11 additions & 39 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ const (
defaultBackoffMaxElapsedTime = 30 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0

// If all states recorded in window are 'normal', it can be assumed that the changefeed
// is running steady. And then if we enter a state other than normal at next tick,
// the backoff must be reset.
defaultStateWindowSize = 512
)

// feedStateManager manages the ReactorState of a changefeed
Expand All @@ -59,7 +54,7 @@ type feedStateManager struct {
shouldBeRemoved bool

adminJobQueue []*model.AdminJob
stateHistory [defaultStateWindowSize]model.FeedState
isRetrying bool
lastErrorRetryTime time.Time // time of last error for a changefeed
lastErrorRetryCheckpointTs model.Ts // checkpoint ts of last retry
lastWarningReportCheckpointTs model.Ts // checkpoint ts of last warning report
Expand Down Expand Up @@ -93,26 +88,6 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
return f
}

// isChangefeedStable check if there are states other than 'normal' in this sliding window.
func (m *feedStateManager) isChangefeedStable() bool {
for _, val := range m.stateHistory {
if val != model.StateNormal {
return false
}
}

return true
}

// shiftStateWindow shift the sliding window
func (m *feedStateManager) shiftStateWindow(state model.FeedState) {
for i := 0; i < defaultStateWindowSize-1; i++ {
m.stateHistory[i] = m.stateHistory[i+1]
}

m.stateHistory[defaultStateWindowSize-1] = state
}

func (m *feedStateManager) Tick(
state *orchestrator.ChangefeedReactorState,
resolvedTs model.Ts,
Expand All @@ -128,7 +103,6 @@ func (m *feedStateManager) Tick(
}
}

m.shiftStateWindow(state.Info.State)
m.checkAndInitLastRetryCheckpointTs(state.Status)

m.state = state
Expand Down Expand Up @@ -167,13 +141,13 @@ func (m *feedStateManager) Tick(
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {
log.Warn("The changefeed won't be restarted "+
"as it has been experiencing failures for "+
log.Error("The changefeed won't be restarted as it has been experiencing failures for "+
"an extended duration",
zap.Duration(
"maxElapsedTime",
m.errBackoff.MaxElapsedTime,
),
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime),
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Time("lastRetryTime", m.lastErrorRetryTime),
zap.Uint64("lastRetryCheckpointTs", m.lastErrorRetryCheckpointTs),
)
m.shouldBeRunning = false
m.patchState(model.StateFailed)
Expand Down Expand Up @@ -563,13 +537,10 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
})
}

// If we enter into an abnormal state 'error' for this changefeed now
// but haven't seen abnormal states in a sliding window (512 ticks),
// it can be assumed that this changefeed meets a sudden change from a stable condition.
// So we can reset the exponential backoff and re-backoff from the InitialInterval.
// TODO: this detection policy should be added into unit test.
if m.isChangefeedStable() {
// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
}
}

Expand Down Expand Up @@ -646,6 +617,7 @@ func (m *feedStateManager) checkAndChangeState() {
zap.Uint64("checkpointTs", m.state.Status.CheckpointTs),
zap.Uint64("lastRetryCheckpointTs", m.lastErrorRetryCheckpointTs))
m.patchState(model.StateNormal)
m.isRetrying = false
}
}

Expand Down
210 changes: 203 additions & 7 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
Expand All @@ -46,19 +47,18 @@ func (p *mockPD) GetTS(_ context.Context) (int64, int64, error) {

// newFeedStateManager4Test creates feedStateManager for test
func newFeedStateManager4Test(
initialIntervalInMs time.Duration,
maxIntervalInMs time.Duration,
maxElapsedTimeInMs time.Duration,
initialIntervalInMs, maxIntervalInMs, maxElapsedTimeInMs int,
multiplier float64,
) *feedStateManager {
f := new(feedStateManager)
f.upstream = new(upstream.Upstream)
f.upstream.PDClient = &mockPD{}
f.upstream.PDClock = pdutil.NewClock4Test()

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = initialIntervalInMs * time.Millisecond
f.errBackoff.MaxInterval = maxIntervalInMs * time.Millisecond
f.errBackoff.MaxElapsedTime = maxElapsedTimeInMs * time.Millisecond
f.errBackoff.InitialInterval = time.Duration(initialIntervalInMs) * time.Millisecond
f.errBackoff.MaxInterval = time.Duration(maxIntervalInMs) * time.Millisecond
f.errBackoff.MaxElapsedTime = time.Duration(maxElapsedTimeInMs) * time.Millisecond
f.errBackoff.Multiplier = multiplier
f.errBackoff.RandomizationFactor = 0

Expand Down Expand Up @@ -690,7 +690,7 @@ func TestBackoffNeverStops(t *testing.T) {
func TestUpdateChangefeedEpoch(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
// Set a long backoff time
manager := newFeedStateManager4Test(time.Hour, time.Hour, 0, 1.0)
manager := newFeedStateManager4Test(int(time.Hour), int(time.Hour), 0, 1.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
Expand Down Expand Up @@ -739,3 +739,199 @@ func TestUpdateChangefeedEpoch(t *testing.T) {
}
}
}

func TestHandleWarning(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, 0, 2.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})

tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 1. test when an warning occurs, the changefeed state will be changed to warning
// and it will still keep running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 2. test when the changefeed is in warning state, and the checkpointTs is not progressing,
// the changefeed state will remain warning
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 3. test when the changefeed is in warning state, and the checkpointTs is progressing,
// the changefeed state will be changed to normal
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 201,
}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 4. test when the changefeed is in warning state, and the checkpointTs is not progressing
// for defaultBackoffMaxElapsedTime time, the changefeed state will be changed to failed
// and it will stop running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
// mock the checkpointTs is not progressing for defaultBackoffMaxElapsedTime time
manager.checkpointTsAdvanced = manager.
checkpointTsAdvanced.Add(-(defaultBackoffMaxElapsedTime + 1))
// resolveTs = 202 > checkpointTs = 201
manager.Tick(state, 202)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateFailed, state.Info.State)
require.False(t, manager.ShouldRunning())
}

func TestErrorAfterWarning(t *testing.T) {
t.Parallel()

maxElapsedTimeInMs := 2000
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})

tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 1. test when an warning occurs, the changefeed state will be changed to warning
// and it will still keep running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 2. test when the changefeed is in warning state, and the checkpointTs is not progressing,
// the changefeed state will remain warning
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 3. Sleep maxElapsedTimeInMs to wait backoff timeout. And when an error occurs after an warning,
// the backoff will be reseted, and changefeed state will be changed to warning and it will still
// keep running.
time.Sleep(time.Millisecond * time.Duration(maxElapsedTimeInMs))
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()

manager.Tick(state, 0)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StatePending, state.Info.State)
require.False(t, manager.ShouldRunning())
manager.Tick(state, 0)

// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())
}

0 comments on commit f9b6d24

Please sign in to comment.