Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: use mDDLTableVersion key to control backfill tables #40984

Merged
merged 5 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,25 @@ var (
ErrInvalidString = dbterror.ClassMeta.NewStd(errno.ErrInvalidCharacterString)
)

// DDLTableVersion is to display ddl related table versions
type DDLTableVersion int

const (
// InitDDLTableVersion is the original version.
InitDDLTableVersion DDLTableVersion = 0
// BaseDDLTableVersion is for support concurrent DDL, it added tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history.
BaseDDLTableVersion DDLTableVersion = 1
// MDLTableVersion is for support MDL tables.
MDLTableVersion DDLTableVersion = 2
// BackfillTableVersion is for support distributed reorg stage, it added tidb_ddl_backfill, tidb_ddl_backfill_history.
BackfillTableVersion DDLTableVersion = 3
)

// Bytes returns the byte slice.
func (ver DDLTableVersion) Bytes() []byte {
return []byte(strconv.Itoa(int(ver)))
}

// Meta is for handling meta information in a transaction.
type Meta struct {
txn *structure.TxStructure
Expand Down Expand Up @@ -619,15 +638,25 @@ func (m *Meta) CreateTableOrView(dbID int64, tableInfo *model.TableInfo) error {
}

// SetDDLTables write a key into storage.
func (m *Meta) SetDDLTables() error {
err := m.txn.Set(mDDLTableVersion, []byte("1"))
func (m *Meta) SetDDLTables(ddlTableVersion DDLTableVersion) error {
err := m.txn.Set(mDDLTableVersion, ddlTableVersion.Bytes())
return errors.Trace(err)
}

// SetMDLTables write a key into storage.
func (m *Meta) SetMDLTables() error {
err := m.txn.Set(mDDLTableVersion, []byte("2"))
return errors.Trace(err)
// CheckDDLTableVersion check if the tables related to concurrent DDL exists.
func (m *Meta) CheckDDLTableVersion() (DDLTableVersion, error) {
v, err := m.txn.Get(mDDLTableVersion)
if err != nil {
return -1, errors.Trace(err)
}
if string(v) == "" {
return InitDDLTableVersion, nil
}
ver, err := strconv.Atoi(string(v))
if err != nil {
return -1, errors.Trace(err)
}
return DDLTableVersion(ver), nil
}

// CreateMySQLDatabaseIfNotExists creates mysql schema and return its DB ID.
Expand Down Expand Up @@ -666,24 +695,6 @@ func (m *Meta) GetSystemDBID() (int64, error) {
return 0, nil
}

// CheckDDLTableExists check if the tables related to concurrent DDL exists.
func (m *Meta) CheckDDLTableExists() (bool, error) {
v, err := m.txn.Get(mDDLTableVersion)
if err != nil {
return false, errors.Trace(err)
}
return len(v) != 0, nil
}

// CheckMDLTableExists check if the tables related to concurrent DDL exists.
func (m *Meta) CheckMDLTableExists() (bool, error) {
v, err := m.txn.Get(mDDLTableVersion)
if err != nil {
return false, errors.Trace(err)
}
return bytes.Equal(v, []byte("2")), nil
}

// SetMetadataLock sets the metadata lock.
func (m *Meta) SetMetadataLock(b bool) error {
var data []byte
Expand Down
37 changes: 36 additions & 1 deletion session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,12 @@ func TestBootstrapWithError(t *testing.T) {
se.txn.init()
se.mu.values = make(map[fmt.Stringer]interface{})
se.SetValue(sessionctx.Initing, true)
err := InitDDLJobTables(store)
err := InitDDLJobTables(store, meta.BaseDDLTableVersion)
require.NoError(t, err)
err = InitMDLTable(store)
require.NoError(t, err)
err = InitDDLJobTables(store, meta.BackfillTableVersion)
require.NoError(t, err)
dom, err := domap.Get(store)
require.NoError(t, err)
domain.BindDomain(se, dom)
Expand Down Expand Up @@ -215,10 +217,43 @@ func TestBootstrapWithError(t *testing.T) {
require.Equal(t, []byte("True"), row.GetBytes(0))
require.NoError(t, r.Close())

mustExec(t, se, "SELECT * from mysql.tidb_ddl_backfill")
mustExec(t, se, "SELECT * from mysql.tidb_ddl_backfill_history")

// Check tidb_ttl_table_status table
mustExec(t, se, "SELECT * from mysql.tidb_ttl_table_status")
}

func TestDDLTableCreateBackfillTable(t *testing.T) {
store, dom := createStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()
se := createSessionAndSetID(t, store)

txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
ver, err := m.CheckDDLTableVersion()
require.NoError(t, err)
require.GreaterOrEqual(t, ver, meta.BackfillTableVersion)

// downgrade `mDDLTableVersion`
m.SetDDLTables(meta.MDLTableVersion)
mustExec(t, se, "drop table mysql.tidb_ddl_backfill")
mustExec(t, se, "drop table mysql.tidb_ddl_backfill_history")
err = txn.Commit(context.Background())
require.NoError(t, err)

// to upgrade session for create ddl related tables
dom.Close()
dom, err = BootstrapSession(store)
require.NoError(t, err)

se = createSessionAndSetID(t, store)
mustExec(t, se, "select * from mysql.tidb_ddl_backfill")
mustExec(t, se, "select * from mysql.tidb_ddl_backfill_history")
dom.Close()
}

// TestUpgrade tests upgrading
func TestUpgrade(t *testing.T) {
ctx := context.Background()
Expand Down
44 changes: 18 additions & 26 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3103,41 +3103,29 @@ func splitAndScatterTable(store kv.Storage, tableIDs []int64) {
}
}

// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg, tidb_ddl_history, tidb_ddl_backfill and tidb_ddl_backfill_history.
func InitDDLJobTables(store kv.Storage) error {
// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_ddl_backfill and tidb_ddl_backfill_history.
func InitDDLJobTables(store kv.Storage, targetVer meta.DDLTableVersion) error {
targetTables := DDLJobTables
if targetVer == meta.BackfillTableVersion {
targetTables = BackfillTables
}
return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
exists, err := t.CheckDDLTableExists()
if err != nil {
tableVer, err := t.CheckDDLTableVersion()
if err != nil || tableVer >= targetVer {
return errors.Trace(err)
}
dbID, err := t.CreateMySQLDatabaseIfNotExists()
if err != nil {
return err
}
if exists {
return initBackfillJobTables(store, t, dbID)
}

if err = createAndSplitTables(store, t, dbID, DDLJobTables); err != nil {
return err
}
if err = initBackfillJobTables(store, t, dbID); err != nil {
if err = createAndSplitTables(store, t, dbID, targetTables); err != nil {
return err
}
return t.SetDDLTables()
return t.SetDDLTables(targetVer)
})
}

// initBackfillJobTables is to create tidb_ddl_backfill and tidb_ddl_backfill_history.
func initBackfillJobTables(store kv.Storage, t *meta.Meta, dbID int64) error {
tblExist, err := t.CheckTableExists(dbID, BackfillTables[0].id)
if err != nil || tblExist {
return errors.Trace(err)
}
return createAndSplitTables(store, t, dbID, BackfillTables)
}

func createAndSplitTables(store kv.Storage, t *meta.Meta, dbID int64, tables []tableBasicInfo) error {
tableIDs := make([]int64, 0, len(tables))
for _, tbl := range tables {
Expand Down Expand Up @@ -3169,8 +3157,8 @@ func createAndSplitTables(store kv.Storage, t *meta.Meta, dbID int64, tables []t
func InitMDLTable(store kv.Storage) error {
return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
exists, err := t.CheckMDLTableExists()
if err != nil || exists {
ver, err := t.CheckDDLTableVersion()
if err != nil || ver >= meta.MDLTableVersion {
return errors.Trace(err)
}
dbID, err := t.CreateMySQLDatabaseIfNotExists()
Expand All @@ -3195,7 +3183,7 @@ func InitMDLTable(store kv.Storage) error {
return errors.Trace(err)
}

return t.SetMDLTables()
return t.SetDDLTables(meta.MDLTableVersion)
})
}

Expand Down Expand Up @@ -3272,14 +3260,18 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}
}
err := InitDDLJobTables(store)
err := InitDDLJobTables(store, meta.BaseDDLTableVersion)
if err != nil {
return nil, err
}
err = InitMDLTable(store)
if err != nil {
return nil, err
}
err = InitDDLJobTables(store, meta.BackfillTableVersion)
if err != nil {
return nil, err
}
ver := getStoreBootstrapVersion(store)
if ver == notBootstrapped {
runInBootstrapSession(store, bootstrap)
Expand Down
12 changes: 9 additions & 3 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ func TestInitMetaTable(t *testing.T) {
tk.MustExec(sql.SQL)
}

for _, sql := range session.BackfillTables {
tk.MustExec(sql.SQL)
}

tbls := map[string]struct{}{
"tidb_ddl_job": {},
"tidb_ddl_reorg": {},
"tidb_ddl_history": {},
"tidb_ddl_job": {},
"tidb_ddl_reorg": {},
"tidb_ddl_history": {},
"tidb_ddl_backfill": {},
"tidb_ddl_backfill_history": {},
}

for tbl := range tbls {
Expand Down