Skip to content

Commit

Permalink
br: migrate pitr id map to the system table mysql.tidb_pitr_id_map (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Leavrth committed Sep 13, 2024
1 parent 2bf89f8 commit 027f01b
Show file tree
Hide file tree
Showing 17 changed files with 323 additions and 76 deletions.
10 changes: 9 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"//pkg/util/redact",
"//pkg/util/table-filter",
"@com_github_fatih_color//:color",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down Expand Up @@ -82,9 +83,10 @@ go_test(
],
embed = [":log_client"],
flaky = True,
shard_count = 40,
shard_count = 41,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/gluetidb",
"//br/pkg/mock",
"//br/pkg/restore/internal/import_client",
Expand All @@ -97,10 +99,16 @@ go_test(
"//br/pkg/utiltest",
"//pkg/domain",
"//pkg/kv",
"//pkg/planner/core/resolve",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/store/pdtypes",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
90 changes: 62 additions & 28 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/fatih/color"
"github.com/gogo/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -95,6 +96,8 @@ type LogClient struct {
// Can not use `restoreTS` directly, because schema created in `full backup` maybe is new than `restoreTS`.
currentTS uint64

upstreamClusterID uint64

*LogFileManager

workerPool *tidbutil.WorkerPool
Expand Down Expand Up @@ -167,6 +170,11 @@ func (rc *LogClient) SetConcurrency(c uint) {
rc.workerPool = tidbutil.NewWorkerPool(c, "file")
}

func (rc *LogClient) SetUpstreamClusterID(upstreamClusterID uint64) {
log.Info("upstream cluster id", zap.Uint64("cluster id", upstreamClusterID))
rc.upstreamClusterID = upstreamClusterID
}

func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error {
var err error
rc.storage, err = storage.New(ctx, backend, opts)
Expand Down Expand Up @@ -558,24 +566,38 @@ func (rc *LogClient) RestoreKVFiles(

func (rc *LogClient) initSchemasMap(
ctx context.Context,
clusterID uint64,
restoreTS uint64,
) ([]*backuppb.PitrDBMap, error) {
filename := metautil.PitrIDMapsFilename(clusterID, restoreTS)
exist, err := rc.storage.FileExists(ctx, filename)
if err != nil {
return nil, errors.Annotatef(err, "failed to check filename:%s ", filename)
} else if !exist {
log.Info("pitr id maps isn't existed", zap.String("file", filename))
getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;"
execCtx := rc.se.GetSessionCtx().GetRestrictedSQLExecutor()
rows, _, errSQL := execCtx.ExecRestrictedSQL(
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
nil,
getPitrIDMapSQL,
restoreTS,
rc.upstreamClusterID,
)
if errSQL != nil {
return nil, errors.Annotatef(errSQL, "failed to get pitr id map from mysql.tidb_pitr_id_map")
}
if len(rows) == 0 {
log.Info("pitr id map does not exist", zap.Uint64("restored ts", restoreTS))
return nil, nil
}

metaData, err := rc.storage.ReadFile(ctx, filename)
if err != nil {
return nil, errors.Trace(err)
metaData := make([]byte, 0, len(rows)*PITRIdMapBlockSize)
for i, row := range rows {
elementID := row.GetUint64(0)
if uint64(i) != elementID {
return nil, errors.Errorf("the part(segment_id = %d) of pitr id map is lost", i)
}
d := row.GetBytes(1)
if len(d) == 0 {
return nil, errors.Errorf("get the empty part(segment_id = %d) of pitr id map", i)
}
metaData = append(metaData, d...)
}
backupMeta := &backuppb.BackupMeta{}
if err = backupMeta.Unmarshal(metaData); err != nil {
if err := backupMeta.Unmarshal(metaData); err != nil {
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -722,7 +744,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
if !cfg.IsNewTask {
log.Info("try to load pitr id maps")
needConstructIdMap = false
dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.restoreTS)
dbMaps, err = rc.initSchemasMap(ctx, rc.restoreTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -733,7 +755,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
if len(dbMaps) <= 0 && cfg.FullBackupStorage == nil {
log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS))
needConstructIdMap = true
dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.startTS)
dbMaps, err = rc.initSchemasMap(ctx, rc.startTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -887,7 +909,7 @@ func (rc *LogClient) PreConstructAndSaveIDMap(
return errors.Trace(err)
}

if err := rc.SaveIDMap(ctx, sr); err != nil {
if err := rc.saveIDMap(ctx, sr); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -1491,24 +1513,36 @@ func (rc *LogClient) GetGCRows() []*stream.PreDelRangeQuery {
return rc.deleteRangeQuery
}

// SaveIDMap saves the id mapping information.
func (rc *LogClient) SaveIDMap(
const PITRIdMapBlockSize int = 524288

// saveIDMap saves the id mapping information.
func (rc *LogClient) saveIDMap(
ctx context.Context,
sr *stream.SchemasReplace,
) error {
idMaps := sr.TidySchemaMaps()
clusterID := rc.GetClusterID(ctx)
metaFileName := metautil.PitrIDMapsFilename(clusterID, rc.restoreTS)
metaWriter := metautil.NewMetaWriter(rc.storage, metautil.MetaFileSize, false, metaFileName, nil)
metaWriter.Update(func(m *backuppb.BackupMeta) {
// save log startTS to backupmeta file
m.ClusterId = clusterID
m.DbMaps = idMaps
})

if err := metaWriter.FlushBackupMeta(ctx); err != nil {
backupmeta := &backuppb.BackupMeta{DbMaps: sr.TidySchemaMaps()}
data, err := proto.Marshal(backupmeta)
if err != nil {
return errors.Trace(err)
}
// clean the dirty id map at first
err = rc.se.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %?;", rc.restoreTS, rc.upstreamClusterID)
if err != nil {
return errors.Trace(err)
}
replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?);"
for startIdx, segmentId := 0, 0; startIdx < len(data); segmentId += 1 {
endIdx := startIdx + PITRIdMapBlockSize
if endIdx > len(data) {
endIdx = len(data)
}
err := rc.se.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx])
if err != nil {
return errors.Trace(err)
}
startIdx = endIdx
}

if rc.useCheckpoint {
var items map[int64]model.TiFlashReplicaInfo
if sr.TiflashRecorder != nil {
Expand Down
151 changes: 136 additions & 15 deletions br/pkg/restore/log_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/gluetidb"
"github.com/pingcap/tidb/br/pkg/mock"
logclient "github.com/pingcap/tidb/br/pkg/restore/log_client"
"github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/br/pkg/utiltest"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/planner/core/resolve"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -1341,46 +1347,161 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) {
}
}

type fakeStorage struct {
storage.ExternalStorage
type fakeSession struct {
glue.Session
}

func (fs fakeStorage) FileExists(ctx context.Context, name string) (bool, error) {
return false, errors.Errorf("name: %s", name)
func (fs fakeSession) GetSessionCtx() sessionctx.Context {
return fakeSessionContext{}
}

type fakeStorageOK struct {
storage.ExternalStorage
type fakeSessionContext struct {
sessionctx.Context
}

func (fs fakeStorageOK) FileExists(ctx context.Context, name string) (bool, error) {
return false, nil
func (fsc fakeSessionContext) GetRestrictedSQLExecutor() sqlexec.RestrictedSQLExecutor {
return fakeSQLExecutor{}
}

type fakeSQLExecutor struct {
sqlexec.RestrictedSQLExecutor
}

func (fse fakeSQLExecutor) ExecRestrictedSQL(_ context.Context, _ []sqlexec.OptionFuncAlias, query string, args ...any) ([]chunk.Row, []*resolve.ResultField, error) {
return nil, nil, errors.Errorf("name: %s, %v", query, args)
}

func TestInitSchemasReplaceForDDL(t *testing.T) {
ctx := context.Background()

{
client := logclient.TEST_NewLogClient(123, 1, 2, fakeStorage{}, domain.NewMockDomain())
client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), fakeSession{})
cfg := &logclient.InitSchemaConfig{IsNewTask: false}
_, err := client.InitSchemasReplaceForDDL(ctx, cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to check filename:pitr_id_maps/pitr_id_map.cluster_id:123.restored_ts:2")
require.Regexp(t, "failed to get pitr id map from mysql.tidb_pitr_id_map.* [2, 1]", err.Error())
}

{
client := logclient.TEST_NewLogClient(123, 1, 2, fakeStorage{}, domain.NewMockDomain())
client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), fakeSession{})
cfg := &logclient.InitSchemaConfig{IsNewTask: true}
_, err := client.InitSchemasReplaceForDDL(ctx, cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to check filename:pitr_id_maps/pitr_id_map.cluster_id:123.restored_ts:1")
require.Regexp(t, "failed to get pitr id map from mysql.tidb_pitr_id_map.* [1, 1]", err.Error())
}

{
client := logclient.TEST_NewLogClient(123, 1, 2, fakeStorageOK{}, domain.NewMockDomain())
s := utiltest.CreateRestoreSchemaSuite(t)
tk := testkit.NewTestKit(t, s.Mock.Storage)
tk.Exec(session.CreatePITRIDMap)
g := gluetidb.New()
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), se)
cfg := &logclient.InitSchemaConfig{IsNewTask: true}
_, err := client.InitSchemasReplaceForDDL(ctx, cfg)
_, err = client.InitSchemasReplaceForDDL(ctx, cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "miss upstream table information at `start-ts`(1) but the full backup path is not specified")
}
}

func downstreamID(upstreamID int64) int64 {
return upstreamID + 10000000
}

func emptyDB(startupID, endupID int64, replaces map[int64]*stream.DBReplace) {
for id := startupID; id < endupID; id += 1 {
replaces[id] = &stream.DBReplace{
Name: fmt.Sprintf("db_%d", id),
DbID: downstreamID(id),
}
}
}

func emptyTables(dbupID, startupID, endupID int64, replaces map[int64]*stream.DBReplace) {
tableMap := make(map[int64]*stream.TableReplace)
for id := startupID; id < endupID; id += 1 {
tableMap[id] = &stream.TableReplace{
Name: fmt.Sprintf("table_%d", id),
TableID: downstreamID(id),
}
}
replaces[dbupID] = &stream.DBReplace{
Name: fmt.Sprintf("db_%d", dbupID),
DbID: downstreamID(dbupID),
TableMap: tableMap,
}
}

func partitions(dbupID, tableupID, startupID, endupID int64, replaces map[int64]*stream.DBReplace) {
partitionMap := make(map[int64]int64)
for id := startupID; id < endupID; id += 1 {
partitionMap[id] = downstreamID(id)
}
replaces[dbupID] = &stream.DBReplace{
Name: fmt.Sprintf("db_%d", dbupID),
DbID: downstreamID(dbupID),
TableMap: map[int64]*stream.TableReplace{
tableupID: {
Name: fmt.Sprintf("table_%d", tableupID),
TableID: downstreamID(tableupID),
PartitionMap: partitionMap,
},
},
}
}

func getDBMap() map[int64]*stream.DBReplace {
replaces := make(map[int64]*stream.DBReplace)
emptyDB(1, 3000, replaces)
emptyTables(3000, 3001, 8000, replaces)
partitions(8000, 8001, 8002, 12000, replaces)
emptyTables(12000, 12001, 30000, replaces)
return replaces
}

func TestPITRIDMap(t *testing.T) {
ctx := context.Background()
s := utiltest.CreateRestoreSchemaSuite(t)
tk := testkit.NewTestKit(t, s.Mock.Storage)
tk.Exec(session.CreatePITRIDMap)
g := gluetidb.New()
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
client := logclient.TEST_NewLogClient(123, 1, 2, 3, nil, se)
baseSchemaReplaces := &stream.SchemasReplace{
DbMap: getDBMap(),
}
err = client.TEST_saveIDMap(ctx, baseSchemaReplaces)
require.NoError(t, err)
newSchemaReplaces, err := client.TEST_initSchemasMap(ctx, 1)
require.NoError(t, err)
require.Nil(t, newSchemaReplaces)
client2 := logclient.TEST_NewLogClient(123, 1, 2, 4, nil, se)
newSchemaReplaces, err = client2.TEST_initSchemasMap(ctx, 2)
require.NoError(t, err)
require.Nil(t, newSchemaReplaces)
newSchemaReplaces, err = client.TEST_initSchemasMap(ctx, 2)
require.NoError(t, err)

require.Equal(t, len(baseSchemaReplaces.DbMap), len(newSchemaReplaces))
for _, dbMap := range newSchemaReplaces {
baseDbMap := baseSchemaReplaces.DbMap[dbMap.IdMap.UpstreamId]
require.NotNil(t, baseDbMap)
require.Equal(t, baseDbMap.DbID, dbMap.IdMap.DownstreamId)
require.Equal(t, baseDbMap.Name, dbMap.Name)
require.Equal(t, len(baseDbMap.TableMap), len(dbMap.Tables))
for _, tableMap := range dbMap.Tables {
baseTableMap := baseDbMap.TableMap[tableMap.IdMap.UpstreamId]
require.NotNil(t, baseTableMap)
require.Equal(t, baseTableMap.TableID, tableMap.IdMap.DownstreamId)
require.Equal(t, baseTableMap.Name, tableMap.Name)
require.Equal(t, len(baseTableMap.PartitionMap), len(tableMap.Partitions))
for _, partitionMap := range tableMap.Partitions {
basePartitionMap, exist := baseTableMap.PartitionMap[partitionMap.UpstreamId]
require.True(t, exist)
require.Equal(t, basePartitionMap, partitionMap.DownstreamId)
}
}
}
}
Loading

0 comments on commit 027f01b

Please sign in to comment.