Skip to content

Commit

Permalink
ddl: fix alter table charset bug and compatibility (pingcap#9790)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Jun 5, 2019
1 parent 92cbfdc commit 89b2385
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 35 deletions.
118 changes: 114 additions & 4 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,116 @@ func (s *testIntegrationSuite) TestChangingTableCharset(c *C) {
rs.Close()
}
c.Assert(err, NotNil)

rs, err = tk.Exec("alter table t charset utf8mb4")
if rs != nil {
rs.Close()
}
c.Assert(err.Error(), Equals, "[ddl:210]unsupported modify charset from latin1 to utf8mb4")

rs, err = tk.Exec("alter table t charset utf8mb4 collate utf8mb4_bin")
c.Assert(err, NotNil)

rs, err = tk.Exec("alter table t charset ''")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[parser:1115]Unknown character set: ''")

rs, err = tk.Exec("alter table t collate ''")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1273]Unknown collation: ''")

rs, err = tk.Exec("alter table t charset utf8mb4 collate '' collate utf8mb4_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1273]Unknown collation: ''")

rs, err = tk.Exec("alter table t charset latin1 charset utf8 charset utf8mb4 collate utf8_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1302]Conflicting declarations: 'CHARACTER SET latin1' and 'CHARACTER SET utf8'")

rs, err = tk.Exec("alter table t charset utf8 collate utf8mb4_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1253]COLLATION 'utf8mb4_bin' is not valid for CHARACTER SET 'utf8'")

rs, err = tk.Exec("alter table t charset utf8 collate utf8_bin collate utf8mb4_bin collate utf8_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1253]COLLATION 'utf8mb4_bin' is not valid for CHARACTER SET 'utf8'")

// Test change column charset when changing table charset.
tk.MustExec("drop table t;")
tk.MustExec("create table t(a varchar(10)) charset utf8")
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset := func() {
tbl := testGetTableByName(c, tk.Se, "test", "t")
c.Assert(tbl, NotNil)
c.Assert(tbl.Meta().Charset, Equals, charset.CharsetUTF8MB4)
c.Assert(tbl.Meta().Collate, Equals, charset.CollationUTF8MB4)
for _, col := range tbl.Meta().Columns {
c.Assert(col.Charset, Equals, charset.CharsetUTF8MB4)
c.Assert(col.Collate, Equals, charset.CollationUTF8MB4)
}
}
checkCharset()

// Test when column charset can not convert to the target charset.
tk.MustExec("drop table t;")
tk.MustExec("create table t(a varchar(10) character set ascii) charset utf8mb4")
_, err = tk.Exec("alter table t convert to charset utf8mb4;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:210]unsupported modify charset from ascii to utf8mb4")

// Test when table charset is equal to target charset but column charset is not equal.
tk.MustExec("drop table t;")
tk.MustExec("create table t(a varchar(10) character set utf8) charset utf8mb4")
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset()

// Mock table info with charset is "". Old TiDB maybe create table with charset is "".
db, ok := domain.GetDomain(tk.Se).InfoSchema().SchemaByName(model.NewCIStr("test"))
c.Assert(ok, IsTrue)
tbl := testGetTableByName(c, tk.Se, "test", "t")
tblInfo := tbl.Meta().Clone()
tblInfo.Charset = ""
tblInfo.Collate = ""
updateTableInfo := func(tblInfo *model.TableInfo) {
mockCtx := mock.NewContext()
mockCtx.Store = s.store
err = mockCtx.NewTxn()
c.Assert(err, IsNil)
txn, err := mockCtx.Txn(true)
c.Assert(err, IsNil)
mt := meta.NewMeta(txn)

err = mt.UpdateTable(db.ID, tblInfo)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
}
updateTableInfo(tblInfo)

// check table charset is ""
tk.MustExec("alter table t add column b varchar(10);") // load latest schema.
tbl = testGetTableByName(c, tk.Se, "test", "t")
c.Assert(tbl, NotNil)
c.Assert(tbl.Meta().Charset, Equals, "")
c.Assert(tbl.Meta().Collate, Equals, "")
// Test when table charset is "", this for compatibility.
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset()

