Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: do resolve conflict job when other lightning has local dupes #41157

Merged
merged 12 commits into from
Feb 9, 2023
19 changes: 8 additions & 11 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type tableMetaMgr interface {
UpdateTableStatus(ctx context.Context, status metaStatus) error
UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error
CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (
needChecksum bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error)
otherHasDupe bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error)
FinishTable(ctx context.Context) error
}

Expand Down Expand Up @@ -370,7 +370,7 @@ func (m *dbTableMetaMgr) UpdateTableStatus(ctx context.Context, status metaStatu
}

func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (
needChecksum bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error,
otherHasDupe bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error,
) {
conn, err := m.session.Conn(ctx)
if err != nil {
Expand All @@ -393,7 +393,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
taskHasDuplicates bool
)
newStatus := metaStatusChecksuming
needChecksum = true
otherHasDupe = false
needRemoteDupe = true
err = exec.Transact(ctx, "checksum pre-check", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(
Expand Down Expand Up @@ -423,9 +423,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
return err
}

if taskHasDuplicates {
needChecksum = false
}
otherHasDupe = otherHasDupe || taskHasDuplicates

// skip finished meta
if status >= metaStatusFinished {
Expand All @@ -436,7 +434,6 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
if status >= metaStatusChecksuming {
newStatus = status
needRemoteDupe = status == metaStatusChecksuming
needChecksum = needChecksum && needRemoteDupe
return nil
}

Expand All @@ -445,7 +442,6 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks

if status < metaStatusChecksuming {
newStatus = metaStatusChecksumSkipped
needChecksum = false
needRemoteDupe = false
break
} else if status == metaStatusChecksuming {
Expand Down Expand Up @@ -475,12 +471,13 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
return false, false, nil, err
}

if needChecksum {
if !otherHasDupe && needRemoteDupe {
ck := verify.MakeKVChecksum(totalBytes, totalKvs, totalChecksum)
baseTotalChecksum = &ck
}
log.FromContext(ctx).Info("check table checksum", zap.String("table", m.tr.tableName),
zap.Bool("checksum", needChecksum), zap.String("new_status", newStatus.String()))
zap.Bool("otherHasDupe", otherHasDupe), zap.Bool("needRemoteDupe", needRemoteDupe),
zap.String("new_status", newStatus.String()))
return
}

Expand Down Expand Up @@ -1073,7 +1070,7 @@ func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum
}

func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (bool, bool, *verify.KVChecksum, error) {
return true, true, &verify.KVChecksum{}, nil
return false, true, &verify.KVChecksum{}, nil
}

func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,11 +794,19 @@ func (tr *TableRestore) postProcess(
}
hasDupe = hasLocalDupe
}
failpoint.Inject("SlowDownCheckDupe", func(v failpoint.Value) {
sec := v.(int)
tr.logger.Warn("start to sleep several seconds before checking other dupe",
zap.Int("seconds", sec))
time.Sleep(time.Duration(sec) * time.Second)
})

needChecksum, needRemoteDupe, baseTotalChecksum, err := metaMgr.CheckAndUpdateLocalChecksum(ctx, &localChecksum, hasDupe)
otherHasDupe, needRemoteDupe, baseTotalChecksum, err := metaMgr.CheckAndUpdateLocalChecksum(ctx, &localChecksum, hasDupe)
if err != nil {
return false, err
}
needChecksum := !otherHasDupe && needRemoteDupe
hasDupe = hasDupe || otherHasDupe

if needRemoteDupe && rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
opts := &kv.SessionOptions{
Expand Down
41 changes: 41 additions & 0 deletions br/tests/lightning_duplicate_resolution_incremental/config1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[lightning]
task-info-schema-name = 'lightning_task_info_dupe_resolve_incremental'
index-concurrency = 10
table-concurrency = 10

[tikv-importer]
backend = "local"
on-duplicate = "replace"
duplicate-resolution = "remove"
incremental-import = true

[checkpoint]
enable = true
schema = "tidb_lightning_checkpoint_dupe_resolve_incremental1"
driver = "mysql"

[[mydumper.files]]
pattern = '(?i).*(-schema-trigger|-schema-post)\.sql$'
type = 'ignore'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$'
schema = '$1'
type = 'schema-schema'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$'
schema = '$1'
table = '$2'
type = 'table-schema'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)\.0\.sql$'
schema = '$1'
table = '$2'
key = '0'
type = 'sql'

[post-restore]
analyze = false
checksum = "optional"
41 changes: 41 additions & 0 deletions br/tests/lightning_duplicate_resolution_incremental/config2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[lightning]
task-info-schema-name = 'lightning_task_info_dupe_resolve_incremental'
index-concurrency = 10
table-concurrency = 10

[tikv-importer]
backend = "local"
on-duplicate = "replace"
duplicate-resolution = "remove"
incremental-import = true

