Skip to content

Commit

Permalink
master, config(dm): fix case-sensitive compatibility for dmctl (#5307)
Browse files Browse the repository at this point in the history
close #5255
  • Loading branch information
lance6716 committed Apr 28, 2022
1 parent 6d5b7fa commit 1e4814e
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 25 deletions.
2 changes: 1 addition & 1 deletion dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker")
}

return c.AdjustCaseSensitive(ctx2, db)
return nil
}

// AdjustCaseSensitive adjust CaseSensitive from DB.
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
cfg1.From.User = user
cfg1.From.Password = password
cfg1.RelayDir = "relay-dir"
c.Assert(checkAndAdjustSourceConfigFunc(ctx, cfg1), IsNil) // adjust source config.
c.Assert(checkAndAdjustSourceConfigForDMCtlFunc(ctx, cfg1), IsNil) // adjust source config.
cfg2 := cfg1.Clone()
cfg2.SourceID = "mysql-replica-02"

Expand Down
28 changes: 25 additions & 3 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package master

import (
"context"
"database/sql"
"encoding/binary"
"fmt"
"math/rand"
Expand Down Expand Up @@ -86,7 +87,11 @@ var (
registerOnce sync.Once
runBackgroundOnce sync.Once

checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
// the difference of below functions is checkAndAdjustSourceConfigForDMCtlFunc will not AdjustCaseSensitive. It's a
// compatibility compromise.
// When we need to change the implementation of dmctl to OpenAPI, we should notice the user about this change.
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfigForDMCtl
)

// Server handles RPC requests for dm-master.
Expand Down Expand Up @@ -1311,15 +1316,19 @@ func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*conf
if err != nil {
return cfgs, err
}
if err := checkAndAdjustSourceConfigFunc(ctx, cfg); err != nil {
if err := checkAndAdjustSourceConfigForDMCtlFunc(ctx, cfg); err != nil {
return cfgs, err
}
cfgs[i] = cfg
}
return cfgs, nil
}

