Skip to content

Commit

Permalink
owner(ticdc): Add support for region-label to enable meta-region isol…
Browse files Browse the repository at this point in the history
…ation (#4937)

close #4756, close #4762
  • Loading branch information
CharlesCheung96 committed May 7, 2022
1 parent 063b528 commit dadad88
Show file tree
Hide file tree
Showing 15 changed files with 309 additions and 54 deletions.
6 changes: 3 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/txnutil"
Expand Down Expand Up @@ -314,7 +314,7 @@ type CDCClient struct {

regionCache *tikv.RegionCache
kvStorage tikv.Storage
pdClock pdtime.Clock
pdClock pdutil.Clock
changefeed model.ChangeFeedID

regionLimiters *regionEventFeedLimiters
Expand All @@ -327,7 +327,7 @@ func NewCDCClient(
kvStorage tikv.Storage,
grpcPool GrpcPool,
regionCache *tikv.RegionCache,
pdClock pdtime.Clock,
pdClock pdutil.Clock,
changefeed model.ChangeFeedID,
cfg *config.KVClientConfig,
) (c CDCKVClient) {
Expand Down
6 changes: 3 additions & 3 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
Expand Down Expand Up @@ -203,7 +203,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 1000000)
Expand Down Expand Up @@ -300,7 +300,7 @@ func prepareBench(b *testing.B, regionNum int) (
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 1000000)
Expand Down
54 changes: 27 additions & 27 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestNewClient(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cli := NewCDCClient(
context.Background(), pdClient, nil, grpcPool, regionCache, pdtime.NewClock4Test(),
context.Background(), pdClient, nil, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
require.NotNil(t, cli)
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestConnectOfflineTiKV(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
context.Background(), pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
context.Background(), pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
// Take care of the eventCh, it's used to output resolvedTs event or kv event
Expand Down Expand Up @@ -427,7 +427,7 @@ func TestRecvLargeMessageSize(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -530,7 +530,7 @@ func TestHandleError(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -692,7 +692,7 @@ func TestCompatibilityWithSameConn(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -763,7 +763,7 @@ func TestClusterIDMismatch(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -835,7 +835,7 @@ func testHandleFeedEvent(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func TestStreamSendWithError(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -1409,7 +1409,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -1543,7 +1543,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -1754,7 +1754,7 @@ func TestIncompatibleTiKV(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
// NOTICE: eventCh may block the main logic of EventFeed
Expand Down Expand Up @@ -1835,7 +1835,7 @@ func TestNoPendingRegionError(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -1917,7 +1917,7 @@ func TestDropStaleRequest(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -2031,7 +2031,7 @@ func TestResolveLock(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -2137,7 +2137,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -2293,7 +2293,7 @@ func testEventAfterFeedStop(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -2477,7 +2477,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -2696,7 +2696,7 @@ func TestResolveLockNoCandidate(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -2795,7 +2795,7 @@ func TestFailRegionReentrant(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -2880,7 +2880,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -2950,7 +2950,7 @@ func testClientErrNoPendingRegion(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -3031,7 +3031,7 @@ func testKVClientForceReconnect(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -3183,7 +3183,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 100)
Expand Down Expand Up @@ -3303,7 +3303,7 @@ func TestEvTimeUpdate(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -3428,7 +3428,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -3523,7 +3523,7 @@ func TestPrewriteNotMatchError(t *testing.T) {
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(),
ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(),
model.DefaultChangeFeedID(""),
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down
4 changes: 2 additions & 2 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/puller/frontier"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -72,7 +72,7 @@ func NewPuller(
grpcPool kv.GrpcPool,
regionCache *tikv.RegionCache,
kvStorage tidbkv.Storage,
pdClock pdtime.Clock,
pdClock pdutil.Clock,
changefeed model.ChangeFeedID,
checkpointTs uint64,
spans []regionspan.Span,
Expand Down
6 changes: 3 additions & 3 deletions cdc/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
Expand Down Expand Up @@ -61,7 +61,7 @@ func newMockCDCKVClient(
kvStorage tikv.Storage,
grpcPool kv.GrpcPool,
regionCache *tikv.RegionCache,
pdClock pdtime.Clock,
pdClock pdutil.Clock,
changefeed model.ChangeFeedID,
cfg *config.KVClientConfig,
) kv.CDCKVClient {
Expand Down Expand Up @@ -127,7 +127,7 @@ func newPullerForTest(
regionCache := tikv.NewRegionCache(pdCli)
defer regionCache.Close()
plr := NewPuller(
ctx, pdCli, grpcPool, regionCache, store, pdtime.NewClock4Test(),
ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(),
model.DefaultChangeFeedID("changefeed-id-test"),
checkpointTs, spans, config.GetDefaultServerConfig().KVClient)
wg.Add(1)
Expand Down
32 changes: 32 additions & 0 deletions pkg/httputil/httputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package httputil

import (
"context"
"io"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -86,6 +87,37 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}

// DoRequest sends an request and returns an HTTP response content.
func (c *Client) DoRequest(
ctx context.Context, url, method string, headers http.Header, body io.Reader,
) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, errors.Trace(err)
}

for key, values := range headers {
for _, v := range values {
req.Header.Add(key, v)
}
}

resp, err := c.Do(req)
if err != nil {
return nil, errors.Trace(err)
}
defer resp.Body.Close()

content, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Trace(err)
}
if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("[%d] %s", resp.StatusCode, content)
}
return content, nil
}

// CloseIdleConnections closes any connections are now sitting idle.
// See http.Client.CloseIdleConnections.
func (c *Client) CloseIdleConnections() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pdtime/clock.go → pkg/pdutil/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pdtime
package pdutil

import (
"context"
Expand Down
Loading

0 comments on commit dadad88

Please sign in to comment.