From 571cecab45de244597e939dfb578ae835a30135a Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 8 Jun 2022 18:29:11 +0800 Subject: [PATCH] fb/latency(cdc): agent fix inconsistent state by tolerant duplicate messages. (#5791) * agent should return absent directly. * do not panic when remove table not exist. * no need to check for table status. --- cdc/processor/processor.go | 2 +- cdc/scheduler/internal/tp/agent.go | 11 ++++++++++- cdc/scheduler/internal/tp/agent_test.go | 23 +++++++++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index c8f44435000..172c7e3ac10 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -307,7 +307,7 @@ func (p *processor) IsRemoveTableFinished(ctx context.Context, tableID model.Tab table, exist := p.tables[tableID] if !exist { - log.Panic("table should be removing but not found", + log.Warn("table should be removing but not found", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), diff --git a/cdc/scheduler/internal/tp/agent.go b/cdc/scheduler/internal/tp/agent.go index 8d5cb4fea2d..bda8aef6dcd 100644 --- a/cdc/scheduler/internal/tp/agent.go +++ b/cdc/scheduler/internal/tp/agent.go @@ -220,7 +220,7 @@ func (a *agent) newTableStatus(tableID model.TableID) schedulepb.TableStatus { // there is task that try to remove the table, // return `stopping` instead of the real table state, // to indicate that the remove table request was received. - if task.IsRemove == true { + if task.IsRemove == true && state != schedulepb.TableStateAbsent { state = schedulepb.TableStateStopping } } @@ -335,6 +335,15 @@ func (a *agent) handleMessageDispatchTableRequest( Epoch: epoch, status: dispatchTableTaskReceived, } + if a.tableExec.GetTableMeta(task.TableID).State == pipeline.TableStateAbsent { + log.Warn("tpscheduler: agent ignore remove table request, "+ + "since the table is absent", + zap.Any("tableID", task.TableID), + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID)) + return + } default: log.Warn("tpscheduler: agent ignore unknown dispatch table request", zap.String("capture", a.captureID), diff --git a/cdc/scheduler/internal/tp/agent_test.go b/cdc/scheduler/internal/tp/agent_test.go index 2a4f7ae4e80..df57a3ae456 100644 --- a/cdc/scheduler/internal/tp/agent_test.go +++ b/cdc/scheduler/internal/tp/agent_test.go @@ -77,6 +77,10 @@ func TestAgentCollectTableStatus(t *testing.T) { a.runningTasks[model.TableID(0)] = &dispatchTableTask{IsRemove: true} status := a.newTableStatus(model.TableID(0)) require.Equal(t, schedulepb.TableStateStopping, status.State) + + a.runningTasks[model.TableID(10)] = &dispatchTableTask{IsRemove: true} + status = a.newTableStatus(model.TableID(10)) + require.Equal(t, schedulepb.TableStateAbsent, status.State) } func TestAgentHandleDispatchTableTask(t *testing.T) { @@ -250,6 +254,25 @@ func TestAgentHandleMessageStopping(t *testing.T) { require.True(t, addTableResponse.AddTable.Reject) } +func TestAgentHandleRemoveTableRequest(t *testing.T) { + t.Parallel() + + a := newBaseAgent4Test() + a.tableExec = newMockTableExecutor() + + // remove a table not exist, should not generate the task. + removeTableRequest := &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{ + TableID: 2, + }, + }, + } + + a.handleMessageDispatchTableRequest(removeTableRequest, a.epoch) + require.Len(t, a.runningTasks, 0) +} + func TestAgentHandleMessage(t *testing.T) { t.Parallel()