Skip to content

Commit

Permalink
*: Reorg part delete range (#40543)
Browse files Browse the repository at this point in the history
ref #38535
  • Loading branch information
mjonss committed Jan 19, 2023
1 parent 8db6a00 commit 3eef6a0
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 18 deletions.
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ go_test(
"//util/domainutil",
"//util/gcutil",
"//util/logutil",
"//util/mathutil",
"//util/mock",
"//util/sem",
"//util/sqlexec",
Expand Down
6 changes: 2 additions & 4 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4382,9 +4382,7 @@ PARTITION BY LIST (level) (
tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p1800,p2000 INTO (PARTITION p2000 VALUES LESS THAN (2100))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
tk.MustExec(`INSERT INTO members VALUES (313, "John", "Doe", "2022-11-22", NULL)`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2050))`)
// TODO: uncomment this:
//tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2020))`, "[table:1526]Table has no partition for value 2022")
tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2020))`, "[table:1526]Table has no partition for value 2022")
tk.MustExec(`INSERT INTO member_level (id, level) values (313, 6)`)
// TODO: uncomment this:
//tk.MustContainErrMsg(`ALTER TABLE member_level REORGANIZE PARTITION lEven INTO (PARTITION lEven VALUES IN (2,4))`, "[table:1526]Table has no partition for value 6")
tk.MustContainErrMsg(`ALTER TABLE member_level REORGANIZE PARTITION lEven INTO (PARTITION lEven VALUES IN (2,4))`, "[table:1526]Table has no partition for value 6")
}
92 changes: 90 additions & 2 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand All @@ -59,6 +60,41 @@ type allTableData struct {
tp []string
}

// TODO: Create a more generic function that gets all accessible table ids
// from all schemas, and checks the full key space so that there are no
// keys for non-existing table IDs. Also figure out how to wait for deleteRange
// Checks that there are no accessible data after an existing table
// assumes that tableIDs are only increasing.
// To be used during failure testing of ALTER, to make sure cleanup is done.
func noNewTablesAfter(t *testing.T, ctx sessionctx.Context, tbl table.Table) {
require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx))
txn, err := ctx.Txn(true)
require.NoError(t, err)
defer func() {
err := txn.Rollback()
require.NoError(t, err)
}()
// Get max tableID (if partitioned)
tblID := tbl.Meta().ID
if pt := tbl.GetPartitionedTable(); pt != nil {
defs := pt.Meta().Partition.Definitions
{
for i := range defs {
tblID = mathutil.Max[int64](tblID, defs[i].ID)
}
}
}
prefix := tablecodec.EncodeTablePrefix(tblID + 1)
it, err := txn.Iter(prefix, nil)
require.NoError(t, err)
if it.Valid() {
foundTblID := tablecodec.DecodeTableID(it.Key())
// There are internal table ids starting from MaxInt48 -1 and allocating decreasing ids
// Allow 0xFF of them, See JobTableID, ReorgTableID, HistoryTableID, MDLTableID
require.False(t, it.Key()[0] == 't' && foundTblID < 0xFFFFFFFFFF00, "Found table data after highest physical Table ID %d < %d", tblID, foundTblID)
}
}

func getAllDataForPhysicalTable(t *testing.T, ctx sessionctx.Context, physTable table.PhysicalTable) allTableData {
require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx))
txn, err := ctx.Txn(true)
Expand Down Expand Up @@ -4714,8 +4750,7 @@ func TestReorganizeRangePartition(t *testing.T) {
tk.MustGetErrCode(`alter table t2 reorganize partition p2 into (partition p2a values less than (30), partition p2b values less than (36))`, mysql.ErrRangeNotIncreasing)
tk.MustGetErrCode(`alter table t2 reorganize partition p2 into (partition p2a values less than (30), partition p2b values less than (34))`, mysql.ErrRangeNotIncreasing)
// Also not allowed to change from MAXVALUE to something else IF there are values in the removed range!
// TODO: uncomment this
//tk.MustContainErrMsg(`alter table t2 reorganize partition pMax into (partition p2b values less than (50))`, "[table:1526]Table has no partition for value 56")
tk.MustContainErrMsg(`alter table t2 reorganize partition pMax into (partition p2b values less than (50))`, "[table:1526]Table has no partition for value 56")
tk.MustQuery(`show create table t2`).Check(testkit.Rows("" +
"t2 CREATE TABLE `t2` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
Expand Down Expand Up @@ -5239,6 +5274,59 @@ func TestReorgPartitionFailInject(t *testing.T) {
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
}

func TestReorgPartitionRollback(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
schemaName := "ReorgPartRollback"
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` +
` partition by range (a) ` +
`(partition p0 values less than (10),` +
` partition p1 values less than (20),` +
` partition pMax values less than (MAXVALUE))`)
tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`)
// TODO: Check that there are no additional placement rules,
// bundles, or ranges with non-completed tableIDs
// (partitions used during reorg, but was dropped)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockUpdateVersionAndTableInfoErr", `return(true)`))
tk.MustExecToErr("alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockUpdateVersionAndTableInfoErr"))
ctx := tk.Session()
is := domain.GetDomain(ctx).InfoSchema()
tbl, err := is.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t"))
require.NoError(t, err)
noNewTablesAfter(t, ctx, tbl)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/reorgPartitionAfterDataCopy", `return(true)`))
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/ddl/reorgPartitionAfterDataCopy")
require.NoError(t, err)
}()
tk.MustExecToErr("alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))")
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" `c` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`),\n" +
" KEY `c` (`c`,`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`a`)\n" +
"(PARTITION `p0` VALUES LESS THAN (10),\n" +
" PARTITION `p1` VALUES LESS THAN (20),\n" +
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))

