Skip to content

Commit

Permalink
sink(ticdc): migrate test-infra to testify for cdc/sink (#5270)
Browse files Browse the repository at this point in the history
close #2907
  • Loading branch information
CharlesCheung96 committed May 6, 2022
1 parent 3e8e246 commit 6f91a84
Show file tree
Hide file tree
Showing 19 changed files with 834 additions and 716 deletions.
4 changes: 3 additions & 1 deletion cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func TestCleanBufferedData(t *testing.T) {
t.Parallel()

tblID := model.TableID(1)
b := newBufferSink(newBlackHoleSink(context.TODO()), 5)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
b := newBufferSink(newBlackHoleSink(ctx), 5)
b.buffer[tblID] = []*model.RowChangedEvent{}
_, ok := b.buffer[tblID]
require.True(t, ok)
Expand Down
122 changes: 120 additions & 2 deletions cdc/sink/codec/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ package codec
import (
"context"
"encoding/json"
"io"
"net/http"
"testing"
"time"

"github.com/jarcoal/httpmock"
"github.com/linkedin/goavro/v2"
"github.com/pingcap/check"
"github.com/pingcap/errors"
Expand All @@ -34,14 +38,16 @@ import (
"go.uber.org/zap"
)

func Test(t *testing.T) { check.TestingT(t) }

type avroBatchEncoderSuite struct {
encoder *AvroEventBatchEncoder
}

var _ = check.Suite(&avroBatchEncoderSuite{})

func (s *avroBatchEncoderSuite) SetUpSuite(c *check.C) {
startHTTPInterceptForTestingRegistry(c)
startAvroHTTPInterceptForTestingRegistry(c)

keyManager, err := NewAvroSchemaManager(context.Background(), &security.Credential{}, "http://127.0.0.1:8081", "-key")
c.Assert(err, check.IsNil)
Expand All @@ -57,7 +63,7 @@ func (s *avroBatchEncoderSuite) SetUpSuite(c *check.C) {
}

func (s *avroBatchEncoderSuite) TearDownSuite(c *check.C) {
stopHTTPInterceptForTestingRegistry()
stopAvroHTTPInterceptForTestingRegistry()
}

func setBinChsClnFlag(ft *types.FieldType) *types.FieldType {
Expand Down Expand Up @@ -379,3 +385,115 @@ func (s *avroBatchEncoderSuite) TestAvroEncode(c *check.C) {
err = s.encoder.AppendRowChangedEvent(testCaseUpdate)
c.Check(err, check.IsNil)
}

func startAvroHTTPInterceptForTestingRegistry(c *check.C) {
httpmock.Activate()

registry := mockRegistry{
subjects: make(map[string]*mockRegistrySchema),
newID: 1,
}

httpmock.RegisterResponder("GET", "http://127.0.0.1:8081", httpmock.NewStringResponder(200, "{}"))

httpmock.RegisterResponder("POST", `=~^http://127.0.0.1:8081/subjects/(.+)/versions`,
func(req *http.Request) (*http.Response, error) {
subject, err := httpmock.GetSubmatch(req, 1)
if err != nil {
return nil, err
}
reqBody, err := io.ReadAll(req.Body)
if err != nil {
return nil, err
}
var reqData registerRequest
err = json.Unmarshal(reqBody, &reqData)
if err != nil {
return nil, err
}

// c.Assert(reqData.SchemaType, check.Equals, "AVRO")

var respData registerResponse
registry.mu.Lock()
item, exists := registry.subjects[subject]
if !exists {
item = &mockRegistrySchema{
content: reqData.Schema,
version: 0,
ID: registry.newID,
}
registry.subjects[subject] = item
respData.ID = registry.newID
} else {
if item.content == reqData.Schema {
respData.ID = item.ID
} else {
item.content = reqData.Schema
item.version++
item.ID = registry.newID
respData.ID = registry.newID
}
}
registry.newID++
registry.mu.Unlock()
return httpmock.NewJsonResponse(200, &respData)
})

httpmock.RegisterResponder("GET", `=~^http://127.0.0.1:8081/subjects/(.+)/versions/latest`,
func(req *http.Request) (*http.Response, error) {
subject, err := httpmock.GetSubmatch(req, 1)
if err != nil {
return httpmock.NewStringResponse(500, "Internal Server Error"), err
}

registry.mu.Lock()
item, exists := registry.subjects[subject]
registry.mu.Unlock()
if !exists {
return httpmock.NewStringResponse(404, ""), nil
}

var respData lookupResponse
respData.Schema = item.content
respData.Name = subject
respData.RegistryID = item.ID

return httpmock.NewJsonResponse(200, &respData)
})

httpmock.RegisterResponder("DELETE", `=~^http://127.0.0.1:8081/subjects/(.+)`,
func(req *http.Request) (*http.Response, error) {
subject, err := httpmock.GetSubmatch(req, 1)
if err != nil {
return nil, err
}

registry.mu.Lock()
defer registry.mu.Unlock()
_, exists := registry.subjects[subject]
if !exists {
return httpmock.NewStringResponse(404, ""), nil
}

delete(registry.subjects, subject)
return httpmock.NewStringResponse(200, ""), nil
})

failCounter := 0
httpmock.RegisterResponder("POST", `=~^http://127.0.0.1:8081/may-fail`,
func(req *http.Request) (*http.Response, error) {
data, _ := io.ReadAll(req.Body)
c.Assert(len(data), check.Greater, 0)
c.Assert(int64(len(data)), check.Equals, req.ContentLength)
if failCounter < 3 {
failCounter++
return httpmock.NewStringResponse(422, ""), nil
}
return httpmock.NewStringResponse(200, ""), nil
})
}

func stopAvroHTTPInterceptForTestingRegistry() {
httpmock.DeactivateAndReset()
}
Loading

0 comments on commit 6f91a84

Please sign in to comment.