Skip to content

Commit

Permalink
Merge branch 'release-5.3' into cherry-pick-4937-to-release-5.3
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 16, 2022
2 parents 5fbb966 + c6fdb3b commit 56bcf38
Show file tree
Hide file tree
Showing 89 changed files with 1,653 additions and 353 deletions.
18 changes: 18 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func (o *Owner) WriteDebugInfo(w io.Writer) {
// AsyncStop stops the owner asynchronously
func (o *Owner) AsyncStop() {
atomic.StoreInt32(&o.closed, 1)
o.cleanStaleMetrics()
}

func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) {
Expand Down Expand Up @@ -273,6 +274,7 @@ func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) {
// Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it.
func (o *Owner) Bootstrap(state *orchestrator.GlobalReactorState) {
log.Info("Start bootstrapping", zap.Any("state", state))
o.cleanStaleMetrics()
fixChangefeedInfos(state)
}

Expand All @@ -281,13 +283,28 @@ func fixChangefeedInfos(state *orchestrator.GlobalReactorState) {
for _, changefeedState := range state.Changefeeds {
if changefeedState != nil {
changefeedState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
}
info.FixIncompatible()
return info, true, nil
})
}
}
}

func (o *Owner) cleanStaleMetrics() {
// The gauge metrics of the Owner should be reset
// each time a new owner is launched, in case the previous owner
// has crashed and has not cleaned up the stale metrics values.
changefeedCheckpointTsGauge.Reset()
changefeedCheckpointTsLagGauge.Reset()
changefeedResolvedTsGauge.Reset()
changefeedResolvedTsLagGauge.Reset()
ownerMaintainTableNumGauge.Reset()
changefeedStatusGauge.Reset()
}

func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) {
// Keep the value of prometheus expression `rate(counter)` = 1
// Please also change alert rule in ticdc.rules.yml when change the expression value.
Expand All @@ -297,6 +314,7 @@ func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) {

ownerMaintainTableNumGauge.Reset()
changefeedStatusGauge.Reset()

for changefeedID, changefeedState := range state.Changefeeds {
for captureID, captureInfo := range state.Captures {
taskStatus, exist := changefeedState.TaskStatuses[captureID]
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Init(tableID model.TableID) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down
6 changes: 5 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,11 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
tableNameStr = tableName.QuoteString()
}

sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs, p.redoManager)
sink, err := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs, p.redoManager)
if err != nil {
return nil, errors.Trace(err)
}

table := tablepipeline.NewTablePipeline(
ctx,
p.mounter,
Expand Down
31 changes: 23 additions & 8 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const (
// RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information
// layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s"
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
)

// InitS3storage init a storage used for s3,
// s3URI should be like s3URI="s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"
var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
Expand Down Expand Up @@ -58,6 +67,13 @@ var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStor
return s3storage, nil
}

// logFormat2ParseFormat converts redo log file name format to the space separated
// format, which can be read and parsed by sscanf. Besides remove the suffix `%s`
// which is used as file name extension, since we will parse extension first.
func logFormat2ParseFormat(fmtStr string) string {
return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s")
}

