Skip to content

Commit

Permalink
owner(ticdc): Add support for region-label to enable meta-region isol…
Browse files Browse the repository at this point in the history
…ation (#4937) (#5353)

close #4756, close #4762
  • Loading branch information
ti-chi-bot committed Jun 21, 2022
1 parent b373dbb commit 01dc212
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 14 deletions.
14 changes: 11 additions & 3 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/version"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
Expand All @@ -60,7 +60,7 @@ type Capture struct {
kvStorage tidbkv.Storage
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer
TimeAcquirer pdutil.TimeAcquirer

cancel context.CancelFunc

Expand Down Expand Up @@ -116,7 +116,7 @@ func (c *Capture) reset(ctx context.Context) error {
if c.TimeAcquirer != nil {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)
c.TimeAcquirer = pdutil.NewTimeAcquirer(c.pdClient)

if c.grpcPool != nil {
c.grpcPool.Close()
Expand Down Expand Up @@ -269,6 +269,14 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
return cerror.ErrCaptureSuicide.GenWithStackByArgs()
}

// Update meta-region label to ensure that meta region isolated from data regions.
err = pdutil.UpdateMetaLabel(ctx, c.pdClient)
if err != nil {
log.Warn("Fail to verify region label rule",
zap.Error(err),
zap.String("captureID", c.info.ID))
}

log.Info("campaign owner successfully", zap.String("capture-id", c.info.ID))
owner := c.newOwner(c.pdClient)
c.setOwner(owner)
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -227,7 +227,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
TimeAcquirer: pdutil.NewTimeAcquirer4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Expand Down
6 changes: 3 additions & 3 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand All @@ -39,7 +39,7 @@ type GlobalVars struct {
CaptureInfo *model.CaptureInfo
EtcdClient *etcd.CDCEtcdClient
GrpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer
TimeAcquirer pdutil.TimeAcquirer
}

// ChangefeedVars contains some vars which can be used anywhere in a pipeline
Expand Down Expand Up @@ -187,7 +187,7 @@ func NewContext4Test(baseCtx context.Context, withChangefeedVars bool) Context {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
TimeAcquirer: pdutil.NewTimeAcquirer4Test(),
})
if withChangefeedVars {
ctx = WithChangefeedVars(ctx, &ChangefeedVars{
Expand Down
34 changes: 34 additions & 0 deletions pkg/httputil/httputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
package httputil

import (
"context"
"io"
"net/http"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/pkg/security"
)

Expand Down Expand Up @@ -43,3 +46,34 @@ func NewClient(credential *security.Credential) (*Client, error) {
Client: http.Client{Transport: transport},
}, nil
}

// DoRequest sends an request and returns an HTTP response content.
func (c *Client) DoRequest(
ctx context.Context, url, method string, headers http.Header, body io.Reader,
) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, errors.Trace(err)
}

for key, values := range headers {
for _, v := range values {
req.Header.Add(key, v)
}
}

resp, err := c.Do(req)
if err != nil {
return nil, errors.Trace(err)
}
defer resp.Body.Close()

content, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Trace(err)
}
if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("[%d] %s", resp.StatusCode, content)
}
return content, nil
}
2 changes: 1 addition & 1 deletion pkg/pdtime/acquirer.go → pkg/pdutil/acquirer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pdtime
package pdutil

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pdtime
package pdutil

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion pkg/pdtime/main_test.go → pkg/pdutil/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pdtime
package pdutil

