Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink (ticdc): fix a deadlock due to checkpointTs fall back in sinkNode (#4084) #4098

Merged
Merged
11 changes: 10 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,21 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err != nil {
return errors.Trace(err)
}

// we must call flowController.Release immediately after we call
// FlushRowChangedEvents to prevent deadlock cause by checkpointTs
// fall back
n.flowController.Release(checkpointTs)

// the checkpointTs may fall back in some situation such as:
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= n.checkpointTs {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)

n.flowController.Release(checkpointTs)
return nil
}

Expand Down
58 changes: 58 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,61 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2)
c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0)
}

type flushFlowController struct {
mockFlowController
releaseCounter int
}

func (c *flushFlowController) Release(resolvedTs uint64) {
c.releaseCounter++
}

type flushSink struct {
mockSink
}

// use to simulate the situation that resolvedTs return from sink manager
// fall back
var fallBackResolvedTs = uint64(10)

func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs == fallBackResolvedTs {
return 0, nil
}
return resolvedTs, nil
}

// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always
// call flowController.Release to release the memory quota of the table to avoid
// deadlock if there is no error occur
func (s *outputSuite) TestFlushSinkReleaseFlowController(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test-flushSink",
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: cfg,
},
})
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController)
c.Assert(sNode.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
sNode.barrierTs = 10

cctx := pipeline.MockNodeContext4Test(nil, pipeline.TickMessage(), nil)
err := sNode.flushSink(cctx, uint64(8))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 1)
// resolvedTs will fall back in this call
err = sNode.flushSink(cctx, uint64(10))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 2)
}
2 changes: 1 addition & 1 deletion pkg/pipeline/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outp
return Message{}, nil
}

// MockNodeContext4Test creates a node context with a message and a output channel for tests.
// MockNodeContext4Test creates a node context with a message and an output channel for tests.
func MockNodeContext4Test(ctx context.Context, msg Message, outputCh chan Message) NodeContext {
return newNodeContext(ctx, msg, outputCh)
}