Skip to content

Commit

Permalink
fb/latency(cdc): agent fix inconsistent state by tolerant duplicate m…
Browse files Browse the repository at this point in the history
…essages. (pingcap#5791)

* agent should return absent directly.
* do not panic when remove table not exist.
* no need to check for table status.
  • Loading branch information
3AceShowHand authored and overvenus committed Jun 21, 2022
1 parent cf6596f commit 571ceca
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (p *processor) IsRemoveTableFinished(ctx context.Context, tableID model.Tab

table, exist := p.tables[tableID]
if !exist {
log.Panic("table should be removing but not found",
log.Warn("table should be removing but not found",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
Expand Down
11 changes: 10 additions & 1 deletion cdc/scheduler/internal/tp/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (a *agent) newTableStatus(tableID model.TableID) schedulepb.TableStatus {
// there is task that try to remove the table,
// return `stopping` instead of the real table state,
// to indicate that the remove table request was received.
if task.IsRemove == true {
if task.IsRemove == true && state != schedulepb.TableStateAbsent {
state = schedulepb.TableStateStopping
}
}
Expand Down Expand Up @@ -335,6 +335,15 @@ func (a *agent) handleMessageDispatchTableRequest(
Epoch: epoch,
status: dispatchTableTaskReceived,
}
if a.tableExec.GetTableMeta(task.TableID).State == pipeline.TableStateAbsent {
log.Warn("tpscheduler: agent ignore remove table request, "+
"since the table is absent",
zap.Any("tableID", task.TableID),
zap.String("capture", a.captureID),
zap.String("namespace", a.changeFeedID.Namespace),
zap.String("changefeed", a.changeFeedID.ID))
return
}
default:
log.Warn("tpscheduler: agent ignore unknown dispatch table request",
zap.String("capture", a.captureID),
Expand Down
23 changes: 23 additions & 0 deletions cdc/scheduler/internal/tp/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func TestAgentCollectTableStatus(t *testing.T) {
a.runningTasks[model.TableID(0)] = &dispatchTableTask{IsRemove: true}
status := a.newTableStatus(model.TableID(0))
require.Equal(t, schedulepb.TableStateStopping, status.State)

a.runningTasks[model.TableID(10)] = &dispatchTableTask{IsRemove: true}
status = a.newTableStatus(model.TableID(10))
require.Equal(t, schedulepb.TableStateAbsent, status.State)
}

func TestAgentHandleDispatchTableTask(t *testing.T) {
Expand Down Expand Up @@ -250,6 +254,25 @@ func TestAgentHandleMessageStopping(t *testing.T) {
require.True(t, addTableResponse.AddTable.Reject)
}

func TestAgentHandleRemoveTableRequest(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()
a.tableExec = newMockTableExecutor()

// remove a table not exist, should not generate the task.
removeTableRequest := &schedulepb.DispatchTableRequest{
Request: &schedulepb.DispatchTableRequest_RemoveTable{
RemoveTable: &schedulepb.RemoveTableRequest{
TableID: 2,
},
},
}

a.handleMessageDispatchTableRequest(removeTableRequest, a.epoch)
require.Len(t, a.runningTasks, 0)
}

func TestAgentHandleMessage(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 571ceca

Please sign in to comment.