Skip to content

Commit

Permalink
store/copr: add log for buildBatchCopTasksConsistentHash (#41101)
Browse files Browse the repository at this point in the history
close #41102
  • Loading branch information
guo-shaoge committed Feb 9, 2023
1 parent 9cb4c48 commit 4997e7c
Showing 1 changed file with 64 additions and 7 deletions.
71 changes: 64 additions & 7 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,16 @@ func buildBatchCopTasksConsistentHash(
rangesForEachPhysicalTable []*KeyRanges,
storeType kv.StoreType,
ttl time.Duration) (res []*batchCopTask, err error) {
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
cache := kvStore.GetRegionCache()
fetchTopoBo := backoff.NewBackofferWithVars(ctx, fetchTopoMaxBackoff, nil)

var retryNum int
var rangesLen int
var storesStr []string
var (
retryNum int
rangesLen int
storesStr []string
)

tasks := make([]*copTask, 0)
regionIDs := make([]tikv.RegionVerID, 0)
Expand All @@ -635,7 +638,9 @@ func buildBatchCopTasksConsistentHash(
regionIDs = append(regionIDs, lo.Location.Region)
}
}
splitKeyElapsed := time.Since(start)

fetchTopoStart := time.Now()
for {
retryNum++
// todo: use AssureAndGetTopo() after SNS is done.
Expand All @@ -654,6 +659,7 @@ func buildBatchCopTasksConsistentHash(
}
break
}
fetchTopoElapsed := time.Since(fetchTopoStart)

rpcCtxs, err := getTiFlashComputeRPCContextByConsistentHash(regionIDs, storesStr)
if err != nil {
Expand Down Expand Up @@ -688,6 +694,24 @@ func buildBatchCopTasksConsistentHash(
}
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(storesStr)))

if log.GetLevel() <= zap.DebugLevel {
debugTaskMap := make(map[string]string, len(taskMap))
for s, b := range taskMap {
debugTaskMap[s] = fmt.Sprintf("addr: %s; regionInfos: %v", b.storeAddr, b.regionInfos)
}
logutil.BgLogger().Debug("detailed info buildBatchCopTasksConsistentHash", zap.Any("taskMap", debugTaskMap), zap.Any("allStores", storesStr))
}

if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasksConsistentHash takes too much time",
zap.Duration("total elapsed", elapsed),
zap.Int("retryNum", retryNum),
zap.Duration("splitKeyElapsed", splitKeyElapsed),
zap.Duration("fetchTopoElapsed", fetchTopoElapsed),
zap.Int("range len", rangesLen),
zap.Int("copTaskNum", len(tasks)),
zap.Int("batchCopTaskNum", len(res)))
}
failpointCheckForConsistentHash(res)
return res, nil
}
Expand Down Expand Up @@ -1185,15 +1209,23 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
storeType kv.StoreType,
ttl time.Duration) (res []*batchCopTask, err error) {
const cmdType = tikvrpc.CmdBatchCop
var retryNum int
var (
retryNum int
rangesLen int
copTaskNum int
splitKeyElapsed time.Duration
getStoreElapsed time.Duration
)
cache := kvStore.GetRegionCache()
start := time.Now()

for {
retryNum++
var rangesLen int
rangesLen = 0
tasks := make([]*copTask, 0)
regionIDs := make([]tikv.RegionVerID, 0)

splitKeyStart := time.Now()
for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
Expand All @@ -1211,7 +1243,9 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
regionIDs = append(regionIDs, lo.Location.Region)
}
}
splitKeyElapsed += time.Since(splitKeyStart)

getStoreStart := time.Now()
stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil {
return nil, err
Expand All @@ -1220,13 +1254,14 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
if len(stores) == 0 {
return nil, errors.New("tiflash_compute node is unavailable")
}
getStoreElapsed = time.Since(getStoreStart)

rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores)
if err != nil {
return nil, err
}
if rpcCtxs == nil {
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum))
logutil.BgLogger().Info("buildBatchCopTasksConsistentHashForPD retry because rcpCtx is nil", zap.Int("retryNum", retryNum))
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -1236,6 +1271,7 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
if len(rpcCtxs) != len(tasks) {
return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks))
}
copTaskNum = len(tasks)
taskMap := make(map[string]*batchCopTask)
for i, rpcCtx := range rpcCtxs {
regionInfo := RegionInfo{
Expand All @@ -1259,10 +1295,31 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
res = append(res, batchTask)
}
}
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores)))
logutil.BgLogger().Info("buildBatchCopTasksConsistentHashForPD done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores)))
if log.GetLevel() <= zap.DebugLevel {
debugStores := make([]string, 0, len(stores))
for _, s := range stores {
debugStores = append(debugStores, s.GetAddr())
}
debugTaskMap := make(map[string]string, len(taskMap))
for s, b := range taskMap {
debugTaskMap[s] = fmt.Sprintf("addr: %s; regionInfos: %v", b.storeAddr, b.regionInfos)
}
logutil.BgLogger().Debug("detailed info buildBatchCopTasksConsistentHashForPD", zap.Any("taskMap", debugTaskMap), zap.Any("allStores", debugStores))
}
break
}

if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasksConsistentHashForPD takes too much time",
zap.Duration("total elapsed", elapsed),
zap.Int("retryNum", retryNum),
zap.Duration("splitKeyElapsed", splitKeyElapsed),
zap.Duration("getStoreElapsed", getStoreElapsed),
zap.Int("range len", rangesLen),
zap.Int("copTaskNum", copTaskNum),
zap.Int("batchCopTaskNum", len(res)))
}
failpointCheckForConsistentHash(res)
return res, nil
}

0 comments on commit 4997e7c

Please sign in to comment.