Skip to content

Commit

Permalink
ddl: args v2 for create/drop/modify schema (#55919)
Browse files Browse the repository at this point in the history
ref #53930
  • Loading branch information
D3Hunter committed Sep 10, 2024
1 parent ea6dba8 commit 17c7b90
Show file tree
Hide file tree
Showing 24 changed files with 437 additions and 196 deletions.
1 change: 1 addition & 0 deletions br/pkg/restore/ingestrec/ingest_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
func fakeJob(reorgTp model.ReorgType, jobTp model.ActionType, state model.JobState,
rowCnt int64, indices []*model.IndexInfo, rawArgs json.RawMessage) *model.Job {
return &model.Job{
Version: model.JobVersion1,
SchemaName: SchemaName,
TableName: TableName,
TableID: TableID,
Expand Down
49 changes: 32 additions & 17 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,24 +669,25 @@ var (
)

var (
dropSchemaJob = &model.Job{Type: model.ActionDropSchema, SchemaID: mDDLJobDBOldID, RawArgs: json.RawMessage(`[[71,72,73,74,75]]`)}
dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)}
dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)}
dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
reorganizeTable0Partition1Job = &model.Job{Type: model.ActionReorganizePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
removeTable0Partition1Job = &model.Job{Type: model.ActionRemovePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
alterTable0Partition1Job = &model.Job{Type: model.ActionAlterTablePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
addTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
addTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)}
dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)}
dropTable0ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)}
dropTable1ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)}
modifyTable0ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)}
modifyTable1ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)}
dropSchemaJob *model.Job
dropTable0Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)}
dropTable1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)}
dropTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
reorganizeTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionReorganizePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
removeTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionRemovePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
alterTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionAlterTablePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
rollBackTable0IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
rollBackTable1IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
addTable0IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
addTable1IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
dropTable0IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)}
dropTable1IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)}
dropTable0ColumnJob = &model.Job{Version: model.JobVersion1, Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)}
dropTable1ColumnJob = &model.Job{Version: model.JobVersion1, Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)}
modifyTable0ColumnJob = &model.Job{Version: model.JobVersion1, Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)}
modifyTable1ColumnJob = &model.Job{Version: model.JobVersion1, Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)}
multiSchemaChangeJob0 = &model.Job{
Version: model.JobVersion1,
Type: model.ActionMultiSchemaChange,
SchemaID: mDDLJobDBOldID,
TableID: mDDLJobTable0OldID,
Expand All @@ -704,6 +705,7 @@ var (
},
}
multiSchemaChangeJob1 = &model.Job{
Version: model.JobVersion1,
Type: model.ActionMultiSchemaChange,
SchemaID: mDDLJobDBOldID,
TableID: mDDLJobTable1OldID,
Expand All @@ -722,6 +724,19 @@ var (
}
)

func genFinishedJob(job *model.Job, args model.FinishedJobArgs) *model.Job {
job.FillFinishedArgs(args)
bytes, _ := job.Encode(true)
resJob := &model.Job{}
_ = resJob.Decode(bytes)
return resJob
}

func init() {
dropSchemaJob = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionDropSchema,
SchemaID: mDDLJobDBOldID}, &model.DropSchemaArgs{AllDroppedTableIDs: []int64{71, 72, 73, 74, 75}})
}

type mockInsertDeleteRange struct {
queryCh chan *PreDelRangeQuery
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,11 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
ctx = kv.WithInternalSourceType(ctx, getDDLRequestSource(job.Type))
switch job.Type {
case model.ActionDropSchema:
var tableIDs []int64
if err := job.DecodeArgs(&tableIDs); err != nil {
args, err := model.GetFinishedDropSchemaArgs(job)
if err != nil {
return errors.Trace(err)
}
tableIDs := args.AllDroppedTableIDs
for i := 0; i < len(tableIDs); i += batchInsertDeleteRangeSize {
batchEnd := len(tableIDs)
if batchEnd > i+batchInsertDeleteRangeSize {
Expand Down Expand Up @@ -315,7 +316,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID"))
case model.ActionTruncateTable:
tableID := job.TableID
args, err := model.GetTruncateTableArgsAfterRun(job)
args, err := model.GetFinishedTruncateTableArgs(job)
if err != nil {
return errors.Trace(err)
}
Expand Down
22 changes: 17 additions & 5 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,20 @@ func (e *executor) CreateSchemaWithInfo(
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaName: dbInfo.Name.L,
Type: model.ActionCreateSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []any{dbInfo},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: dbInfo.Name.L,
Table: model.InvolvingAll,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
job.FillArgs(&model.CreateSchemaArgs{
DBInfo: dbInfo,
})
if ref := dbInfo.PlacementPolicyRef; ref != nil {
job.InvolvingSchemaInfo = append(job.InvolvingSchemaInfo, model.InvolvingSchemaInfo{
Policy: ref.Name.L,
Expand Down Expand Up @@ -348,18 +351,22 @@ func (e *executor) ModifySchemaCharsetAndCollate(ctx sessionctx.Context, stmt *a
}
// Do the DDL job.
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
Type: model.ActionModifySchemaCharsetAndCollate,
BinlogInfo: &model.HistoryInfo{},
Args: []any{toCharset, toCollate},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: dbInfo.Name.L,
Table: model.InvolvingAll,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
job.FillArgs(&model.ModifySchemaArgs{
ToCharset: toCharset,
ToCollate: toCollate,
})
err = e.DoDDLJob(ctx, job)
return errors.Trace(err)
}
Expand All @@ -383,18 +390,20 @@ func (e *executor) ModifySchemaDefaultPlacement(ctx sessionctx.Context, stmt *as

// Do the DDL job.
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
Type: model.ActionModifySchemaDefaultPlacement,
BinlogInfo: &model.HistoryInfo{},
Args: []any{placementPolicyRef},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: dbInfo.Name.L,
Table: model.InvolvingAll,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
job.FillArgs(&model.ModifySchemaArgs{PolicyRef: placementPolicyRef})

if placementPolicyRef != nil {
job.InvolvingSchemaInfo = append(job.InvolvingSchemaInfo, model.InvolvingSchemaInfo{
Policy: placementPolicyRef.Name.L,
Expand Down Expand Up @@ -741,19 +750,22 @@ func (e *executor) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt
return err
}
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: old.ID,
SchemaName: old.Name.L,
SchemaState: old.State,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []any{fkCheck},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: old.Name.L,
Table: model.InvolvingAll,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
job.FillArgs(&model.DropSchemaArgs{
FKCheck: fkCheck,
})

err = e.DoDDLJob(ctx, job)
if err != nil {
Expand Down Expand Up @@ -6429,7 +6441,7 @@ func getTruncateTableNewTableID(job *model.Job) int64 {
if job.Version == model.JobVersion1 {
return job.Args[0].(int64)
}
return job.ArgsV2.(*model.TruncateTableArgs).NewTableID
return job.Args[0].(*model.TruncateTableArgs).NewTableID
}

// HandleLockTablesOnSuccessSubmit handles the table lock for the job which is submitted
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,12 @@ func checkDatabaseHasForeignKeyReferredInOwner(jobCtx *jobContext, job *model.Jo
if !variable.EnableForeignKey.Load() {
return nil
}
var fkCheck bool
err := job.DecodeArgs(&fkCheck)
args, err := model.GetDropSchemaArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return errors.Trace(err)
}
fkCheck := args.FKCheck
if !fkCheck {
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/ddl/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ false]`),
}

for _, c := range cases {
job := &model.Job{RawArgs: c.raw}
job := &model.Job{
Version: model.JobVersion1,
Type: model.ActionAddIndex,
RawArgs: c.raw,
}
uniques, indexNames, specs, indexOptions, hiddenCols, err := decodeAddIndexArgs(job)
require.NoError(t, err)
require.Equal(t, c.uniques, uniques)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func (s *jobScheduler) loadAndDeliverJobs(se *sess.Session) error {
if err != nil {
return errors.Trace(err)
}
intest.Assert(job.Version > 0, "job version should be greater than 0")

involving := job.GetInvolvingSchemaInfo()
if targetPool.available() == 0 {
Expand Down
11 changes: 8 additions & 3 deletions pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func getRequiredGIDCount(jobWs []*JobWrapper) int {
partCount := jobW.Args[3].(int)
count += 1 + partCount
} else {
count += 1 + len(jobW.ArgsV2.(*model.TruncateTableArgs).OldPartitionIDs)
count += 1 + len(jobW.Args[0].(*model.TruncateTableArgs).OldPartitionIDs)
}
}
}
Expand All @@ -550,7 +550,12 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) {
}
}
case model.ActionCreateSchema:
dbInfo := jobW.Args[0].(*model.DBInfo)
var dbInfo *model.DBInfo
if jobW.Version == model.JobVersion1 {
dbInfo = jobW.Args[0].(*model.DBInfo)
} else {
dbInfo = jobW.Args[0].(*model.CreateSchemaArgs).DBInfo
}
if !jobW.IDAllocated {
dbInfo.ID = alloc.next()
}
Expand Down Expand Up @@ -603,7 +608,7 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) {
}
jobW.Args[2] = partIDs
} else {
args := jobW.ArgsV2.(*model.TruncateTableArgs)
args := jobW.Args[0].(*model.TruncateTableArgs)
args.NewTableID = alloc.next()
partIDs := make([]int64, len(args.OldPartitionIDs))
for i := range partIDs {
Expand Down
17 changes: 9 additions & 8 deletions pkg/ddl/job_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,12 @@ func TestCombinedIDAllocation(t *testing.T) {

genCreateDBJob := func() *model.Job {
info := &model.DBInfo{}
return &model.Job{
Version: model.JobVersion1,
j := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionCreateSchema,
Args: []any{info},
}
j.FillArgs(&model.CreateSchemaArgs{DBInfo: info})
return j
}

genRGroupJob := func() *model.Job {
Expand Down Expand Up @@ -413,10 +414,10 @@ func TestCombinedIDAllocation(t *testing.T) {
}
case model.ActionCreateSchema:
require.Greater(t, j.SchemaID, initialGlobalID)
info := &model.DBInfo{}
require.NoError(t, j.DecodeArgs(info))
uniqueIDs[info.ID] = struct{}{}
require.Equal(t, j.SchemaID, info.ID)
args, err := model.GetCreateSchemaArgs(j)
require.NoError(t, err)
uniqueIDs[args.DBInfo.ID] = struct{}{}
require.Equal(t, j.SchemaID, args.DBInfo.ID)
case model.ActionCreateResourceGroup:
info := &model.ResourceGroupInfo{}
require.NoError(t, j.DecodeArgs(info))
Expand Down Expand Up @@ -449,7 +450,7 @@ func TestCombinedIDAllocation(t *testing.T) {
checkPartitionInfo(info)
checkID(info.NewTableID)
case model.ActionTruncateTable:
args, err := model.GetTruncateTableArgsBeforeRun(j)
args, err := model.GetTruncateTableArgs(j)
require.NoError(t, err)
checkID(args.NewTableID)
for _, id := range args.NewPartitionIDs {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ func TestSchemaResume(t *testing.T) {
dbInfo, err := testSchemaInfo(store, "test_restart")
require.NoError(t, err)
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
Type: model.ActionCreateSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []any{dbInfo},
}
job.FillArgs(&model.CreateSchemaArgs{DBInfo: dbInfo})
testRunInterruptedJob(t, store, dom, job)
testCheckSchemaState(t, store, dbInfo, model.StatePublic)

Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
}
switch job.Type {
case model.ActionDropSchema:
var tableIDs []int64
if err := job.DecodeArgs(&tableIDs); err != nil {
args, err := model.GetFinishedDropSchemaArgs(job)
if err != nil {
return 0, errors.Trace(err)
}
return len(tableIDs), nil
return len(args.AllDroppedTableIDs), nil
case model.ActionDropTable:
var startKey kv.Key
var physicalTableIDs []int64
Expand All @@ -100,7 +100,7 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
}
return len(physicalTableIDs) + 1, nil
case model.ActionTruncateTable:
args, err := model.GetTruncateTableArgsAfterRun(job)
args, err := model.GetFinishedTruncateTableArgs(job)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 17c7b90

Please sign in to comment.