From 8b2dad7bdfa94098453ccc51346cf9fb2c79a23e Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Fri, 16 Dec 2022 14:56:06 +0800 Subject: [PATCH 1/2] done Signed-off-by: wjhuang2016 --- ddl/ddl_worker.go | 27 +++++++++++++++++++++++---- ddl/job_table.go | 9 ++------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 8621dcb08361c..e2be4ccf2427f 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -536,16 +536,20 @@ func cleanMDLInfo(pool *sessionPool, jobID int64, ec *clientv3.Client) { } // checkMDLInfo checks if metadata lock info exists. It means the schema is locked by some TiDBs if exists. -func checkMDLInfo(jobID int64, pool *sessionPool) (bool, error) { - sql := fmt.Sprintf("select * from mysql.tidb_mdl_info where job_id = %d", jobID) +func checkMDLInfo(jobID int64, pool *sessionPool) (bool, int64, error) { + sql := fmt.Sprintf("select version from mysql.tidb_mdl_info where job_id = %d", jobID) sctx, _ := pool.get() defer pool.put(sctx) sess := newSession(sctx) rows, err := sess.execute(context.Background(), sql, "check-mdl-info") if err != nil { - return false, err + return false, 0, err } - return len(rows) > 0, nil + if len(rows) == 0 { + return false, 0, nil + } + ver := rows[0].GetInt64(0) + return true, ver, nil } func needUpdateRawArgs(job *model.Job, meetErr bool) bool { @@ -1377,6 +1381,21 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l zap.String("job", job.String())) } +// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL. +func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, waitTime time.Duration, latestSchemaVersion int64) { + failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) { + if val.(bool) { + if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion { + panic("check down before update global version failed") + } else { + mockDDLErrOnce = -1 + } + } + }) + + waitSchemaChanged(context.Background(), d, waitTime, latestSchemaVersion, job) +} + // waitSchemaSynced handles the following situation: // If the job enters a new state, and the worker crashs when it's in the process of waiting for 2 * lease time, // Then the worker restarts quickly, we may run the job immediately again, diff --git a/ddl/job_table.go b/ddl/job_table.go index a6e19b7f7edf0..5aa416cc2f886 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -237,7 +237,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { // check if this ddl job is synced to all servers. if !d.isSynced(job) || d.once.Load() { if variable.EnableMDL.Load() { - exist, err := checkMDLInfo(job.ID, d.sessPool) + exist, version, err := checkMDLInfo(job.ID, d.sessPool) if err != nil { logutil.BgLogger().Warn("[ddl] check MDL info failed", zap.Error(err), zap.String("job", job.String())) // Release the worker resource. @@ -246,12 +246,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { } else if exist { // Release the worker resource. pool.put(wk) - err = waitSchemaSynced(d.ddlCtx, job, 2*d.lease) - if err != nil { - logutil.BgLogger().Warn("[ddl] wait ddl job sync failed", zap.Error(err), zap.String("job", job.String())) - time.Sleep(time.Second) - return - } + waitSchemaSyncedForMDL(d.ddlCtx, job, 2*d.lease, version) d.once.Store(false) cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) // Don't have a worker now. From 664e30e36f508eaf9a9f83edbbb83c3f9e6779f6 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Fri, 16 Dec 2022 15:48:12 +0800 Subject: [PATCH 2/2] done Signed-off-by: wjhuang2016 --- ddl/ddl_worker.go | 15 +++++++++++++-- ddl/job_table.go | 5 ++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index e2be4ccf2427f..89e515db8e1bc 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1382,7 +1382,7 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l } // waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL. -func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, waitTime time.Duration, latestSchemaVersion int64) { +func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64) error { failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) { if val.(bool) { if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion { @@ -1393,7 +1393,18 @@ func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, waitTime time.Duration, l } }) - waitSchemaChanged(context.Background(), d, waitTime, latestSchemaVersion, job) + timeStart := time.Now() + // OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB). + err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion) + if err != nil { + logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) + return err + } + logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", + zap.Int64("ver", latestSchemaVersion), + zap.Duration("take time", time.Since(timeStart)), + zap.String("job", job.String())) + return nil } // waitSchemaSynced handles the following situation: diff --git a/ddl/job_table.go b/ddl/job_table.go index 5aa416cc2f886..771a83b8f8264 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -246,7 +246,10 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { } else if exist { // Release the worker resource. pool.put(wk) - waitSchemaSyncedForMDL(d.ddlCtx, job, 2*d.lease, version) + err = waitSchemaSyncedForMDL(d.ddlCtx, job, version) + if err != nil { + return + } d.once.Store(false) cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) // Don't have a worker now.