// Test when column charset is "".
tbl = testGetTableByName(c, tk.Se, "test", "t")
tblInfo = tbl.Meta().Clone()
tblInfo.Columns[0].Charset = ""
tblInfo.Columns[0].Collate = ""
updateTableInfo(tblInfo)
// check table charset is ""
tk.MustExec("alter table t drop column b;") // load latest schema.
tbl = testGetTableByName(c, tk.Se, "test", "t")
c.Assert(tbl, NotNil)
c.Assert(tbl.Meta().Columns[0].Charset, Equals, "")
c.Assert(tbl.Meta().Columns[0].Collate, Equals, "")
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset()
}

func (s *testIntegrationSuite) TestCaseInsensitiveCharsetAndCollate(c *C) {
Expand Down Expand Up @@ -513,15 +623,15 @@ func (s *testIntegrationSuite) TestIgnoreColumnUTF8Charset(c *C) {

// Test for alter table convert charset
config.GetGlobalConfig().TreatOldVersionUTF8AsUTF8MB4 = true
tk.MustExec("alter table t change column b b varchar(40) character set ascii") // reload schema.
tk.MustExec("alter table t drop column b") // reload schema.
tk.MustExec("alter table t convert to charset utf8mb4;")

config.GetGlobalConfig().TreatOldVersionUTF8AsUTF8MB4 = false
tk.MustExec("alter table t change column b b varchar(50) CHARSET ascii") // reload schema.
tk.MustExec("alter table t add column b varchar(50);") // reload schema.
// TODO: fix this after PR 9790.
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n" +
" `a` varchar(20) CHARSET utf8 COLLATE utf8_bin DEFAULT NULL,\n" +
" `b` varchar(50) CHARSET ascii COLLATE ascii_bin DEFAULT NULL\n" +
" `a` varchar(20) DEFAULT NULL,\n" +
" `b` varchar(50) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}

Expand Down
2 changes: 2 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ func (s *testDBSuite) testGetTableByName(c *C, db, table string) table.Table {
func testGetTableByName(c *C, se sessionctx.Context, db, table string) table.Table {
ctx := se.(sessionctx.Context)
dom := domain.GetDomain(ctx)
c.Assert(dom, NotNil)

// Make sure the table schema is the new schema.
err := dom.Reload()
c.Assert(err, IsNil)
Expand Down
9 changes: 9 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ var (
ErrWrongNameForIndex = terror.ClassDDL.New(codeWrongNameForIndex, mysql.MySQLErrName[mysql.ErrWrongNameForIndex])
// ErrUnknownCharacterSet returns unknown character set.
ErrUnknownCharacterSet = terror.ClassDDL.New(codeUnknownCharacterSet, "Unknown character set: '%s'")
// ErrUnknownCollation returns unknown collation.
ErrUnknownCollation = terror.ClassDDL.New(codeUnknownCollation, "Unknown collation: '%s'")
// ErrCollationCharsetMismatch returns when collation not match the charset.
ErrCollationCharsetMismatch = terror.ClassDDL.New(codeCollationCharsetMismatch, mysql.MySQLErrName[mysql.ErrCollationCharsetMismatch])
// ErrConflictingDeclarations return conflict declarations.
ErrConflictingDeclarations = terror.ClassDDL.New(codeConflictingDeclarations, "Conflicting declarations: 'CHARACTER SET %s' and 'CHARACTER SET %s'")
// ErrPrimaryCantHaveNull returns All parts of a PRIMARY KEY must be NOT NULL; if you need NULL in a key, use UNIQUE instead
Expand Down Expand Up @@ -621,6 +625,8 @@ const (
codeWrongNameForIndex = terror.ErrCode(mysql.ErrWrongNameForIndex)
codeErrTooLongIndexComment = terror.ErrCode(mysql.ErrTooLongIndexComment)
codeUnknownCharacterSet = terror.ErrCode(mysql.ErrUnknownCharacterSet)
codeUnknownCollation = terror.ErrCode(mysql.ErrUnknownCollation)
codeCollationCharsetMismatch = terror.ErrCode(mysql.ErrCollationCharsetMismatch)
codeConflictingDeclarations = terror.ErrCode(mysql.ErrConflictingDeclarations)
codeCantCreateTable = terror.ErrCode(mysql.ErrCantCreateTable)
codeTableMustHaveColumns = terror.ErrCode(mysql.ErrTableMustHaveColumns)
Expand Down Expand Up @@ -690,6 +696,9 @@ func init() {
codePrimaryCantHaveNull: mysql.ErrPrimaryCantHaveNull,
codeWrongExprInPartitionFunc: mysql.ErrWrongExprInPartitionFunc,
codeUnknownPartition: mysql.ErrUnknownPartition,
codeUnknownCollation: mysql.ErrUnknownCollation,
codeCollationCharsetMismatch: mysql.ErrCollationCharsetMismatch,
codeConflictingDeclarations: mysql.ErrConflictingDeclarations,
}
terror.ErrClassToMySQLCodes[terror.ClassDDL] = ddlMySQLErrCodes
}
135 changes: 104 additions & 31 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,27 @@ func ResolveCharsetCollation(tblCharset, dbCharset string) (string, string, erro
return charset, collate, nil
}

func typesNeedCharset(tp byte) bool {
switch tp {
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString,
mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob,
mysql.TypeEnum, mysql.TypeSet:
return true
}
return false
}

func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCharset string) error {
tp.Charset = strings.ToLower(tp.Charset)
tp.Collate = strings.ToLower(tp.Collate)
if len(tp.Charset) == 0 {
switch tp.Tp {
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeEnum, mysql.TypeSet:
if typesNeedCharset(tp.Tp) {
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
}
default:
} else {
tp.Charset = charset.CharsetBin
tp.Collate = charset.CharsetBin
}
Expand Down Expand Up @@ -1263,13 +1272,13 @@ func isIgnorableSpec(tp ast.AlterTableType) bool {
// getCharsetAndCollateInTableOption will iterate the charset and collate in the options,
// and returns the last charset and collate in options. If there is no charset in the options,
// the returns charset will be "", the same as collate.
func getCharsetAndCollateInTableOption(startIdx int, options []*ast.TableOption) (charset, collate string) {
charsets := make([]string, len(options))
collates := make([]string, len(options))
func getCharsetAndCollateInTableOption(startIdx int, options []*ast.TableOption) (ca, co string, err error) {
charsets := make([]string, 0, len(options))
collates := make([]string, 0, len(options))
for i := startIdx; i < len(options); i++ {
opt := options[i]
// we set the charset to the last option. example: alter table t charset latin1 charset utf8 collate utf8_bin;
// the charset will be utf8, collate will be utf8_bin
// we set the ca to the last option. example: alter table t ca latin1 ca utf8 co utf8_bin;
// the ca will be utf8, co will be utf8_bin
switch opt.Tp {
case ast.TableOptionCharset:
charsets = append(charsets, opt.StrValue)
Expand All @@ -1278,12 +1287,26 @@ func getCharsetAndCollateInTableOption(startIdx int, options []*ast.TableOption)
}
}

if len(charsets) != 0 {
charset = charsets[len(charsets)-1]
if len(charsets) > 1 {
return "", "", ErrConflictingDeclarations.GenWithStackByArgs(charsets[0], charsets[1])
}
if len(charsets) == 1 {
if charsets[0] == "" {
return "", "", ErrUnknownCharacterSet.GenWithStackByArgs("")
}
ca = charsets[0]
}

if len(collates) != 0 {
collate = collates[len(collates)-1]
for i := range collates {
if collates[i] == "" {
return "", "", ErrUnknownCollation.GenWithStackByArgs("")
}
if len(ca) != 0 && !charset.ValidCharsetAndCollation(ca, collates[i]) {
return "", "", ErrCollationCharsetMismatch.GenWithStackByArgs(collates[i], ca)
}
}
co = collates[len(collates)-1]
}
return
}
Expand Down Expand Up @@ -1371,7 +1394,11 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
if handledCharsetOrCollate {
continue
}
toCharset, toCollate := getCharsetAndCollateInTableOption(i, spec.Options)
var toCharset, toCollate string
toCharset, toCollate, err = getCharsetAndCollateInTableOption(i, spec.Options)
if err != nil {
return err
}
err = d.AlterTableCharsetAndCollate(ctx, ident, toCharset, toCollate)
handledCharsetOrCollate = true
}
Expand Down Expand Up @@ -1718,7 +1745,7 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, colName model.CIS
// modifiableCharsetAndCollation returns error when the charset or collation is not modifiable.
func modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollate string) error {
if !charset.ValidCharsetAndCollation(toCharset, toCollate) {
return ErrUnknownCharacterSet.GenWithStackByArgs(toCharset, toCollate)
return ErrUnknownCharacterSet.GenWithStack("Unknown character set: '%s', collation: '%s'", toCharset, toCollate)
}
if toCharset == charset.CharsetUTF8MB4 && origCharset == charset.CharsetUTF8 {
// TiDB only allow utf8 to be changed to utf8mb4.
Expand Down Expand Up @@ -2152,11 +2179,9 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
}

origCharset := tb.Meta().Charset
origCollate := tb.Meta().Collate
if toCharset == "" {
// charset does not change.
toCharset = origCharset
toCharset = tb.Meta().Charset
}

if toCollate == "" {
Expand All @@ -2166,23 +2191,13 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}
}
// Old version schema charset maybe modified when load schema if TreatOldVersionUTF8AsUTF8MB4 was enable.
// So even if the origCharset equal toCharset, we still need to do the ddl for old version schema.
if origCharset == toCharset && origCollate == toCollate && tb.Meta().Version >= model.TableInfoVersion2 {
// nothing to do.
return nil
}

if err = modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollate); err != nil {
return errors.Trace(err)
doNothing, err := checkAlterTableCharset(tb.Meta(), schema, toCharset, toCollate)
if err != nil {
return err
}

for _, col := range tb.Meta().Cols() {
if col.Tp == mysql.TypeVarchar {
if err = IsTooBigFieldLength(col.Flen, col.Name.O, toCharset); err != nil {
return errors.Trace(err)
}
}
if doNothing {
return nil
}

job := &model.Job{
Expand All @@ -2197,6 +2212,64 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}

// checkAlterTableCharset uses to check is it possible to change the charset of table.
// This function returns 2 variable:
// doNothing: if doNothing is true, means no need to change any more, because the target charset is same with the charset of table.
// err: if err is not nil, means it is not possible to change table charset to target charset.
func checkAlterTableCharset(tblInfo *model.TableInfo, dbInfo *model.DBInfo, toCharset, toCollate string) (doNothing bool, err error) {
origCharset := tblInfo.Charset
origCollate := tblInfo.Collate
// Old version schema charset maybe modified when load schema if TreatOldVersionUTF8AsUTF8MB4 was enable.
// So even if the origCharset equal toCharset, we still need to do the ddl for old version schema.
if origCharset == toCharset && origCollate == toCollate && tblInfo.Version >= model.TableInfoVersion2 {
// nothing to do.
doNothing = true
for _, col := range tblInfo.Columns {
if col.Charset == charset.CharsetBin {
continue
}
if col.Charset == toCharset && col.Collate == toCollate {
continue
}
doNothing = false
}
if doNothing {
return doNothing, nil
}
}

if len(origCharset) == 0 {
// The table charset may be "", if the table is create in old TiDB version, such as v2.0.8.
// This DDL will update the table charset to default charset.
origCharset, origCollate, err = ResolveCharsetCollation("", dbInfo.Charset)
if err != nil {
return doNothing, err
}
}

if err = modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollate); err != nil {
return doNothing, err
}

for _, col := range tblInfo.Columns {
if col.Tp == mysql.TypeVarchar {
if err = IsTooBigFieldLength(col.Flen, col.Name.O, toCharset); err != nil {
return doNothing, err
}
}
if col.Charset == charset.CharsetBin {
continue
}
if len(col.Charset) == 0 {
continue
}
if err = modifiableCharsetAndCollation(toCharset, toCollate, col.Charset, col.Collate); err != nil {
return doNothing, err
}
}
return doNothing, nil
}

// RenameIndex renames an index.
// In TiDB, indexes are case-insensitive (so index 'a' and 'A" are considered the same index),
// but index names are case-sensitive (we can rename index 'a' to 'A')
Expand Down
Loading

0 comments on commit 89b2385

Please sign in to comment.