func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) error {
func innerCheckAndAdjustSourceConfig(
ctx context.Context,
cfg *config.SourceConfig,
hook func(sourceConfig *config.SourceConfig, ctx context.Context, db *sql.DB) error,
) error {
dbConfig := cfg.GenerateDBConfig()
fromDB, err := conn.DefaultDBProvider.Apply(dbConfig)
if err != nil {
Expand All @@ -1329,12 +1338,25 @@ func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) e
if err = cfg.Adjust(ctx, fromDB.DB); err != nil {
return err
}
if hook != nil {
if err = hook(cfg, ctx, fromDB.DB); err != nil {
return err
}
}
if _, err = cfg.Yaml(); err != nil {
return err
}
return cfg.Verify()
}

func checkAndAdjustSourceConfig(ctx context.Context, cfg *config.SourceConfig) error {
return innerCheckAndAdjustSourceConfig(ctx, cfg, (*config.SourceConfig).AdjustCaseSensitive)
}

func checkAndAdjustSourceConfigForDMCtl(ctx context.Context, cfg *config.SourceConfig) error {
return innerCheckAndAdjustSourceConfig(ctx, cfg, nil)
}

func parseSourceConfig(contents []string) ([]*config.SourceConfig, error) {
cfgs := make([]*config.SourceConfig, len(contents))
for i, content := range contents {
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ func (t *testMaster) SetUpSuite(c *check.C) {
t.workerClients = make(map[string]workerrpc.Client)
t.saveMaxRetryNum = maxRetryNum
maxRetryNum = 2
checkAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
checkAndAdjustSourceConfigForDMCtlFunc = checkAndNoAdjustSourceConfigMock
}

func (t *testMaster) TearDownSuite(c *check.C) {
maxRetryNum = t.saveMaxRetryNum
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfig
}

func (t *testMaster) SetUpTest(c *check.C) {
Expand Down
3 changes: 2 additions & 1 deletion dm/pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,13 @@ func FetchLowerCaseTableNamesSetting(ctx context.Context, conn *sql.Conn) (Lower
return LowerCaseTableNamesFlavor(res), nil
}

// GetDBCaseSensitive returns the case sensitive setting of target db.
// GetDBCaseSensitive returns the case-sensitive setting of target db.
func GetDBCaseSensitive(ctx context.Context, db *sql.DB) (bool, error) {
conn, err := db.Conn(ctx)
if err != nil {
return true, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer conn.Close()
lcFlavor, err := FetchLowerCaseTableNamesSetting(ctx, conn)
if err != nil {
return true, err
Expand Down
14 changes: 6 additions & 8 deletions dm/tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,13 @@ function test_query_timeout() {
# don't know why CI has turned on Event Scheduler
run_sql_both_source 'SET GLOBAL event_scheduler = OFF;'

# there's only 2 rows in result, one for dm-worker's source-level status, one for SHOW PROCESSLIST
# there's only 1 row in result, which is for SHOW PROCESSLIST
run_sql_source1 'SHOW PROCESSLIST;'
check_rows_equal 2
check_rows_equal 1

run_sql_source2 'SHOW PROCESSLIST;'
check_rows_equal 2
check_rows_equal 1

# there's only 1 row in result, which is for SHOW PROCESSLIST
run_sql_tidb 'SHOW PROCESSLIST;'
check_rows_equal 1

Expand Down Expand Up @@ -131,14 +130,13 @@ function test_query_timeout() {
"stop-task $ILLEGAL_CHAR_NAME" \
"\"result\": true" 3

# there's only 2 rows in result, one for dm-worker's source-level status, one for SHOW PROCESSLIST
# there's only 1 row in result, which is for SHOW PROCESSLIST
run_sql_source1 'SHOW PROCESSLIST;'
check_rows_equal 2
check_rows_equal 1

run_sql_source2 'SHOW PROCESSLIST;'
check_rows_equal 2
check_rows_equal 1

# there's only 1 row in result, which is for SHOW PROCESSLIST
run_sql_tidb 'SHOW PROCESSLIST;'
check_rows_equal 1

Expand Down
1 change: 1 addition & 0 deletions dm/tests/case_sensitive/data/db2.prepare.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ create table Upper_Table (
PRIMARY KEY (id));
insert into Upper_Table (name, ts) values ('Arya', now()), ('Bran', '2021-05-11 10:01:05'), ('Sansa', NULL);

-- if case-insensitive, this should report conflict with Upper_Table
create table upper_table(id int NOT NULL PRIMARY KEY);

-- test block-allow-list
Expand Down
51 changes: 42 additions & 9 deletions dm/tests/case_sensitive/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,32 @@ source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME
API_VERSION="v1alpha1"

function run() {
function prepare_sensitive_task() {
cp $cur/data/db1.prepare.sql $WORK_DIR/db1.prepare.sql
cp $cur/data/db2.prepare.sql $WORK_DIR/db2.prepare.sql
cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
}

function prepare_insensitive_task() {
cp $cur/data/db1.prepare.sql $WORK_DIR/db1.prepare.sql
cp $cur/data/db2.prepare.sql $WORK_DIR/db2.prepare.sql
cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml

sed -i "/sensitive/d" $WORK_DIR/dm-task.yaml
sed -i "/create table upper_table/d" $WORK_DIR/db2.prepare.sql
}

function run_with_prepared() {
run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'"
inject_points=(
"github.com/pingcap/tiflow/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
"github.com/pingcap/tiflow/dm/relay/NewUpstreamServer=return(true)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'
# manually create the route table
run_sql 'CREATE DATABASE IF NOT EXISTS `UPPER_DB_ROUTE`' $TIDB_PORT $TIDB_PASSWORD
Expand All @@ -40,7 +55,6 @@ function run() {
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# start DM task only
cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta"
# check task has started
check_metric $WORKER1_PORT "dm_worker_task_state{source_id=\"mysql-replica-01\",task=\"test\",worker=\"worker1\"}" 10 1 3
Expand Down Expand Up @@ -82,15 +96,13 @@ function run() {

# test block-allow-list by the way
run_sql "show databases;" $TIDB_PORT $TIDB_PASSWORD
check_not_contains "Upper_Db_IGNORE"
check_contains "Upper_DB1"
check_contains "lower_db"
# test route-rule
check_contains "UPPER_DB_ROUTE"

run_sql "show tables from UPPER_DB_ROUTE" $TIDB_PORT $TIDB_PASSWORD
check_contains "do_table_route"
check_not_contains "Do_table_ignore"
run_sql_tidb_with_retry "select count(*) from UPPER_DB_ROUTE.do_table_route" "count(*): 5"

# test binlog event filter
Expand All @@ -101,16 +113,37 @@ function run() {
# ensure the truncate is ignored and the new row is inserted
run_sql_tidb_with_retry "select count(*) from UPPER_DB_ROUTE.do_table_route" "count(*): 6"

dmctl_stop_task test
dmctl_operate_source stop $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source stop $WORK_DIR/source2.yaml $SOURCE_ID2

export GO_FAILPOINTS=''
}

function check_ignore_when_sensitive() {
run_sql "show databases;" $TIDB_PORT $TIDB_PASSWORD
check_not_contains "Upper_Db_IGNORE"
run_sql "show tables from UPPER_DB_ROUTE" $TIDB_PORT $TIDB_PASSWORD
check_not_contains "Do_table_ignore"
}

trap cleanup_process EXIT
trap "cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE sync_diff_inspector" EXIT
trap "cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE Upper_Db_IGNORE sync_diff_inspector" EXIT

# also cleanup dm processes in case of last run failed
cleanup_process $*
cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE
run
cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE Upper_Db_IGNORE

prepare_sensitive_task
run_with_prepared
check_ignore_when_sensitive

cleanup_process $*
cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE Upper_Db_IGNORE

prepare_insensitive_task
run_with_prepared

cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"

0 comments on commit 1e4814e

Please sign in to comment.