Skip to content

Commit

Permalink
statistics: fix wrong singleflight implementation for stats' syncload(#…
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Aug 28, 2024
1 parent caa60c0 commit 0002c1a
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 64 deletions.
5 changes: 5 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,6 +1712,11 @@ type TableItemID struct {
IsIndex bool
}

// Key is used to generate unique key for TableItemID to use in the syncload
func (t TableItemID) Key() string {
return fmt.Sprintf("%d#%d#%t", t.ID, t.TableID, t.IsIndex)
}

// PolicyRefInfo is the struct to refer the placement policy.
type PolicyRefInfo struct {
ID int64 `json:"id"`
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_library(
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//oracle",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//singleflight",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
1 change: 0 additions & 1 deletion statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,6 @@ func NewHandle(ctx, initStatsCtx sessionctx.Context, lease time.Duration, pool s
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
err := handle.RefreshVars()
if err != nil {
return nil, err
Expand Down
83 changes: 34 additions & 49 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
)

type statsWrapper struct {
Expand All @@ -47,7 +48,7 @@ type StatsLoad struct {
SubCtxs []sessionctx.Context
NeededItemsCh chan *NeededItemTask
TimeoutItemsCh chan *NeededItemTask
WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult
Singleflight singleflight.Group
}

// NeededItemTask represents one needed column/indices with expire time.
Expand Down Expand Up @@ -235,49 +236,63 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
} else {
task = lastTask
}
return h.handleOneItemTask(task, readerCtx, ctx)
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
return h.handleOneItemTask(task, readerCtx, ctx)
})
timeout := time.Until(task.ToTimeout)
select {
case result := <-resultChan:
if result.Err == nil {
slr := result.Val.(*stmtctx.StatsLoadResult)
if slr.Error != nil {
return task, slr.Error
}
task.ResultCh <- *slr
return nil, nil
}
return task, result.Err
case <-time.After(timeout):
return task, nil
}
}

func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (*NeededItemTask, error) {
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (result *stmtctx.StatsLoadResult, err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
}()
result = &stmtctx.StatsLoadResult{Item: task.TableItemID}
item := result.Item
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(item.TableID)
if !ok {
h.writeToResultChan(task.ResultCh, result)
return nil, nil
return result, nil
}
var err error
wrapper := &statsWrapper{}
if item.IsIndex {
index, ok := tbl.Indices[item.ID]
if !ok || index.IsFullLoad() {
h.writeToResultChan(task.ResultCh, result)
return nil, nil
return result, nil
}
wrapper.idx = index
} else {
col, ok := tbl.Columns[item.ID]
if !ok || col.IsFullLoad() {
h.writeToResultChan(task.ResultCh, result)
return nil, nil
return result, nil
}
wrapper.col = col
}
// to avoid duplicated handling in concurrent scenario
working := h.setWorking(result.Item, task.ResultCh)
if !working {
h.writeToResultChan(task.ResultCh, result)
return nil, nil
}
// refresh statsReader to get latest stats
h.loadFreshStatsReader(readerCtx, ctx)
t := time.Now()
needUpdate := false
wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader)
if err != nil {
result.Error = err
return task, err
return result, err
}
if item.IsIndex {
if wrapper.idx != nil {
Expand All @@ -290,9 +305,8 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
h.writeToResultChan(task.ResultCh, result)
return result, nil
}
h.finishWorking(result)
return nil, nil
}

Expand Down Expand Up @@ -509,32 +523,3 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
}
return h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version, WithTableStatsByQuery()))
}

func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
h.StatsLoad.Lock()
defer h.StatsLoad.Unlock()
chList, ok := h.StatsLoad.WorkingColMap[item]
if ok {
if chList[0] == resultCh {
return true // just return for duplicate setWorking
}
h.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
return false
}
chList = []chan stmtctx.StatsLoadResult{}
chList = append(chList, resultCh)
h.StatsLoad.WorkingColMap[item] = chList
return true
}

func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) {
h.StatsLoad.Lock()
defer h.StatsLoad.Unlock()
if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok {
list := chList[1:]
for _, ch := range list {
h.writeToResultChan(ch, result)
}
}
delete(h.StatsLoad.WorkingColMap, result.Item)
}
17 changes: 3 additions & 14 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,26 +209,15 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
list, ok := h.StatsLoad.WorkingColMap[neededColumns[0]]
require.True(t, ok)
require.Len(t, list, 1)
require.Equal(t, stmtCtx1.StatsLoad.ResultCh, list[0])

task2, err2 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Nil(t, err2)
require.Nil(t, task2)
list, ok = h.StatsLoad.WorkingColMap[neededColumns[0]]
require.True(t, ok)
require.Len(t, list, 2)
require.Equal(t, stmtCtx2.StatsLoad.ResultCh, list[1])

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task3)

require.Len(t, stmtCtx1.StatsLoad.ResultCh, 1)
require.Len(t, stmtCtx2.StatsLoad.ResultCh, 1)
task, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task)

rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
require.True(t, ok1)
Expand Down

0 comments on commit 0002c1a

Please sign in to comment.