diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel index 2351e2908ea64..76a2f7de49a97 100644 --- a/br/pkg/restore/log_client/BUILD.bazel +++ b/br/pkg/restore/log_client/BUILD.bazel @@ -44,6 +44,7 @@ go_library( "//pkg/util/redact", "//pkg/util/table-filter", "@com_github_fatih_color//:color", + "@com_github_gogo_protobuf//proto", "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", @@ -82,9 +83,10 @@ go_test( ], embed = [":log_client"], flaky = True, - shard_count = 40, + shard_count = 41, deps = [ "//br/pkg/errors", + "//br/pkg/glue", "//br/pkg/gluetidb", "//br/pkg/mock", "//br/pkg/restore/internal/import_client", @@ -97,10 +99,16 @@ go_test( "//br/pkg/utiltest", "//pkg/domain", "//pkg/kv", + "//pkg/planner/core/resolve", + "//pkg/session", + "//pkg/sessionctx", "//pkg/store/pdtypes", "//pkg/tablecodec", + "//pkg/testkit", "//pkg/testkit/testsetup", + "//pkg/util/chunk", "//pkg/util/codec", + "//pkg/util/sqlexec", "//pkg/util/table-filter", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 6c4666725ef4f..9a7ee8ee9cf8d 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -28,6 +28,7 @@ import ( "time" "github.com/fatih/color" + "github.com/gogo/protobuf/proto" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -95,6 +96,8 @@ type LogClient struct { // Can not use `restoreTS` directly, because schema created in `full backup` maybe is new than `restoreTS`. currentTS uint64 + upstreamClusterID uint64 + *LogFileManager workerPool *tidbutil.WorkerPool @@ -167,6 +170,11 @@ func (rc *LogClient) SetConcurrency(c uint) { rc.workerPool = tidbutil.NewWorkerPool(c, "file") } +func (rc *LogClient) SetUpstreamClusterID(upstreamClusterID uint64) { + log.Info("upstream cluster id", zap.Uint64("cluster id", upstreamClusterID)) + rc.upstreamClusterID = upstreamClusterID +} + func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error { var err error rc.storage, err = storage.New(ctx, backend, opts) @@ -558,24 +566,38 @@ func (rc *LogClient) RestoreKVFiles( func (rc *LogClient) initSchemasMap( ctx context.Context, - clusterID uint64, restoreTS uint64, ) ([]*backuppb.PitrDBMap, error) { - filename := metautil.PitrIDMapsFilename(clusterID, restoreTS) - exist, err := rc.storage.FileExists(ctx, filename) - if err != nil { - return nil, errors.Annotatef(err, "failed to check filename:%s ", filename) - } else if !exist { - log.Info("pitr id maps isn't existed", zap.String("file", filename)) + getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;" + execCtx := rc.se.GetSessionCtx().GetRestrictedSQLExecutor() + rows, _, errSQL := execCtx.ExecRestrictedSQL( + kv.WithInternalSourceType(ctx, kv.InternalTxnBR), + nil, + getPitrIDMapSQL, + restoreTS, + rc.upstreamClusterID, + ) + if errSQL != nil { + return nil, errors.Annotatef(errSQL, "failed to get pitr id map from mysql.tidb_pitr_id_map") + } + if len(rows) == 0 { + log.Info("pitr id map does not exist", zap.Uint64("restored ts", restoreTS)) return nil, nil } - - metaData, err := rc.storage.ReadFile(ctx, filename) - if err != nil { - return nil, errors.Trace(err) + metaData := make([]byte, 0, len(rows)*PITRIdMapBlockSize) + for i, row := range rows { + elementID := row.GetUint64(0) + if uint64(i) != elementID { + return nil, errors.Errorf("the part(segment_id = %d) of pitr id map is lost", i) + } + d := row.GetBytes(1) + if len(d) == 0 { + return nil, errors.Errorf("get the empty part(segment_id = %d) of pitr id map", i) + } + metaData = append(metaData, d...) } backupMeta := &backuppb.BackupMeta{} - if err = backupMeta.Unmarshal(metaData); err != nil { + if err := backupMeta.Unmarshal(metaData); err != nil { return nil, errors.Trace(err) } @@ -722,7 +744,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL( if !cfg.IsNewTask { log.Info("try to load pitr id maps") needConstructIdMap = false - dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.restoreTS) + dbMaps, err = rc.initSchemasMap(ctx, rc.restoreTS) if err != nil { return nil, errors.Trace(err) } @@ -733,7 +755,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL( if len(dbMaps) <= 0 && cfg.FullBackupStorage == nil { log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS)) needConstructIdMap = true - dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.startTS) + dbMaps, err = rc.initSchemasMap(ctx, rc.startTS) if err != nil { return nil, errors.Trace(err) } @@ -887,7 +909,7 @@ func (rc *LogClient) PreConstructAndSaveIDMap( return errors.Trace(err) } - if err := rc.SaveIDMap(ctx, sr); err != nil { + if err := rc.saveIDMap(ctx, sr); err != nil { return errors.Trace(err) } return nil @@ -1491,24 +1513,36 @@ func (rc *LogClient) GetGCRows() []*stream.PreDelRangeQuery { return rc.deleteRangeQuery } -// SaveIDMap saves the id mapping information. -func (rc *LogClient) SaveIDMap( +const PITRIdMapBlockSize int = 524288 + +// saveIDMap saves the id mapping information. +func (rc *LogClient) saveIDMap( ctx context.Context, sr *stream.SchemasReplace, ) error { - idMaps := sr.TidySchemaMaps() - clusterID := rc.GetClusterID(ctx) - metaFileName := metautil.PitrIDMapsFilename(clusterID, rc.restoreTS) - metaWriter := metautil.NewMetaWriter(rc.storage, metautil.MetaFileSize, false, metaFileName, nil) - metaWriter.Update(func(m *backuppb.BackupMeta) { - // save log startTS to backupmeta file - m.ClusterId = clusterID - m.DbMaps = idMaps - }) - - if err := metaWriter.FlushBackupMeta(ctx); err != nil { + backupmeta := &backuppb.BackupMeta{DbMaps: sr.TidySchemaMaps()} + data, err := proto.Marshal(backupmeta) + if err != nil { + return errors.Trace(err) + } + // clean the dirty id map at first + err = rc.se.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %?;", rc.restoreTS, rc.upstreamClusterID) + if err != nil { return errors.Trace(err) } + replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?);" + for startIdx, segmentId := 0, 0; startIdx < len(data); segmentId += 1 { + endIdx := startIdx + PITRIdMapBlockSize + if endIdx > len(data) { + endIdx = len(data) + } + err := rc.se.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx]) + if err != nil { + return errors.Trace(err) + } + startIdx = endIdx + } + if rc.useCheckpoint { var items map[int64]model.TiFlashReplicaInfo if sr.TiflashRecorder != nil { diff --git a/br/pkg/restore/log_client/client_test.go b/br/pkg/restore/log_client/client_test.go index aaa8e8c79a60c..7b00e30e6eaa7 100644 --- a/br/pkg/restore/log_client/client_test.go +++ b/br/pkg/restore/log_client/client_test.go @@ -25,16 +25,22 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/gluetidb" "github.com/pingcap/tidb/br/pkg/mock" logclient "github.com/pingcap/tidb/br/pkg/restore/log_client" "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/br/pkg/utiltest" "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/planner/core/resolve" + "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/sqlexec" filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/stretchr/testify/require" "google.golang.org/grpc/keepalive" @@ -1341,46 +1347,161 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) { } } -type fakeStorage struct { - storage.ExternalStorage +type fakeSession struct { + glue.Session } -func (fs fakeStorage) FileExists(ctx context.Context, name string) (bool, error) { - return false, errors.Errorf("name: %s", name) +func (fs fakeSession) GetSessionCtx() sessionctx.Context { + return fakeSessionContext{} } -type fakeStorageOK struct { - storage.ExternalStorage +type fakeSessionContext struct { + sessionctx.Context } -func (fs fakeStorageOK) FileExists(ctx context.Context, name string) (bool, error) { - return false, nil +func (fsc fakeSessionContext) GetRestrictedSQLExecutor() sqlexec.RestrictedSQLExecutor { + return fakeSQLExecutor{} +} + +type fakeSQLExecutor struct { + sqlexec.RestrictedSQLExecutor +} + +func (fse fakeSQLExecutor) ExecRestrictedSQL(_ context.Context, _ []sqlexec.OptionFuncAlias, query string, args ...any) ([]chunk.Row, []*resolve.ResultField, error) { + return nil, nil, errors.Errorf("name: %s, %v", query, args) } func TestInitSchemasReplaceForDDL(t *testing.T) { ctx := context.Background() { - client := logclient.TEST_NewLogClient(123, 1, 2, fakeStorage{}, domain.NewMockDomain()) + client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), fakeSession{}) cfg := &logclient.InitSchemaConfig{IsNewTask: false} _, err := client.InitSchemasReplaceForDDL(ctx, cfg) require.Error(t, err) - require.Contains(t, err.Error(), "failed to check filename:pitr_id_maps/pitr_id_map.cluster_id:123.restored_ts:2") + require.Regexp(t, "failed to get pitr id map from mysql.tidb_pitr_id_map.* [2, 1]", err.Error()) } { - client := logclient.TEST_NewLogClient(123, 1, 2, fakeStorage{}, domain.NewMockDomain()) + client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), fakeSession{}) cfg := &logclient.InitSchemaConfig{IsNewTask: true} _, err := client.InitSchemasReplaceForDDL(ctx, cfg) require.Error(t, err) - require.Contains(t, err.Error(), "failed to check filename:pitr_id_maps/pitr_id_map.cluster_id:123.restored_ts:1") + require.Regexp(t, "failed to get pitr id map from mysql.tidb_pitr_id_map.* [1, 1]", err.Error()) } { - client := logclient.TEST_NewLogClient(123, 1, 2, fakeStorageOK{}, domain.NewMockDomain()) + s := utiltest.CreateRestoreSchemaSuite(t) + tk := testkit.NewTestKit(t, s.Mock.Storage) + tk.Exec(session.CreatePITRIDMap) + g := gluetidb.New() + se, err := g.CreateSession(s.Mock.Storage) + require.NoError(t, err) + client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), se) cfg := &logclient.InitSchemaConfig{IsNewTask: true} - _, err := client.InitSchemasReplaceForDDL(ctx, cfg) + _, err = client.InitSchemasReplaceForDDL(ctx, cfg) require.Error(t, err) require.Contains(t, err.Error(), "miss upstream table information at `start-ts`(1) but the full backup path is not specified") } } + +func downstreamID(upstreamID int64) int64 { + return upstreamID + 10000000 +} + +func emptyDB(startupID, endupID int64, replaces map[int64]*stream.DBReplace) { + for id := startupID; id < endupID; id += 1 { + replaces[id] = &stream.DBReplace{ + Name: fmt.Sprintf("db_%d", id), + DbID: downstreamID(id), + } + } +} + +func emptyTables(dbupID, startupID, endupID int64, replaces map[int64]*stream.DBReplace) { + tableMap := make(map[int64]*stream.TableReplace) + for id := startupID; id < endupID; id += 1 { + tableMap[id] = &stream.TableReplace{ + Name: fmt.Sprintf("table_%d", id), + TableID: downstreamID(id), + } + } + replaces[dbupID] = &stream.DBReplace{ + Name: fmt.Sprintf("db_%d", dbupID), + DbID: downstreamID(dbupID), + TableMap: tableMap, + } +} + +func partitions(dbupID, tableupID, startupID, endupID int64, replaces map[int64]*stream.DBReplace) { + partitionMap := make(map[int64]int64) + for id := startupID; id < endupID; id += 1 { + partitionMap[id] = downstreamID(id) + } + replaces[dbupID] = &stream.DBReplace{ + Name: fmt.Sprintf("db_%d", dbupID), + DbID: downstreamID(dbupID), + TableMap: map[int64]*stream.TableReplace{ + tableupID: { + Name: fmt.Sprintf("table_%d", tableupID), + TableID: downstreamID(tableupID), + PartitionMap: partitionMap, + }, + }, + } +} + +func getDBMap() map[int64]*stream.DBReplace { + replaces := make(map[int64]*stream.DBReplace) + emptyDB(1, 3000, replaces) + emptyTables(3000, 3001, 8000, replaces) + partitions(8000, 8001, 8002, 12000, replaces) + emptyTables(12000, 12001, 30000, replaces) + return replaces +} + +func TestPITRIDMap(t *testing.T) { + ctx := context.Background() + s := utiltest.CreateRestoreSchemaSuite(t) + tk := testkit.NewTestKit(t, s.Mock.Storage) + tk.Exec(session.CreatePITRIDMap) + g := gluetidb.New() + se, err := g.CreateSession(s.Mock.Storage) + require.NoError(t, err) + client := logclient.TEST_NewLogClient(123, 1, 2, 3, nil, se) + baseSchemaReplaces := &stream.SchemasReplace{ + DbMap: getDBMap(), + } + err = client.TEST_saveIDMap(ctx, baseSchemaReplaces) + require.NoError(t, err) + newSchemaReplaces, err := client.TEST_initSchemasMap(ctx, 1) + require.NoError(t, err) + require.Nil(t, newSchemaReplaces) + client2 := logclient.TEST_NewLogClient(123, 1, 2, 4, nil, se) + newSchemaReplaces, err = client2.TEST_initSchemasMap(ctx, 2) + require.NoError(t, err) + require.Nil(t, newSchemaReplaces) + newSchemaReplaces, err = client.TEST_initSchemasMap(ctx, 2) + require.NoError(t, err) + + require.Equal(t, len(baseSchemaReplaces.DbMap), len(newSchemaReplaces)) + for _, dbMap := range newSchemaReplaces { + baseDbMap := baseSchemaReplaces.DbMap[dbMap.IdMap.UpstreamId] + require.NotNil(t, baseDbMap) + require.Equal(t, baseDbMap.DbID, dbMap.IdMap.DownstreamId) + require.Equal(t, baseDbMap.Name, dbMap.Name) + require.Equal(t, len(baseDbMap.TableMap), len(dbMap.Tables)) + for _, tableMap := range dbMap.Tables { + baseTableMap := baseDbMap.TableMap[tableMap.IdMap.UpstreamId] + require.NotNil(t, baseTableMap) + require.Equal(t, baseTableMap.TableID, tableMap.IdMap.DownstreamId) + require.Equal(t, baseTableMap.Name, tableMap.Name) + require.Equal(t, len(baseTableMap.PartitionMap), len(tableMap.Partitions)) + for _, partitionMap := range tableMap.Partitions { + basePartitionMap, exist := baseTableMap.PartitionMap[partitionMap.UpstreamId] + require.True(t, exist) + require.Equal(t, basePartitionMap, partitionMap.DownstreamId) + } + } + } +} diff --git a/br/pkg/restore/log_client/export_test.go b/br/pkg/restore/log_client/export_test.go index 18db7e61fc2d4..db1324d61f12a 100644 --- a/br/pkg/restore/log_client/export_test.go +++ b/br/pkg/restore/log_client/export_test.go @@ -19,13 +19,29 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/domain" ) var FilterFilesByRegion = filterFilesByRegion +func (rc *LogClient) TEST_saveIDMap( + ctx context.Context, + sr *stream.SchemasReplace, +) error { + return rc.saveIDMap(ctx, sr) +} + +func (rc *LogClient) TEST_initSchemasMap( + ctx context.Context, + restoreTS uint64, +) ([]*backuppb.PitrDBMap, error) { + return rc.initSchemasMap(ctx, restoreTS) +} + // readStreamMetaByTS is used for streaming task. collect all meta file by TS, it is for test usage. func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]Meta, error) { metas, err := rc.streamingMeta(ctx) @@ -39,15 +55,16 @@ func (rc *LogFileManager) ReadStreamMeta(ctx context.Context) ([]Meta, error) { return r.Item, nil } -func TEST_NewLogClient(clusterID, startTS, restoreTS uint64, storage storage.ExternalStorage, dom *domain.Domain) *LogClient { +func TEST_NewLogClient(clusterID, startTS, restoreTS, upstreamClusterID uint64, dom *domain.Domain, se glue.Session) *LogClient { return &LogClient{ - dom: dom, + dom: dom, + se: se, + upstreamClusterID: upstreamClusterID, LogFileManager: &LogFileManager{ startTS: startTS, restoreTS: restoreTS, }, clusterID: clusterID, - storage: storage, } } diff --git a/br/pkg/restore/snap_client/systable_restore.go b/br/pkg/restore/snap_client/systable_restore.go index fdc1b88783967..ddc12516268aa 100644 --- a/br/pkg/restore/snap_client/systable_restore.go +++ b/br/pkg/restore/snap_client/systable_restore.go @@ -68,6 +68,8 @@ var unRecoverableTable = map[string]map[string]struct{}{ // replace into view is not supported now "tidb_mdl_view": {}, + + "tidb_pitr_id_map": {}, }, "sys": { // replace into view is not supported now diff --git a/br/pkg/restore/snap_client/systable_restore_test.go b/br/pkg/restore/snap_client/systable_restore_test.go index eb3d6cf2df5f3..608fa7f57392f 100644 --- a/br/pkg/restore/snap_client/systable_restore_test.go +++ b/br/pkg/restore/snap_client/systable_restore_test.go @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) { // // The above variables are in the file br/pkg/restore/systable_restore.go func TestMonitorTheSystemTableIncremental(t *testing.T) { - require.Equal(t, int64(212), session.CurrentBootstrapVersion) + require.Equal(t, int64(213), session.CurrentBootstrapVersion) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 3463df2cbba97..3e45bf6934f1c 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -258,6 +258,7 @@ type RestoreConfig struct { checkpointSnapshotRestoreTaskName string `json:"-" toml:"-"` checkpointLogRestoreTaskName string `json:"-" toml:"-"` checkpointTaskInfoClusterID uint64 `json:"-" toml:"-"` + upstreamClusterID uint64 `json:"-" toml:"-"` WaitTiflashReady bool `json:"wait-tiflash-ready" toml:"wait-tiflash-ready"` // for ebs-based restore diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 1704fbc832711..dc0a5f863637d 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1141,6 +1141,7 @@ func RunStreamRestore( if cfg.RestoreTS == 0 { cfg.RestoreTS = logInfo.logMaxTS } + cfg.upstreamClusterID = logInfo.clusterID if len(cfg.FullBackupStorage) > 0 { startTS, fullClusterID, err := getFullBackupTS(ctx, cfg) @@ -1506,6 +1507,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m } client.SetCrypter(&cfg.CipherInfo) client.SetConcurrency(uint(cfg.Concurrency)) + client.SetUpstreamClusterID(cfg.upstreamClusterID) client.InitClients(ctx, u) err = client.SetRawKVBatchClient(ctx, cfg.PD, cfg.TLS.ToKVSecurity()) diff --git a/br/pkg/version/version.go b/br/pkg/version/version.go index 4b0f3c34cdbad..519797e0daafc 100644 --- a/br/pkg/version/version.go +++ b/br/pkg/version/version.go @@ -165,6 +165,14 @@ func CheckVersionForBRPiTR(s *metapb.Store, tikvVersion *semver.Version) error { s.Address, tikvVersion, build.ReleaseVersion) } } + + if BRVersion.Major > 8 || (BRVersion.Major == 8 && BRVersion.Minor >= 4) { + if tikvVersion.Major < 8 || (tikvVersion.Major == 8 && tikvVersion.Minor < 4) { + return errors.Annotatef(berrors.ErrVersionMismatch, + "TiKV node %s version %s is too old because the PITR id map is written into the cluster system table mysql.tidb_pitr_id_map, please use the tikv with version v8.4.0+", + s.Address, tikvVersion) + } + } return nil } diff --git a/br/pkg/version/version_test.go b/br/pkg/version/version_test.go index 4e7a2966e7dac..853c992113526 100644 --- a/br/pkg/version/version_test.go +++ b/br/pkg/version/version_test.go @@ -157,6 +157,26 @@ func TestCheckClusterVersion(t *testing.T) { require.Regexp(t, `^TiKV .* version mismatch when use PiTR v6.1.0, please `, err.Error()) } + { + build.ReleaseVersion = "v8.4.0" + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: `v6.2.0`}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR) + require.Error(t, err) + require.Regexp(t, `^TiKV .* is too old because the PITR id map is written into`, err.Error()) + } + + { + build.ReleaseVersion = "v8.5.0" + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: `v6.2.0`}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR) + require.Error(t, err) + require.Regexp(t, `^TiKV .* is too old because the PITR id map is written into`, err.Error()) + } + { build.ReleaseVersion = "v4.0.5" mock.getAllStores = func() []*metapb.Store { diff --git a/pkg/ddl/column_type_change_test.go b/pkg/ddl/column_type_change_test.go index 2452809a72c77..b13337bb4a595 100644 --- a/pkg/ddl/column_type_change_test.go +++ b/pkg/ddl/column_type_change_test.go @@ -260,7 +260,7 @@ func TestRowFormatWithChecksums(t *testing.T) { data, err := h.GetMvccByEncodedKey(encodedKey) require.NoError(t, err) // row value with checksums - expected := []byte{0x80, 0x2, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0xea, 0xb0, 0x41, 0x20} + expected := []byte{0x80, 0x2, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0x2b, 0x9, 0x2d, 0x78} require.Equal(t, expected, data.Info.Writes[0].ShortValue) tk.MustExec("drop table if exists t") } @@ -284,7 +284,7 @@ func TestRowLevelChecksumWithMultiSchemaChange(t *testing.T) { data, err := h.GetMvccByEncodedKey(encodedKey) require.NoError(t, err) // checksum skipped and with a null col vv - expected := []byte{0x80, 0x2, 0x3, 0x0, 0x1, 0x0, 0x1, 0x2, 0x4, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0xa8, 0x88, 0x6f, 0xc8} + expected := []byte{0x80, 0x2, 0x3, 0x0, 0x1, 0x0, 0x1, 0x2, 0x4, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0x69, 0x31, 0x3, 0x90} require.Equal(t, expected, data.Info.Writes[0].ShortValue) tk.MustExec("drop table if exists t") } diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index 4f516b9a33585..df13450cb0a27 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -353,7 +353,7 @@ func TestProcessChunkWith(t *testing.T) { require.Len(t, progress.GetColSize(), 3) checksumMap := checksum.GetInnerChecksums() require.Len(t, checksumMap, 1) - require.Equal(t, verify.MakeKVChecksum(111, 3, 14231358899564314836), *checksumMap[verify.DataKVGroupID]) + require.Equal(t, verify.MakeKVChecksum(111, 3, 13867387642099248025), *checksumMap[verify.DataKVGroupID]) }) } diff --git a/pkg/executor/infoschema_cluster_table_test.go b/pkg/executor/infoschema_cluster_table_test.go index cb63dfed7d6a6..8549d16a4b453 100644 --- a/pkg/executor/infoschema_cluster_table_test.go +++ b/pkg/executor/infoschema_cluster_table_test.go @@ -397,7 +397,7 @@ func TestTableStorageStats(t *testing.T) { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - result := 54 + result := 55 require.Len(t, rows, result) // More tests about the privileges. diff --git a/pkg/executor/infoschema_reader_test.go b/pkg/executor/infoschema_reader_test.go index 0f194737886be..50ee4a3fb44dd 100644 --- a/pkg/executor/infoschema_reader_test.go +++ b/pkg/executor/infoschema_reader_test.go @@ -608,7 +608,7 @@ func TestColumnTable(t *testing.T) { testkit.RowsWithSep("|", "test|tbl1|col_2")) tk.MustQuery(`select count(*) from information_schema.columns;`).Check( - testkit.RowsWithSep("|", "4939")) + testkit.RowsWithSep("|", "4944")) } func TestIndexUsageTable(t *testing.T) { @@ -655,7 +655,7 @@ func TestIndexUsageTable(t *testing.T) { testkit.RowsWithSep("|", "test|idt2|idx_4")) tk.MustQuery(`select count(*) from information_schema.tidb_index_usage;`).Check( - testkit.RowsWithSep("|", "72")) + testkit.RowsWithSep("|", "73")) tk.MustQuery(`select TABLE_SCHEMA, TABLE_NAME, INDEX_NAME from information_schema.tidb_index_usage where TABLE_SCHEMA = 'test1';`).Check(testkit.Rows()) @@ -858,22 +858,22 @@ func TestInfoSchemaDDLJobs(t *testing.T) { tk2 := testkit.NewTestKit(t, store) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE table_name = "t1";`).Check(testkit.RowsWithSep("|", - "125|add index /* txn-merge */|public|118|123|t1|synced", - "124|create table|public|118|123|t1|synced", - "111|add index /* txn-merge */|public|104|109|t1|synced", - "110|create table|public|104|109|t1|synced", + "127|add index /* txn-merge */|public|120|125|t1|synced", + "126|create table|public|120|125|t1|synced", + "113|add index /* txn-merge */|public|106|111|t1|synced", + "112|create table|public|106|111|t1|synced", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE db_name = "d1" and JOB_TYPE LIKE "add index%%";`).Check(testkit.RowsWithSep("|", - "131|add index /* txn-merge */|public|118|129|t3|synced", - "128|add index /* txn-merge */|public|118|126|t2|synced", - "125|add index /* txn-merge */|public|118|123|t1|synced", - "122|add index /* txn-merge */|public|118|120|t0|synced", + "133|add index /* txn-merge */|public|120|131|t3|synced", + "130|add index /* txn-merge */|public|120|128|t2|synced", + "127|add index /* txn-merge */|public|120|125|t1|synced", + "124|add index /* txn-merge */|public|120|122|t0|synced", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE db_name = "d0" and table_name = "t3";`).Check(testkit.RowsWithSep("|", - "117|add index /* txn-merge */|public|104|115|t3|synced", - "116|create table|public|104|115|t3|synced", + "119|add index /* txn-merge */|public|106|117|t3|synced", + "118|create table|public|106|117|t3|synced", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE state = "running";`).Check(testkit.Rows()) @@ -884,15 +884,15 @@ func TestInfoSchemaDDLJobs(t *testing.T) { if job.SchemaState == model.StateWriteOnly && loaded.CompareAndSwap(false, true) { tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE table_name = "t0" and state = "running";`).Check(testkit.RowsWithSep("|", - "132|add index /* txn-merge */|write only|104|106|t0|running", + "134|add index /* txn-merge */|write only|106|108|t0|running", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE db_name = "d0" and state = "running";`).Check(testkit.RowsWithSep("|", - "132|add index /* txn-merge */|write only|104|106|t0|running", + "134|add index /* txn-merge */|write only|106|108|t0|running", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE state = "running";`).Check(testkit.RowsWithSep("|", - "132|add index /* txn-merge */|write only|104|106|t0|running", + "134|add index /* txn-merge */|write only|106|108|t0|running", )) } }) @@ -908,8 +908,8 @@ func TestInfoSchemaDDLJobs(t *testing.T) { tk.MustExec("create table test2.t1(id int)") tk.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE db_name = "test2" and table_name = "t1"`).Check(testkit.RowsWithSep("|", - "141|create table|public|138|140|t1|synced", - "136|create table|public|133|135|t1|synced", + "143|create table|public|140|142|t1|synced", + "138|create table|public|135|137|t1|synced", )) // Test explain output, since the output may change in future. diff --git a/pkg/expression/integration_test/integration_test.go b/pkg/expression/integration_test/integration_test.go index a1cd13635bb9d..7c5c83fc2d988 100644 --- a/pkg/expression/integration_test/integration_test.go +++ b/pkg/expression/integration_test/integration_test.go @@ -1116,18 +1116,18 @@ func TestTiDBEncodeKey(t *testing.T) { err := tk.QueryToErr("select tidb_encode_record_key('test', 't1', 0);") require.ErrorContains(t, err, "doesn't exist") tk.MustQuery("select tidb_encode_record_key('test', 't', 1);"). - Check(testkit.Rows("7480000000000000685f728000000000000001")) + Check(testkit.Rows("74800000000000006a5f728000000000000001")) tk.MustExec("alter table t add index i(b);") err = tk.QueryToErr("select tidb_encode_index_key('test', 't', 'i1', 1);") require.ErrorContains(t, err, "index not found") tk.MustQuery("select tidb_encode_index_key('test', 't', 'i', 1, 1);"). - Check(testkit.Rows("7480000000000000685f698000000000000001038000000000000001038000000000000001")) + Check(testkit.Rows("74800000000000006a5f698000000000000001038000000000000001038000000000000001")) tk.MustExec("create table t1 (a int primary key, b int) partition by hash(a) partitions 4;") tk.MustExec("insert into t1 values (1, 1);") - tk.MustQuery("select tidb_encode_record_key('test', 't1(p1)', 1);").Check(testkit.Rows("74800000000000006d5f728000000000000001")) - rs := tk.MustQuery("select tidb_mvcc_info('74800000000000006d5f728000000000000001');") + tk.MustQuery("select tidb_encode_record_key('test', 't1(p1)', 1);").Check(testkit.Rows("74800000000000006f5f728000000000000001")) + rs := tk.MustQuery("select tidb_mvcc_info('74800000000000006f5f728000000000000001');") mvccInfo := rs.Rows()[0][0].(string) require.NotEqual(t, mvccInfo, `{"info":{}}`) @@ -1136,14 +1136,14 @@ func TestTiDBEncodeKey(t *testing.T) { tk2 := testkit.NewTestKit(t, store) err = tk2.Session().Auth(&auth.UserIdentity{Username: "alice", Hostname: "localhost"}, nil, nil, nil) require.NoError(t, err) - err = tk2.QueryToErr("select tidb_mvcc_info('74800000000000006d5f728000000000000001');") + err = tk2.QueryToErr("select tidb_mvcc_info('74800000000000006f5f728000000000000001');") require.ErrorContains(t, err, "Access denied") err = tk2.QueryToErr("select tidb_encode_record_key('test', 't1(p1)', 1);") require.ErrorContains(t, err, "SELECT command denied") err = tk2.QueryToErr("select tidb_encode_index_key('test', 't', 'i1', 1);") require.ErrorContains(t, err, "SELECT command denied") tk.MustExec("grant select on test.t1 to 'alice'@'%';") - tk2.MustQuery("select tidb_encode_record_key('test', 't1(p1)', 1);").Check(testkit.Rows("74800000000000006d5f728000000000000001")) + tk2.MustQuery("select tidb_encode_record_key('test', 't1(p1)', 1);").Check(testkit.Rows("74800000000000006f5f728000000000000001")) } func TestIssue9710(t *testing.T) { diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 8b30966b2b085..ec74727a89189 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -694,6 +694,15 @@ const ( KEY (created_by), KEY (status));` + // CreatePITRIDMap is a table that records the id map from upstream to downstream for PITR. + CreatePITRIDMap = `CREATE TABLE IF NOT EXISTS mysql.tidb_pitr_id_map ( + restored_ts BIGINT NOT NULL, + upstream_cluster_id BIGINT NOT NULL, + segment_id BIGINT NOT NULL, + id_map BLOB(524288) NOT NULL, + update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (restored_ts, upstream_cluster_id, segment_id));` + // DropMySQLIndexUsageTable removes the table `mysql.schema_index_usage` DropMySQLIndexUsageTable = "DROP TABLE IF EXISTS mysql.schema_index_usage" @@ -1120,11 +1129,15 @@ const ( // 2. modify column `plan_digest` type, modify column `time` to `start_time, // modify column `original_sql` to `sample_sql` to `mysql.tidb_runaway_queries`. version212 = 212 + + // version 213 + // create `mysql.tidb_pitr_id_map` table + version213 = 213 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version212 +var currentBootstrapVersion int64 = version213 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 @@ -1292,6 +1305,7 @@ var ( upgradeToVer210, upgradeToVer211, upgradeToVer212, + upgradeToVer213, } ) @@ -3119,6 +3133,14 @@ func upgradeToVer212(s sessiontypes.Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries MODIFY COLUMN `plan_digest` varchar(64) DEFAULT '';", infoschema.ErrColumnExists) } +func upgradeToVer213(s sessiontypes.Session, ver int64) { + if ver >= version213 { + return + } + + mustExecute(s, CreatePITRIDMap) +} + // initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist. func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) @@ -3263,6 +3285,8 @@ func doDDLWorks(s sessiontypes.Session) { mustExecute(s, CreateDistFrameworkMeta) // create request_unit_by_group mustExecute(s, CreateRequestUnitByGroupTable) + // create tidb_pitr_id_map + mustExecute(s, CreatePITRIDMap) // create `sys` schema mustExecute(s, CreateSysSchema) // create `sys.schema_unused_indexes` view diff --git a/pkg/statistics/handle/storage/dump_test.go b/pkg/statistics/handle/storage/dump_test.go index 08470b647b9df..805202c566f85 100644 --- a/pkg/statistics/handle/storage/dump_test.go +++ b/pkg/statistics/handle/storage/dump_test.go @@ -15,12 +15,14 @@ package storage_test import ( + "cmp" "context" "encoding/json" "errors" "fmt" "math" "runtime" + "slices" "strings" "testing" @@ -600,6 +602,10 @@ func TestJSONTableToBlocks(t *testing.T) { dumpJSONTable, err := h.DumpStatsToJSON("test", tableInfo.Meta(), nil, true) require.NoError(t, err) + // the slice is generated from a map loop, which is randomly + slices.SortFunc(dumpJSONTable.PredicateColumns, func(a, b *handleutil.JSONPredicateColumn) int { + return cmp.Compare(a.ID, b.ID) + }) jsOrigin, _ := json.Marshal(dumpJSONTable) blockSize := 30 @@ -608,6 +614,10 @@ func TestJSONTableToBlocks(t *testing.T) { dumpJSONBlocks, err := storage.JSONTableToBlocks(js, blockSize) require.NoError(t, err) jsConverted, err := storage.BlocksToJSONTable(dumpJSONBlocks) + // the slice is generated from a map loop, which is randomly + slices.SortFunc(jsConverted.PredicateColumns, func(a, b *handleutil.JSONPredicateColumn) int { + return cmp.Compare(a.ID, b.ID) + }) require.NoError(t, err) jsonStr, err := json.Marshal(jsConverted) require.NoError(t, err)