import (
"testing"
Expand Down
116 changes: 116 additions & 0 deletions pkg/pdutil/region_label.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pdutil

import (
"bytes"
"context"
"net/http"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/httputil"
"github.com/pingcap/tiflow/pkg/retry"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

const (
regionLabelPrefix = "/pd/api/v1/config/region-label/rules"

// Split the default rule by `6e000000000000000000f8` to keep metadata region
// isolated from the normal data area.
addMetaJSON = `{
"sets": [
{
"id": "ticdc/meta",
"labels": [
{
"key": "data-type",
"value": "meta"
}
],
"rule_type": "key-range",
"data": [
{
"start_key": "6d00000000000000f8",
"end_key": "6e00000000000000f8"
}
]
}
]
}`
)

var defaultMaxRetry uint64 = 3

// pdAPIClient is api client of Placement Driver.
type pdAPIClient struct {
pdClient pd.Client
dialClient *httputil.Client
}

// newPDApiClient create a new pdAPIClient.
func newPDApiClient(pdClient pd.Client) (*pdAPIClient, error) {
conf := config.GetGlobalServerConfig()
dialClient, err := httputil.NewClient(conf.Security)
if err != nil {
return nil, errors.Trace(err)
}
return &pdAPIClient{
pdClient: pdClient,
dialClient: dialClient,
}, nil
}

// UpdateMetaLabel is a reentrant function that updates the meta-region label of upstream cluster.
func UpdateMetaLabel(ctx context.Context, pdClient pd.Client) error {
pc, err := newPDApiClient(pdClient)
if err != nil {
return err
}
defer pc.dialClient.CloseIdleConnections()

err = retry.Do(ctx, func() error {
err = pc.patchMetaLabel(ctx)
if err != nil {
log.Error("Fail to add meta region label to PD", zap.Error(err))
return err
}

log.Info("Succeed to add meta region label to PD")
return nil
}, retry.WithMaxTries(defaultMaxRetry), retry.WithIsRetryableErr(func(err error) bool {
switch errors.Cause(err) {
case context.Canceled:
return false
}
return true
}))
return err
}

func (pc *pdAPIClient) patchMetaLabel(ctx context.Context) error {
url := pc.pdClient.GetLeaderAddr() + regionLabelPrefix
header := http.Header{"Content-Type": {"application/json"}}
content := []byte(addMetaJSON)

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err := pc.dialClient.DoRequest(ctx, url, http.MethodPatch,
header, bytes.NewReader(content))
return errors.Trace(err)
}
85 changes: 85 additions & 0 deletions pkg/pdutil/region_lable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pdutil

import (
"context"
"net/http"
"net/http/httptest"
"testing"

cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
)

type mockPDClient struct {
pd.Client
testServer *httptest.Server
url string
}

func (m *mockPDClient) GetLeaderAddr() string {
return m.url
}

func newMockPDClient(normal bool) *mockPDClient {
mock := &mockPDClient{}
status := http.StatusOK
if !normal {
status = http.StatusNotFound
}
mock.testServer = httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(status)
},
))
mock.url = mock.testServer.URL

return mock
}

func TestMetaLabelNormal(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockClient := newMockPDClient(true)

err := UpdateMetaLabel(ctx, mockClient)
require.Nil(t, err)
mockClient.testServer.Close()
}

func TestMetaLabelFail(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockClient := newMockPDClient(false)
pc, err := newPDApiClient(mockClient)
require.Nil(t, err)
mockClient.url = "http://127.0.1.1:2345"

// test url error
err = pc.patchMetaLabel(ctx)
require.NotNil(t, err)

// test 404
mockClient.url = mockClient.testServer.URL
err = pc.patchMetaLabel(ctx)
require.Regexp(t, ".*404.*", err)

err = UpdateMetaLabel(ctx, mockClient)
require.ErrorIs(t, err, cerror.ErrReachMaxTry)
mockClient.testServer.Close()
}
6 changes: 3 additions & 3 deletions pkg/txnutil/gc/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ import (
"testing"
"time"

"github.com/pingcap/tiflow/pkg/pdtime"

"github.com/pingcap/check"
"github.com/pingcap/errors"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/tikv/client-go/v2/oracle"
)
Expand Down Expand Up @@ -96,7 +95,8 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) {
gcManager.isTiCDCBlockGC = true
ctx := context.Background()

TimeAcquirer := pdtime.NewTimeAcquirer(mockPDClient)
TimeAcquirer := pdutil.NewTimeAcquirer(mockPDClient)

go TimeAcquirer.Run(ctx)
time.Sleep(1 * time.Second)
defer TimeAcquirer.Stop()
Expand Down
Loading

0 comments on commit 01dc212

Please sign in to comment.