Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: migrate pitr id map to the system table mysql.tidb_pitr_id_map #55871

Merged
merged 22 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"//pkg/util/redact",
"//pkg/util/table-filter",
"@com_github_fatih_color//:color",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down Expand Up @@ -82,9 +83,10 @@ go_test(
],
embed = [":log_client"],
flaky = True,
shard_count = 40,
shard_count = 41,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/gluetidb",
"//br/pkg/mock",
"//br/pkg/restore/internal/import_client",
Expand All @@ -97,8 +99,10 @@ go_test(
"//br/pkg/utiltest",
"//pkg/domain",
"//pkg/kv",
"//pkg/session",
"//pkg/store/pdtypes",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/util/codec",
"//pkg/util/table-filter",
Expand Down
90 changes: 62 additions & 28 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/fatih/color"
"github.com/gogo/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -95,6 +96,8 @@ type LogClient struct {
// Can not use `restoreTS` directly, because schema created in `full backup` maybe is new than `restoreTS`.
currentTS uint64

upstreamClusterID uint64

*LogFileManager

workerPool *tidbutil.WorkerPool
Expand Down Expand Up @@ -167,6 +170,11 @@ func (rc *LogClient) SetConcurrency(c uint) {
rc.workerPool = tidbutil.NewWorkerPool(c, "file")
}

func (rc *LogClient) SetUpstreamClusterID(upstreamClusterID uint64) {
log.Info("upstream cluster id", zap.Uint64("cluster id", upstreamClusterID))
rc.upstreamClusterID = upstreamClusterID
}

func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error {
var err error
rc.storage, err = storage.New(ctx, backend, opts)
Expand Down Expand Up @@ -558,24 +566,38 @@ func (rc *LogClient) RestoreKVFiles(

func (rc *LogClient) initSchemasMap(
ctx context.Context,
clusterID uint64,
restoreTS uint64,
) ([]*backuppb.PitrDBMap, error) {
filename := metautil.PitrIDMapsFilename(clusterID, restoreTS)
exist, err := rc.storage.FileExists(ctx, filename)
if err != nil {
return nil, errors.Annotatef(err, "failed to check filename:%s ", filename)
} else if !exist {
log.Info("pitr id maps isn't existed", zap.String("file", filename))
getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;"
execCtx := rc.se.GetSessionCtx().GetRestrictedSQLExecutor()
rows, _, errSQL := execCtx.ExecRestrictedSQL(
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
nil,
getPitrIDMapSQL,
restoreTS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restoreTS maybe confusing, because sometimes we use start-ts to build pitr map. esspecially in L584.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current log restore's start-ts is the last log restore's restored-ts, so BR gets the pitr id map at start-ts of the current log restore is actually to get the pitr id map at restored-ts of the last log restore.

rc.upstreamClusterID,
)
if errSQL != nil {
return nil, errors.Annotatef(errSQL, "failed to get pitr id map from mysql.tidb_pitr_id_map")
}
if len(rows) == 0 {
log.Info("pitr id map does not exist", zap.Uint64("restored ts", restoreTS))
return nil, nil
}

metaData, err := rc.storage.ReadFile(ctx, filename)
if err != nil {
return nil, errors.Trace(err)
metaData := make([]byte, 0, len(rows)*PITRIdMapBlockSize)
for i, row := range rows {
elementID := row.GetUint64(0)
if uint64(i) != elementID {
return nil, errors.Errorf("the part(segment_id = %d) of pitr id map is lost", i)
}
d := row.GetBytes(1)
if len(d) == 0 {
return nil, errors.Errorf("get the empty part(segment_id = %d) of pitr id map", i)
}
metaData = append(metaData, d...)
}
backupMeta := &backuppb.BackupMeta{}
if err = backupMeta.Unmarshal(metaData); err != nil {
if err := backupMeta.Unmarshal(metaData); err != nil {
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -722,7 +744,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
if !cfg.IsNewTask {
log.Info("try to load pitr id maps")
needConstructIdMap = false
dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.restoreTS)
dbMaps, err = rc.initSchemasMap(ctx, rc.restoreTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -733,7 +755,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
if len(dbMaps) <= 0 && cfg.FullBackupStorage == nil {
log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS))
needConstructIdMap = true
dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.startTS)
dbMaps, err = rc.initSchemasMap(ctx, rc.startTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -887,7 +909,7 @@ func (rc *LogClient) PreConstructAndSaveIDMap(
return errors.Trace(err)
}

if err := rc.SaveIDMap(ctx, sr); err != nil {
if err := rc.saveIDMap(ctx, sr); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -1491,24 +1513,36 @@ func (rc *LogClient) GetGCRows() []*stream.PreDelRangeQuery {
return rc.deleteRangeQuery
}

// SaveIDMap saves the id mapping information.
func (rc *LogClient) SaveIDMap(
const PITRIdMapBlockSize int = 524288

// saveIDMap saves the id mapping information.
func (rc *LogClient) saveIDMap(
ctx context.Context,
sr *stream.SchemasReplace,
) error {
idMaps := sr.TidySchemaMaps()
clusterID := rc.GetClusterID(ctx)
metaFileName := metautil.PitrIDMapsFilename(clusterID, rc.restoreTS)
metaWriter := metautil.NewMetaWriter(rc.storage, metautil.MetaFileSize, false, metaFileName, nil)
metaWriter.Update(func(m *backuppb.BackupMeta) {
// save log startTS to backupmeta file
m.ClusterId = clusterID
m.DbMaps = idMaps
})

if err := metaWriter.FlushBackupMeta(ctx); err != nil {
backupmeta := &backuppb.BackupMeta{DbMaps: sr.TidySchemaMaps()}
data, err := proto.Marshal(backupmeta)
if err != nil {
return errors.Trace(err)
}
// clean the dirty id map at first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe? if process exits abnormally after deleting finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the processes to persist id map:

  1. directly get the pitr id map if checkpoint task progress mark that BR has persist pitr id map.
  2. if not, generate pitr id map, and
    a. delete the id map at restored ts
    b. write the id map at restored ts
    c. update the checkpoint task progress to mark that BR has persist pitr id map.

In the external storage, step 2.a and 2.b is replaced with uploading files with override mode. Actually, the atomic is not necessary. That's because there must be failed at the step 2.b or 2.c, which means the pitr id map is incomplete. But just do the step 2.a in the next execution to delete the incomplete pitr id map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on id map at start-ts and the metakv to generate the new id map at restored-ts. So delete the new id map at restored-ts is safe because we still can regenerate the same one.

err = rc.se.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %?;", rc.restoreTS, rc.upstreamClusterID)
if err != nil {
return errors.Trace(err)
}
replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?);"
for startIdx, segmentId := 0, 0; startIdx < len(data); segmentId += 1 {
endIdx := startIdx + PITRIdMapBlockSize
if endIdx > len(data) {
endIdx = len(data)
}
err := rc.se.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx])
if err != nil {
return errors.Trace(err)
}
startIdx = endIdx
}

if rc.useCheckpoint {
var items map[int64]model.TiFlashReplicaInfo
if sr.TiflashRecorder != nil {
Expand Down
109 changes: 106 additions & 3 deletions br/pkg/restore/log_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/br/pkg/utiltest"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -1361,26 +1363,127 @@ func TestInitSchemasReplaceForDDL(t *testing.T) {
ctx := context.Background()

{
client := logclient.TEST_NewLogClient(123, 1, 2, fakeStorage{}, domain.NewMockDomain())
client := logclient.TEST_NewLogClient(123, 1, 2, 1, fakeStorage{}, domain.NewMockDomain(), nil)
cfg := &logclient.InitSchemaConfig{IsNewTask: false}
_, err := client.InitSchemasReplaceForDDL(ctx, cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to check filename:pitr_id_maps/pitr_id_map.cluster_id:123.restored_ts:2")
}

{
client := logclient.TEST_NewLogClient(123, 1, 2, fakeStorage{}, domain.NewMockDomain())
client := logclient.TEST_NewLogClient(123, 1, 2, 1, fakeStorage{}, domain.NewMockDomain(), nil)
cfg := &logclient.InitSchemaConfig{IsNewTask: true}
_, err := client.InitSchemasReplaceForDDL(ctx, cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to check filename:pitr_id_maps/pitr_id_map.cluster_id:123.restored_ts:1")
}

{
client := logclient.TEST_NewLogClient(123, 1, 2, fakeStorageOK{}, domain.NewMockDomain())
client := logclient.TEST_NewLogClient(123, 1, 2, 1, fakeStorageOK{}, domain.NewMockDomain(), nil)
cfg := &logclient.InitSchemaConfig{IsNewTask: true}
_, err := client.InitSchemasReplaceForDDL(ctx, cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "miss upstream table information at `start-ts`(1) but the full backup path is not specified")
}
}

func downstreamID(upstreamID int64) int64 {
return upstreamID + 10000000
}

func emptyDB(startupID, endupID int64, replaces map[int64]*stream.DBReplace) {
for id := startupID; id < endupID; id += 1 {
replaces[id] = &stream.DBReplace{
Name: fmt.Sprintf("db_%d", id),
DbID: downstreamID(id),
}
}
}

func emptyTables(dbupID, startupID, endupID int64, replaces map[int64]*stream.DBReplace) {
tableMap := make(map[int64]*stream.TableReplace)
for id := startupID; id < endupID; id += 1 {
tableMap[id] = &stream.TableReplace{
Name: fmt.Sprintf("table_%d", id),
TableID: downstreamID(id),
}
}
replaces[dbupID] = &stream.DBReplace{
Name: fmt.Sprintf("db_%d", dbupID),
DbID: downstreamID(dbupID),
TableMap: tableMap,
}
}

func partitions(dbupID, tableupID, startupID, endupID int64, replaces map[int64]*stream.DBReplace) {
partitionMap := make(map[int64]int64)
for id := startupID; id < endupID; id += 1 {
partitionMap[id] = downstreamID(id)
}
replaces[dbupID] = &stream.DBReplace{
Name: fmt.Sprintf("db_%d", dbupID),
DbID: downstreamID(dbupID),
TableMap: map[int64]*stream.TableReplace{
tableupID: {
Name: fmt.Sprintf("table_%d", tableupID),
TableID: downstreamID(tableupID),
PartitionMap: partitionMap,
},
},
}
}

func getDBMap() map[int64]*stream.DBReplace {
replaces := make(map[int64]*stream.DBReplace)
emptyDB(1, 3000, replaces)
emptyTables(3000, 3001, 8000, replaces)
partitions(8000, 8001, 8002, 12000, replaces)
emptyTables(12000, 12001, 30000, replaces)
return replaces
}

func TestPITRIDMap(t *testing.T) {
ctx := context.Background()
s := utiltest.CreateRestoreSchemaSuite(t)
tk := testkit.NewTestKit(t, s.Mock.Storage)
tk.Exec(session.CreatePITRIDMap)
g := gluetidb.New()
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
client := logclient.TEST_NewLogClient(123, 1, 2, 3, nil, nil, se)
baseSchemaReplaces := &stream.SchemasReplace{
DbMap: getDBMap(),
}
err = client.TEST_saveIDMap(ctx, baseSchemaReplaces)
require.NoError(t, err)
newSchemaReplaces, err := client.TEST_initSchemasMap(ctx, 1)
require.NoError(t, err)
require.Nil(t, newSchemaReplaces)
client2 := logclient.TEST_NewLogClient(123, 1, 2, 4, nil, nil, se)
newSchemaReplaces, err = client2.TEST_initSchemasMap(ctx, 2)
require.NoError(t, err)
require.Nil(t, newSchemaReplaces)
newSchemaReplaces, err = client.TEST_initSchemasMap(ctx, 2)
require.NoError(t, err)

require.Equal(t, len(baseSchemaReplaces.DbMap), len(newSchemaReplaces))
for _, dbMap := range newSchemaReplaces {
baseDbMap := baseSchemaReplaces.DbMap[dbMap.IdMap.UpstreamId]
require.NotNil(t, baseDbMap)
require.Equal(t, baseDbMap.DbID, dbMap.IdMap.DownstreamId)
require.Equal(t, baseDbMap.Name, dbMap.Name)
require.Equal(t, len(baseDbMap.TableMap), len(dbMap.Tables))
for _, tableMap := range dbMap.Tables {
baseTableMap := baseDbMap.TableMap[tableMap.IdMap.UpstreamId]
require.NotNil(t, baseTableMap)
require.Equal(t, baseTableMap.TableID, tableMap.IdMap.DownstreamId)
require.Equal(t, baseTableMap.Name, tableMap.Name)
require.Equal(t, len(baseTableMap.PartitionMap), len(tableMap.Partitions))
for _, partitionMap := range tableMap.Partitions {
basePartitionMap, exist := baseTableMap.PartitionMap[partitionMap.UpstreamId]
require.True(t, exist)
require.Equal(t, basePartitionMap, partitionMap.DownstreamId)
}
}
}
}
22 changes: 20 additions & 2 deletions br/pkg/restore/log_client/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,29 @@ import (

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/pkg/domain"
)

var FilterFilesByRegion = filterFilesByRegion

func (rc *LogClient) TEST_saveIDMap(
ctx context.Context,
sr *stream.SchemasReplace,
) error {
return rc.saveIDMap(ctx, sr)
}

func (rc *LogClient) TEST_initSchemasMap(
ctx context.Context,
restoreTS uint64,
) ([]*backuppb.PitrDBMap, error) {
return rc.initSchemasMap(ctx, restoreTS)
}

// readStreamMetaByTS is used for streaming task. collect all meta file by TS, it is for test usage.
func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]Meta, error) {
metas, err := rc.streamingMeta(ctx)
Expand All @@ -39,9 +55,11 @@ func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]Meta, error) {
return r.Item, nil
}

func TEST_NewLogClient(clusterID, startTS, restoreTS uint64, storage storage.ExternalStorage, dom *domain.Domain) *LogClient {
func TEST_NewLogClient(clusterID, startTS, restoreTS, upstreamClusterID uint64, storage storage.ExternalStorage, dom *domain.Domain, se glue.Session) *LogClient {
return &LogClient{
dom: dom,
dom: dom,
se: se,
upstreamClusterID: upstreamClusterID,
LogFileManager: &LogFileManager{
startTS: startTS,
restoreTS: restoreTS,
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/snap_client/systable_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ var unRecoverableTable = map[string]map[string]struct{}{

// replace into view is not supported now
"tidb_mdl_view": {},

"tidb_pitr_id_map": {},
},
"sys": {
// replace into view is not supported now
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ type RestoreConfig struct {
checkpointSnapshotRestoreTaskName string `json:"-" toml:"-"`
checkpointLogRestoreTaskName string `json:"-" toml:"-"`
checkpointTaskInfoClusterID uint64 `json:"-" toml:"-"`
upstreamClusterID uint64 `json:"-" toml:"-"`
WaitTiflashReady bool `json:"wait-tiflash-ready" toml:"wait-tiflash-ready"`

// for ebs-based restore
Expand Down
Loading
Loading