Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add-index-update-ver
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Jan 29, 2023
2 parents ac8ab3b + ad38dbb commit 48e0841
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 28 deletions.
2 changes: 2 additions & 0 deletions resourcemanager/pooltask/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go_library(
srcs = [
"task.go",
"task_manager.go",
"task_manager_iterator.go",
"task_manager_scheduler.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager/pooltask",
visibility = ["//visibility:public"],
Expand Down
36 changes: 18 additions & 18 deletions resourcemanager/pooltask/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,29 @@ type tContainer[T any, U any, C any, CT any, TF Context[CT]] struct {
task *TaskBox[T, U, C, CT, TF]
}

type meta struct {
stats *list.List
createTS time.Time
origin int32
running int32
type meta[T any, U any, C any, CT any, TF Context[CT]] struct {
stats *list.List
createTS time.Time
initialConcurrency int32
running atomic.Int32
}

func newStats(concurrency int32) *meta {
s := &meta{
createTS: time.Now(),
stats: list.New(),
origin: concurrency,
func newStats[T any, U any, C any, CT any, TF Context[CT]](concurrency int32) *meta[T, U, C, CT, TF] {
s := &meta[T, U, C, CT, TF]{
createTS: time.Now(),
stats: list.New(),
initialConcurrency: concurrency,
}
return s
}

func (m *meta) getOriginConcurrency() int32 {
return m.origin
func (m *meta[T, U, C, CT, TF]) getOriginConcurrency() int32 {
return m.initialConcurrency
}

// TaskStatusContainer is a container that can control or watch the pool.
type TaskStatusContainer[T any, U any, C any, CT any, TF Context[CT]] struct {
stats map[uint64]*meta
stats map[uint64]*meta[T, U, C, CT, TF]
rw sync.RWMutex
}

Expand All @@ -70,7 +70,7 @@ func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskMa
task := make([]TaskStatusContainer[T, U, C, CT, TF], shard)
for i := 0; i < shard; i++ {
task[i] = TaskStatusContainer[T, U, C, CT, TF]{
stats: make(map[uint64]*meta),
stats: make(map[uint64]*meta[T, U, C, CT, TF]),
}
}
return TaskManager[T, U, C, CT, TF]{
Expand All @@ -83,7 +83,7 @@ func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskMa
func (t *TaskManager[T, U, C, CT, TF]) RegisterTask(taskID uint64, concurrency int32) {
id := getShardID(taskID)
t.task[id].rw.Lock()
t.task[id].stats[taskID] = newStats(concurrency)
t.task[id].stats[taskID] = newStats[T, U, C, CT, TF](concurrency)
t.task[id].rw.Unlock()
}

Expand Down Expand Up @@ -113,7 +113,7 @@ func (t *TaskManager[T, U, C, CT, TF]) AddSubTask(taskID uint64, task *TaskBox[T
t.running.Inc()
t.task[shardID].rw.Lock()
t.task[shardID].stats[taskID].stats.PushBack(tc)
t.task[shardID].stats[taskID].running++ // running job in this task
t.task[shardID].stats[taskID].running.Inc() // running job in this task
t.task[shardID].rw.Unlock()
}

Expand All @@ -122,7 +122,7 @@ func (t *TaskManager[T, U, C, CT, TF]) ExitSubTask(taskID uint64) {
shardID := getShardID(taskID)
t.running.Dec() // total running tasks
t.task[shardID].rw.Lock()
t.task[shardID].stats[taskID].running-- // running job in this task
t.task[shardID].stats[taskID].running.Dec() // running job in this task
t.task[shardID].rw.Unlock()
}

Expand All @@ -131,7 +131,7 @@ func (t *TaskManager[T, U, C, CT, TF]) Running(taskID uint64) int32 {
shardID := getShardID(taskID)
t.task[shardID].rw.Lock()
defer t.task[shardID].rw.Unlock()
return t.task[shardID].stats[taskID].running
return t.task[shardID].stats[taskID].running.Load()
}

// StopTask is to stop a task by TaskID.
Expand Down
131 changes: 131 additions & 0 deletions resourcemanager/pooltask/task_manager_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pooltask

import (
"container/list"
"time"
)

func (t *TaskManager[T, U, C, CT, TF]) getBoostTask() (tid uint64, result *TaskBox[T, U, C, CT, TF]) {
// boost task
// 1、the count of running task is less than concurrency
// 2、less run time, more possible to boost
tid, element := t.iter(canBoost[T, U, C, CT, TF])
if element != nil {
return tid, element.Value.(tContainer[T, U, C, CT, TF]).task
}
return 0, nil
}

func (t *TaskManager[T, U, C, CT, TF]) pauseTask() {
// pause task,
// 1、more run time, more possible to pause
// 2、if task have been boosted, first to pause.
tid, result := t.iter(canPause[T, U, C, CT, TF])
if result != nil {
result.Value.(tContainer[T, U, C, CT, TF]).task.status.CompareAndSwap(RunningTask, StopTask)
// delete it from list
shardID := getShardID(tid)
t.task[shardID].rw.Lock()
defer t.task[shardID].rw.Unlock()
t.task[shardID].stats[tid].stats.Remove(result)
}
}

func (t *TaskManager[T, U, C, CT, TF]) iter(fn func(m *meta[T, U, C, CT, TF], max time.Time) (*list.Element, bool)) (tid uint64, result *list.Element) {
var compareTS time.Time
for i := 0; i < shard; i++ {
breakFind := func(index int) (breakFind bool) {
t.task[i].rw.RLock()
defer t.task[i].rw.RUnlock()
for id, stats := range t.task[i].stats {
if result == nil {
result = findTask[T, U, C, CT, TF](stats, RunningTask)
tid = id
compareTS = stats.createTS
continue
}
newResult, pauseFind := fn(stats, compareTS)
if pauseFind {
result = newResult
tid = id
compareTS = stats.createTS
return true
}
if newResult != nil {
result = newResult
tid = id
compareTS = stats.createTS
}
}
return false
}(shard)
if breakFind {
break
}
}
return tid, result
}

func canPause[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], max time.Time) (result *list.Element, isBreak bool) {
if m.initialConcurrency < m.running.Load() {
box := findTask[T, U, C, CT, TF](m, RunningTask)
if box != nil {
return box, true
}
}
if m.createTS.Before(max) {
box := findTask[T, U, C, CT, TF](m, RunningTask)
if box != nil {
return box, false
}
}
return nil, false
}

func canBoost[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], min time.Time) (result *list.Element, isBreak bool) {
if m.running.Load() < m.initialConcurrency {
box := getTask[T, U, C, CT, TF](m)
if box != nil {
return box, true
}
}
if m.createTS.After(min) {
box := getTask[T, U, C, CT, TF](m)
if box != nil {
return box, false
}
}
return nil, false
}

func findTask[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], status int32) *list.Element {
for e := m.stats.Front(); e != nil; e = e.Next() {
box := e.Value.(tContainer[T, U, C, CT, TF])
if box.task.status.Load() == status {
return e
}
}
return nil
}

func getTask[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF]) *list.Element {
e := m.stats.Front()
if e != nil {
return e
}
return nil
}
28 changes: 28 additions & 0 deletions resourcemanager/pooltask/task_manager_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pooltask

// Overclock is to increase the concurrency of pool.
func (t *TaskManager[T, U, C, CT, TF]) Overclock() (tid uint64, task *TaskBox[T, U, C, CT, TF]) {
if t.concurrency > t.running.Load() {
return t.getBoostTask()
}
return 0, nil
}

// Downclock is to decrease the concurrency of pool.
func (t *TaskManager[T, U, C, CT, TF]) Downclock() {
t.pauseTask()
}
2 changes: 1 addition & 1 deletion resourcemanager/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *ResourceManager) Stop() {
}

// Register is to register pool into resource manager
func (r *ResourceManager) Register(pool util.GorotinuePool, name string, component util.Component) error {
func (r *ResourceManager) Register(pool util.GoroutinePool, name string, component util.Component) error {
p := util.PoolContainer{Pool: pool, Component: component}
return r.registerPool(name, &p)
}
Expand Down
4 changes: 2 additions & 2 deletions resourcemanager/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) {
switch cmd {
case scheduler.Downclock:
concurrency := con - 1
log.Info("downclock goroutine pool",
log.Info("[resource manager] downclock goroutine pool",
zap.Int("origin concurrency", con),
zap.Int("concurrency", concurrency),
zap.String("name", pool.Pool.Name()))
pool.Pool.Tune(concurrency)
case scheduler.Overclock:
concurrency := con + 1
log.Info("overclock goroutine pool",
log.Info("[resource manager] overclock goroutine pool",
zap.Int("origin concurrency", con),
zap.Int("concurrency", concurrency),
zap.String("name", pool.Pool.Name()))
Expand Down
2 changes: 1 addition & 1 deletion resourcemanager/scheduler/cpu_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewCPUScheduler() *CPUScheduler {
}

// Tune is to tune the goroutine pool
func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command {
func (*CPUScheduler) Tune(_ util.Component, pool util.GoroutinePool) Command {
if time.Since(pool.LastTunerTs()) < util.MinSchedulerInterval.Load() {
return Hold
}
Expand Down
2 changes: 1 addition & 1 deletion resourcemanager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ const (

// Scheduler is a scheduler interface
type Scheduler interface {
Tune(component util.Component, p util.GorotinuePool) Command
Tune(component util.Component, p util.GoroutinePool) Command
}
6 changes: 3 additions & 3 deletions resourcemanager/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ var (
MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond)
)

// GorotinuePool is a pool interface
type GorotinuePool interface {
// GoroutinePool is a pool interface
type GoroutinePool interface {
ReleaseAndWait()
Tune(size int)
LastTunerTs() time.Time
Expand All @@ -37,7 +37,7 @@ type GorotinuePool interface {

// PoolContainer is a pool container
type PoolContainer struct {
Pool GorotinuePool
Pool GoroutinePool
Component Component
}

Expand Down
12 changes: 11 additions & 1 deletion util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,22 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
p.SetLastTuneTs(time.Now())
p.capacity.Store(int32(size))
if size > capacity {
// boost
for i := 0; i < size-capacity; i++ {
if tid, boostTask := p.taskManager.Overclock(); boostTask != nil {
p.addWaitingTask()
p.taskManager.AddSubTask(tid, boostTask.Clone())
p.taskCh <- boostTask
}
}
if size-capacity == 1 {
p.cond.Signal()
return
}
p.cond.Broadcast()
return
}
if size < capacity {
p.taskManager.Downclock()
}
}

Expand Down
Loading

0 comments on commit 48e0841

Please sign in to comment.