Skip to content

Commit

Permalink
create view
Browse files Browse the repository at this point in the history
create tbl/seq

change

todo

change

ut

test

change

fix test

test
  • Loading branch information
D3Hunter committed Sep 10, 2024
1 parent 17c7b90 commit 29b6889
Show file tree
Hide file tree
Showing 18 changed files with 497 additions and 333 deletions.
54 changes: 27 additions & 27 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ import (
// DANGER: it is an internal function used by onCreateTable and onCreateTables, for reusing code. Be careful.
// 1. it expects the argument of job has been deserialized.
// 2. it won't call updateSchemaVersion, FinishTableJob and asyncNotifyEvent.
func createTable(jobCtx *jobContext, t *meta.Meta, job *model.Job, fkCheck bool) (*model.TableInfo, error) {
func createTable(jobCtx *jobContext, t *meta.Meta, job *model.Job, args *model.CreateTableArgs) (*model.TableInfo, error) {
schemaID := job.SchemaID
tbInfo := job.Args[0].(*model.TableInfo)
tbInfo, fkCheck := args.TableInfo, args.FKCheck

tbInfo.State = model.StateNone
err := checkTableNotExists(jobCtx.infoCache, schemaID, tbInfo.Name.L)
Expand Down Expand Up @@ -157,20 +157,19 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
}
})

// just decode, createTable will use it as Args[0]
tbInfo := &model.TableInfo{}
fkCheck := false
if err := job.DecodeArgs(tbInfo, &fkCheck); err != nil {
args, err := model.GetCreateTableArgs(job)
if err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tbInfo := args.TableInfo

if len(tbInfo.ForeignKeys) > 0 {
return createTableWithForeignKeys(jobCtx, t, job, tbInfo, fkCheck)
return createTableWithForeignKeys(jobCtx, t, job, args)
}

tbInfo, err := createTable(jobCtx, t, job, fkCheck)
tbInfo, err = createTable(jobCtx, t, job, args)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -189,14 +188,15 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
return ver, errors.Trace(err)
}

func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job, tbInfo *model.TableInfo, fkCheck bool) (ver int64, err error) {
func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job, args *model.CreateTableArgs) (ver int64, err error) {
tbInfo := args.TableInfo
switch tbInfo.State {
case model.StateNone, model.StatePublic:
// create table in non-public or public state. The function `createTable` will always reset
// the `tbInfo.State` with `model.StateNone`, so it's fine to just call the `createTable` with
// public state.
// when `br` restores table, the state of `tbInfo` will be public.
tbInfo, err = createTable(jobCtx, t, job, fkCheck)
tbInfo, err = createTable(jobCtx, t, job, args)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -227,37 +227,38 @@ func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job
func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, error) {
var ver int64

var args []*model.TableInfo
fkCheck := false
err := job.DecodeArgs(&args, &fkCheck)
args, err := model.GetBatchCreateTableArgs(job)
if err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

tableInfos := make([]*model.TableInfo, 0, len(args.Tables))
// We don't construct jobs for every table, but only tableInfo
// The following loop creates a stub job for every table
//
// it clones a stub job from the ActionCreateTables job
stubJob := job.Clone()
stubJob.Args = make([]any, 1)
for i := range args {
stubJob.TableID = args[i].ID
stubJob.Args[0] = args[i]
if args[i].Sequence != nil {
err := createSequenceWithCheck(t, stubJob, args[i])
for i := range args.Tables {
tblArgs := args.Tables[i]
tableInfo := tblArgs.TableInfo
stubJob.TableID = tableInfo.ID
if tableInfo.Sequence != nil {
err := createSequenceWithCheck(t, stubJob, tableInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tableInfos = append(tableInfos, tableInfo)
} else {
tbInfo, err := createTable(jobCtx, t, stubJob, fkCheck)
tbInfo, err := createTable(jobCtx, t, stubJob, tblArgs)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
args[i] = tbInfo
tableInfos = append(tableInfos, tbInfo)
}
}

Expand All @@ -268,10 +269,10 @@ func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, er

job.State = model.JobStateDone
job.SchemaState = model.StatePublic
job.BinlogInfo.SetTableInfos(ver, args)
for i := range args {
job.BinlogInfo.SetTableInfos(ver, tableInfos)
for i := range tableInfos {
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(args[i]),
SchemaChangeEvent: util.NewCreateTableEvent(tableInfos[i]),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
}
Expand All @@ -290,14 +291,13 @@ func createTableOrViewWithCheck(t *meta.Meta, job *model.Job, schemaID int64, tb

func onCreateView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
var orReplace bool
var _placeholder int64 // oldTblInfoID
if err := job.DecodeArgs(tbInfo, &orReplace, &_placeholder); err != nil {
args, err := model.GetCreateTableArgs(job)
if err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tbInfo, orReplace := args.TableInfo, args.OnExistReplace
tbInfo.State = model.StateNone

oldTableID, err := findTableIDByName(jobCtx.infoCache, t, schemaID, tbInfo.Name.L)
Expand Down
12 changes: 12 additions & 0 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ type JobWrapper struct {
// IDAllocated see config of same name in CreateTableConfig.
// exported for test.
IDAllocated bool
JobArgs model.JobArgs
// job submission is run in async, we use this channel to notify the caller.
// when fast create table enabled, we might combine multiple jobs into one, and
// append the channel to this slice.
Expand All @@ -221,6 +222,17 @@ func NewJobWrapper(job *model.Job, idAllocated bool) *JobWrapper {
}
}

// NewJobWrapperWithArgs creates a new JobWrapper with job args.
// TODO: merge with NewJobWrapper later.
func NewJobWrapperWithArgs(job *model.Job, args model.JobArgs, idAllocated bool) *JobWrapper {
return &JobWrapper{
Job: job,
IDAllocated: idAllocated,
JobArgs: args,
ResultCh: []chan jobSubmitResult{make(chan jobSubmitResult)},
}
}

// NotifyResult notifies the job submit result.
func (t *JobWrapper) NotifyResult(err error) {
merged := len(t.ResultCh) > 1
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestBuildJobDependence(t *testing.T) {
job6 := &model.Job{ID: 6, TableID: 1, Type: model.ActionDropTable}
job7 := &model.Job{ID: 7, TableID: 2, Type: model.ActionModifyColumn}
job9 := &model.Job{ID: 9, SchemaID: 111, Type: model.ActionDropSchema}
job11 := &model.Job{ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}}
job11 := &model.Job{Version: model.JobVersion1, ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}}
err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
require.NoError(t, m.EnQueueDDLJob(job1))
Expand Down
Loading

0 comments on commit 29b6889

Please sign in to comment.