Skip to content

Commit

Permalink
Merge branch 'release-5.4' into cherry-pick-4937-to-release-5.4
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 21, 2022
2 parents 61bb40c + 48e2761 commit ba5ad3a
Show file tree
Hide file tree
Showing 56 changed files with 1,790 additions and 292 deletions.
237 changes: 161 additions & 76 deletions cmd/kafka-consumer/main.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ ErrSyncerReplaceEventNotExist,[code=36066:class=sync-unit:scope=internal:level=h
ErrSyncerParseDDL,[code=36067:class=sync-unit:scope=internal:level=high], "Message: parse DDL: %s, Workaround: Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed."
ErrSyncerUnsupportedStmt,[code=36068:class=sync-unit:scope=internal:level=high], "Message: `%s` statement not supported in %s mode"
ErrSyncerGetEvent,[code=36069:class=sync-unit:scope=upstream:level=high], "Message: get binlog event error: %v, Workaround: Please check if the binlog file could be parsed by `mysqlbinlog`."
ErrSyncerDownstreamTableNotFound,[code=36070:class=sync-unit:scope=internal:level=high], "Message: downstream table %s not found"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium], "Message: nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium], "Message: op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium], "Message: operate request without --sharding specified not valid"
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker")
}

return c.AdjustCaseSensitive(ctx2, db)
return nil
}

// AdjustCaseSensitive adjust CaseSensitive from DB.
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
cfg1.From.User = user
cfg1.From.Password = password
cfg1.RelayDir = "relay-dir"
c.Assert(checkAndAdjustSourceConfigFunc(ctx, cfg1), IsNil) // adjust source config.
c.Assert(checkAndAdjustSourceConfigForDMCtlFunc(ctx, cfg1), IsNil) // adjust source config.
cfg2 := cfg1.Clone()
cfg2.SourceID = "mysql-replica-02"

Expand Down
40 changes: 32 additions & 8 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package master

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/http/httputil"

"github.com/pingcap/failpoint"

ginmiddleware "github.com/deepmap/oapi-codegen/pkg/gin-middleware"
"github.com/gin-gonic/gin"
Expand All @@ -43,9 +46,8 @@ const (
docJSONBasePath = "/api/v1/dm.json"
)

// redirectRequestToLeaderMW a middleware auto redirect request to leader.
// because the leader has some data in memory, only the leader can process the request.
func (s *Server) redirectRequestToLeaderMW() gin.HandlerFunc {
// reverseRequestToLeaderMW reverses request to leader.
func (s *Server) reverseRequestToLeaderMW(tlsCfg *tls.Config) gin.HandlerFunc {
return func(c *gin.Context) {
ctx2 := c.Request.Context()
isLeader, _ := s.isLeaderAndNeedForward(ctx2)
Expand All @@ -58,14 +60,36 @@ func (s *Server) redirectRequestToLeaderMW() gin.HandlerFunc {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
c.Redirect(http.StatusTemporaryRedirect, fmt.Sprintf("http://%s%s", leaderOpenAPIAddr, c.Request.RequestURI))
c.AbortWithStatus(http.StatusTemporaryRedirect)

failpoint.Inject("MockNotSetTls", func() {
tlsCfg = nil
})
// simpleProxy just reverses to leader host
simpleProxy := httputil.ReverseProxy{
Director: func(req *http.Request) {
if tlsCfg != nil {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}
req.URL.Host = leaderOpenAPIAddr
req.Host = leaderOpenAPIAddr
},
}
if tlsCfg != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsCfg
simpleProxy.Transport = transport
}
log.L().Info("reverse request to leader", zap.String("Request URL", c.Request.URL.String()), zap.String("leader", leaderOpenAPIAddr), zap.Bool("hasTLS", tlsCfg != nil))
simpleProxy.ServeHTTP(c.Writer, c.Request)
c.Abort()
}
}
}

