diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index 3c281563439de..0275b61592a1c 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "@com_github_golang_protobuf//proto", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/logbackuppb", "@com_github_pingcap_kvproto//pkg/metapb", @@ -78,6 +79,7 @@ go_test( "//tablecodec", "//util/codec", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/logbackuppb", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index b29cbd6956ae2..33c0e0898b66f 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -262,14 +262,16 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) { return case e, ok := <-ch: if !ok { + log.Info("[log backup advancer] Task watcher exits due to stream ends.") return } - log.Info("meet task event", zap.Stringer("event", &e)) + log.Info("[log backup advancer] Meet task event", zap.Stringer("event", &e)) if err := c.onTaskEvent(ctx, e); err != nil { if errors.Cause(e.Err) != context.Canceled { log.Error("listen task meet error, would reopen.", logutil.ShortError(err)) time.AfterFunc(c.cfg.BackoffTime, func() { c.StartTaskListener(ctx) }) } + log.Info("[log backup advancer] Task watcher exits due to some error.", logutil.ShortError(err)) return } } diff --git a/br/pkg/streamhelper/advancer_cliext.go b/br/pkg/streamhelper/advancer_cliext.go index 059475e62b2b2..f3d7e1f279d5f 100644 --- a/br/pkg/streamhelper/advancer_cliext.go +++ b/br/pkg/streamhelper/advancer_cliext.go @@ -7,13 +7,16 @@ import ( "context" "encoding/binary" "fmt" + "io" "strings" "github.com/golang/protobuf/proto" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/kv" clientv3 "go.etcd.io/etcd/client/v3" @@ -94,6 +97,9 @@ func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (Ta func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResponse) ([]TaskEvent, error) { result := make([]TaskEvent, 0, len(resp.Events)) + if err := resp.Err(); err != nil { + return nil, err + } for _, event := range resp.Events { te, err := t.toTaskEvent(ctx, event) if err != nil { @@ -110,6 +116,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE handleResponse := func(resp clientv3.WatchResponse) bool { events, err := t.eventFromWatch(ctx, resp) if err != nil { + log.Warn("[log backup advancer] Meet error during receiving the task event.", logutil.ShortError(err)) ch <- errorEvent(err) return false } @@ -118,33 +125,44 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE } return true } + collectRemaining := func() { + log.Info("[log backup advancer] Start collecting remaining events in the channel.", zap.Int("remained", len(c))) + defer log.Info("[log backup advancer] Finish collecting remaining events in the channel.") + for { + select { + case resp, ok := <-c: + if !ok { + return + } + if !handleResponse(resp) { + return + } + default: + return + } + } + } go func() { defer close(ch) for { select { case resp, ok := <-c: + failpoint.Inject("advancer_close_channel", func() { + // We cannot really close the channel, just simulating it. + ok = false + }) if !ok { + ch <- errorEvent(io.EOF) return } if !handleResponse(resp) { return } case <-ctx.Done(): - // drain the remain event from channel. - for { - select { - case resp, ok := <-c: - if !ok { - return - } - if !handleResponse(resp) { - return - } - default: - return - } - } + collectRemaining() + ch <- errorEvent(ctx.Err()) + return } } }() diff --git a/br/pkg/streamhelper/integration_test.go b/br/pkg/streamhelper/integration_test.go index 81572f6b7890d..4b989dc4ab2ba 100644 --- a/br/pkg/streamhelper/integration_test.go +++ b/br/pkg/streamhelper/integration_test.go @@ -7,12 +7,14 @@ import ( "context" "encoding/binary" "fmt" + "io" "net" "net/url" "path" "testing" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" @@ -143,6 +145,7 @@ func TestIntegration(t *testing.T) { t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) t.Run("testStoptask", func(t *testing.T) { testStoptask(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) + t.Run("TestStreamClose", func(t *testing.T) { testStreamClose(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) } func TestChecking(t *testing.T) { @@ -295,6 +298,7 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) { taskInfo2 := simpleTask(taskName2, 4) require.NoError(t, metaCli.PutTask(ctx, taskInfo2)) require.NoError(t, metaCli.DeleteTask(ctx, taskName2)) + first := <-ch require.Equal(t, first.Type, streamhelper.EventAdd) require.Equal(t, first.Name, taskName) @@ -310,8 +314,44 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) { require.Equal(t, forth.Type, streamhelper.EventDel) require.Equal(t, forth.Name, taskName2) cancel() - _, ok := <-ch - require.False(t, ok) + fifth, ok := <-ch + require.True(t, ok) + require.Equal(t, fifth.Type, streamhelper.EventErr) + require.Error(t, fifth.Err, context.Canceled) + item, ok := <-ch + require.False(t, ok, "%v", item) +} + +func testStreamClose(t *testing.T, metaCli streamhelper.AdvancerExt) { + ctx := context.Background() + taskName := "close_simple" + taskInfo := simpleTask(taskName, 4) + + require.NoError(t, metaCli.PutTask(ctx, taskInfo)) + ch := make(chan streamhelper.TaskEvent, 1024) + require.NoError(t, metaCli.Begin(ctx, ch)) + require.NoError(t, metaCli.DeleteTask(ctx, taskName)) + first := <-ch + require.Equal(t, first.Type, streamhelper.EventAdd) + require.Equal(t, first.Name, taskName) + require.ElementsMatch(t, first.Ranges, simpleRanges(4)) + second := <-ch + require.Equal(t, second.Type, streamhelper.EventDel, "%s", second) + require.Equal(t, second.Name, taskName, "%s", second) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel", "return")) + defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel") + // We need to make the channel file some events hence we can simulate the closed channel. + taskName2 := "close_simple2" + taskInfo2 := simpleTask(taskName2, 4) + require.NoError(t, metaCli.PutTask(ctx, taskInfo2)) + require.NoError(t, metaCli.DeleteTask(ctx, taskName2)) + + third := <-ch + require.Equal(t, third.Type, streamhelper.EventErr) + require.Error(t, third.Err, io.EOF) + item, ok := <-ch + require.False(t, ok, "%#v", item) } func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {