Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: args v2 for create table/view/sequence, batch create table #55964

Merged
merged 2 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ import (
// DANGER: it is an internal function used by onCreateTable and onCreateTables, for reusing code. Be careful.
// 1. it expects the argument of job has been deserialized.
// 2. it won't call updateSchemaVersion, FinishTableJob and asyncNotifyEvent.
func createTable(jobCtx *jobContext, t *meta.Meta, job *model.Job, fkCheck bool) (*model.TableInfo, error) {
func createTable(jobCtx *jobContext, t *meta.Meta, job *model.Job, args *model.CreateTableArgs) (*model.TableInfo, error) {
schemaID := job.SchemaID
tbInfo := job.Args[0].(*model.TableInfo)
tbInfo, fkCheck := args.TableInfo, args.FKCheck

tbInfo.State = model.StateNone
err := checkTableNotExists(jobCtx.infoCache, schemaID, tbInfo.Name.L)
Expand Down Expand Up @@ -156,20 +156,19 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
}
})

// just decode, createTable will use it as Args[0]
tbInfo := &model.TableInfo{}
fkCheck := false
if err := job.DecodeArgs(tbInfo, &fkCheck); err != nil {
args, err := model.GetCreateTableArgs(job)
if err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tbInfo := args.TableInfo

if len(tbInfo.ForeignKeys) > 0 {
return createTableWithForeignKeys(jobCtx, t, job, tbInfo, fkCheck)
return createTableWithForeignKeys(jobCtx, t, job, args)
}

tbInfo, err := createTable(jobCtx, t, job, fkCheck)
tbInfo, err = createTable(jobCtx, t, job, args)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -186,14 +185,15 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
return ver, errors.Trace(err)
}

func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job, tbInfo *model.TableInfo, fkCheck bool) (ver int64, err error) {
func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job, args *model.CreateTableArgs) (ver int64, err error) {
tbInfo := args.TableInfo
switch tbInfo.State {
case model.StateNone, model.StatePublic:
// create table in non-public or public state. The function `createTable` will always reset
// the `tbInfo.State` with `model.StateNone`, so it's fine to just call the `createTable` with
// public state.
// when `br` restores table, the state of `tbInfo` will be public.
tbInfo, err = createTable(jobCtx, t, job, fkCheck)
tbInfo, err = createTable(jobCtx, t, job, args)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -222,37 +222,38 @@ func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job
func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, error) {
var ver int64

var args []*model.TableInfo
fkCheck := false
err := job.DecodeArgs(&args, &fkCheck)
args, err := model.GetBatchCreateTableArgs(job)
if err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

tableInfos := make([]*model.TableInfo, 0, len(args.Tables))
// We don't construct jobs for every table, but only tableInfo
// The following loop creates a stub job for every table
//
// it clones a stub job from the ActionCreateTables job
stubJob := job.Clone()
stubJob.Args = make([]any, 1)
for i := range args {
stubJob.TableID = args[i].ID
stubJob.Args[0] = args[i]
if args[i].Sequence != nil {
err := createSequenceWithCheck(t, stubJob, args[i])
for i := range args.Tables {
tblArgs := args.Tables[i]
tableInfo := tblArgs.TableInfo
stubJob.TableID = tableInfo.ID
if tableInfo.Sequence != nil {
err := createSequenceWithCheck(t, stubJob, tableInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tableInfos = append(tableInfos, tableInfo)
} else {
tbInfo, err := createTable(jobCtx, t, stubJob, fkCheck)
tbInfo, err := createTable(jobCtx, t, stubJob, tblArgs)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
args[i] = tbInfo
tableInfos = append(tableInfos, tbInfo)
}
}

Expand All @@ -263,9 +264,9 @@ func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, er

job.State = model.JobStateDone
job.SchemaState = model.StatePublic
job.BinlogInfo.SetTableInfos(ver, args)
for i := range args {
createTableEvent := notifier.NewCreateTableEvent(args[i])
job.BinlogInfo.SetTableInfos(ver, tableInfos)
for i := range tableInfos {
createTableEvent := notifier.NewCreateTableEvent(tableInfos[i])
asyncNotifyEvent(jobCtx, createTableEvent, job)
}

Expand All @@ -283,14 +284,13 @@ func createTableOrViewWithCheck(t *meta.Meta, job *model.Job, schemaID int64, tb

func onCreateView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
var orReplace bool
var _placeholder int64 // oldTblInfoID
if err := job.DecodeArgs(tbInfo, &orReplace, &_placeholder); err != nil {
args, err := model.GetCreateTableArgs(job)
if err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tbInfo, orReplace := args.TableInfo, args.OnExistReplace
tbInfo.State = model.StateNone

oldTableID, err := findTableIDByName(jobCtx.infoCache, t, schemaID, tbInfo.Name.L)
Expand Down
12 changes: 12 additions & 0 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ type JobWrapper struct {
// IDAllocated see config of same name in CreateTableConfig.
// exported for test.
IDAllocated bool
JobArgs model.JobArgs
// job submission is run in async, we use this channel to notify the caller.
// when fast create table enabled, we might combine multiple jobs into one, and
// append the channel to this slice.
Expand All @@ -218,6 +219,17 @@ func NewJobWrapper(job *model.Job, idAllocated bool) *JobWrapper {
}
}

// NewJobWrapperWithArgs creates a new JobWrapper with job args.
// TODO: merge with NewJobWrapper later.
func NewJobWrapperWithArgs(job *model.Job, args model.JobArgs, idAllocated bool) *JobWrapper {
return &JobWrapper{
Job: job,
IDAllocated: idAllocated,
JobArgs: args,
ResultCh: []chan jobSubmitResult{make(chan jobSubmitResult)},
}
}

// NotifyResult notifies the job submit result.
func (t *JobWrapper) NotifyResult(err error) {
merged := len(t.ResultCh) > 1
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestBuildJobDependence(t *testing.T) {
job6 := &model.Job{ID: 6, TableID: 1, Type: model.ActionDropTable}
job7 := &model.Job{ID: 7, TableID: 2, Type: model.ActionModifyColumn}
job9 := &model.Job{ID: 9, SchemaID: 111, Type: model.ActionDropSchema}
job11 := &model.Job{ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}}
job11 := &model.Job{Version: model.JobVersion1, ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}}
err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
require.NoError(t, m.EnQueueDDLJob(job1))
Expand Down
Loading