Skip to content

Commit

Permalink
Add unit tests for kv/importer and restore/checkpoints, plus some bug…
Browse files Browse the repository at this point in the history
… fixes (pingcap#191)

* kv: add unit test for importer (based on a mocked gRPC client)

* restore: remove the unnecessary SHOW CREATE TABLE calls

We can now reconstruct the table info directly from the HTTP reply, so the
SHOW CREATE TABLE results are now useless. Better drop them.

* restore: add unit test for tidb.go

* common: ensure sqlmock errors are not retryable

* restore: fix error where checkpoint status of index engine is not updated

Also, made the WholeTableEngineID constant public.

* restore: rename a confusing variable

* restore: fix bug where --checkpoint-error-destroy=all skips index engine

* restore: prevent NPE when getting missing table from file checkpoint

* restore: add unit tests for checkpoints

* kv: address importer comments

* kv: also exposes the mock Importer constructor to other tests
  • Loading branch information
kennytm committed May 21, 2019
1 parent e98dbf8 commit 78bb37c
Show file tree
Hide file tree
Showing 16 changed files with 1,993 additions and 78 deletions.
6 changes: 3 additions & 3 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tableName s
}

for _, table := range targetTables {
for engineID := 0; engineID < table.EnginesCount; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, int32(engineID))
closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, int32(engineID))
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, engineID)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while closing engine:", err)
lastErr = err
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ module github.com/pingcap/tidb-lightning

require (
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.3.3
github.com/coreos/go-semver v0.2.0
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.2.1
github.com/golang/mock v1.1.1
github.com/grpc-ecosystem/grpc-gateway v1.6.4 // indirect
github.com/joho/sqltocsv v0.0.0-20190321025444-a9e6f980056c
github.com/kr/pretty v0.1.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DATA-DOG/go-sqlmock v1.3.3 h1:CWUqKXe0s8A2z6qCgkP4Kru7wC11YoAnoupUKFDnH08=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY=
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -56,6 +58,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff h1:kOkM9whyQYodu09SJ6W3NCsHG7crFaJILQ22Gozp3lg=
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.0.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
17 changes: 16 additions & 1 deletion lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
"context"
"database/sql"
"encoding/json"
stderrors "errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"reflect"
"regexp"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -150,6 +153,13 @@ func (t SQLWithRetry) Exec(ctx context.Context, purpose string, query string, ar
})
}

// sqlmock uses fmt.Errorf to produce expectation failures, which will cause
// unnecessary retry if not specially handled >:(
var stdFatalErrorsRegexp = regexp.MustCompile(
`^call to (?s:.*) was not expected|arguments do not match:|could not match actual sql`,
)
var stdErrorType = reflect.TypeOf(stderrors.New(""))

// IsRetryableError returns whether the error is transient (e.g. network
// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This
// function returns `false` (irrecoverable) if `err == nil`.
Expand All @@ -174,7 +184,12 @@ func IsRetryableError(err error) bool {
}
default:
switch status.Code(err) {
case codes.Unknown, codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return true
case codes.Unknown:
if reflect.TypeOf(err) == stdErrorType {
return !stdFatalErrorsRegexp.MatchString(err.Error())
}
return true
default:
return false
Expand Down
6 changes: 6 additions & 0 deletions lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package common_test
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
Expand All @@ -24,6 +25,7 @@ import (

"github.com/go-sql-driver/mysql"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/common"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -109,4 +111,8 @@ func (s *utilSuite) TestIsRetryableError(c *C) {
c.Assert(common.IsRetryableError(status.Error(codes.OutOfRange, "")), IsTrue)
c.Assert(common.IsRetryableError(status.Error(codes.Unavailable, "")), IsTrue)
c.Assert(common.IsRetryableError(status.Error(codes.DataLoss, "")), IsTrue)

// sqlmock errors
c.Assert(common.IsRetryableError(fmt.Errorf("call to database Close was not expected")), IsFalse)
c.Assert(common.IsRetryableError(errors.New("call to database Close was not expected")), IsTrue)
}
40 changes: 29 additions & 11 deletions lightning/kv/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
)

const (
maxRetryTimes int = 3 // tikv-importer has done retry internally. so we don't retry many times.
retryBackoffTime = time.Second * 3
maxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
defaultRetryBackoffTime = time.Second * 3
)

/*
Expand Down Expand Up @@ -71,9 +71,10 @@ Usual workflow:
// Importer represents a gRPC connection to tikv-importer. This type is
// goroutine safe: you can share this instance and execute any method anywhere.
type Importer struct {
conn *grpc.ClientConn
cli kv.ImportKVClient
pdAddr string
conn *grpc.ClientConn
cli kv.ImportKVClient
pdAddr string
retryBackoffTime time.Duration
}

// NewImporter creates a new connection to tikv-importer. A single connection
Expand All @@ -85,15 +86,32 @@ func NewImporter(ctx context.Context, importServerAddr string, pdAddr string) (*
}

return &Importer{
conn: conn,
cli: kv.NewImportKVClient(conn),
pdAddr: pdAddr,
conn: conn,
cli: kv.NewImportKVClient(conn),
pdAddr: pdAddr,
retryBackoffTime: defaultRetryBackoffTime,
}, nil
}

// NewMockImporter creates an *unconnected* importer based on a custom
// ImportKVClient. This is provided for testing only. Do not use this function
// outside of tests.
func NewMockImporter(cli kv.ImportKVClient, pdAddr string) *Importer {
return &Importer{
conn: nil,
cli: cli,
pdAddr: pdAddr,
retryBackoffTime: 0,
}
}

// Close the importer connection.
func (importer *Importer) Close() {
importer.conn.Close()
if importer.conn != nil {
if err := importer.conn.Close(); err != nil {
log.L().Warn("close importer gRPC connection failed", zap.Error(err))
}
}
}

// SwitchMode switches the TiKV cluster to another operation mode.
Expand Down Expand Up @@ -266,7 +284,7 @@ func (stream *WriteStream) Put(kvs []kvec.KvPair) error {
break
}
stream.engine.logger.Error("send write stream failed", log.ShortError(sendErr))
time.Sleep(retryBackoffTime)
time.Sleep(stream.engine.importer.retryBackoffTime)
}
return errors.Trace(sendErr)
}
Expand Down Expand Up @@ -356,7 +374,7 @@ func (engine *ClosedEngine) Import(ctx context.Context) error {
return errors.Trace(err)
}
task.Warn("import spuriously failed, going to retry again", log.ShortError(err))
time.Sleep(retryBackoffTime)
time.Sleep(engine.importer.retryBackoffTime)
}

return errors.Annotatef(err, "[%s] import reach max retry %d and still failed", engine.uuid, maxRetryTimes)
Expand Down
Loading

0 comments on commit 78bb37c

Please sign in to comment.