From 29b688995e36e75136d7669e4013685c780850f7 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Sat, 7 Sep 2024 01:34:35 +0800 Subject: [PATCH] create view create tbl/seq change todo change ut test change fix test test --- pkg/ddl/create_table.go | 54 +++++++-------- pkg/ddl/ddl.go | 12 ++++ pkg/ddl/ddl_test.go | 2 +- pkg/ddl/executor.go | 101 +++++++++++++-------------- pkg/ddl/executor_nokit_test.go | 116 ++++++++++++++------------------ pkg/ddl/executor_test.go | 3 +- pkg/ddl/job_scheduler.go | 33 +++++---- pkg/ddl/job_submitter.go | 110 +++++++++++++----------------- pkg/ddl/job_submitter_test.go | 89 +++++++++++++----------- pkg/ddl/restart_test.go | 20 +++--- pkg/ddl/schema_test.go | 14 ++-- pkg/ddl/schema_version.go | 50 ++++++++------ pkg/ddl/sequence.go | 7 +- pkg/ddl/table_test.go | 51 +++++++------- pkg/meta/model/BUILD.bazel | 2 +- pkg/meta/model/job.go | 2 + pkg/meta/model/job_args.go | 95 ++++++++++++++++++++++++++ pkg/meta/model/job_args_test.go | 69 +++++++++++++++++++ 18 files changed, 497 insertions(+), 333 deletions(-) diff --git a/pkg/ddl/create_table.go b/pkg/ddl/create_table.go index af4dafa0f7198..0fa8363f3418b 100644 --- a/pkg/ddl/create_table.go +++ b/pkg/ddl/create_table.go @@ -55,9 +55,9 @@ import ( // DANGER: it is an internal function used by onCreateTable and onCreateTables, for reusing code. Be careful. // 1. it expects the argument of job has been deserialized. // 2. it won't call updateSchemaVersion, FinishTableJob and asyncNotifyEvent. -func createTable(jobCtx *jobContext, t *meta.Meta, job *model.Job, fkCheck bool) (*model.TableInfo, error) { +func createTable(jobCtx *jobContext, t *meta.Meta, job *model.Job, args *model.CreateTableArgs) (*model.TableInfo, error) { schemaID := job.SchemaID - tbInfo := job.Args[0].(*model.TableInfo) + tbInfo, fkCheck := args.TableInfo, args.FKCheck tbInfo.State = model.StateNone err := checkTableNotExists(jobCtx.infoCache, schemaID, tbInfo.Name.L) @@ -157,20 +157,19 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, } }) - // just decode, createTable will use it as Args[0] - tbInfo := &model.TableInfo{} - fkCheck := false - if err := job.DecodeArgs(tbInfo, &fkCheck); err != nil { + args, err := model.GetCreateTableArgs(job) + if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } + tbInfo := args.TableInfo if len(tbInfo.ForeignKeys) > 0 { - return createTableWithForeignKeys(jobCtx, t, job, tbInfo, fkCheck) + return createTableWithForeignKeys(jobCtx, t, job, args) } - tbInfo, err := createTable(jobCtx, t, job, fkCheck) + tbInfo, err = createTable(jobCtx, t, job, args) if err != nil { return ver, errors.Trace(err) } @@ -189,14 +188,15 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, return ver, errors.Trace(err) } -func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job, tbInfo *model.TableInfo, fkCheck bool) (ver int64, err error) { +func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job, args *model.CreateTableArgs) (ver int64, err error) { + tbInfo := args.TableInfo switch tbInfo.State { case model.StateNone, model.StatePublic: // create table in non-public or public state. The function `createTable` will always reset // the `tbInfo.State` with `model.StateNone`, so it's fine to just call the `createTable` with // public state. // when `br` restores table, the state of `tbInfo` will be public. - tbInfo, err = createTable(jobCtx, t, job, fkCheck) + tbInfo, err = createTable(jobCtx, t, job, args) if err != nil { return ver, errors.Trace(err) } @@ -227,37 +227,38 @@ func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, error) { var ver int64 - var args []*model.TableInfo - fkCheck := false - err := job.DecodeArgs(&args, &fkCheck) + args, err := model.GetBatchCreateTableArgs(job) if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } + tableInfos := make([]*model.TableInfo, 0, len(args.Tables)) // We don't construct jobs for every table, but only tableInfo // The following loop creates a stub job for every table // // it clones a stub job from the ActionCreateTables job stubJob := job.Clone() stubJob.Args = make([]any, 1) - for i := range args { - stubJob.TableID = args[i].ID - stubJob.Args[0] = args[i] - if args[i].Sequence != nil { - err := createSequenceWithCheck(t, stubJob, args[i]) + for i := range args.Tables { + tblArgs := args.Tables[i] + tableInfo := tblArgs.TableInfo + stubJob.TableID = tableInfo.ID + if tableInfo.Sequence != nil { + err := createSequenceWithCheck(t, stubJob, tableInfo) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + tableInfos = append(tableInfos, tableInfo) } else { - tbInfo, err := createTable(jobCtx, t, stubJob, fkCheck) + tbInfo, err := createTable(jobCtx, t, stubJob, tblArgs) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - args[i] = tbInfo + tableInfos = append(tableInfos, tbInfo) } } @@ -268,10 +269,10 @@ func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, er job.State = model.JobStateDone job.SchemaState = model.StatePublic - job.BinlogInfo.SetTableInfos(ver, args) - for i := range args { + job.BinlogInfo.SetTableInfos(ver, tableInfos) + for i := range tableInfos { createTableEvent := &statsutil.DDLEvent{ - SchemaChangeEvent: util.NewCreateTableEvent(args[i]), + SchemaChangeEvent: util.NewCreateTableEvent(tableInfos[i]), } asyncNotifyEvent(jobCtx, createTableEvent, job) } @@ -290,14 +291,13 @@ func createTableOrViewWithCheck(t *meta.Meta, job *model.Job, schemaID int64, tb func onCreateView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID - tbInfo := &model.TableInfo{} - var orReplace bool - var _placeholder int64 // oldTblInfoID - if err := job.DecodeArgs(tbInfo, &orReplace, &_placeholder); err != nil { + args, err := model.GetCreateTableArgs(job) + if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } + tbInfo, orReplace := args.TableInfo, args.OnExistReplace tbInfo.State = model.StateNone oldTableID, err := findTableIDByName(jobCtx.infoCache, t, schemaID, tbInfo.Name.L) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index e1b42f3ee8943..2e012a34412e5 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -204,6 +204,7 @@ type JobWrapper struct { // IDAllocated see config of same name in CreateTableConfig. // exported for test. IDAllocated bool + JobArgs model.JobArgs // job submission is run in async, we use this channel to notify the caller. // when fast create table enabled, we might combine multiple jobs into one, and // append the channel to this slice. @@ -221,6 +222,17 @@ func NewJobWrapper(job *model.Job, idAllocated bool) *JobWrapper { } } +// NewJobWrapperWithArgs creates a new JobWrapper with job args. +// TODO: merge with NewJobWrapper later. +func NewJobWrapperWithArgs(job *model.Job, args model.JobArgs, idAllocated bool) *JobWrapper { + return &JobWrapper{ + Job: job, + IDAllocated: idAllocated, + JobArgs: args, + ResultCh: []chan jobSubmitResult{make(chan jobSubmitResult)}, + } +} + // NotifyResult notifies the job submit result. func (t *JobWrapper) NotifyResult(err error) { merged := len(t.ResultCh) > 1 diff --git a/pkg/ddl/ddl_test.go b/pkg/ddl/ddl_test.go index a96b1cd7ce893..47313c49bc5b6 100644 --- a/pkg/ddl/ddl_test.go +++ b/pkg/ddl/ddl_test.go @@ -214,7 +214,7 @@ func TestBuildJobDependence(t *testing.T) { job6 := &model.Job{ID: 6, TableID: 1, Type: model.ActionDropTable} job7 := &model.Job{ID: 7, TableID: 2, Type: model.ActionModifyColumn} job9 := &model.Job{ID: 9, SchemaID: 111, Type: model.ActionDropSchema} - job11 := &model.Job{ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}} + job11 := &model.Job{Version: model.JobVersion1, ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}} err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) require.NoError(t, m.EnQueueDDLJob(job1)) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 222f26339116d..45296b69136b5 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -1054,8 +1054,8 @@ func (e *executor) createTableWithInfoJob( dbName pmodel.CIStr, tbInfo *model.TableInfo, involvingRef []model.InvolvingSchemaInfo, - onExist OnExist, -) (job *model.Job, err error) { + cfg CreateTableConfig, +) (jobW *JobWrapper, err error) { is := e.infoCache.GetLatest() schema, ok := is.SchemaByName(dbName) if !ok { @@ -1069,7 +1069,7 @@ func (e *executor) createTableWithInfoJob( var oldViewTblID int64 if oldTable, err := is.TableByName(e.ctx, schema.Name, tbInfo.Name); err == nil { err = infoschema.ErrTableExists.GenWithStackByArgs(ast.Ident{Schema: schema.Name, Name: tbInfo.Name}) - switch onExist { + switch cfg.OnExist { case OnExistIgnore: ctx.GetSessionVars().StmtCtx.AppendNote(err) return nil, nil @@ -1094,16 +1094,13 @@ func (e *executor) createTableWithInfoJob( } var actionType model.ActionType - args := []any{tbInfo} switch { case tbInfo.View != nil: actionType = model.ActionCreateView - args = append(args, onExist == OnExistReplace, oldViewTblID) case tbInfo.Sequence != nil: actionType = model.ActionCreateSequence default: actionType = model.ActionCreateTable - args = append(args, ctx.GetSessionVars().ForeignKeyChecks) } var involvingSchemas []model.InvolvingSchemaInfo @@ -1119,18 +1116,24 @@ func (e *executor) createTableWithInfoJob( involvingSchemas = append(involvingSchemas, sharedInvolvingFromTableInfo...) } - job = &model.Job{ + job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: schema.ID, SchemaName: schema.Name.L, TableName: tbInfo.Name.L, Type: actionType, BinlogInfo: &model.HistoryInfo{}, - Args: args, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: involvingSchemas, SQLMode: ctx.GetSessionVars().SQLMode, } - return job, nil + args := &model.CreateTableArgs{ + TableInfo: tbInfo, + OnExistReplace: cfg.OnExist == OnExistReplace, + OldViewTblID: oldViewTblID, + FKCheck: ctx.GetSessionVars().ForeignKeyChecks, + } + return NewJobWrapperWithArgs(job, args, cfg.IDAllocated), nil } func getSharedInvolvingSchemaInfo(info *model.TableInfo) []model.InvolvingSchemaInfo { @@ -1200,18 +1203,14 @@ func (e *executor) CreateTableWithInfo( ) (err error) { c := GetCreateTableConfig(cs) - job, err := e.createTableWithInfoJob( - ctx, dbName, tbInfo, involvingRef, c.OnExist, - ) + jobW, err := e.createTableWithInfoJob(ctx, dbName, tbInfo, involvingRef, c) if err != nil { return err } - if job == nil { + if jobW == nil { return nil } - jobW := NewJobWrapper(job, c.IDAllocated) - err = e.DoDDLJobWrapper(ctx, jobW) if err != nil { // table exists, but if_not_exists flags is true, so we ignore this error. @@ -1220,7 +1219,7 @@ func (e *executor) CreateTableWithInfo( err = nil } } else { - err = e.createTableWithInfoPost(ctx, tbInfo, job.SchemaID) + err = e.createTableWithInfoPost(ctx, tbInfo, jobW.SchemaID) } return errors.Trace(err) @@ -1239,15 +1238,12 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context, }) c := GetCreateTableConfig(cs) - jobW := NewJobWrapper( - &model.Job{ - BinlogInfo: &model.HistoryInfo{}, - CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - SQLMode: ctx.GetSessionVars().SQLMode, - }, - c.IDAllocated, - ) - args := make([]*model.TableInfo, 0, len(infos)) + job := &model.Job{ + Version: model.GetJobVerInUse(), + BinlogInfo: &model.HistoryInfo{}, + CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, + SQLMode: ctx.GetSessionVars().SQLMode, + } var err error @@ -1269,43 +1265,41 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context, duplication[info.Name.L] = struct{}{} } + args := &model.BatchCreateTableArgs{ + Tables: make([]*model.CreateTableArgs, 0, len(infos)), + } for _, info := range infos { - job, err := e.createTableWithInfoJob(ctx, dbName, info, nil, c.OnExist) + jobItem, err := e.createTableWithInfoJob(ctx, dbName, info, nil, c) if err != nil { return errors.Trace(err) } - if job == nil { + if jobItem == nil { continue } // if jobW.Type == model.ActionCreateTables, it is initialized // if not, initialize jobW by job.XXXX - if jobW.Type != model.ActionCreateTables { - jobW.Type = model.ActionCreateTables - jobW.SchemaID = job.SchemaID - jobW.SchemaName = job.SchemaName + if job.Type != model.ActionCreateTables { + job.Type = model.ActionCreateTables + job.SchemaID = jobItem.SchemaID + job.SchemaName = jobItem.SchemaName } // append table job args - info, ok := job.Args[0].(*model.TableInfo) - if !ok { - return errors.Trace(fmt.Errorf("except table info")) - } - args = append(args, info) - jobW.InvolvingSchemaInfo = append(jobW.InvolvingSchemaInfo, model.InvolvingSchemaInfo{ + args.Tables = append(args.Tables, jobItem.JobArgs.(*model.CreateTableArgs)) + job.InvolvingSchemaInfo = append(job.InvolvingSchemaInfo, model.InvolvingSchemaInfo{ Database: dbName.L, Table: info.Name.L, }) if sharedInv := getSharedInvolvingSchemaInfo(info); len(sharedInv) > 0 { - jobW.InvolvingSchemaInfo = append(jobW.InvolvingSchemaInfo, sharedInv...) + job.InvolvingSchemaInfo = append(job.InvolvingSchemaInfo, sharedInv...) } } - if len(args) == 0 { + if len(args.Tables) == 0 { return nil } - jobW.Args = append(jobW.Args, args) - jobW.Args = append(jobW.Args, ctx.GetSessionVars().ForeignKeyChecks) + jobW := NewJobWrapperWithArgs(job, args, c.IDAllocated) err = e.DoDDLJobWrapper(ctx, jobW) if err != nil { // table exists, but if_not_exists flags is true, so we ignore this error. @@ -1316,8 +1310,8 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context, return errors.Trace(err) } - for j := range args { - if err = e.createTableWithInfoPost(ctx, args[j], jobW.SchemaID); err != nil { + for _, tblArgs := range args.Tables { + if err = e.createTableWithInfoPost(ctx, tblArgs.TableInfo, jobW.SchemaID); err != nil { return errors.Trace(err) } } @@ -4217,11 +4211,11 @@ func (e *executor) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error { CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - job.FillArgs(&model.TruncateTableArgs{ + args := &model.TruncateTableArgs{ FKCheck: fkCheck, OldPartitionIDs: oldPartitionIDs, - }) - err = e.DoDDLJob(ctx, job) + } + err = e.doDDLJob2(ctx, job, args) if err != nil { return errors.Trace(err) } @@ -6257,6 +6251,10 @@ func (e *executor) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { return e.DoDDLJobWrapper(ctx, NewJobWrapper(job, false)) } +func (e *executor) doDDLJob2(ctx sessionctx.Context, job *model.Job, args model.JobArgs) error { + return e.DoDDLJobWrapper(ctx, NewJobWrapperWithArgs(job, args, false)) +} + // DoDDLJobWrapper submit DDL job and wait it finishes. // When fast create is enabled, we might merge multiple jobs into one, so do not // depend on job.ID, use JobID from jobSubmitResult. @@ -6437,13 +6435,6 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (re } } -func getTruncateTableNewTableID(job *model.Job) int64 { - if job.Version == model.JobVersion1 { - return job.Args[0].(int64) - } - return job.Args[0].(*model.TruncateTableArgs).NewTableID -} - // HandleLockTablesOnSuccessSubmit handles the table lock for the job which is submitted // successfully. exported for testing purpose. func HandleLockTablesOnSuccessSubmit(ctx sessionctx.Context, jobW *JobWrapper) { @@ -6451,7 +6442,7 @@ func HandleLockTablesOnSuccessSubmit(ctx sessionctx.Context, jobW *JobWrapper) { if ok, lockTp := ctx.CheckTableLocked(jobW.TableID); ok { ctx.AddTableLock([]model.TableLockTpInfo{{ SchemaID: jobW.SchemaID, - TableID: getTruncateTableNewTableID(jobW.Job), + TableID: jobW.JobArgs.(*model.TruncateTableArgs).NewTableID, Tp: lockTp, }}) } @@ -6463,7 +6454,7 @@ func HandleLockTablesOnSuccessSubmit(ctx sessionctx.Context, jobW *JobWrapper) { func HandleLockTablesOnFinish(ctx sessionctx.Context, jobW *JobWrapper, ddlErr error) { if jobW.Type == model.ActionTruncateTable { if ddlErr != nil { - newTableID := getTruncateTableNewTableID(jobW.Job) + newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID ctx.ReleaseTableLockByTableIDs([]int64{newTableID}) return } diff --git a/pkg/ddl/executor_nokit_test.go b/pkg/ddl/executor_nokit_test.go index e1c5fb416b77b..d89426fad0a6e 100644 --- a/pkg/ddl/executor_nokit_test.go +++ b/pkg/ddl/executor_nokit_test.go @@ -69,20 +69,20 @@ func TestBuildQueryStringFromJobs(t *testing.T) { } func TestMergeCreateTableJobsOfSameSchema(t *testing.T) { - job1 := NewJobWrapper(&model.Job{ + job1 := NewJobWrapperWithArgs(&model.Job{ + Version: model.GetJobVerInUse(), SchemaID: 1, Type: model.ActionCreateTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{&model.TableInfo{Name: pmodel.CIStr{O: "t1", L: "t1"}}, false}, Query: "create table db1.t1 (c1 int, c2 int)", - }, false) - job2 := NewJobWrapper(&model.Job{ + }, &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.CIStr{O: "t1", L: "t1"}}}, false) + job2 := NewJobWrapperWithArgs(&model.Job{ + Version: model.GetJobVerInUse(), SchemaID: 1, Type: model.ActionCreateTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{&model.TableInfo{Name: pmodel.CIStr{O: "t2", L: "t2"}}, &model.TableInfo{}}, Query: "create table db1.t2 (c1 int, c2 int);", - }, false) + }, &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.CIStr{O: "t2", L: "t2"}}, FKCheck: true}, false) job, err := mergeCreateTableJobsOfSameSchema([]*JobWrapper{job1, job2}) require.NoError(t, err) require.Equal(t, "create table db1.t1 (c1 int, c2 int); create table db1.t2 (c1 int, c2 int);", job.Query) @@ -101,11 +101,11 @@ func TestMergeCreateTableJobs(t *testing.T) { t.Run("non create table are not merged", func(t *testing.T) { jobWs := []*JobWrapper{ - {Job: &model.Job{SchemaName: "db", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{Name: pmodel.NewCIStr("t1")}, false}}}, + {Job: &model.Job{Version: model.GetJobVerInUse(), SchemaName: "db", Type: model.ActionCreateTable}, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.NewCIStr("t1")}}}, {Job: &model.Job{SchemaName: "db", Type: model.ActionAddColumn}}, - {Job: &model.Job{SchemaName: "db", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{Name: pmodel.NewCIStr("t2")}, false}}}, + {Job: &model.Job{Version: model.GetJobVerInUse(), SchemaName: "db", Type: model.ActionCreateTable}, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.NewCIStr("t2")}}}, } newWs, err := mergeCreateTableJobs(jobWs) require.NoError(t, err) @@ -122,17 +122,16 @@ func TestMergeCreateTableJobs(t *testing.T) { t.Run("jobs of pre allocated ids are not merged", func(t *testing.T) { jobWs := []*JobWrapper{ - {Job: &model.Job{SchemaName: "db", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{Name: pmodel.NewCIStr("t1")}, false}}, IDAllocated: true}, - {Job: &model.Job{SchemaName: "db", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{Name: pmodel.NewCIStr("t2")}, false}}}, + {Job: &model.Job{Version: model.GetJobVerInUse(), SchemaName: "db", Type: model.ActionCreateTable}, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.NewCIStr("t1")}}, IDAllocated: true}, + {Job: &model.Job{Version: model.GetJobVerInUse(), SchemaName: "db", Type: model.ActionCreateTable}, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.NewCIStr("t2")}}}, } newWs, err := mergeCreateTableJobs(jobWs) slices.SortFunc(newWs, func(a, b *JobWrapper) int { - if aName, bName := a.Args[0].(*model.TableInfo).Name.L, b.Args[0].(*model.TableInfo).Name.L; aName != bName { - return strings.Compare(aName, bName) - } - return 0 + argsA := a.JobArgs.(*model.CreateTableArgs) + argsB := b.JobArgs.(*model.CreateTableArgs) + return strings.Compare(argsA.TableInfo.Name.L, argsB.TableInfo.Name.L) }) require.NoError(t, err) require.EqualValues(t, jobWs, newWs) @@ -140,17 +139,16 @@ func TestMergeCreateTableJobs(t *testing.T) { t.Run("jobs of foreign keys are not merged", func(t *testing.T) { jobWs := []*JobWrapper{ - {Job: &model.Job{SchemaName: "db", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{ForeignKeys: []*model.FKInfo{{}}}, false}}}, - {Job: &model.Job{SchemaName: "db", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{Name: pmodel.NewCIStr("t2")}, false}}}, + {Job: &model.Job{Version: model.GetJobVerInUse(), SchemaName: "db", Type: model.ActionCreateTable}, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{ForeignKeys: []*model.FKInfo{{}}}}}, + {Job: &model.Job{Version: model.GetJobVerInUse(), SchemaName: "db", Type: model.ActionCreateTable}, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.NewCIStr("t2")}}}, } newWs, err := mergeCreateTableJobs(jobWs) slices.SortFunc(newWs, func(a, b *JobWrapper) int { - if aName, bName := a.Args[0].(*model.TableInfo).Name.L, b.Args[0].(*model.TableInfo).Name.L; aName != bName { - return strings.Compare(aName, bName) - } - return 0 + argsA := a.JobArgs.(*model.CreateTableArgs) + argsB := b.JobArgs.(*model.CreateTableArgs) + return strings.Compare(argsA.TableInfo.Name.L, argsB.TableInfo.Name.L) }) require.NoError(t, err) require.EqualValues(t, jobWs, newWs) @@ -158,17 +156,14 @@ func TestMergeCreateTableJobs(t *testing.T) { t.Run("jobs of different schema are not merged", func(t *testing.T) { jobWs := []*JobWrapper{ - {Job: &model.Job{SchemaName: "db1", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{Name: pmodel.NewCIStr("t1")}, false}}}, - {Job: &model.Job{SchemaName: "db2", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{Name: pmodel.NewCIStr("t2")}, false}}}, + {Job: &model.Job{Version: model.GetJobVerInUse(), SchemaName: "db1", Type: model.ActionCreateTable}, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.NewCIStr("t1")}}}, + {Job: &model.Job{Version: model.GetJobVerInUse(), SchemaName: "db2", Type: model.ActionCreateTable}, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.NewCIStr("t2")}}}, } newWs, err := mergeCreateTableJobs(jobWs) slices.SortFunc(newWs, func(a, b *JobWrapper) int { - if aName, bName := a.SchemaName, b.SchemaName; aName != bName { - return strings.Compare(aName, bName) - } - return 0 + return strings.Compare(a.SchemaName, b.SchemaName) }) require.NoError(t, err) require.EqualValues(t, jobWs, newWs) @@ -176,61 +171,52 @@ func TestMergeCreateTableJobs(t *testing.T) { t.Run("max batch size 8", func(t *testing.T) { jobWs := make([]*JobWrapper, 0, 100) + jobWs = append(jobWs, NewJobWrapper(&model.Job{SchemaName: "db0", Type: model.ActionAddColumn}, false)) + jobW := NewJobWrapperWithArgs(&model.Job{Version: model.GetJobVerInUse(), SchemaName: "db1", Type: model.ActionCreateTable}, + &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.NewCIStr("t1")}}, true) + jobWs = append(jobWs, jobW) + jobW = NewJobWrapperWithArgs(&model.Job{Version: model.GetJobVerInUse(), SchemaName: "db2", Type: model.ActionCreateTable}, + &model.CreateTableArgs{TableInfo: &model.TableInfo{ForeignKeys: []*model.FKInfo{{}}}}, false) + jobWs = append(jobWs, jobW) for db, cnt := range map[string]int{ - "db0": 9, - "db1": 7, - "db2": 22, + "db3": 9, + "db4": 7, + "db5": 22, } { for i := 0; i < cnt; i++ { tblName := fmt.Sprintf("t%d", i) - jobWs = append(jobWs, NewJobWrapper(&model.Job{SchemaName: db, Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{Name: pmodel.NewCIStr(tblName)}, false}}, false)) + jobW := NewJobWrapperWithArgs(&model.Job{Version: model.GetJobVerInUse(), SchemaName: db, Type: model.ActionCreateTable}, + &model.CreateTableArgs{TableInfo: &model.TableInfo{Name: pmodel.NewCIStr(tblName)}}, false) + jobWs = append(jobWs, jobW) } } - jobWs = append(jobWs, NewJobWrapper(&model.Job{SchemaName: "dbx", Type: model.ActionAddColumn}, false)) - jobWs = append(jobWs, NewJobWrapper(&model.Job{SchemaName: "dbxx", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{Name: pmodel.NewCIStr("t1")}, false}}, true)) - jobWs = append(jobWs, NewJobWrapper(&model.Job{SchemaName: "dbxxx", Type: model.ActionCreateTable, - Args: []any{&model.TableInfo{ForeignKeys: []*model.FKInfo{{}}}, false}}, false)) newWs, err := mergeCreateTableJobs(jobWs) slices.SortFunc(newWs, func(a, b *JobWrapper) int { - if a.Type != b.Type { - return int(b.Type - a.Type) - } - if aName, bName := a.SchemaName, b.SchemaName; aName != bName { - return strings.Compare(aName, bName) - } - aTableInfo, aOK := a.Args[0].(*model.TableInfo) - bTableInfo, bOK := b.Args[0].(*model.TableInfo) - if aOK && bOK && aTableInfo.Name.L != bTableInfo.Name.L { - return strings.Compare(aTableInfo.Name.L, bTableInfo.Name.L) - } - - return 0 + return strings.Compare(a.SchemaName, b.SchemaName) }) require.NoError(t, err) // 3 non-mergeable + 2 + 1 + 3 require.Len(t, newWs, 9) require.Equal(t, model.ActionAddColumn, newWs[0].Type) require.Equal(t, model.ActionCreateTable, newWs[1].Type) - require.Equal(t, "dbxx", newWs[1].SchemaName) + require.Equal(t, "db1", newWs[1].SchemaName) require.Equal(t, model.ActionCreateTable, newWs[2].Type) - require.Equal(t, "dbxxx", newWs[2].SchemaName) + require.Equal(t, "db2", newWs[2].SchemaName) schemaCnts := make(map[string][]int, 3) for i := 3; i < 9; i++ { require.Equal(t, model.ActionCreateTables, newWs[i].Type) - infos := newWs[i].Args[0].([]*model.TableInfo) - schemaCnts[newWs[i].SchemaName] = append(schemaCnts[newWs[i].SchemaName], len(infos)) - require.Equal(t, len(infos), len(newWs[i].ResultCh)) + args := newWs[i].JobArgs.(*model.BatchCreateTableArgs) + schemaCnts[newWs[i].SchemaName] = append(schemaCnts[newWs[i].SchemaName], len(args.Tables)) + require.Equal(t, len(args.Tables), len(newWs[i].ResultCh)) } for k := range schemaCnts { slices.Sort(schemaCnts[k]) } require.Equal(t, map[string][]int{ - "db0": {4, 5}, - "db1": {7}, - "db2": {7, 7, 8}, + "db3": {4, 5}, + "db4": {7}, + "db5": {7, 7, 8}, }, schemaCnts) }) } diff --git a/pkg/ddl/executor_test.go b/pkg/ddl/executor_test.go index 1193613e760fa..6899262783363 100644 --- a/pkg/ddl/executor_test.go +++ b/pkg/ddl/executor_test.go @@ -270,8 +270,7 @@ func TestHandleLockTable(t *testing.T) { Type: model.ActionTruncateTable, TableID: 1, } - job.FillArgs(&model.TruncateTableArgs{NewTableID: 2}) - jobW := ddl.NewJobWrapper(job, false) + jobW := ddl.NewJobWrapperWithArgs(job, &model.TruncateTableArgs{NewTableID: 2}, false) t.Run("target table not locked", func(t *testing.T) { se.ReleaseAllTableLocks() diff --git a/pkg/ddl/job_scheduler.go b/pkg/ddl/job_scheduler.go index f7affb329fe4f..f18a937fa29ab 100644 --- a/pkg/ddl/job_scheduler.go +++ b/pkg/ddl/job_scheduler.go @@ -675,6 +675,11 @@ func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWra var sql bytes.Buffer sql.WriteString(addDDLJobSQL) for i, jobW := range jobWs { + // TODO remove this check when all job type pass args in this way. + if jobW.JobArgs != nil { + jobW.FillArgs(jobW.JobArgs) + } + injectModifyJobArgFailPoint(jobWs) b, err := jobW.Encode(true) if err != nil { return err @@ -683,7 +688,7 @@ func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWra sql.WriteString(",") } fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", jobW.ID, jobW.MayNeedReorg(), - strconv.Quote(job2SchemaIDs(jobW.Job)), strconv.Quote(job2TableIDs(jobW.Job)), + strconv.Quote(job2SchemaIDs(jobW)), strconv.Quote(job2TableIDs(jobW)), util.WrapKey2String(b), jobW.Type, jobW.Started()) } se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) @@ -692,22 +697,22 @@ func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWra return errors.Trace(err) } -func job2SchemaIDs(job *model.Job) string { - return job2UniqueIDs(job, true) +func job2SchemaIDs(jobW *JobWrapper) string { + return job2UniqueIDs(jobW, true) } -func job2TableIDs(job *model.Job) string { - return job2UniqueIDs(job, false) +func job2TableIDs(jobW *JobWrapper) string { + return job2UniqueIDs(jobW, false) } -func job2UniqueIDs(job *model.Job, schema bool) string { - switch job.Type { +func job2UniqueIDs(jobW *JobWrapper, schema bool) string { + switch jobW.Type { case model.ActionExchangeTablePartition, model.ActionRenameTables, model.ActionRenameTable: var ids []int64 if schema { - ids = job.CtxVars[0].([]int64) + ids = jobW.CtxVars[0].([]int64) } else { - ids = job.CtxVars[1].([]int64) + ids = jobW.CtxVars[1].([]int64) } set := make(map[int64]struct{}, len(ids)) for _, id := range ids { @@ -722,15 +727,15 @@ func job2UniqueIDs(job *model.Job, schema bool) string { return strings.Join(s, ",") case model.ActionTruncateTable: if schema { - return strconv.FormatInt(job.SchemaID, 10) + return strconv.FormatInt(jobW.SchemaID, 10) } - newTableID := getTruncateTableNewTableID(job) - return strconv.FormatInt(job.TableID, 10) + "," + strconv.FormatInt(newTableID, 10) + newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID + return strconv.FormatInt(jobW.TableID, 10) + "," + strconv.FormatInt(newTableID, 10) } if schema { - return strconv.FormatInt(job.SchemaID, 10) + return strconv.FormatInt(jobW.SchemaID, 10) } - return strconv.FormatInt(job.TableID, 10) + return strconv.FormatInt(jobW.TableID, 10) } func updateDDLJob2Table(se *sess.Session, job *model.Job, updateRawArgs bool) error { diff --git a/pkg/ddl/job_submitter.go b/pkg/ddl/job_submitter.go index b824ef9059ee4..d8908d5a09235 100644 --- a/pkg/ddl/job_submitter.go +++ b/pkg/ddl/job_submitter.go @@ -153,8 +153,8 @@ func mergeCreateTableJobs(jobWs []*JobWrapper) ([]*JobWrapper, error) { continue } // ActionCreateTables doesn't support foreign key now. - tbInfo, ok := jobW.Args[0].(*model.TableInfo) - if !ok || len(tbInfo.ForeignKeys) > 0 { + args := jobW.JobArgs.(*model.CreateTableArgs) + if len(args.TableInfo.ForeignKeys) > 0 { resJobWs = append(resJobWs, jobW) continue } @@ -173,22 +173,13 @@ func mergeCreateTableJobs(jobWs []*JobWrapper) ([]*JobWrapper, error) { start := 0 for _, batchSize := range mathutil.Divide2Batches(total, batchCount) { batch := jobs[start : start+batchSize] - job, err := mergeCreateTableJobsOfSameSchema(batch) + newJobW, err := mergeCreateTableJobsOfSameSchema(batch) if err != nil { return nil, err } start += batchSize logutil.DDLLogger().Info("merge create table jobs", zap.String("schema", schema), zap.Int("total", total), zap.Int("batch_size", batchSize)) - - newJobW := &JobWrapper{ - Job: job, - ResultCh: make([]chan jobSubmitResult, 0, batchSize), - } - // merge the result channels. - for _, j := range batch { - newJobW.ResultCh = append(newJobW.ResultCh, j.ResultCh...) - } resJobWs = append(resJobWs, newJobW) } } @@ -217,16 +208,18 @@ func buildQueryStringFromJobs(jobs []*JobWrapper) string { } // mergeCreateTableJobsOfSameSchema combine CreateTableJobs to BatchCreateTableJob. -func mergeCreateTableJobsOfSameSchema(jobWs []*JobWrapper) (*model.Job, error) { +func mergeCreateTableJobsOfSameSchema(jobWs []*JobWrapper) (*JobWrapper, error) { if len(jobWs) == 0 { return nil, errors.Trace(fmt.Errorf("expect non-empty jobs")) } - var combinedJob *model.Job - - args := make([]*model.TableInfo, 0, len(jobWs)) - involvingSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(jobWs)) - var foreignKeyChecks bool + var ( + combinedJob *model.Job + args = &model.BatchCreateTableArgs{ + Tables: make([]*model.CreateTableArgs, 0, len(jobWs)), + } + involvingSchemaInfo = make([]model.InvolvingSchemaInfo, 0, len(jobWs)) + ) // if there is any duplicated table name duplication := make(map[string]struct{}) @@ -234,16 +227,11 @@ func mergeCreateTableJobsOfSameSchema(jobWs []*JobWrapper) (*model.Job, error) { if combinedJob == nil { combinedJob = job.Clone() combinedJob.Type = model.ActionCreateTables - combinedJob.Args = combinedJob.Args[:0] - foreignKeyChecks = job.Args[1].(bool) } - // append table job args - info, ok := job.Args[0].(*model.TableInfo) - if !ok { - return nil, errors.Trace(fmt.Errorf("expect model.TableInfo, but got %T", job.Args[0])) - } - args = append(args, info) + jobArgs := job.JobArgs.(*model.CreateTableArgs) + args.Tables = append(args.Tables, jobArgs) + info := jobArgs.TableInfo if _, ok := duplication[info.Name.L]; ok { // return err even if create table if not exists return nil, infoschema.ErrTableExists.FastGenByArgs("can not batch create tables with same name") @@ -258,12 +246,20 @@ func mergeCreateTableJobsOfSameSchema(jobWs []*JobWrapper) (*model.Job, error) { }) } - combinedJob.Args = append(combinedJob.Args, args) - combinedJob.Args = append(combinedJob.Args, foreignKeyChecks) combinedJob.InvolvingSchemaInfo = involvingSchemaInfo combinedJob.Query = buildQueryStringFromJobs(jobWs) - return combinedJob, nil + newJobW := &JobWrapper{ + Job: combinedJob, + JobArgs: args, + ResultCh: make([]chan jobSubmitResult, 0, len(jobWs)), + } + // merge the result channels. + for _, j := range jobWs { + newJobW.ResultCh = append(newJobW.ResultCh, j.ResultCh...) + } + + return newJobW, nil } // addBatchDDLJobs2Table gets global job IDs and puts the DDL jobs in the DDL job table. @@ -383,6 +379,10 @@ func (s *JobSubmitter) addBatchDDLJobs2Queue(jobWs []*JobWrapper) error { } for _, jobW := range jobWs { + // TODO remove this check when all job type pass args in this way. + if jobW.JobArgs != nil { + jobW.FillArgs(jobW.JobArgs) + } job := jobW.Job job.StartTS = txn.StartTS() setJobStateToQueueing(job) @@ -434,7 +434,6 @@ func (s *JobSubmitter) GenGIDAndInsertJobsWithRetry(ctx context.Context, ddlSe * } }) assignGIDsForJobs(jobWs, ids) - injectModifyJobArgFailPoint(jobWs) // job scheduler will start run them after txn commit, we want to make sure // the channel exists before the jobs are submitted. for i, jobW := range jobWs { @@ -494,12 +493,12 @@ func getRequiredGIDCount(jobWs []*JobWrapper) int { } switch jobW.Type { case model.ActionCreateView, model.ActionCreateSequence, model.ActionCreateTable: - info := jobW.Args[0].(*model.TableInfo) - count += idCountForTable(info) + args := jobW.JobArgs.(*model.CreateTableArgs) + count += idCountForTable(args.TableInfo) case model.ActionCreateTables: - infos := jobW.Args[0].([]*model.TableInfo) - for _, info := range infos { - count += idCountForTable(info) + args := jobW.JobArgs.(*model.BatchCreateTableArgs) + for _, tblArgs := range args.Tables { + count += idCountForTable(tblArgs.TableInfo) } case model.ActionCreateSchema, model.ActionCreateResourceGroup: count++ @@ -519,12 +518,7 @@ func getRequiredGIDCount(jobWs []*JobWrapper) int { pInfo := jobW.Args[1].(*model.PartitionInfo) count += len(pInfo.Definitions) case model.ActionTruncateTable: - if jobW.Version == model.JobVersion1 { - partCount := jobW.Args[3].(int) - count += 1 + partCount - } else { - count += 1 + len(jobW.Args[0].(*model.TruncateTableArgs).OldPartitionIDs) - } + count += 1 + len(jobW.JobArgs.(*model.TruncateTableArgs).OldPartitionIDs) } } return count @@ -537,16 +531,16 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) { for _, jobW := range jobWs { switch jobW.Type { case model.ActionCreateView, model.ActionCreateSequence, model.ActionCreateTable: - info := jobW.Args[0].(*model.TableInfo) + args := jobW.JobArgs.(*model.CreateTableArgs) if !jobW.IDAllocated { - alloc.assignIDsForTable(info) + alloc.assignIDsForTable(args.TableInfo) } - jobW.TableID = info.ID + jobW.TableID = args.TableInfo.ID case model.ActionCreateTables: if !jobW.IDAllocated { - infos := jobW.Args[0].([]*model.TableInfo) - for _, info := range infos { - alloc.assignIDsForTable(info) + args := jobW.JobArgs.(*model.BatchCreateTableArgs) + for _, tblArgs := range args.Tables { + alloc.assignIDsForTable(tblArgs.TableInfo) } } case model.ActionCreateSchema: @@ -599,23 +593,13 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) { pInfo.NewTableID = pInfo.Definitions[0].ID case model.ActionTruncateTable: if !jobW.IDAllocated { - if jobW.Version == model.JobVersion1 { - jobW.Args[0] = alloc.next() - partCount := jobW.Args[3].(int) - partIDs := make([]int64, partCount) - for i := range partIDs { - partIDs[i] = alloc.next() - } - jobW.Args[2] = partIDs - } else { - args := jobW.Args[0].(*model.TruncateTableArgs) - args.NewTableID = alloc.next() - partIDs := make([]int64, len(args.OldPartitionIDs)) - for i := range partIDs { - partIDs[i] = alloc.next() - } - args.NewPartitionIDs = partIDs + args := jobW.JobArgs.(*model.TruncateTableArgs) + args.NewTableID = alloc.next() + partIDs := make([]int64, len(args.OldPartitionIDs)) + for i := range partIDs { + partIDs[i] = alloc.next() } + args.NewPartitionIDs = partIDs } } jobW.ID = alloc.next() diff --git a/pkg/ddl/job_submitter_test.go b/pkg/ddl/job_submitter_test.go index cdcddba8c68a7..0bae829a9d0ca 100644 --- a/pkg/ddl/job_submitter_test.go +++ b/pkg/ddl/job_submitter_test.go @@ -65,11 +65,12 @@ func TestGenIDAndInsertJobsWithRetry(t *testing.T) { jobs := []*ddl.JobWrapper{{ Job: &model.Job{ + Version: model.GetJobVerInUse(), Type: model.ActionCreateTable, SchemaName: "test", TableName: "t1", - Args: []any{&model.TableInfo{}}, }, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{}}, }} initialGID := getGlobalID(ctx, t, store) threads, iterations := 10, 500 @@ -134,24 +135,32 @@ func TestCombinedIDAllocation(t *testing.T) { return info } - genCreateTblJob := func(tp model.ActionType, partitionCnt int) *model.Job { - return &model.Job{ - Version: model.JobVersion1, - Type: tp, - Args: []any{genTblInfo(partitionCnt)}, - } + genCreateTblJobW := func(tp model.ActionType, partitionCnt int, idAllocated bool) *ddl.JobWrapper { + return ddl.NewJobWrapperWithArgs( + &model.Job{ + Version: model.GetJobVerInUse(), + Type: tp, + }, + &model.CreateTableArgs{TableInfo: genTblInfo(partitionCnt)}, + idAllocated, + ) } - genCreateTblsJob := func(partitionCounts ...int) *model.Job { - infos := make([]*model.TableInfo, 0, len(partitionCounts)) - for _, c := range partitionCounts { - infos = append(infos, genTblInfo(c)) + genCreateTblsJobW := func(idAllocated bool, partitionCounts ...int) *ddl.JobWrapper { + args := &model.BatchCreateTableArgs{ + Tables: make([]*model.CreateTableArgs, 0, len(partitionCounts)), } - return &model.Job{ - Version: model.JobVersion1, - Type: model.ActionCreateTables, - Args: []any{infos}, + for _, c := range partitionCounts { + args.Tables = append(args.Tables, &model.CreateTableArgs{TableInfo: genTblInfo(c)}) } + return ddl.NewJobWrapperWithArgs( + &model.Job{ + Version: model.JobVersion1, + Type: model.ActionCreateTables, + }, + args, + idAllocated, + ) } genCreateDBJob := func() *model.Job { @@ -220,50 +229,50 @@ func TestCombinedIDAllocation(t *testing.T) { } } - genTruncTblJob := func(partCnt int) *model.Job { + genTruncTblJob := func(partCnt int, idAllocated bool) *ddl.JobWrapper { j := &model.Job{ Version: model.GetJobVerInUse(), Type: model.ActionTruncateTable, } - j.FillArgs(&model.TruncateTableArgs{OldPartitionIDs: make([]int64, partCnt)}) - return j + args := &model.TruncateTableArgs{OldPartitionIDs: make([]int64, partCnt)} + return ddl.NewJobWrapperWithArgs(j, args, idAllocated) } cases := []idAllocationCase{ { - jobW: ddl.NewJobWrapper(genCreateTblsJob(1, 2, 0), false), + jobW: genCreateTblsJobW(false, 1, 2, 0), requiredIDCount: 1 + 3 + 1 + 2, }, { - jobW: ddl.NewJobWrapper(genCreateTblsJob(3, 4), true), + jobW: genCreateTblsJobW(true, 3, 4), requiredIDCount: 1, }, { - jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateTable, 3), false), + jobW: genCreateTblJobW(model.ActionCreateTable, 3, false), requiredIDCount: 1 + 1 + 3, }, { - jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateTable, 0), false), + jobW: genCreateTblJobW(model.ActionCreateTable, 0, false), requiredIDCount: 1 + 1, }, { - jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateTable, 8), true), + jobW: genCreateTblJobW(model.ActionCreateTable, 8, true), requiredIDCount: 1, }, { - jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateSequence, 0), false), + jobW: genCreateTblJobW(model.ActionCreateSequence, 0, false), requiredIDCount: 2, }, { - jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateSequence, 0), true), + jobW: genCreateTblJobW(model.ActionCreateSequence, 0, true), requiredIDCount: 1, }, { - jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateView, 0), false), + jobW: genCreateTblJobW(model.ActionCreateView, 0, false), requiredIDCount: 2, }, { - jobW: ddl.NewJobWrapper(genCreateTblJob(model.ActionCreateView, 0), true), + jobW: genCreateTblJobW(model.ActionCreateView, 0, true), requiredIDCount: 1, }, { @@ -323,11 +332,11 @@ func TestCombinedIDAllocation(t *testing.T) { requiredIDCount: 1, }, { - jobW: ddl.NewJobWrapper(genTruncTblJob(17), false), + jobW: genTruncTblJob(17, false), requiredIDCount: 19, }, { - jobW: ddl.NewJobWrapper(genTruncTblJob(6), true), + jobW: genTruncTblJob(6, true), requiredIDCount: 1, }, } @@ -402,15 +411,15 @@ func TestCombinedIDAllocation(t *testing.T) { switch j.Type { case model.ActionCreateTable, model.ActionCreateView, model.ActionCreateSequence: require.Greater(t, j.TableID, initialGlobalID) - info := &model.TableInfo{} - require.NoError(t, j.DecodeArgs(info)) - require.Equal(t, j.TableID, info.ID) - checkTableInfo(info) + args, err := model.GetCreateTableArgs(j) + require.NoError(t, err) + require.Equal(t, j.TableID, args.TableInfo.ID) + checkTableInfo(args.TableInfo) case model.ActionCreateTables: - var infos []*model.TableInfo - require.NoError(t, j.DecodeArgs(&infos)) - for _, info := range infos { - checkTableInfo(info) + args, err := model.GetBatchCreateTableArgs(j) + require.NoError(t, err) + for _, tblArgs := range args.Tables { + checkTableInfo(tblArgs.TableInfo) } case model.ActionCreateSchema: require.Greater(t, j.SchemaID, initialGlobalID) @@ -481,11 +490,12 @@ func TestGenIDAndInsertJobsWithRetryQPS(t *testing.T) { payload := strings.Repeat("a", payloadSize) jobs := []*ddl.JobWrapper{{ Job: &model.Job{ + Version: model.GetJobVerInUse(), Type: model.ActionCreateTable, SchemaName: "test", TableName: "t1", - Args: []any{&model.TableInfo{Comment: payload}}, }, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{Comment: payload}}, }} counters := make([]atomic.Int64, thread+1) var wg util.WaitGroupWrapper @@ -541,11 +551,12 @@ func TestGenGIDAndInsertJobsWithRetryOnErr(t *testing.T) { ddlSe := sess.NewSession(tk.Session()) jobs := []*ddl.JobWrapper{{ Job: &model.Job{ + Version: model.GetJobVerInUse(), Type: model.ActionCreateTable, SchemaName: "test", TableName: "t1", - Args: []any{&model.TableInfo{}}, }, + JobArgs: &model.CreateTableArgs{TableInfo: &model.TableInfo{}}, }} submitter := ddl.NewJobSubmitterForTest() // retry for 3 times diff --git a/pkg/ddl/restart_test.go b/pkg/ddl/restart_test.go index 47443ce57e35a..028cdc8e116bd 100644 --- a/pkg/ddl/restart_test.go +++ b/pkg/ddl/restart_test.go @@ -59,7 +59,7 @@ func restartWorkers(t *testing.T, store kv.Storage, d *domain.Domain) { } // runInterruptedJob should be called concurrently with restartWorkers -func runInterruptedJob(t *testing.T, store kv.Storage, d ddl.Executor, job *model.Job, doneCh chan error) { +func runInterruptedJob(t *testing.T, store kv.Storage, d ddl.Executor, job *model.Job, args model.JobArgs, doneCh chan error) { var ( history *model.Job err error @@ -68,7 +68,7 @@ func runInterruptedJob(t *testing.T, store kv.Storage, d ddl.Executor, job *mode de := d.(ddl.ExecutorForTest) ctx := testkit.NewTestKit(t, store).Session() ctx.SetValue(sessionctx.QueryString, "skip") - err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) if errors.Is(err, context.Canceled) { endlessLoopTime := time.Now().Add(time.Minute) for history == nil { @@ -88,9 +88,9 @@ func runInterruptedJob(t *testing.T, store kv.Storage, d ddl.Executor, job *mode doneCh <- err } -func testRunInterruptedJob(t *testing.T, store kv.Storage, d *domain.Domain, job *model.Job) { +func testRunInterruptedJob(t *testing.T, store kv.Storage, d *domain.Domain, job *model.Job, args model.JobArgs) { done := make(chan error, 1) - go runInterruptedJob(t, store, d.DDLExecutor(), job, done) + go runInterruptedJob(t, store, d.DDLExecutor(), job, args, done) ticker := time.NewTicker(d.GetSchemaLease()) defer ticker.Stop() @@ -121,11 +121,11 @@ func TestSchemaResume(t *testing.T) { BinlogInfo: &model.HistoryInfo{}, } job.FillArgs(&model.CreateSchemaArgs{DBInfo: dbInfo}) - testRunInterruptedJob(t, store, dom, job) + testRunInterruptedJob(t, store, dom, job, nil) testCheckSchemaState(t, store, dbInfo, model.StatePublic) job = buildDropSchemaJob(dbInfo) - testRunInterruptedJob(t, store, dom, job) + testRunInterruptedJob(t, store, dom, job, nil) testCheckSchemaState(t, store, dbInfo, model.StateNone) } @@ -139,7 +139,7 @@ func TestStat(t *testing.T) { job := buildDropSchemaJob(dbInfo) done := make(chan error, 1) - go runInterruptedJob(t, store, dom.DDLExecutor(), job, done) + go runInterruptedJob(t, store, dom.DDLExecutor(), job, nil, done) ticker := time.NewTicker(dom.GetSchemaLease() * 1) defer ticker.Stop() @@ -174,15 +174,15 @@ func TestTableResume(t *testing.T) { tblInfo, err := testTableInfo(store, "t1", 3) require.NoError(t, err) job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, TableID: tblInfo.ID, TableName: tblInfo.Name.L, Type: model.ActionCreateTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{tblInfo}, } - testRunInterruptedJob(t, store, dom, job) + testRunInterruptedJob(t, store, dom, job, &model.CreateTableArgs{TableInfo: tblInfo}) testCheckTableState(t, store, dbInfo, tblInfo, model.StatePublic) job = &model.Job{ @@ -193,6 +193,6 @@ func TestTableResume(t *testing.T) { Type: model.ActionDropTable, BinlogInfo: &model.HistoryInfo{}, } - testRunInterruptedJob(t, store, dom, job) + testRunInterruptedJob(t, store, dom, job, nil) testCheckTableState(t, store, dbInfo, tblInfo, model.StateNone) } diff --git a/pkg/ddl/schema_test.go b/pkg/ddl/schema_test.go index 46705075b4842..53246265db0fa 100644 --- a/pkg/ddl/schema_test.go +++ b/pkg/ddl/schema_test.go @@ -40,16 +40,17 @@ import ( func testCreateTable(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTest, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, TableID: tblInfo.ID, TableName: tblInfo.Name.L, Type: model.ActionCreateTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{tblInfo}, } + args := &model.CreateTableArgs{TableInfo: tblInfo} ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) @@ -337,8 +338,9 @@ func TestSchemaWaitJob(t *testing.T) { require.NoError(t, err) schemaID := genIDs[0] doDDLJobErr(t, schemaID, 0, "test_schema", "", model.ActionCreateSchema, - testkit.NewTestKit(t, store).Session(), det2, store, func(job *model.Job) { + testkit.NewTestKit(t, store).Session(), det2, store, func(job *model.Job) model.JobArgs { job.FillArgs(&model.CreateSchemaArgs{DBInfo: dbInfo}) + return nil }) } @@ -350,7 +352,7 @@ func doDDLJobErr( ctx sessionctx.Context, d ddl.ExecutorForTest, store kv.Storage, - handler func(job *model.Job), + handler func(job *model.Job) model.JobArgs, ) *model.Job { job := &model.Job{ Version: model.GetJobVerInUse(), @@ -361,10 +363,10 @@ func doDDLJobErr( Type: tp, BinlogInfo: &model.HistoryInfo{}, } - handler(job) + args := handler(job) // TODO: check error detail ctx.SetValue(sessionctx.QueryString, "skip") - require.Error(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) + require.Error(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true))) testCheckJobCancelled(t, store, job, nil) return job diff --git a/pkg/ddl/schema_version.go b/pkg/ddl/schema_version.go index 3c3cebb2783f8..7e5545bab9b0c 100644 --- a/pkg/ddl/schema_version.go +++ b/pkg/ddl/schema_version.go @@ -31,18 +31,18 @@ import ( // SetSchemaDiffForCreateTables set SchemaDiff for ActionCreateTables. func SetSchemaDiffForCreateTables(diff *model.SchemaDiff, job *model.Job) error { - var tableInfos []*model.TableInfo - err := job.DecodeArgs(&tableInfos) + args, err := model.GetBatchCreateTableArgs(job) if err != nil { return errors.Trace(err) } - diff.AffectedOpts = make([]*model.AffectedOption, len(tableInfos)) - for i := range tableInfos { + diff.AffectedOpts = make([]*model.AffectedOption, len(args.Tables)) + for i := range args.Tables { + tblInfo := args.Tables[i].TableInfo diff.AffectedOpts[i] = &model.AffectedOption{ SchemaID: job.SchemaID, OldSchemaID: job.SchemaID, - TableID: tableInfos[i].ID, - OldTableID: tableInfos[i].ID, + TableID: tblInfo.ID, + OldTableID: tblInfo.ID, } } return nil @@ -75,12 +75,11 @@ func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job) error // SetSchemaDiffForCreateView set SchemaDiff for ActionCreateView. func SetSchemaDiffForCreateView(diff *model.SchemaDiff, job *model.Job) error { - tbInfo := &model.TableInfo{} - var orReplace bool - var oldTbInfoID int64 - if err := job.DecodeArgs(tbInfo, &orReplace, &oldTbInfoID); err != nil { + args, err := model.GetCreateTableArgs(job) + if err != nil { return errors.Trace(err) } + tbInfo, orReplace, oldTbInfoID := args.TableInfo, args.OnExistReplace, args.OldViewTblID // When the statement is "create or replace view " and we need to drop the old view, // it has two table IDs and should be handled differently. if oldTbInfoID > 0 && orReplace { @@ -247,19 +246,26 @@ func SetSchemaDiffForPartitionModify(diff *model.SchemaDiff, job *model.Job) err } // SetSchemaDiffForCreateTable set SchemaDiff for ActionCreateTable. -func SetSchemaDiffForCreateTable(diff *model.SchemaDiff, job *model.Job) { +func SetSchemaDiffForCreateTable(diff *model.SchemaDiff, job *model.Job) error { diff.TableID = job.TableID - if len(job.Args) > 0 { - tbInfo, _ := job.Args[0].(*model.TableInfo) - // When create table with foreign key, there are two schema status change: - // 1. none -> write-only - // 2. write-only -> public - // In the second status change write-only -> public, infoschema loader should apply drop old table first, then - // apply create new table. So need to set diff.OldTableID here to make sure it. - if tbInfo != nil && tbInfo.State == model.StatePublic && len(tbInfo.ForeignKeys) > 0 { - diff.OldTableID = job.TableID - } + var tbInfo *model.TableInfo + // create table with foreign key will update tableInfo in the job args, so we + // must reuse already decoded ones. + // TODO make DecodeArgs can reuse already decoded args, so we can use GetCreateTableArgs. + if job.Version == model.JobVersion1 { + tbInfo, _ = job.Args[0].(*model.TableInfo) + } else { + tbInfo = job.Args[0].(*model.CreateTableArgs).TableInfo + } + // When create table with foreign key, there are two schema status change: + // 1. none -> write-only + // 2. write-only -> public + // In the second status change write-only -> public, infoschema loader should apply drop old table first, then + // apply create new table. So need to set diff.OldTableID here to make sure it. + if tbInfo.State == model.StatePublic && len(tbInfo.ForeignKeys) > 0 { + diff.OldTableID = job.TableID } + return nil } // SetSchemaDiffForRecoverSchema set SchemaDiff for ActionRecoverSchema. @@ -353,7 +359,7 @@ func updateSchemaVersion(jobCtx *jobContext, t *meta.Meta, job *model.Job, multi case model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: err = SetSchemaDiffForPartitionModify(diff, job) case model.ActionCreateTable: - SetSchemaDiffForCreateTable(diff, job) + err = SetSchemaDiffForCreateTable(diff, job) case model.ActionRecoverSchema: err = SetSchemaDiffForRecoverSchema(diff, job) case model.ActionFlashbackCluster: diff --git a/pkg/ddl/sequence.go b/pkg/ddl/sequence.go index 22c5092110b66..76fe7c60320e5 100644 --- a/pkg/ddl/sequence.go +++ b/pkg/ddl/sequence.go @@ -29,15 +29,16 @@ import ( func onCreateSequence(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID - tbInfo := &model.TableInfo{} - if err := job.DecodeArgs(tbInfo); err != nil { + args, err := model.GetCreateTableArgs(job) + if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } + tbInfo := args.TableInfo tbInfo.State = model.StateNone - err := checkTableNotExists(jobCtx.infoCache, schemaID, tbInfo.Name.L) + err = checkTableNotExists(jobCtx.infoCache, schemaID, tbInfo.Name.L) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) { job.State = model.JobStateCancelled diff --git a/pkg/ddl/table_test.go b/pkg/ddl/table_test.go index cd60b7636c2da..293ab1d22050d 100644 --- a/pkg/ddl/table_test.go +++ b/pkg/ddl/table_test.go @@ -161,9 +161,9 @@ func testTruncateTable(t *testing.T, ctx sessionctx.Context, store kv.Storage, d Type: model.ActionTruncateTable, BinlogInfo: &model.HistoryInfo{}, } - job.FillArgs(&model.TruncateTableArgs{NewTableID: newTableID}) + args := &model.TruncateTableArgs{NewTableID: newTableID} ctx.SetValue(sessionctx.QueryString, "skip") - err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) @@ -219,8 +219,8 @@ func TestTable(t *testing.T) { newTblInfo, err := testTableInfo(store, "t", 3) require.NoError(t, err) doDDLJobErr(t, dbInfo.ID, newTblInfo.ID, dbInfo.Name.L, newTblInfo.Name.L, model.ActionCreateTable, - ctx, de, store, func(job *model.Job) { - job.Args = []any{newTblInfo} + ctx, de, store, func(job *model.Job) model.JobArgs { + return &model.CreateTableArgs{TableInfo: newTblInfo} }) ctx = testkit.NewTestKit(t, store).Session() @@ -293,16 +293,17 @@ func TestCreateView(t *testing.T) { newTblInfo0, err := testTableInfo(store, "v", 3) require.NoError(t, err) job = &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, TableID: tblInfo.ID, TableName: tblInfo.Name.L, Type: model.ActionCreateView, BinlogInfo: &model.HistoryInfo{}, - Args: []any{newTblInfo0}, } + args := &model.CreateTableArgs{TableInfo: newTblInfo0} ctx.SetValue(sessionctx.QueryString, "skip") - err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) @@ -316,16 +317,17 @@ func TestCreateView(t *testing.T) { newTblInfo1, err := testTableInfo(store, "v", 3) require.NoError(t, err) job = &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, TableID: tblInfo.ID, TableName: tblInfo.Name.L, Type: model.ActionCreateView, BinlogInfo: &model.HistoryInfo{}, - Args: []any{newTblInfo1, true, newTblInfo0.ID}, } + args = &model.CreateTableArgs{TableInfo: newTblInfo1, OnExistReplace: true, OldViewTblID: newTblInfo0.ID} ctx.SetValue(sessionctx.QueryString, "skip") - err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) v = getSchemaVer(t, ctx) @@ -339,16 +341,17 @@ func TestCreateView(t *testing.T) { newTblInfo2, err := testTableInfo(store, "v", 3) require.NoError(t, err) job = &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, TableID: tblInfo.ID, TableName: tblInfo.Name.L, Type: model.ActionCreateView, BinlogInfo: &model.HistoryInfo{}, - Args: []any{newTblInfo2, true, newTblInfo0.ID}, } + args = &model.CreateTableArgs{TableInfo: newTblInfo2, OnExistReplace: true, OldViewTblID: newTblInfo0.ID} ctx.SetValue(sessionctx.QueryString, "skip") - err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) // The non-existing table id in job args will not be considered anymore. require.NoError(t, err) } @@ -475,28 +478,26 @@ func TestCreateTables(t *testing.T) { ctx := testkit.NewTestKit(t, store).Session() - var infos []*model.TableInfo genIDs, err := genGlobalIDs(store, 3) require.NoError(t, err) - infos = append(infos, &model.TableInfo{ - ID: genIDs[0], - Name: pmodel.NewCIStr("s1"), - }) - infos = append(infos, &model.TableInfo{ - ID: genIDs[1], - Name: pmodel.NewCIStr("s2"), - }) - infos = append(infos, &model.TableInfo{ - ID: genIDs[2], - Name: pmodel.NewCIStr("s3"), - }) + args := &model.BatchCreateTableArgs{ + Tables: make([]*model.CreateTableArgs, 0, 3), + } + for i := 0; i < 3; i++ { + args.Tables = append(args.Tables, &model.CreateTableArgs{ + TableInfo: &model.TableInfo{ + ID: genIDs[i], + Name: pmodel.NewCIStr(fmt.Sprintf("s%d", i+1)), + }, + }) + } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, Type: model.ActionCreateTables, BinlogInfo: &model.HistoryInfo{}, - Args: []any{infos}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{ {Database: "test_table", Table: "s1"}, {Database: "test_table", Table: "s2"}, @@ -511,7 +512,7 @@ func TestCreateTables(t *testing.T) { *errP = errors.New("mock get job by ID failed") }) }) - err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + err = de.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) testGetTable(t, domain, genIDs[0]) diff --git a/pkg/meta/model/BUILD.bazel b/pkg/meta/model/BUILD.bazel index 3a6b991dd107e..7c176c34801fe 100644 --- a/pkg/meta/model/BUILD.bazel +++ b/pkg/meta/model/BUILD.bazel @@ -44,7 +44,7 @@ go_test( ], embed = [":model"], flaky = True, - shard_count = 28, + shard_count = 30, deps = [ "//pkg/parser/charset", "//pkg/parser/model", diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 04cb601fb9a8c..4b950f15e356c 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -515,6 +515,7 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { } else { var arg any if len(job.Args) > 0 { + intest.Assert(len(job.Args) == 1, "Job.Args should have only one element") arg = job.Args[0] } job.RawArgs, err = json.Marshal(arg) @@ -542,6 +543,7 @@ func (job *Job) Decode(b []byte) error { // DecodeArgs decodes serialized job arguments from job.RawArgs into the given // variables, and also save the result in job.Args. It's for JobVersion1. +// TODO make it un-exported after we finish the migration to JobVersion2. func (job *Job) DecodeArgs(args ...any) error { intest.Assert(job.Version == JobVersion1, "Job.DecodeArgs is only used for JobVersion1") var rawArgs []json.RawMessage diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 9c769bdd0730b..bd71cbfdd3499 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -26,6 +26,7 @@ import ( func getOrDecodeArgsV2[T JobArgs](job *Job) (T, error) { intest.Assert(job.Version == JobVersion2, "job version is not v2") if len(job.Args) > 0 { + intest.Assert(len(job.Args) == 1, "job args length is not 1") return job.Args[0].(T), nil } var v T @@ -183,6 +184,100 @@ func GetModifySchemaArgs(job *Job) (*ModifySchemaArgs, error) { return getOrDecodeArgsV2[*ModifySchemaArgs](job) } +// CreateTableArgs is the arguments for create table/view/sequence job. +type CreateTableArgs struct { + TableInfo *TableInfo `json:"table_info,omitempty"` + // below 2 are used for create view. + OnExistReplace bool `json:"on_exist_replace,omitempty"` + OldViewTblID int64 `json:"old_view_tbl_id,omitempty"` + // used for create table. + FKCheck bool `json:"fk_check,omitempty"` +} + +func (a *CreateTableArgs) fillJob(job *Job) { + if job.Version == JobVersion1 { + switch job.Type { + case ActionCreateTable: + job.Args = []any{a.TableInfo, a.FKCheck} + case ActionCreateView: + job.Args = []any{a.TableInfo, a.OnExistReplace, a.OldViewTblID} + case ActionCreateSequence: + job.Args = []any{a.TableInfo} + } + return + } + job.Args = []any{a} +} + +// GetCreateTableArgs gets the create-table args. +func GetCreateTableArgs(job *Job) (*CreateTableArgs, error) { + if job.Version == JobVersion1 { + var ( + tableInfo = &TableInfo{} + onExistReplace bool + oldViewTblID int64 + fkCheck bool + ) + switch job.Type { + case ActionCreateTable: + if err := job.DecodeArgs(tableInfo, &fkCheck); err != nil { + return nil, errors.Trace(err) + } + case ActionCreateView: + if err := job.DecodeArgs(tableInfo, &onExistReplace, &oldViewTblID); err != nil { + return nil, errors.Trace(err) + } + case ActionCreateSequence: + if err := job.DecodeArgs(tableInfo); err != nil { + return nil, errors.Trace(err) + } + } + return &CreateTableArgs{ + TableInfo: tableInfo, + OnExistReplace: onExistReplace, + OldViewTblID: oldViewTblID, + FKCheck: fkCheck, + }, nil + } + return getOrDecodeArgsV2[*CreateTableArgs](job) +} + +// BatchCreateTableArgs is the arguments for batch create table job. +type BatchCreateTableArgs struct { + Tables []*CreateTableArgs `json:"tables,omitempty"` +} + +func (a *BatchCreateTableArgs) fillJob(job *Job) { + if job.Version == JobVersion1 { + infos := make([]*TableInfo, 0, len(a.Tables)) + for _, info := range a.Tables { + infos = append(infos, info.TableInfo) + } + job.Args = []any{infos, a.Tables[0].FKCheck} + return + } + job.Args = []any{a} +} + +// GetBatchCreateTableArgs gets the batch create-table args. +func GetBatchCreateTableArgs(job *Job) (*BatchCreateTableArgs, error) { + if job.Version == JobVersion1 { + var ( + tableInfos []*TableInfo + fkCheck bool + ) + if err := job.DecodeArgs(&tableInfos, &fkCheck); err != nil { + return nil, errors.Trace(err) + } + args := &BatchCreateTableArgs{Tables: make([]*CreateTableArgs, 0, len(tableInfos))} + for _, info := range tableInfos { + args.Tables = append(args.Tables, &CreateTableArgs{TableInfo: info, FKCheck: fkCheck}) + } + return args, nil + } + return getOrDecodeArgsV2[*BatchCreateTableArgs](job) +} + // TruncateTableArgs is the arguments for truncate table job. type TruncateTableArgs struct { FKCheck bool `json:"fk_check,omitempty"` diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index 4156a6c63ffe0..7d689a68712a8 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -130,6 +130,75 @@ func TestModifySchemaArgs(t *testing.T) { } } +func TestCreateTableArgs(t *testing.T) { + t.Run("create table", func(t *testing.T) { + inArgs := &CreateTableArgs{ + TableInfo: &TableInfo{ID: 100}, + FKCheck: true, + } + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionCreateTable))) + args, err := GetCreateTableArgs(j2) + require.NoError(t, err) + require.EqualValues(t, inArgs.TableInfo, args.TableInfo) + require.EqualValues(t, inArgs.FKCheck, args.FKCheck) + } + }) + t.Run("create view", func(t *testing.T) { + inArgs := &CreateTableArgs{ + TableInfo: &TableInfo{ID: 122}, + OnExistReplace: true, + OldViewTblID: 123, + } + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionCreateView))) + args, err := GetCreateTableArgs(j2) + require.NoError(t, err) + require.EqualValues(t, inArgs.TableInfo, args.TableInfo) + require.EqualValues(t, inArgs.OnExistReplace, args.OnExistReplace) + require.EqualValues(t, inArgs.OldViewTblID, args.OldViewTblID) + } + }) + t.Run("create sequence", func(t *testing.T) { + inArgs := &CreateTableArgs{ + TableInfo: &TableInfo{ID: 22}, + } + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionCreateSequence))) + args, err := GetCreateTableArgs(j2) + require.NoError(t, err) + require.EqualValues(t, inArgs.TableInfo, args.TableInfo) + } + }) +} + +func TestBatchCreateTableArgs(t *testing.T) { + inArgs := &BatchCreateTableArgs{ + Tables: []*CreateTableArgs{ + {TableInfo: &TableInfo{ID: 100}, FKCheck: true}, + {TableInfo: &TableInfo{ID: 101}, FKCheck: false}, + }, + } + // in job version 1, we only save one FKCheck value for all tables. + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, JobVersion1, ActionCreateTables))) + args, err := GetBatchCreateTableArgs(j2) + require.NoError(t, err) + for i := 0; i < len(inArgs.Tables); i++ { + require.EqualValues(t, inArgs.Tables[i].TableInfo, args.Tables[i].TableInfo) + require.EqualValues(t, true, args.Tables[i].FKCheck) + } + + j2 = &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, JobVersion2, ActionCreateTables))) + args, err = GetBatchCreateTableArgs(j2) + require.NoError(t, err) + require.EqualValues(t, inArgs.Tables, args.Tables) +} + func TestTruncateTableArgs(t *testing.T) { inArgs := &TruncateTableArgs{ NewTableID: 1,