// ParseLogFileName extract the commitTs, fileType from log fileName
func ParseLogFileName(name string) (uint64, string, error) {
ext := filepath.Ext(name)
Expand All @@ -66,7 +82,9 @@ func ParseLogFileName(name string) (uint64, string, error) {
}

// if .sort, the name should be like
// fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), LogEXT)+SortLogEXT
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT
if ext == SortLogEXT {
name = strings.TrimSuffix(name, SortLogEXT)
ext = filepath.Ext(name)
Expand All @@ -75,15 +93,12 @@ func ParseLogFileName(name string) (uint64, string, error) {
return 0, "", nil
}

var commitTs, d1 uint64
var s1, s2, fileType string
var commitTs uint64
var s1, s2, fileType, uid string
// the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
formatStr := "%s %s %d %s %d" + LogEXT
if ext == TmpEXT {
formatStr += TmpEXT
}
formatStr := logFormat2ParseFormat(RedoLogFileFormatV1)
name = strings.ReplaceAll(name, "_", " ")
_, err := fmt.Sscanf(name, formatStr, &s1, &s2, &d1, &fileType, &commitTs)
_, err := fmt.Sscanf(name, formatStr, &s1, &s2, &fileType, &commitTs, &uid)
if err != nil {
return 0, "", errors.Annotatef(err, "bad log name: %s", name)
}
Expand Down
77 changes: 69 additions & 8 deletions cdc/redo/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package common
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func TestParseLogFileName(t *testing.T) {
type arg struct {
name string
}
// the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
tests := []struct {
name string
args arg
Expand All @@ -36,39 +35,99 @@ func TestParseLogFileName(t *testing.T) {
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT),
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT),
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + SortLogEXT,
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
Expand All @@ -90,7 +149,9 @@ func TestParseLogFileName(t *testing.T) {
{
name: "err wrong format ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf("%s_%s_%s_%d%s%s", /* a wrong format */
"cp", "test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantErr: ".*bad log name*.",
},
Expand Down
34 changes: 32 additions & 2 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -117,6 +118,11 @@ type ManagerOptions struct {
type cacheRows struct {
tableID model.TableID
rows []*model.RowChangedEvent
// When calling FlushLog for a table, we must ensure that all data of this
// table has been written to underlying writer. Since the EmitRowChangedEvents
// and FlushLog of the same table can't be executed concurrently, we can
// insert a simple barrier data into data stream to achieve this goal.
flushCallback chan struct{}
}

// ManagerImpl manages redo log writer, buffers un-persistent redo logs, calculates
Expand Down Expand Up @@ -214,6 +220,9 @@ func (m *ManagerImpl) Enabled() bool {
// error ErrBufferLogTimeout will be returned.
// TODO: if the API is truly non-blocking, we should return an error immediatel
// when the log buffer channel is full.
// TODO: After buffer sink in sink node is removed, there is no batch mechanism
// before sending row changed events to redo manager, the original log buffer
// design may have performance issue.
func (m *ManagerImpl) EmitRowChangedEvents(
ctx context.Context,
tableID model.TableID,
Expand Down Expand Up @@ -248,6 +257,20 @@ func (m *ManagerImpl) FlushLog(
return nil
}
defer atomic.StoreInt64(&m.flushing, 0)

// Adding a barrier to data stream, to ensure all logs of this table has been
// written to underlying writer.
flushCallbackCh := make(chan struct{})
m.logBuffer <- cacheRows{
tableID: tableID,
flushCallback: flushCallbackCh,
}
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-flushCallbackCh:
}

return m.writer.FlushLog(ctx, tableID, resolvedTs)
}

Expand Down Expand Up @@ -322,8 +345,11 @@ func (m *ManagerImpl) updateTableResolvedTs(ctx context.Context) error {
return err
}
minResolvedTs := uint64(math.MaxUint64)
for tableID, rts := range rtsMap {
m.rtsMap[tableID] = rts
for tableID := range m.rtsMap {
if rts, ok := rtsMap[tableID]; ok {
m.rtsMap[tableID] = rts
}
rts := m.rtsMap[tableID]
if rts < minResolvedTs {
minResolvedTs = rts
}
Expand Down Expand Up @@ -359,6 +385,10 @@ func (m *ManagerImpl) bgWriteLog(ctx context.Context, errCh chan<- error) {
case <-ctx.Done():
return
case cache := <-m.logBuffer:
if cache.flushCallback != nil {
close(cache.flushCallback)
continue
}
logs := make([]*model.RedoRowChangedEvent, 0, len(cache.rows))
for _, row := range cache.rows {
logs = append(logs, RowToRedo(row))
Expand Down
Loading

0 comments on commit 56bcf38

Please sign in to comment.