[checkpoint]
enable = true
schema = "tidb_lightning_checkpoint_dupe_resolve_incremental2"
driver = "mysql"

[[mydumper.files]]
pattern = '(?i).*(-schema-trigger|-schema-post)\.sql$'
type = 'ignore'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$'
schema = '$1'
type = 'schema-schema'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$'
schema = '$1'
table = '$2'
type = 'table-schema'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)\.1\.sql$'
schema = '$1'
table = '$2'
key = '1'
type = 'sql'

[post-restore]
analyze = false
checksum = "optional"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create schema dup_resolve_detect;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
create table ta (
id varchar(11) not null primary key nonclustered, -- use varchar here to make sure _tidb_rowid will be generated
name varchar(20) not null,
size bigint not null,
unique key uni_name(name)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
insert into ta values (3, '3c49f3bd', 6194643990092531757);
insert into ta values (13, '1da87b44', 3724743701402246028);
insert into ta values (6, '8b080186', 4840750639653607661);
insert into ta values (1, 'c83c0e6a', 5057094372111243649);
insert into ta values (12, 'dd73baf5', 2295098755414696158);
insert into ta values (4, '1cf99fa1', 2520784525406914042);
insert into ta values (11, 'b238a0e6', 3314555604794199537);
insert into ta values (10, 'a489c47a', 7706822128523578708);
insert into ta values (10, '9a54941e', 4969369552499069659);
insert into ta values (2, 'e7c90179', 1305347797378229715);
insert into ta values (9, '75e0344a', 500154046394880294);
insert into ta values (9, 'c3e8fc36', 5880042654284780409);
insert into ta values (6, 'd6835599', 2703142091339420770);
insert into ta values (5, 'c4a9c3a3', 6725275961959702206);
insert into ta values (14, 'eb1ab0dd', 5442878220607642694);
insert into ta values (7, '78e166f4', 7062852002089313920);
insert into ta values (8, '20986b65', 5485014514564267319);
insert into ta values (8, '9bd4d7a9', 9085469020413045798);
insert into ta values (15, 'd4aa9a8a', 546189610059969690);
insert into ta values (7, 'a7870c06', 3615729521258364152);
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
insert into ta values (111, 'bcf4e75f', 3304674741328415661);
insert into ta values (112, 'c08078e9', 7464585077725645791);
insert into ta values (113, 'ca05b4b2', 1280363363179468054);
insert into ta values (114, '8a094c96', 107578474892900608);
insert into ta values (115, 'f38efac2', 5273601814057696410);
insert into ta values (116, '5bf0cb56', 7276272767003446282);
insert into ta values (117, 'c8836b45', 653431702983792793);
insert into ta values (118, '7470ba67', 5617407618564683998);
insert into ta values (119, '466e1e95', 6827370124386922419);
insert into ta values (120, '41df97f3', 2296443172527920942);
insert into ta values (121, 'bd644f43', 6038622426427289955);
insert into ta values (122, '96aeb918', 1496857236328804363);
insert into ta values (123, '232448f7', 1199921720244646472);
insert into ta values (124, 'd296d6e4', 5705035255191089143);
insert into ta values (125, '194ec1d8', 6895413645725179445);
insert into ta values (126, 'a53238ec', 1527836891202149330);
62 changes: 62 additions & 0 deletions br/tests/lightning_duplicate_resolution_incremental/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/bin/bash
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

check_cluster_version 5 2 0 'duplicate detection' || exit 0

LOG_FILE1="$TEST_DIR/lightning-duplicate-resolution1.log"
LOG_FILE2="$TEST_DIR/lightning-duplicate-resolution2.log"

# let lightning run a bit slow to avoid some table in the first lightning finish too fast.
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownCheckDupe=return(10)"
run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_incremental.sorted1" \
--enable-checkpoint=1 --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config1.toml" &

counter=0
while [ $counter -lt 10 ]; do
if grep -Fq "start to sleep several seconds before checking other dupe" "$LOG_FILE1"; then
echo "lightning 1 already starts waiting for dupe"
break
fi
((counter += 1))
echo "waiting for lightning 1 starts"
sleep 1
done

if [ $counter -ge 10 ]; then
echo "fail to wait for lightning 1 starts"
exit 1
fi

run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_incremental.sorted2" \
--enable-checkpoint=1 --log-file "$LOG_FILE2" --config "tests/$TEST_NAME/config2.toml" &
sleepymole marked this conversation as resolved.
Show resolved Hide resolved

wait

export GO_FAILPOINTS=""

# Ensure table is consistent.
run_sql 'admin check table dup_resolve_detect.ta'

# Check data correctness
run_sql 'select count(*), sum(id) from dup_resolve_detect.ta where id < 100'
check_contains 'count(*): 10'
check_contains 'sum(id): 80'

run_sql 'select count(*), sum(id) from dup_resolve_detect.ta where id > 100'
check_contains 'count(*): 16'
check_contains 'sum(id): 1896'