Skip to content

Commit

Permalink
ddl: fix data race on job.SetWarnings
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Feb 9, 2023
1 parent 9f5cc51 commit 4cfaa39
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
6 changes: 4 additions & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,12 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
func (w *worker) mergeWarningsIntoJob(job *model.Job) {
rc := w.getReorgCtx(job.ID)
rc.mu.Lock()
defer rc.mu.Unlock()
partWarnings := rc.mu.warnings
partWarningsCount := rc.mu.warningsCount
job.SetWarnings(mergeWarningsAndWarningsCount(partWarnings, job.ReorgMeta.Warnings, partWarningsCount, job.ReorgMeta.WarningsCount))
rc.mu.Unlock()
warnings, warningsCount := job.GetWarnings()
warnings, warningsCount = mergeWarningsAndWarningsCount(partWarnings, warnings, partWarningsCount, warningsCount)
job.SetWarnings(warnings, warningsCount)
}

func updateBackfillProgress(w *worker, reorgInfo *reorgInfo, tblInfo *model.TableInfo,
Expand Down
10 changes: 8 additions & 2 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,18 @@ func (job *Job) GetRowCount() int64 {

// SetWarnings sets the warnings of rows handled.
func (job *Job) SetWarnings(warnings map[errors.ErrorID]*terror.Error, warningsCount map[errors.ErrorID]int64) {
job.Mu.Lock()
job.ReorgMeta.Warnings = warnings
job.ReorgMeta.WarningsCount = warningsCount
job.Mu.Unlock()
}

// GetWarnings gets the warnings of the rows handled.
func (job *Job) GetWarnings() (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
return job.ReorgMeta.Warnings, job.ReorgMeta.WarningsCount
job.Mu.Lock()
w, wc := job.ReorgMeta.Warnings, job.ReorgMeta.WarningsCount
job.Mu.Unlock()
return w, wc
}

// Encode encodes job with json format.
Expand Down Expand Up @@ -672,7 +677,8 @@ func (job *Job) String() string {
ret := fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaState:%s, SchemaID:%d, TableID:%d, RowCount:%d, ArgLen:%d, start time: %v, Err:%v, ErrCount:%d, SnapshotVersion:%v",
job.ID, job.Type, job.State, job.SchemaState, job.SchemaID, job.TableID, rowCount, len(job.Args), TSConvert2Time(job.StartTS), job.Error, job.ErrorCount, job.SnapshotVer)
if job.ReorgMeta != nil {
ret += fmt.Sprintf(", UniqueWarnings:%d", len(job.ReorgMeta.Warnings))
warnings, _ := job.GetWarnings()
ret += fmt.Sprintf(", UniqueWarnings:%d", len(warnings))
}
if job.Type != ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
ret += fmt.Sprintf(", Multi-Schema Change:true, Revertible:%v", job.MultiSchemaInfo.Revertible)
Expand Down

0 comments on commit 4cfaa39

Please sign in to comment.