Skip to content

Commit

Permalink
*: avoid concurrently using the session in the syncload (#52830)
Browse files Browse the repository at this point in the history
close #52827
  • Loading branch information
hawkingrei committed Apr 23, 2024
1 parent 0931309 commit 8629068
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
14 changes: 12 additions & 2 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
}
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) {
err := s.handleOneItemTask(sctx, task)
err := s.handleOneItemTask(task)
return nil, err
})
timeout := time.Until(task.ToTimeout)
Expand Down Expand Up @@ -264,13 +264,23 @@ func isVaildForRetry(task *statstypes.NeededItemTask) bool {
return task.Retry <= RetryCount
}

func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) {
func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err error) {
se, err := s.statsHandle.SPool().Get()
if err != nil {
return err
}
sctx := se.(sessionctx.Context)
sctx.GetSessionVars().StmtCtx.Priority = mysql.HighPriority
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
err = errors.Errorf("stats loading panicked: %v", r)
}
if err == nil { // only recycle when no error
sctx.GetSessionVars().StmtCtx.Priority = mysql.NoPriority
s.statsHandle.SPool().Put(se)
}
}()
item := task.Item.TableItemID
tbl, ok := s.statsHandle.Get(item.TableID)
Expand Down
13 changes: 13 additions & 0 deletions pkg/statistics/handle/syncload/stats_syncload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,18 @@ func TestRetry(t *testing.T) {
t.Logf("task1.ResultCh should get nothing")
t.FailNow()
}
task1.Retry = 0
for i := 0; i < syncload.RetryCount*5; i++ {
task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get nothing")
t.FailNow()
default:
}
task1.Retry = 0
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail"))
}

0 comments on commit 8629068

Please sign in to comment.