Skip to content

Commit

Permalink
dm/load: fix concurrent call Loader.Status (pingcap#3459) (pingcap#3468)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and 3AceShowHand committed Jan 13, 2022
1 parent a4330bd commit 95d038f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 9 deletions.
10 changes: 5 additions & 5 deletions dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ type Loader struct {
// to calculate remainingTimeGauge metric, map will be init in `l.prepare.prepareDataFiles`
dbTableDataTotalSize map[string]map[string]*atomic.Int64
dbTableDataFinishedSize map[string]map[string]*atomic.Int64
dbTableDataLastFinishedSize map[string]map[string]int64
dbTableDataLastUpdatedTime time.Time
dbTableDataLastFinishedSize map[string]map[string]*atomic.Int64
dbTableDataLastUpdatedTime atomic.Time

metaBinlog atomic.String
metaBinlogGTID atomic.String
Expand Down Expand Up @@ -1053,12 +1053,12 @@ func (l *Loader) prepareDataFiles(files map[string]struct{}) error {
if _, ok := l.dbTableDataTotalSize[db]; !ok {
l.dbTableDataTotalSize[db] = make(map[string]*atomic.Int64)
l.dbTableDataFinishedSize[db] = make(map[string]*atomic.Int64)
l.dbTableDataLastFinishedSize[db] = make(map[string]int64)
l.dbTableDataLastFinishedSize[db] = make(map[string]*atomic.Int64)
}
if _, ok := l.dbTableDataTotalSize[db][table]; !ok {
l.dbTableDataTotalSize[db][table] = atomic.NewInt64(0)
l.dbTableDataFinishedSize[db][table] = atomic.NewInt64(0)
l.dbTableDataLastFinishedSize[db][table] = 0
l.dbTableDataLastFinishedSize[db][table] = atomic.NewInt64(0)
}
l.dbTableDataTotalSize[db][table].Add(size)

Expand All @@ -1083,7 +1083,7 @@ func (l *Loader) prepare() error {
l.finishedDataSize.Store(0) // reset before load from checkpoint
l.dbTableDataTotalSize = make(map[string]map[string]*atomic.Int64)
l.dbTableDataFinishedSize = make(map[string]map[string]*atomic.Int64)
l.dbTableDataLastFinishedSize = make(map[string]map[string]int64)
l.dbTableDataLastFinishedSize = make(map[string]map[string]*atomic.Int64)

// check if mydumper dir data exists.
if !utils.IsDirExists(l.cfg.Dir) {
Expand Down
14 changes: 10 additions & 4 deletions dm/loader/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,25 @@ func (l *Loader) printStatus() {
totalSize := l.totalDataSize.Load()
totalFileCount := l.totalFileCount.Load()

interval := time.Since(l.dbTableDataLastUpdatedTime)
interval := time.Since(l.dbTableDataLastUpdatedTime.Load())
intervalSecond := interval.Seconds()
if intervalSecond == 0 {
return
}

for db, tables := range l.dbTableDataFinishedSize {
for table, size := range tables {
curFinished := size.Load()
speed := float64(curFinished-l.dbTableDataLastFinishedSize[db][table]) / interval.Seconds()
l.dbTableDataLastFinishedSize[db][table] = curFinished
lastFinished := l.dbTableDataFinishedSize[db][table].Load()
speed := float64(curFinished-lastFinished) / intervalSecond
l.dbTableDataLastFinishedSize[db][table].Store(curFinished)
if speed > 0 {
remainingSeconds := float64(l.dbTableDataTotalSize[db][table].Load()-curFinished) / speed
remainingTimeGauge.WithLabelValues(l.cfg.Name, l.cfg.WorkerName, l.cfg.SourceID, db, table).Set(remainingSeconds)
}
}
}
l.dbTableDataLastUpdatedTime = time.Now()
l.dbTableDataLastUpdatedTime.Store(time.Now())

l.logger.Info("progress status of load",
zap.Int64("finished_bytes", finishedSize),
Expand Down
56 changes: 56 additions & 0 deletions dm/loader/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package loader

import (
"sync"

. "github.com/pingcap/check"
"go.uber.org/atomic"

"github.com/pingcap/ticdc/dm/dm/config"
"github.com/pingcap/ticdc/dm/pkg/log"
)

func (*testLoaderSuite) TestConcurrentStatus(c *C) {
l := &Loader{}
l.cfg = &config.SubTaskConfig{}
l.logger = log.L()
l.finishedDataSize.Store(100)
l.totalDataSize.Store(200)
l.totalFileCount.Store(10)
l.dbTableDataFinishedSize = map[string]map[string]*atomic.Int64{
"db1": {
"table1": atomic.NewInt64(10),
"table2": atomic.NewInt64(20),
},
}
l.dbTableDataLastFinishedSize = map[string]map[string]*atomic.Int64{
"db1": {
"table1": atomic.NewInt64(0),
"table2": atomic.NewInt64(0),
},
}

// test won't race or panic
wg := sync.WaitGroup{}
wg.Add(20)
for i := 0; i < 20; i++ {
go func() {
l.Status(nil)
wg.Done()
}()
}
wg.Wait()
}

0 comments on commit 95d038f

Please sign in to comment.