// WASHERE: How to test these?
//tk.MustQuery(`select * from mysql.gc_delete_range_done`).Sort().Check(testkit.Rows())
//time.Sleep(1 * time.Second)
//tk.MustQuery(`select * from mysql.gc_delete_range`).Sort().Check(testkit.Rows())

tbl, err = is.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t"))
require.NoError(t, err)
noNewTablesAfter(t, ctx, tbl)
}

func TestReorgPartitionData(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,8 @@ func jobNeedGC(job *model.Job) bool {
switch job.Type {
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionModifyColumn,
model.ActionAddIndex, model.ActionAddPrimaryKey:
model.ActionAddIndex, model.ActionAddPrimaryKey,
model.ActionReorganizePartition:
return true
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
Expand Down
8 changes: 6 additions & 2 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,13 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
elemID := ea.allocForPhysicalID(tableID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
case model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionReorganizePartition:
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
// partInfo is not used, but is set in ReorgPartition.
// Better to have an additional argument in job.DecodeArgs since it is ignored,
// instead of having one to few, which will remove the data from the job arguments...
var partInfo model.PartitionInfo
if err := job.DecodeArgs(&physicalTableIDs, &partInfo); err != nil {
return errors.Trace(err)
}
for _, physicalTableID := range physicalTableIDs {
Expand Down
8 changes: 4 additions & 4 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
// be finished. Otherwise the query to this partition will be blocked.
needRetry, err := checkPartitionReplica(tblInfo.TiFlashReplica.Count, addingDefinitions, d)
if err != nil {
ver, err = convertAddTablePartitionJob2RollbackJob(d, t, job, err, tblInfo)
return ver, err
return convertAddTablePartitionJob2RollbackJob(d, t, job, err, tblInfo)
}
if needRetry {
// The new added partition hasn't been replicated.
Expand Down Expand Up @@ -250,8 +249,9 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64)
if err != nil {
return false, errors.Wrapf(err, "failed to notify PD label rule")
}
return true, nil
}
return true, nil
return false, nil
}

func alterTablePartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) {
Expand Down Expand Up @@ -1705,7 +1705,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if err != nil {
return ver, errors.Trace(err)
}
if job.Type == model.ActionAddTablePartition {
if job.Type == model.ActionAddTablePartition || job.Type == model.ActionReorganizePartition {
// It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo.
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
Expand Down
3 changes: 2 additions & 1 deletion ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
return 0, errors.Trace(err)
}
return mathutil.Max(len(physicalTableIDs), 1), nil
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
case model.ActionDropTablePartition, model.ActionTruncateTablePartition,
model.ActionReorganizePartition:
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return 0, errors.Trace(err)
Expand Down
1 change: 1 addition & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
zap.Int64("currentSchemaVersion", currentSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)),
zap.Int64("gotSchemaVersion", is.SchemaMetaVersion()),
zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS),
zap.Uint64s("actionTypes", relatedChanges.ActionTypes))
return is, false, currentSchemaVersion, relatedChanges, nil
Expand Down
4 changes: 4 additions & 0 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,10 @@ func (w *GCWorker) doGCPlacementRules(se session.Session, safePoint uint64, dr u
if err = historyJob.DecodeArgs(&physicalTableIDs); err != nil {
return
}
case model.ActionReorganizePartition:
if err = historyJob.DecodeArgs(&physicalTableIDs); err != nil {
return
}
}

if len(physicalTableIDs) == 0 {
Expand Down
7 changes: 3 additions & 4 deletions table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,13 +1307,12 @@ func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model

// GetPartition returns a Table, which is actually a partition.
func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable {
// Attention, can't simply use `return p.partitions[pid]` here.
// Attention, can't simply use `return t.partitions[pid]` here.
// Because A nil of type *partition is a kind of `table.PhysicalTable`
part, ok := t.partitions[pid]
if !ok {
// TODO: remove and just keep return nil
panic("MJONSS: How did we get here?")
//return nil
// Should never happen!
return nil
}
return part
}
Expand Down

0 comments on commit 3eef6a0

Please sign in to comment.