From bbe7c3b1318d0d92103c9cbf9dcf60ea11b3b733 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Sat, 7 May 2022 17:39:03 +0800 Subject: [PATCH] owner(ticdc): Add support for region-label to enable meta-region isolation (#4937) close pingcap/tiflow#4756, close pingcap/tiflow#4762 --- cdc/capture/capture.go | 14 ++- cdc/owner/changefeed_test.go | 4 +- pkg/context/context.go | 6 +- pkg/httputil/httputil.go | 34 +++++++ pkg/{pdtime => pdutil}/acquirer.go | 2 +- pkg/{pdtime => pdutil}/acquirer_test.go | 2 +- pkg/{pdtime => pdutil}/main_test.go | 2 +- pkg/pdutil/region_label.go | 116 ++++++++++++++++++++++++ pkg/pdutil/region_lable_test.go | 85 +++++++++++++++++ pkg/txnutil/gc/gc_manager_test.go | 6 +- 10 files changed, 257 insertions(+), 14 deletions(-) rename pkg/{pdtime => pdutil}/acquirer.go (99%) rename pkg/{pdtime => pdutil}/acquirer_test.go (98%) rename pkg/{pdtime => pdutil}/main_test.go (97%) create mode 100644 pkg/pdutil/region_label.go create mode 100644 pkg/pdutil/region_lable_test.go diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 419f3e6b7cf..145e1422cf8 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -34,7 +34,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/version" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3/concurrency" @@ -60,7 +60,7 @@ type Capture struct { kvStorage tidbkv.Storage etcdClient *etcd.CDCEtcdClient grpcPool kv.GrpcPool - TimeAcquirer pdtime.TimeAcquirer + TimeAcquirer pdutil.TimeAcquirer cancel context.CancelFunc @@ -112,7 +112,7 @@ func (c *Capture) reset(ctx context.Context) error { if c.TimeAcquirer != nil { c.TimeAcquirer.Stop() } - c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient) + c.TimeAcquirer = pdutil.NewTimeAcquirer(c.pdClient) if c.grpcPool != nil { c.grpcPool.Close() @@ -265,6 +265,14 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { return cerror.ErrCaptureSuicide.GenWithStackByArgs() } + // Update meta-region label to ensure that meta region isolated from data regions. + err = pdutil.UpdateMetaLabel(ctx, c.pdClient) + if err != nil { + log.Warn("Fail to verify region label rule", + zap.Error(err), + zap.String("captureID", c.info.ID)) + } + log.Info("campaign owner successfully", zap.String("capture-id", c.info.ID)) owner := c.newOwner(c.pdClient) c.setOwner(owner) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 6a9f9657d66..5135ae2bb70 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -29,7 +29,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" "github.com/pingcap/tiflow/pkg/orchestrator" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/pingcap/tiflow/pkg/version" @@ -227,7 +227,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { AdvertiseAddr: "127.0.0.1:0000", Version: version.ReleaseVersion, }, - TimeAcquirer: pdtime.NewTimeAcquirer4Test(), + TimeAcquirer: pdutil.NewTimeAcquirer4Test(), }) ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: "changefeed-id-test", diff --git a/pkg/context/context.go b/pkg/context/context.go index 863f44051ad..3de8216166d 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/etcd" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -39,7 +39,7 @@ type GlobalVars struct { CaptureInfo *model.CaptureInfo EtcdClient *etcd.CDCEtcdClient GrpcPool kv.GrpcPool - TimeAcquirer pdtime.TimeAcquirer + TimeAcquirer pdutil.TimeAcquirer } // ChangefeedVars contains some vars which can be used anywhere in a pipeline @@ -187,7 +187,7 @@ func NewContext4Test(baseCtx context.Context, withChangefeedVars bool) Context { AdvertiseAddr: "127.0.0.1:0000", Version: version.ReleaseVersion, }, - TimeAcquirer: pdtime.NewTimeAcquirer4Test(), + TimeAcquirer: pdutil.NewTimeAcquirer4Test(), }) if withChangefeedVars { ctx = WithChangefeedVars(ctx, &ChangefeedVars{ diff --git a/pkg/httputil/httputil.go b/pkg/httputil/httputil.go index 012a84dc4d6..fa1b077516a 100644 --- a/pkg/httputil/httputil.go +++ b/pkg/httputil/httputil.go @@ -14,8 +14,11 @@ package httputil import ( + "context" + "io" "net/http" + "github.com/pingcap/errors" "github.com/pingcap/tiflow/pkg/security" ) @@ -43,3 +46,34 @@ func NewClient(credential *security.Credential) (*Client, error) { Client: http.Client{Transport: transport}, }, nil } + +// DoRequest sends an request and returns an HTTP response content. +func (c *Client) DoRequest( + ctx context.Context, url, method string, headers http.Header, body io.Reader, +) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, method, url, body) + if err != nil { + return nil, errors.Trace(err) + } + + for key, values := range headers { + for _, v := range values { + req.Header.Add(key, v) + } + } + + resp, err := c.Do(req) + if err != nil { + return nil, errors.Trace(err) + } + defer resp.Body.Close() + + content, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.Trace(err) + } + if resp.StatusCode != http.StatusOK { + return nil, errors.Errorf("[%d] %s", resp.StatusCode, content) + } + return content, nil +} diff --git a/pkg/pdtime/acquirer.go b/pkg/pdutil/acquirer.go similarity index 99% rename from pkg/pdtime/acquirer.go rename to pkg/pdutil/acquirer.go index 4f9a3ad93ad..3d7de75e6bf 100644 --- a/pkg/pdtime/acquirer.go +++ b/pkg/pdutil/acquirer.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pdtime +package pdutil import ( "context" diff --git a/pkg/pdtime/acquirer_test.go b/pkg/pdutil/acquirer_test.go similarity index 98% rename from pkg/pdtime/acquirer_test.go rename to pkg/pdutil/acquirer_test.go index 55b2950192e..02b37b0ba54 100644 --- a/pkg/pdtime/acquirer_test.go +++ b/pkg/pdutil/acquirer_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pdtime +package pdutil import ( "context" diff --git a/pkg/pdtime/main_test.go b/pkg/pdutil/main_test.go similarity index 97% rename from pkg/pdtime/main_test.go rename to pkg/pdutil/main_test.go index 24c9b4e26ad..bee2202fb0d 100644 --- a/pkg/pdtime/main_test.go +++ b/pkg/pdutil/main_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pdtime +package pdutil import ( "testing" diff --git a/pkg/pdutil/region_label.go b/pkg/pdutil/region_label.go new file mode 100644 index 00000000000..3aa513c0d4d --- /dev/null +++ b/pkg/pdutil/region_label.go @@ -0,0 +1,116 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "bytes" + "context" + "net/http" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/httputil" + "github.com/pingcap/tiflow/pkg/retry" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" +) + +const ( + regionLabelPrefix = "/pd/api/v1/config/region-label/rules" + + // Split the default rule by `6e000000000000000000f8` to keep metadata region + // isolated from the normal data area. + addMetaJSON = `{ + "sets": [ + { + "id": "ticdc/meta", + "labels": [ + { + "key": "data-type", + "value": "meta" + } + ], + "rule_type": "key-range", + "data": [ + { + "start_key": "6d00000000000000f8", + "end_key": "6e00000000000000f8" + } + ] + } + ] + }` +) + +var defaultMaxRetry int64 = 3 + +// pdAPIClient is api client of Placement Driver. +type pdAPIClient struct { + pdClient pd.Client + dialClient *httputil.Client +} + +// newPDApiClient create a new pdAPIClient. +func newPDApiClient(ctx context.Context, pdClient pd.Client) (*pdAPIClient, error) { + conf := config.GetGlobalServerConfig() + dialClient, err := httputil.NewClient(conf.Security) + if err != nil { + return nil, errors.Trace(err) + } + return &pdAPIClient{ + pdClient: pdClient, + dialClient: dialClient, + }, nil +} + +// UpdateMetaLabel is a reentrant function that updates the meta-region label of upstream cluster. +func UpdateMetaLabel(ctx context.Context, pdClient pd.Client) error { + pc, err := newPDApiClient(ctx, pdClient) + if err != nil { + return err + } + defer pc.dialClient.CloseIdleConnections() + + err = retry.Do(ctx, func() error { + err = pc.patchMetaLabel(ctx) + if err != nil { + log.Error("Fail to add meta region label to PD", zap.Error(err)) + return err + } + + log.Info("Succeed to add meta region label to PD") + return nil + }, retry.WithMaxTries(defaultMaxRetry), retry.WithIsRetryableErr(func(err error) bool { + switch errors.Cause(err) { + case context.Canceled: + return false + } + return true + })) + return err +} + +func (pc *pdAPIClient) patchMetaLabel(ctx context.Context) error { + url := pc.pdClient.GetLeaderAddr() + regionLabelPrefix + header := http.Header{"Content-Type": {"application/json"}} + content := []byte(addMetaJSON) + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + _, err := pc.dialClient.DoRequest(ctx, url, http.MethodPatch, + header, bytes.NewReader(content)) + return errors.Trace(err) +} diff --git a/pkg/pdutil/region_lable_test.go b/pkg/pdutil/region_lable_test.go new file mode 100644 index 00000000000..44b107083e3 --- /dev/null +++ b/pkg/pdutil/region_lable_test.go @@ -0,0 +1,85 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" +) + +type mockPDClient struct { + pd.Client + testServer *httptest.Server + url string +} + +func (m *mockPDClient) GetLeaderAddr() string { + return m.url +} + +func newMockPDClient(ctx context.Context, normal bool) *mockPDClient { + mock := &mockPDClient{} + status := http.StatusOK + if !normal { + status = http.StatusNotFound + } + mock.testServer = httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(status) + }, + )) + mock.url = mock.testServer.URL + + return mock +} + +func TestMetaLabelNormal(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockClient := newMockPDClient(ctx, true) + + err := UpdateMetaLabel(ctx, mockClient) + require.Nil(t, err) + mockClient.testServer.Close() +} + +func TestMetaLabelFail(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockClient := newMockPDClient(ctx, false) + pc, err := newPDApiClient(ctx, mockClient) + require.Nil(t, err) + mockClient.url = "http://127.0.1.1:2345" + + // test url error + err = pc.patchMetaLabel(ctx) + require.NotNil(t, err) + + // test 404 + mockClient.url = mockClient.testServer.URL + err = pc.patchMetaLabel(ctx) + require.Regexp(t, ".*404.*", err) + + err = UpdateMetaLabel(ctx, mockClient) + require.ErrorIs(t, err, cerror.ErrReachMaxTry) + mockClient.testServer.Close() +} diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 368f879cd39..7bbdb6e5700 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -18,12 +18,11 @@ import ( "testing" "time" - "github.com/pingcap/tiflow/pkg/pdtime" - "github.com/pingcap/check" "github.com/pingcap/errors" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/tikv/client-go/v2/oracle" ) @@ -96,7 +95,8 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) { gcManager.isTiCDCBlockGC = true ctx := context.Background() - TimeAcquirer := pdtime.NewTimeAcquirer(mockPDClient) + TimeAcquirer := pdutil.NewTimeAcquirer(mockPDClient) + go TimeAcquirer.Run(ctx) time.Sleep(1 * time.Second) defer TimeAcquirer.Stop()