Skip to content

Commit

Permalink
pkg/sink(ticdc): iterate all Kafka configs to support KOP (#8893) (#8966
Browse files Browse the repository at this point in the history
)

close #8892
  • Loading branch information
ti-chi-bot committed May 17, 2023
1 parent c164be5 commit 5484207
Show file tree
Hide file tree
Showing 84 changed files with 601 additions and 481 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
run:
go: 1.18
go: 1.19
linters:
enable:
- unconvert
Expand Down
3 changes: 2 additions & 1 deletion cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ func decodeRowV1(b []byte, tableInfo *model.TableInfo, tz *time.Location) (map[i

// decodeRowV2 decodes value data using new encoding format.
// Ref: https://github.com/pingcap/tidb/pull/12634
// https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md
//
// https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md
func decodeRowV2(data []byte, columns []rowcodec.ColInfo, tz *time.Location) (map[int64]types.Datum, error) {
decoder := rowcodec.NewDatumMapDecoder(columns, tz)
datums, err := decoder.DecodeToDatumMap(data, nil)
Expand Down
4 changes: 2 additions & 2 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ func (s *snapshot) doDropTable(tbInfo *model.TableInfo, currentTs uint64) {

// truncateTable truncate the table with the given ID, and replace it with a new `tbInfo`.
// NOTE: after a table is truncated:
// * physicalTableByID(id) will return nil;
// * IsTruncateTableID(id) should return true.
// - physicalTableByID(id) will return nil;
// - IsTruncateTableID(id) should return true.
func (s *snapshot) truncateTable(id int64, tbInfo *model.TableInfo, currentTs uint64) (err error) {
old, ok := s.physicalTableByID(id)
if !ok {
Expand Down
8 changes: 4 additions & 4 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,10 +1150,10 @@ func (s *eventFeedSession) getRPCContextForRegion(ctx context.Context, id tikv.R
// receiveFromStream receives gRPC messages from a stream continuously and sends
// messages to region worker, if `stream.Recv` meets error, this routine will exit
// silently. As for regions managed by this routine, there are two situations:
// 1. established regions: a `nil` event will be sent to region worker, and region
// worker call `s.onRegionFail` to re-establish these regions.
// 2. pending regions: call `s.onRegionFail` for each pending region before this
// routine exits to establish these regions.
// 1. established regions: a `nil` event will be sent to region worker, and region
// worker call `s.onRegionFail` to re-establish these regions.
// 2. pending regions: call `s.onRegionFail` for each pending region before this
// routine exits to establish these regions.
func (s *eventFeedSession) receiveFromStream(
ctx context.Context,
g *errgroup.Group,
Expand Down
30 changes: 15 additions & 15 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2766,10 +2766,10 @@ func TestResolveLockNoCandidate(t *testing.T) {
// TestFailRegionReentrant tests one region could be failover multiple times,
// kv client must avoid duplicated `onRegionFail` call for the same region.
// In this test
// 1. An `unknownErr` is sent to kv client first to trigger `handleSingleRegionError` in region worker.
// 2. We delay the kv client to re-create a new region request by 500ms via failpoint.
// 3. Before new region request is fired, simulate kv client `stream.Recv` returns an error, the stream
// handler will signal region worker to exit, which will evict all active region states then.
// 1. An `unknownErr` is sent to kv client first to trigger `handleSingleRegionError` in region worker.
// 2. We delay the kv client to re-create a new region request by 500ms via failpoint.
// 3. Before new region request is fired, simulate kv client `stream.Recv` returns an error, the stream
// handler will signal region worker to exit, which will evict all active region states then.
func TestFailRegionReentrant(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -2849,17 +2849,17 @@ func TestFailRegionReentrant(t *testing.T) {

// TestClientV1UnlockRangeReentrant tests clientV1 can handle region reconnection
// with unstable TiKV store correctly. The test workflow is as follows:
// 1. kv client establishes two regions request, naming region-1, region-2, they
// belong to the same TiKV store.
// 2. The region-1 is firstly established, yet region-2 has some delay after its
// region state is inserted into `pendingRegions`
// 3. At this time the TiKV store crashes and `stream.Recv` returns error. In the
// defer function of `receiveFromStream`, all pending regions will be cleaned
// up, which means the region lock will be unlocked once for these regions.
// 4. In step-2, the region-2 continues to run, it can't get store stream which
// has been deleted in step-3, so it will create new stream but fails because
// of unstable TiKV store, at this point, the kv client should handle with the
// pending region correctly.
// 1. kv client establishes two regions request, naming region-1, region-2, they
// belong to the same TiKV store.
// 2. The region-1 is firstly established, yet region-2 has some delay after its
// region state is inserted into `pendingRegions`
// 3. At this time the TiKV store crashes and `stream.Recv` returns error. In the
// defer function of `receiveFromStream`, all pending regions will be cleaned
// up, which means the region lock will be unlocked once for these regions.
// 4. In step-2, the region-2 continues to run, it can't get store stream which
// has been deleted in step-3, so it will create new stream but fails because
// of unstable TiKV store, at this point, the kv client should handle with the
// pending region correctly.
func TestClientV1UnlockRangeReentrant(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
Expand Down
15 changes: 7 additions & 8 deletions cdc/processor/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ The TablePipeline listens to the kv change logs of a specified table(with its ma
The relationship between the three module is as follows:
One Capture(with processor role) -> Processor Manager -> Processor(changefeed1) -> TablePipeline(tableA)
╲ ╲
╲ -> TablePipeline(tableB)
-> Processor(changefeed2) -> TablePipeline(tableC)
-> TablePipeline(tableD)
╲ ╲
╲ -> TablePipeline(tableB)
-> Processor(changefeed2) -> TablePipeline(tableC)
-> TablePipeline(tableD)
*/
package processor
4 changes: 3 additions & 1 deletion cdc/processor/pipeline/actor_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func NewActorNode(
}

// TryRun get message from parentNode and handle it util there is no more message to come
// or message handling is blocking
//
// or message handling is blocking
//
// only one message will be cached
func (n *ActorNode) TryRun(ctx context.Context) error {
processedCount := 0
Expand Down
1 change: 0 additions & 1 deletion cdc/redo/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,5 @@ A record has a length field and a logical Log data. The length field is a 64-bit
When apply redo log from cli, will select files in the specific dir to open base on the startTs, endTs send from cli or download logs from s3 first is enabled,
then sort the event records in each file base on commitTs, after sorted, the new sort file name should be as CaptureID_ChangeFeedID_CreateTime_FileType_MaxCommitTSOfAllEventInTheFile.log.sort.
*/
package redo
3 changes: 2 additions & 1 deletion cdc/redo/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (
"go.uber.org/zap"
)

//go:generate mockery --name=RedoLogReader --inpackage
// RedoLogReader is a reader abstraction for redo log storage layer
//
//go:generate mockery --name=RedoLogReader --inpackage
type RedoLogReader interface {
io.Closer

Expand Down
3 changes: 2 additions & 1 deletion cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import (
"golang.org/x/sync/errgroup"
)

//go:generate mockery --name=RedoLogWriter --inpackage
// RedoLogWriter defines the interfaces used to write redo log, all operations are thread-safe.
//
//go:generate mockery --name=RedoLogWriter --inpackage
type RedoLogWriter interface {
// WriteLog writer RedoRowChangedEvent to row log file.
WriteLog(ctx context.Context, tableID int64, rows []*model.RedoRowChangedEvent) error
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mq/codec/craft/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

/// create string from byte slice without copying
// / create string from byte slice without copying
func unsafeBytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

/// Primitive type decoders
// / Primitive type decoders
func decodeUint8(bits []byte) ([]byte, byte, error) {
if len(bits) < 1 {
return bits, 0, cerror.ErrCraftCodecInvalidData.GenWithStack("buffer underflow")
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/codec/craft/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func encodeString(bits []byte, data string) []byte {
return append(bits, data...)
}

/// Chunk encoders
// / Chunk encoders
func encodeStringChunk(bits []byte, data []string) []byte {
for _, s := range data {
bits = encodeUvarint(bits, uint64(len(s)))
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/codec/craft/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func decodeHeaders(bits []byte, numHeaders int, allocator *SliceAllocator, dict
}, nil
}

/// Column group in columnar layout
// / Column group in columnar layout
type columnGroup struct {
ty byte
names []string
Expand Down
17 changes: 11 additions & 6 deletions cdc/sink/mq/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,18 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s
return "", err
}

if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName {
log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", brokerConfigName)
// For compatibility with KOP, we checked all return values.
// 1. Kafka only returns requested configs.
// 2. Kop returns all configs.
for _, entry := range configEntries {
if entry.Name == brokerConfigName {
return entry.Value, nil
}
}

return configEntries[0].Value, nil
log.Warn("Kafka config item not found",
zap.String("configName", brokerConfigName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", brokerConfigName)
}

// getTopicConfig gets topic config by name.
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mq/producer/pulsar/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
// 1. All option in url queries start with lowercase chars, e.g. `tlsAllowInsecureConnection`, `maxConnectionsPerBroker`.
// 2. Use `auth` to config authentication plugin type, `auth.*` to config auth params.
// See:
// 1. https://pulsar.apache.org/docs/en/reference-cli-tools/#pulsar-client
// 2. https://github.com/apache/pulsar-client-go/tree/master/pulsar/internal/auth
// 1. https://pulsar.apache.org/docs/en/reference-cli-tools/#pulsar-client
// 2. https://github.com/apache/pulsar-client-go/tree/master/pulsar/internal/auth
//
// For example:
// pulsar://{host}/{topic}?auth=token&auth.token={token}
Expand Down
6 changes: 3 additions & 3 deletions cdc/sorter/leveldb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ func (r *reader) outputBufferedResolvedEvents(buffer *outputBuffer) {
// them later.
//
// It returns:
// * a bool to indicate whether it has read the last Next or not.
// * an uint64, if it is not 0, it means all resolved events before the ts
// - a bool to indicate whether it has read the last Next or not.
// - an uint64, if it is not 0, it means all resolved events before the ts
// are outputted.
// * an error if it occurs.
// - an error if it occurs.
//
// Note: outputBuffer must be empty.
func (r *reader) outputIterEvents(
Expand Down
8 changes: 4 additions & 4 deletions cmd/cdc/hack/dbus_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func init() {
// It tries different techniques employed by different operating systems,
// returning the first valid address it finds, or an empty string.
//
// * /run/user/<uid>/bus if this exists, it *is* the bus socket. present on
// Ubuntu 18.04
// * /run/user/<uid>/dbus-session: if this exists, it can be parsed for the bus
// address. present on Ubuntu 16.04
// - /run/user/<uid>/bus if this exists, it *is* the bus socket. present on
// Ubuntu 18.04
// - /run/user/<uid>/dbus-session: if this exists, it can be parsed for the bus
// address. present on Ubuntu 16.04
//
// See https://dbus.freedesktop.org/doc/dbus-launch.1.html
func canDiscoverDbusSessionBusAddress() bool {
Expand Down
13 changes: 7 additions & 6 deletions dm/chaos/cases/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ const (

// createTableToSmithSchema converts a `CREATE TABLE` statement to a schema representation needed by go-sqlsmith.
// for one column in `columns`:
// - columns[0]: DB name
// - columns[1]: table name
// - columns[2]: table type (only use `BASE TABLE` now)
// - columns[3]: column name
// - columns[4]: column type
// - columns[0]: DB name
// - columns[1]: table name
// - columns[2]: table type (only use `BASE TABLE` now)
// - columns[3]: column name
// - columns[4]: column type
//
// for `indexes`:
// - list of index name
// - list of index name
func createTableToSmithSchema(dbName, sql string) (columns [][5]string, indexes []string, err error) {
parser2 := parser.New() // we should have clear `SQL_MODE.
stmt, err := parser2.ParseOneStmt(sql, "", "")
Expand Down
1 change: 1 addition & 0 deletions dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var getAllServerIDFunc = utils.GetAllServerID
// SampleSourceConfig is sample config file of source.
// The embed source.yaml is a copy of dm/master/source.yaml, because embed
// can only match regular files in the current directory and subdirectories.
//
//go:embed source.yaml
var SampleSourceConfig string

Expand Down
1 change: 1 addition & 0 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ type SubTaskConfig struct {
}

// SampleSubtaskConfig is the content of subtask.toml in current folder.
//
//go:embed subtask.toml
var SampleSubtaskConfig string

Expand Down
2 changes: 1 addition & 1 deletion dm/dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
1 change: 1 addition & 0 deletions dm/dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
)

// SampleConfig is sample config of dm-master.
//
//go:embed dm-master.toml
var SampleConfig string

Expand Down
9 changes: 5 additions & 4 deletions dm/dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ func startEtcd(etcdCfg *embed.Config,
// - if local persistent data exists (in fact, it's not join):
// - just restart if `member` already exists (already joined before)
// - read `initial-cluster` back from local persistent data to restart (just like bootstrapping)
//
// - if local persistent data not exist:
// 1. fetch member list from the cluster to check if we can join now.
// 2. call `member add` to add the member info into the cluster.
// 3. generate config for join (`initial-cluster` and `initial-cluster-state`).
// 4. save `initial-cluster` in local persistent data for later restarting.
// 1. fetch member list from the cluster to check if we can join now.
// 2. call `member add` to add the member info into the cluster.
// 3. generate config for join (`initial-cluster` and `initial-cluster-state`).
// 4. save `initial-cluster` in local persistent data for later restarting.
//
// NOTE: A member can't join to another cluster after it has joined a previous one.
func prepareJoinEtcd(cfg *Config) error {
Expand Down
30 changes: 19 additions & 11 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,37 +51,45 @@ const (
// Cases trigger a source-to-worker bound try:
// - a worker from Offline to Free:
// - receive keep-alive.
//
// - a worker from Bound to Free:
// - trigger by unbound: `a source removed`.
//
// - a new source added:
// - add source request from user.
//
// - a source unbound from another worker:
// - trigger by unbound: `a worker from Bound to Offline`.
// - TODO(csuzhangxc): design a strategy to ensure the old worker already shutdown its work.
//
// Cases trigger a source-to-worker unbound try.
// - a worker from Bound to Offline:
// - lost keep-alive.
//
// - a source removed:
// - remove source request from user.
//
// TODO: try to handle the return `err` of etcd operations,
// because may put into etcd, but the response to the etcd client interrupted.
//
// because may put into etcd, but the response to the etcd client interrupted.
//
// Relay scheduling:
// - scheduled by source
// DM-worker will enable relay according to its bound source, in current implementation, it will read `enable-relay`
// of source config and decide whether to enable relay.
// turn on `enable-relay`:
// - scheduled by source
// DM-worker will enable relay according to its bound source, in current implementation, it will read `enable-relay`
// of source config and decide whether to enable relay.
// turn on `enable-relay`:
// - use `enable-relay: true` when create source
// - `start-relay -s source` to dynamically change `enable-relay`
// turn off `enable-relay`:
// turn off `enable-relay`:
// - use `enable-relay: false` when create source
// - `stop-relay -s source` to dynamically change `enable-relay`
// - found conflict schedule type with (source, worker) when scheduler bootstrap
// - scheduled by (source, worker)
// DM-worker will check if relay is assigned to it no matter it's bound or not. In current implementation, it will
// read UpstreamRelayWorkerKeyAdapter in etcd.
// add UpstreamRelayWorkerKeyAdapter:
// - scheduled by (source, worker)
// DM-worker will check if relay is assigned to it no matter it's bound or not. In current implementation, it will
// read UpstreamRelayWorkerKeyAdapter in etcd.
// add UpstreamRelayWorkerKeyAdapter:
// - use `start-relay -s source -w worker`
// remove UpstreamRelayWorkerKeyAdapter:
// remove UpstreamRelayWorkerKeyAdapter:
// - use `stop-relay -s source -w worker`
// - remove worker by `offline-member`
type Scheduler struct {
Expand Down
6 changes: 5 additions & 1 deletion dm/dm/master/scheduler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,19 @@ type WorkerStage string
// - Relay -> Offline, lost keep-alive.
// - Relay -> Free, stop relay.
// - Relay -> Bound, old bound worker becomes offline so bind source to this worker, which has started relay.
//
// invalid transformation:
// - Offline -> Bound, must become Free first.
// - Offline -> Relay, must become Free first.
//
// in Bound stage relay can be turned on/off, the difference with Bound-Relay transformation is
// - Bound stage turning on/off represents a bound DM worker receives start-relay/stop-relay, source bound relation is
// not changed.
// - Bound-Relay transformation represents source bound relation is changed.
//
// caller should ensure the correctness when invoke below transformation methods successively. For example, call ToBound
// twice with different arguments.
//
// twice with different arguments.
const (
WorkerOffline WorkerStage = "offline" // the worker is not online yet.
WorkerFree WorkerStage = "free" // the worker is online, but no upstream source assigned to it yet.
Expand Down
Loading

0 comments on commit 5484207

Please sign in to comment.