// InitOpenAPIHandles init openapi handlers.
func (s *Server) InitOpenAPIHandles() error {
func (s *Server) InitOpenAPIHandles(tlsCfg *tls.Config) error {
swagger, err := openapi.GetSwagger()
if err != nil {
return err
Expand All @@ -77,7 +101,7 @@ func (s *Server) InitOpenAPIHandles() error {
// middlewares
r.Use(gin.Recovery())
r.Use(openapi.ZapLogger(log.L().WithFields(zap.String("component", "openapi")).Logger))
r.Use(s.redirectRequestToLeaderMW())
r.Use(s.reverseRequestToLeaderMW(tlsCfg))
r.Use(terrorHTTPErrorHandler())
// use validation middleware to check all requests against the OpenAPI schema.
r.Use(ginmiddleware.OapiRequestValidator(swagger))
Expand Down
160 changes: 156 additions & 4 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
package master

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/DATA-DOG/go-sqlmock"
"github.com/deepmap/oapi-codegen/pkg/testutil"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -75,7 +81,7 @@ func (t *openAPISuite) SetUpTest(c *check.C) {
c.Assert(ha.ClearTestInfoOperation(t.etcdTestCli), check.IsNil)
}

func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
func (t *openAPISuite) TestReverseRequestToLeader(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -134,9 +140,155 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
c.Assert(resultListSource.Data, check.HasLen, 0)
c.Assert(resultListSource.Total, check.Equals, 0)

// list source not from leader will get a redirect
result2 := testutil.NewRequest().Get(baseURL).GoWithHTTPHandler(t.testT, s2.openapiHandles)
c.Assert(result2.Code(), check.Equals, http.StatusTemporaryRedirect)
// list source from non-leader will get result too
result2, err := HTTPTestWithTestResponseRecorder(testutil.NewRequest().Get(baseURL), s2.openapiHandles)
c.Assert(err, check.IsNil)
c.Assert(result2.Code(), check.Equals, http.StatusOK)
var resultListSource2 openapi.GetSourceListResponse
c.Assert(result2.UnmarshalBodyToObject(&resultListSource2), check.IsNil)
c.Assert(resultListSource2.Data, check.HasLen, 0)
c.Assert(resultListSource2.Total, check.Equals, 0)
}

func (t *openAPISuite) TestReverseRequestToHttpsLeader(c *check.C) {
pwd, err := os.Getwd()
require.NoError(t.testT, err)
caPath := pwd + "/tls_for_test/ca.pem"
certPath := pwd + "/tls_for_test/dm.pem"
keyPath := pwd + "/tls_for_test/dm.key"

// master1
masterAddr1 := tempurl.Alloc()[len("http://"):]
peerAddr1 := tempurl.Alloc()[len("http://"):]
cfg1 := NewConfig()
require.NoError(t.testT, cfg1.Parse([]string{
"--name=dm-master-tls-1",
fmt.Sprintf("--data-dir=%s", t.testT.TempDir()),
fmt.Sprintf("--master-addr=https://%s", masterAddr1),
fmt.Sprintf("--advertise-addr=https://%s", masterAddr1),
fmt.Sprintf("--peer-urls=https://%s", peerAddr1),
fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr1),
fmt.Sprintf("--initial-cluster=dm-master-tls-1=https://%s", peerAddr1),
"--ssl-ca=" + caPath,
"--ssl-cert=" + certPath,
"--ssl-key=" + keyPath,
}))
cfg1.OpenAPI = true
s1 := NewServer(cfg1)
ctx1, cancel1 := context.WithCancel(context.Background())
require.NoError(t.testT, s1.Start(ctx1))
defer func() {
cancel1()
s1.Close()
}()
// wait the first one become the leader
require.True(t.testT, utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s1.election.IsLeader() && s1.scheduler.Started()
}))

// master2
masterAddr2 := tempurl.Alloc()[len("http://"):]
peerAddr2 := tempurl.Alloc()[len("http://"):]
cfg2 := NewConfig()
require.NoError(t.testT, cfg2.Parse([]string{
"--name=dm-master-tls-2",
fmt.Sprintf("--data-dir=%s", t.testT.TempDir()),
fmt.Sprintf("--master-addr=https://%s", masterAddr2),
fmt.Sprintf("--advertise-addr=https://%s", masterAddr2),
fmt.Sprintf("--peer-urls=https://%s", peerAddr2),
fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr2),
"--ssl-ca=" + caPath,
"--ssl-cert=" + certPath,
"--ssl-key=" + keyPath,
}))
cfg2.OpenAPI = true
cfg2.Join = s1.cfg.MasterAddr // join to an existing cluster
s2 := NewServer(cfg2)
ctx2, cancel2 := context.WithCancel(context.Background())
require.NoError(t.testT, s2.Start(ctx2))
defer func() {
cancel2()
s2.Close()
}()
// wait the second master ready
require.False(t.testT, utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s2.election.IsLeader()
}))

