Skip to content

Commit

Permalink
copr: Fix bug that mpp node availability detect does not work in some…
Browse files Browse the repository at this point in the history
… corner cases (#28201) (#28288)

close pingcap/tiflash#3118
  • Loading branch information
ti-srebot committed Feb 10, 2022
1 parent b94d692 commit fc4f280
Showing 1 changed file with 53 additions and 53 deletions.
106 changes: 53 additions & 53 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ func (rs *batchCopResponse) RespTime() time.Duration {
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask {
if len(originalTasks) <= 1 {
isMPP := mppStoreLastFailTime != nil
// for mpp, we still need to detect the store availability
if len(originalTasks) <= 1 && !isMPP {
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
cache := kvStore.GetRegionCache()
storeTaskMap := make(map[uint64]*batchCopTask)
// storeCandidateRegionMap stores all the possible store->region map. Its content is
Expand Down Expand Up @@ -231,16 +232,28 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
}
}
}
if totalRemainingRegionNum == 0 {
return originalTasks
}

avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if totalRemainingRegionNum > 0 {
avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
Expand All @@ -250,57 +263,44 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
return store
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
var key string
var ri RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
}
return store
}

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
var key string
var ri RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
}
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}
}
if totalRemainingRegionNum > 0 {
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}

var ret []*batchCopTask
for _, task := range storeTaskMap {
Expand Down

0 comments on commit fc4f280

Please sign in to comment.