Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add more retryable error for retryer (#43022) #43611

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"@com_github_klauspost_compress//snappy",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
Expand Down Expand Up @@ -70,6 +71,7 @@ go_test(
],
embed = [":storage"],
flaky = True,
shard_count = 43,
deps = [
"//br/pkg/mock",
"@com_github_aws_aws_sdk_go//aws",
Expand All @@ -80,6 +82,7 @@ go_test(
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_mock//gomock",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
],
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand Down Expand Up @@ -943,7 +944,21 @@ func isDeadlineExceedError(err error) bool {
return strings.Contains(err.Error(), "context deadline exceeded")
}

func isConnectionResetError(err error) bool {
return strings.Contains(err.Error(), "read: connection reset")
}

func (rl retryerWithLog) ShouldRetry(r *request.Request) bool {
// for unit test
failpoint.Inject("replace-error-to-connection-reset-by-peer", func(_ failpoint.Value) {
log.Info("original error", zap.Error(r.Error))
if r.Error != nil {
r.Error = errors.New("read tcp *.*.*.*:*->*.*.*.*:*: read: connection reset by peer")
}
})
if isConnectionResetError(r.Error) {
return true
}
if isDeadlineExceedError(r.Error) && r.HTTPRequest.URL.Host == ec2MetaAddress {
// fast fail for unreachable linklocal address in EC2 containers.
log.Warn("failed to get EC2 metadata. skipping.", logutil.ShortError(r.Error))
Expand Down
59 changes: 58 additions & 1 deletion br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/golang/mock/gomock"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/mock"
. "github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -285,6 +287,8 @@ func TestS3Storage(t *testing.T) {
sendCredential bool
}

require.NoError(t, os.Setenv("AWS_ACCESS_KEY_ID", "ab"))
require.NoError(t, os.Setenv("AWS_SECRET_ACCESS_KEY", "cd"))
s := createGetBucketRegionServer("us-west-2", 200, true)
defer s.Close()

Expand Down Expand Up @@ -407,7 +411,15 @@ func TestS3Storage(t *testing.T) {
}

func TestS3URI(t *testing.T) {
backend, err := ParseBackend("s3://bucket/prefix/", nil)
accessKey := "ab"
secretAccessKey := "cd"
options := &BackendOptions{
S3: S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err := ParseBackend("s3://bucket/prefix/", options)
require.NoError(t, err)
storage, err := New(context.Background(), backend, &ExternalStorageOptions{})
require.NoError(t, err)
Expand Down Expand Up @@ -1193,3 +1205,48 @@ func TestObjectLock(t *testing.T) {
)
require.Equal(t, true, s.storage.IsObjectLockEnabled())
}

func TestRetryError(t *testing.T) {
var count int32 = 0
var errString string = "read tcp *.*.*.*:*->*.*.*.*:*: read: connection reset by peer"
var lock sync.Mutex
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "PUT" {
var curCnt int32
t.Log(r.URL)
lock.Lock()
count += 1
curCnt = count
lock.Unlock()
if curCnt < 2 {
// write an cannot-retry error, but we modify the error to specific error, so client would retry.
w.WriteHeader(403)
return
}
}

w.WriteHeader(200)
}))

defer server.Close()
t.Log(server.URL)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/replace-error-to-connection-reset-by-peer", "return(true)"))
defer func() {
failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/replace-error-to-connection-reset-by-peer")
}()

ctx := context.Background()
s, err := NewS3Storage(&backuppb.S3{
Endpoint: server.URL,
Bucket: "test",
Prefix: "retry",
AccessKey: "none",
SecretAccessKey: "none",
ForcePathStyle: true,
}, &ExternalStorageOptions{})
require.NoError(t, err)
err = s.WriteFile(ctx, "reset", []byte(errString))
require.NoError(t, err)
require.Equal(t, count, int32(2))
}
4 changes: 2 additions & 2 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration {
// bo.attempt--
e := errors.Cause(err)
switch e { // nolint:errorlint
case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows:
case nil, context.Canceled, context.DeadlineExceeded, sql.ErrNoRows:
// Excepted error, finish the operation
bo.delayTime = 0
bo.attempt = 0
case berrors.ErrRestoreTotalKVMismatch:
case berrors.ErrRestoreTotalKVMismatch, io.EOF:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
default:
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package utils_test

import (
"context"
"io"
"testing"
"time"

Expand Down Expand Up @@ -101,13 +102,16 @@ func TestPdBackoffWithRetryableError(t *testing.T) {
gRPCError := status.Error(codes.Unavailable, "transport is closing")
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
if counter == 2 {
return io.EOF
}
return gRPCError
}, backoffer)
require.Equal(t, 16, counter)
require.Equal(t, []error{
gRPCError,
gRPCError,
gRPCError,
io.EOF,
gRPCError,
gRPCError,
gRPCError,
Expand Down