baseURL := "/api/v1/sources"
// list source from leader
result := testutil.NewRequest().Get(baseURL).GoWithHTTPHandler(t.testT, s1.openapiHandles)
require.Equal(t.testT, http.StatusOK, result.Code())
var resultListSource openapi.GetSourceListResponse
require.NoError(t.testT, result.UnmarshalBodyToObject(&resultListSource))
require.Len(t.testT, resultListSource.Data, 0)
require.Equal(t.testT, 0, resultListSource.Total)

// with tls, list source not from leader will get result too
result, err = HTTPTestWithTestResponseRecorder(testutil.NewRequest().Get(baseURL), s2.openapiHandles)
require.NoError(t.testT, err)
require.Equal(t.testT, http.StatusOK, result.Code())
var resultListSource2 openapi.GetSourceListResponse
require.NoError(t.testT, result.UnmarshalBodyToObject(&resultListSource2))
require.Len(t.testT, resultListSource2.Data, 0)
require.Equal(t.testT, 0, resultListSource2.Total)

// without tls, list source not from leader will be 502
require.NoError(t.testT, failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockNotSetTls", `return()`))
result, err = HTTPTestWithTestResponseRecorder(testutil.NewRequest().Get(baseURL), s2.openapiHandles)
require.NoError(t.testT, err)
require.Equal(t.testT, http.StatusBadGateway, result.Code())
require.NoError(t.testT, failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockNotSetTls"))
}

// httptest.ResponseRecorder is not http.CloseNotifier, will panic when test reverse proxy.
// We need to implement the interface ourselves.
// ref: https://github.com/gin-gonic/gin/blob/ce20f107f5dc498ec7489d7739541a25dcd48463/context_test.go#L1747-L1765
type TestResponseRecorder struct {
*httptest.ResponseRecorder
closeChannel chan bool
}

func (r *TestResponseRecorder) CloseNotify() <-chan bool {
return r.closeChannel
}

func CreateTestResponseRecorder() *TestResponseRecorder {
return &TestResponseRecorder{
httptest.NewRecorder(),
make(chan bool, 1),
}
}

func HTTPTestWithTestResponseRecorder(r *testutil.RequestBuilder, handler http.Handler) (*testutil.CompletedRequest, error) {
if r == nil {
return nil, nil
}
if r.Error != nil {
return nil, r.Error
}
var bodyReader io.Reader
if r.Body != nil {
bodyReader = bytes.NewReader(r.Body)
}

req := httptest.NewRequest(r.Method, r.Path, bodyReader)
for h, v := range r.Headers {
req.Header.Add(h, v)
}
if host, ok := r.Headers["Host"]; ok {
req.Host = host
}
for _, c := range r.Cookies {
req.AddCookie(c)
}

rec := CreateTestResponseRecorder()
handler.ServeHTTP(rec, req)

return &testutil.CompletedRequest{
Recorder: rec.ResponseRecorder,
}, nil
}

func (t *openAPISuite) TestOpenAPIWillNotStartInDefaultConfig(c *check.C) {
Expand Down
35 changes: 31 additions & 4 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package master

import (
"context"
"database/sql"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -82,7 +83,11 @@ var (
registerOnce sync.Once
runBackgroundOnce sync.Once

checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
// the difference of below functions is checkAndAdjustSourceConfigForDMCtlFunc will not AdjustCaseSensitive. It's a
// compatibility compromise.
// When we need to change the implementation of dmctl to OpenAPI, we should notice the user about this change.
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfigForDMCtl
)

// Server handles RPC requests for dm-master.
Expand Down Expand Up @@ -192,7 +197,12 @@ func (s *Server) Start(ctx context.Context) (err error) {
"/debug/": getDebugHandler(),
}
if s.cfg.OpenAPI {
if initOpenAPIErr := s.InitOpenAPIHandles(); initOpenAPIErr != nil {
// tls3 is used to openapi reverse proxy
tls3, err1 := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
if err1 != nil {
return terror.ErrMasterTLSConfigNotValid.Delegate(err1)
}
if initOpenAPIErr := s.InitOpenAPIHandles(tls3.TLSConfig()); initOpenAPIErr != nil {
return terror.ErrOpenAPICommonError.Delegate(initOpenAPIErr)
}
userHandles["/api/v1/"] = s.openapiHandles
Expand Down Expand Up @@ -1199,15 +1209,19 @@ func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*conf
if err != nil {
return cfgs, err
}
if err := checkAndAdjustSourceConfigFunc(ctx, cfg); err != nil {
if err := checkAndAdjustSourceConfigForDMCtlFunc(ctx, cfg); err != nil {
return cfgs, err
}
cfgs[i] = cfg
}
return cfgs, nil
}

func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) error {
func innerCheckAndAdjustSourceConfig(
ctx context.Context,
cfg *config.SourceConfig,
hook func(sourceConfig *config.SourceConfig, ctx context.Context, db *sql.DB) error,
) error {
dbConfig := cfg.GenerateDBConfig()
fromDB, err := conn.DefaultDBProvider.Apply(dbConfig)
if err != nil {
Expand All @@ -1217,12 +1231,25 @@ func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) e
if err = cfg.Adjust(ctx, fromDB.DB); err != nil {
return err
}
if hook != nil {
if err = hook(cfg, ctx, fromDB.DB); err != nil {
return err
}
}
if _, err = cfg.Yaml(); err != nil {
return err
}
return cfg.Verify()
}

func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) error {
return innerCheckAndAdjustSourceConfig(ctx, cfg, (*config.SourceConfig).AdjustCaseSensitive)
}

func checkAndAdjustSourceConfigForDMCtl(ctx context.Context, cfg *config.SourceConfig) error {
return innerCheckAndAdjustSourceConfig(ctx, cfg, nil)
}

func parseSourceConfig(contents []string) ([]*config.SourceConfig, error) {
cfgs := make([]*config.SourceConfig, len(contents))
for i, content := range contents {
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ func (t *testMaster) SetUpSuite(c *check.C) {
t.workerClients = make(map[string]workerrpc.Client)
t.saveMaxRetryNum = maxRetryNum
maxRetryNum = 2
checkAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
checkAndAdjustSourceConfigForDMCtlFunc = checkAndNoAdjustSourceConfigMock
}

func (t *testMaster) TearDownSuite(c *check.C) {
maxRetryNum = t.saveMaxRetryNum
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfig
}

func (t *testMaster) SetUpTest(c *check.C) {
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (w *SourceWorker) UpdateSubTask(ctx context.Context, cfg *config.SubTaskCon
return st.Update(ctx, cfg)
}

// OperateSubTask stop/resume/pause sub task.
// OperateSubTask stop/resume/pause sub task.
func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error {
w.Lock()
defer w.Unlock()
Expand Down
Loading

0 comments on commit ba5ad3a

Please sign in to comment.