Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add more log and metrics for instance plan cache eviction #55377

Merged
merged 3 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3182,7 +3182,7 @@ func (do *Domain) planCacheMetricsAndVars() {
// planCacheEvictTrigger triggers the plan cache eviction periodically.
func (do *Domain) planCacheEvictTrigger() {
defer util.Recover(metrics.LabelDomain, "planCacheEvictTrigger", nil, false)
ticker := time.NewTicker(time.Second * 15) // 15s by default
ticker := time.NewTicker(time.Second * 30) // 30s by default
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
ticker.Stop()
logutil.BgLogger().Info("planCacheEvictTrigger exited.")
Expand All @@ -3192,7 +3192,13 @@ func (do *Domain) planCacheEvictTrigger() {
select {
case <-ticker.C:
// trigger the eviction
do.instancePlanCache.Evict()
begin := time.Now()
detailInfo, numEvicted := do.instancePlanCache.Evict()
metrics2.GetPlanCacheInstanceEvict().Set(float64(numEvicted))
logutil.BgLogger().Info("instance plan eviction",
zap.String("detail", detailInfo),
zap.Int64("num_evicted", int64(numEvicted)),
zap.Duration("time_spent", time.Since(begin)))
case <-do.exit:
return
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/planner/core/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
sessionPlanCacheInstanceMemoryUsage prometheus.Gauge
instancePlanCacheInstancePlanNumCounter prometheus.Gauge
instancePlanCacheInstanceMemoryUsage prometheus.Gauge
instancePlanCacheInstanceNumEvict prometheus.Gauge
)

func init() {
Expand All @@ -52,6 +53,7 @@ func InitMetricsVars() {
sessionPlanCacheInstanceMemoryUsage = metrics.PlanCacheInstanceMemoryUsage.WithLabelValues(" session-plan-cache")
instancePlanCacheInstancePlanNumCounter = metrics.PlanCacheInstancePlanNumCounter.WithLabelValues(" instance-plan-cache")
instancePlanCacheInstanceMemoryUsage = metrics.PlanCacheInstanceMemoryUsage.WithLabelValues(" instance-plan-cache")
instancePlanCacheInstanceNumEvict = metrics.PlanCacheInstancePlanNumCounter.WithLabelValues(" instance-plan-cache-last-evict")
}

// GetPlanCacheHitCounter get different plan cache hit counter
Expand Down Expand Up @@ -90,3 +92,8 @@ func GetPlanCacheInstanceMemoryUsage(instancePlanCache bool) prometheus.Gauge {
}
return sessionPlanCacheInstanceMemoryUsage
}

// GetPlanCacheInstanceEvict get instance plan cache evict counter.
func GetPlanCacheInstanceEvict() prometheus.Gauge {
return instancePlanCacheInstanceNumEvict
}
29 changes: 22 additions & 7 deletions pkg/planner/core/plan_cache_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package core

import (
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -55,6 +56,7 @@ type instancePlanCache struct {
totPlan atomic.Int64

evictMutex sync.Mutex
inEvict atomic.Bool
softMemLimit atomic.Int64
hardMemLimit atomic.Int64
}
Expand Down Expand Up @@ -84,10 +86,12 @@ func (pc *instancePlanCache) Get(key string, paramTypes any) (value any, ok bool
return pc.getPlanFromList(headNode, paramTypes)
}

func (*instancePlanCache) getPlanFromList(headNode *instancePCNode, paramTypes any) (any, bool) {
func (pc *instancePlanCache) getPlanFromList(headNode *instancePCNode, paramTypes any) (any, bool) {
for node := headNode.next.Load(); node != nil; node = node.next.Load() {
if checkTypesCompatibility4PC(node.value.paramTypes, paramTypes) { // v.Plan is read-only, no need to lock
node.lastUsed.Store(time.Now()) // atomically update the lastUsed field
if !pc.inEvict.Load() {
node.lastUsed.Store(time.Now()) // atomically update the lastUsed field
}
return node.value, true
}
}
Expand All @@ -97,6 +101,9 @@ func (*instancePlanCache) getPlanFromList(headNode *instancePCNode, paramTypes a
// Put puts the key and values into the cache.
// Due to some thread-safety issues, this Put operation might fail, use the returned succ to indicate it.
func (pc *instancePlanCache) Put(key string, value, paramTypes any) (succ bool) {
if pc.inEvict.Load() {
return // do nothing if eviction is in progress
}
vMem := value.(*PlanCacheValue).MemoryUsage()
if vMem+pc.totCost.Load() > pc.hardMemLimit.Load() {
return // do nothing if it exceeds the hard limit
Expand All @@ -108,6 +115,9 @@ func (pc *instancePlanCache) Put(key string, value, paramTypes any) (succ bool)
if _, ok := pc.getPlanFromList(headNode, paramTypes); ok {
return // some other thread has inserted the same plan before
}
if pc.inEvict.Load() {
return // do nothing if eviction is in progress
}

firstNode := headNode.next.Load()
currNode := pc.createNode(value)
Expand All @@ -124,24 +134,29 @@ func (pc *instancePlanCache) Put(key string, value, paramTypes any) (succ bool)
// step 1: iterate all values to collect their last_used
// step 2: estimate an eviction threshold time based on all last_used values
// step 3: iterate all values again and evict qualified values
func (pc *instancePlanCache) Evict() (evicted bool) {
func (pc *instancePlanCache) Evict() (detailInfo string, numEvicted int) {
pc.evictMutex.Lock() // make sure only one thread to trigger eviction for safety
defer pc.evictMutex.Unlock()
if pc.totCost.Load() < pc.softMemLimit.Load() {
return // do nothing
pc.inEvict.Store(true)
defer pc.inEvict.Store(false)
currentTot, softLimit := pc.totCost.Load(), pc.softMemLimit.Load()
if currentTot < softLimit {
detailInfo = fmt.Sprintf("memory usage is below the soft limit, currentTot: %v, softLimit: %v", currentTot, softLimit)
return
}
lastUsedTimes := make([]time.Time, 0, 64)
pc.foreach(func(_, this *instancePCNode) bool { // step 1
lastUsedTimes = append(lastUsedTimes, this.lastUsed.Load())
return false
})
threshold := pc.calcEvictionThreshold(lastUsedTimes) // step 2
pc.foreach(func(prev, this *instancePCNode) bool { // step 3
detailInfo = fmt.Sprintf("evict threshold: %v", threshold)
pc.foreach(func(prev, this *instancePCNode) bool { // step 3
if !this.lastUsed.Load().After(threshold) { // if lastUsed<=threshold, evict this value
if prev.next.CompareAndSwap(this, this.next.Load()) { // have to use CAS since
pc.totCost.Sub(this.value.MemoryUsage()) // it might have been updated by other thread
pc.totPlan.Sub(1)
evicted = true
numEvicted++
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type InstancePlanCache interface {
// Put puts the key and value into the cache.
Put(key string, value, paramTypes any) (succ bool)
// Evict evicts some cached values.
Evict() (evicted bool)
Evict() (detailInfo string, numEvicted int)
// Size returns the number of cached values.
Size() int64
// MemUsage returns the total memory usage of this plan cache.
Expand Down