From 7395371893a71db68b35afa76bca17884be5b0ae Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Tue, 3 Sep 2019 14:52:48 +0800 Subject: [PATCH 01/11] add max cop resp time --- distsql/distsql.go | 3 ++- distsql/distsql_test.go | 5 +++- distsql/select_result.go | 1 + executor/builder.go | 2 +- executor/distsql.go | 4 +-- executor/table_reader.go | 6 ++--- kv/kv.go | 3 +++ planner/core/common_plans.go | 14 +++++++--- store/tikv/coprocessor.go | 16 +++++++---- util/execdetails/execdetails.go | 47 ++++++++++++++++++++++++++++++--- 10 files changed, 81 insertions(+), 20 deletions(-) diff --git a/distsql/distsql.go b/distsql/distsql.go index 57909b5457c2b..2d2b9d5f3ecf8 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -92,11 +92,12 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie // The difference from Select is that SelectWithRuntimeStats will set copPlanIDs into selectResult, // which can help selectResult to collect runtime stats. func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, - fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (SelectResult, error) { + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (SelectResult, error) { sr, err := Select(ctx, sctx, kvReq, fieldTypes, fb) if err == nil { if selectResult, ok := sr.(*selectResult); ok { selectResult.copPlanIDs = copPlanIDs + selectResult.rootPlanID = rootPlanID } } return sr, err diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 4a145717d0fce..1231c6094a18c 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -73,7 +73,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str idx := i planIDFuncs = append(planIDFuncs, stringutil.StringerStr(planIDs[idx])) } - response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs) + response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs, stringutil.StringerStr("root_0")) } c.Assert(err, IsNil) @@ -404,6 +404,9 @@ func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails { // MemSize implements kv.ResultSubset interface. func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) } +// RespTime implements kv.ResultSubset interface. +func (r *mockResultSubset) RespTime() time.Duration { return 0 } + func populateBuffer() []byte { numCols := 4 numRows := 1024 diff --git a/distsql/select_result.go b/distsql/select_result.go index 273ce7b1d1110..9619b08a16aa4 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -77,6 +77,7 @@ type selectResult struct { // copPlanIDs contains all copTasks' planIDs, // which help to collect copTasks' runtime stats. copPlanIDs []fmt.Stringer + rootPlanID fmt.Stringer memTracker *memory.Tracker } diff --git a/executor/builder.go b/executor/builder.go index bc94b0d127b84..23f5397d02eca 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2092,7 +2092,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex } e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) e.resultHandler = &tableResultHandler{} - result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) + result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) if err != nil { return nil, err } diff --git a/executor/distsql.go b/executor/distsql.go index 6f5d5f2bcffda..2f14dd7f2d788 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -302,7 +302,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.feedback.Invalidate() return err } - e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) + e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) if err != nil { e.feedback.Invalidate() return err @@ -452,7 +452,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k tps = e.idxColTps } // Since the first read only need handle information. So its returned col is only 1. - result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans)) + result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans), e.id) if err != nil { return err } diff --git a/executor/table_reader.go b/executor/table_reader.go index 1426c47fb18ba..0d125f7bd973d 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -44,9 +44,9 @@ type selectResultHook struct { } func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, - fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (distsql.SelectResult, error) { + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (distsql.SelectResult, error) { if sr.selectResultFunc == nil { - return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) + return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs, rootPlanID) } return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) } @@ -189,7 +189,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra return nil, err } e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) - result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) if err != nil { return nil, err } diff --git a/kv/kv.go b/kv/kv.go index 9af0b88baf950..836e7f12561e1 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -15,6 +15,7 @@ package kv import ( "context" + "time" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/oracle" @@ -255,6 +256,8 @@ type ResultSubset interface { GetExecDetails() *execdetails.ExecDetails // MemSize returns how many bytes of memory this result use for tracing memory usage. MemSize() int64 + // RespTime returns the response time for the request. + RespTime() time.Duration } // Response represents the response returned from KV layer. diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 5f23659aa56a5..3f18045af4e09 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -655,13 +655,21 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st runtimeStatsColl := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl // There maybe some mock information for cop task to let runtimeStatsColl.Exists(p.ExplainID()) is true. // So check copTaskExecDetail first and print the real cop task information if it's not empty. + var analyzeInfo string if runtimeStatsColl.ExistsCopStats(explainID) { - row = append(row, runtimeStatsColl.GetCopStats(explainID).String()) + analyzeInfo = runtimeStatsColl.GetCopStats(explainID).String() } else if runtimeStatsColl.ExistsRootStats(explainID) { - row = append(row, runtimeStatsColl.GetRootStats(explainID).String()) + analyzeInfo = runtimeStatsColl.GetRootStats(explainID).String() } else { - row = append(row, "time:0ns, loops:0, rows:0") + analyzeInfo = "time:0ns, loops:0, rows:0" } + switch p.(type) { + case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader: + if runtimeStatsColl.GetReaderStats(explainID) != nil { + analyzeInfo += ", " + runtimeStatsColl.GetReaderStats(explainID).String() + } + } + row = append(row, analyzeInfo) tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String()) if tracker != nil { diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 1dea973858225..579d7ee60d5f8 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -387,6 +387,7 @@ type copResponse struct { startKey kv.Key err error respSize int64 + respTime time.Duration } const ( @@ -429,6 +430,10 @@ func (rs *copResponse) MemSize() int64 { return rs.respSize } +func (rs *copResponse) RespTime() time.Duration { + return rs.respTime +} + const minLogCopTaskTime = 300 * time.Millisecond // run is a worker function that get a copTask from channel, handle it and @@ -663,11 +668,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch metrics.TiKVCoprocessorHistogram.Observe(costTime.Seconds()) if task.cmdType == tikvrpc.CmdCopStream { - return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch) + return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch, costTime) } // Handles the response for non-streaming copTask. - return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, nil) + return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, nil, costTime) } const ( @@ -726,7 +731,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan return logStr } -func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) { defer stream.Close() var resp *coprocessor.Response var lastRange *coprocessor.KeyRange @@ -736,7 +741,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP return nil, nil } for { - remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, task, ch, lastRange) + remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, task, ch, lastRange, costTime) if err != nil || len(remainedTasks) != 0 { return remainedTasks, errors.Trace(err) } @@ -766,7 +771,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP // returns more tasks when that happens, or handles the response if no error. // if we're handling streaming coprocessor response, lastRange is the range of last // successful response, otherwise it's nil. -func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) { +func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) { if regionErr := resp.pbResp.GetRegionError(); regionErr != nil { if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil { return nil, errors.Trace(err) @@ -810,6 +815,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon if rpcCtx != nil { resp.detail.CalleeAddress = rpcCtx.Addr } + resp.respTime = costTime if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil { if handleTime := pbDetails.HandleTime; handleTime != nil { resp.detail.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index c4ae9cfecc7bb..d42a221003967 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -253,11 +253,32 @@ func (crs *CopRuntimeStats) String() string { procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalRows, totalIters, totalTasks) } +// ReaderRuntimeStats collects stats for TableReader, IndexReader and IndexLookupReader +type ReaderRuntimeStats struct { + sync.Mutex + + maxCopRespTime time.Duration +} + +// RecordOneCopTask record once cop response time to update maxCopRespTime +func (rrs *ReaderRuntimeStats) RecordOneCopTask(t time.Duration) { + rrs.Lock() + defer rrs.Unlock() + if t > rrs.maxCopRespTime { + rrs.maxCopRespTime = t + } +} + +func (rrs *ReaderRuntimeStats) String() string { + return fmt.Sprintf("cop resp time max: %v", rrs.maxCopRespTime) +} + // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { - mu sync.Mutex - rootStats map[string]*RuntimeStats - copStats map[string]*CopRuntimeStats + mu sync.Mutex + rootStats map[string]*RuntimeStats + copStats map[string]*CopRuntimeStats + readerStats map[string]*ReaderRuntimeStats } // RuntimeStats collects one executor's execution info. @@ -273,7 +294,7 @@ type RuntimeStats struct { // NewRuntimeStatsColl creates new executor collector. func NewRuntimeStatsColl() *RuntimeStatsColl { return &RuntimeStatsColl{rootStats: make(map[string]*RuntimeStats), - copStats: make(map[string]*CopRuntimeStats)} + copStats: make(map[string]*CopRuntimeStats), readerStats: make(map[string]*ReaderRuntimeStats)} } // GetRootStats gets execStat for a executor. @@ -306,6 +327,12 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID, address string, summary *tip copStats.RecordOneCopTask(address, summary) } +// RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader. +func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, maxCopRespTime time.Duration) { + readerStats := e.GetReaderStats(planID) + readerStats.RecordOneCopTask(maxCopRespTime) +} + // ExistsRootStats checks if the planID exists in the rootStats collection. func (e *RuntimeStatsColl) ExistsRootStats(planID string) bool { e.mu.Lock() @@ -322,6 +349,18 @@ func (e *RuntimeStatsColl) ExistsCopStats(planID string) bool { return exists } +// ExistsReaderStats checks if the planID exists in the readerStats collection. +func (e *RuntimeStatsColl) GetReaderStats(planID string) *ReaderRuntimeStats { + e.mu.Lock() + defer e.mu.Unlock() + stats, exists := e.readerStats[planID] + if !exists { + stats = &ReaderRuntimeStats{maxCopRespTime: 0} + e.readerStats[planID] = stats + } + return stats +} + // Record records executor's execution. func (e *RuntimeStats) Record(d time.Duration, rowNum int) { atomic.AddInt32(&e.loop, 1) From 15f5d3d7857881cc333f0b8afcd398b62c2bc17f Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Tue, 3 Sep 2019 15:27:09 +0800 Subject: [PATCH 02/11] fix --- distsql/select_result.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distsql/select_result.go b/distsql/select_result.go index 9619b08a16aa4..b5e50d39e3dda 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -112,6 +112,9 @@ func (r *selectResult) fetch(ctx context.Context) { } else { result.result = resultSubset r.memConsume(int64(resultSubset.MemSize())) + if r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID.String(), resultSubset.RespTime()) + } } select { From d741b0446627bc1b4ccd1225a9a117ff34d01a8b Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Tue, 3 Sep 2019 15:54:50 +0800 Subject: [PATCH 03/11] fix ci --- util/execdetails/execdetails.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index d42a221003967..f55f6164ec51b 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -349,7 +349,7 @@ func (e *RuntimeStatsColl) ExistsCopStats(planID string) bool { return exists } -// ExistsReaderStats checks if the planID exists in the readerStats collection. +// GetReaderStats gets the ReaderRuntimeStats specified by planID. func (e *RuntimeStatsColl) GetReaderStats(planID string) *ReaderRuntimeStats { e.mu.Lock() defer e.mu.Unlock() From 1dc762c31633dfb3af950ffb1c63d8de90da3c37 Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Tue, 3 Sep 2019 16:11:16 +0800 Subject: [PATCH 04/11] add avg --- util/execdetails/execdetails.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index f55f6164ec51b..ee6f0db37724a 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -258,6 +258,8 @@ type ReaderRuntimeStats struct { sync.Mutex maxCopRespTime time.Duration + sumCopRespTime time.Duration + cntCopResp float64 } // RecordOneCopTask record once cop response time to update maxCopRespTime @@ -267,10 +269,12 @@ func (rrs *ReaderRuntimeStats) RecordOneCopTask(t time.Duration) { if t > rrs.maxCopRespTime { rrs.maxCopRespTime = t } + rrs.sumCopRespTime += t + rrs.cntCopResp += 1 } func (rrs *ReaderRuntimeStats) String() string { - return fmt.Sprintf("cop resp time max: %v", rrs.maxCopRespTime) + return fmt.Sprintf("cop resp time max: %v avg: %v", rrs.maxCopRespTime, time.Duration(float64(rrs.sumCopRespTime)/rrs.cntCopResp)) } // RuntimeStatsColl collects executors's execution info. @@ -328,9 +332,9 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID, address string, summary *tip } // RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader. -func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, maxCopRespTime time.Duration) { +func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, CopRespTime time.Duration) { readerStats := e.GetReaderStats(planID) - readerStats.RecordOneCopTask(maxCopRespTime) + readerStats.RecordOneCopTask(CopRespTime) } // ExistsRootStats checks if the planID exists in the rootStats collection. From 2cd35409c0faaac959436048138763313f7cf0fe Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Wed, 4 Sep 2019 13:46:54 +0800 Subject: [PATCH 05/11] fix error --- distsql/select_result.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index b5e50d39e3dda..e28b7fed5f162 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -112,7 +112,7 @@ func (r *selectResult) fetch(ctx context.Context) { } else { result.result = resultSubset r.memConsume(int64(resultSubset.MemSize())) - if r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { + if r.ctx != nil && r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID.String(), resultSubset.RespTime()) } } From 3d8e83c69067b1caf4f8153e92ec1ffe6d9bbeb1 Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Wed, 4 Sep 2019 17:20:23 +0800 Subject: [PATCH 06/11] fix and improve --- executor/join_test.go | 5 +++-- planner/core/cbo_test.go | 2 +- util/execdetails/execdetails.go | 37 +++++++++++++++++++++------------ 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/executor/join_test.go b/executor/join_test.go index 1d8fb322c94fd..ec34d95cf5e97 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -16,6 +16,7 @@ package executor_test import ( "context" "fmt" + "strings" "time" . "github.com/pingcap/check" @@ -1065,9 +1066,9 @@ func (s *testSuite2) TestHashJoin(c *C) { c.Assert(len(row), Equals, 7) outerExecInfo := row[1][4].(string) // FIXME: revert this result to 1 after TableReaderExecutor can handle initChunkSize. - c.Assert(outerExecInfo[len(outerExecInfo)-1:], Equals, "5") + c.Assert(outerExecInfo[strings.Index(outerExecInfo, "rows")+5:strings.Index(outerExecInfo, "rows")+6], Equals, "5") innerExecInfo := row[4][4].(string) - c.Assert(innerExecInfo[len(innerExecInfo)-1:], Equals, "0") + c.Assert(innerExecInfo[strings.Index(innerExecInfo, "rows")+5:strings.Index(innerExecInfo, "rows")+6], Equals, "0") } func (s *testSuite2) TestJoinDifferentDecimals(c *C) { diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index 4cccdb6ce3266..a173024af79e6 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -979,7 +979,7 @@ func (s *testAnalyzeSuite) TestIssue9805(c *C) { c.Assert(row, HasLen, 6) if strings.HasSuffix(row[0].(string), "IndexLookUp_12") { hasIndexLookUp12 = true - c.Assert(row[4], Equals, "time:0ns, loops:0, rows:0") + c.Assert(row[4].(string)[:25], Equals, "time:0ns, loops:0, rows:0") } } c.Assert(hasIndexLookUp12, IsTrue) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index ee6f0db37724a..5d5bbf9ce182c 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -257,24 +257,35 @@ func (crs *CopRuntimeStats) String() string { type ReaderRuntimeStats struct { sync.Mutex - maxCopRespTime time.Duration - sumCopRespTime time.Duration - cntCopResp float64 + copRespTime []time.Duration } -// RecordOneCopTask record once cop response time to update maxCopRespTime +// RecordOneCopTask record once cop response time to update maxcopRespTime func (rrs *ReaderRuntimeStats) RecordOneCopTask(t time.Duration) { rrs.Lock() defer rrs.Unlock() - if t > rrs.maxCopRespTime { - rrs.maxCopRespTime = t - } - rrs.sumCopRespTime += t - rrs.cntCopResp += 1 + rrs.copRespTime = append(rrs.copRespTime, t) } func (rrs *ReaderRuntimeStats) String() string { - return fmt.Sprintf("cop resp time max: %v avg: %v", rrs.maxCopRespTime, time.Duration(float64(rrs.sumCopRespTime)/rrs.cntCopResp)) + sort.Slice(rrs.copRespTime, func(i, j int) bool { + return rrs.copRespTime[i] < rrs.copRespTime[j] + }) + s := "cop resp time " + size := len(rrs.copRespTime) + if size == 0 { + return s + "max:0ns, min:0ns, avg:0ns, p80:0ns, p95:0ns" + } + s += fmt.Sprintf("max:%v", rrs.copRespTime[size-1]) + s += fmt.Sprintf(", min:%v", rrs.copRespTime[0]) + sum := 0.0 + for _, t := range rrs.copRespTime { + sum += float64(t) + } + s += fmt.Sprintf(", avg:%v", time.Duration(sum/float64(size))) + s += fmt.Sprintf(", p80:%v", rrs.copRespTime[size*4/5]) + s += fmt.Sprintf(", p95:%v", rrs.copRespTime[size*17/20]) + return s } // RuntimeStatsColl collects executors's execution info. @@ -332,9 +343,9 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID, address string, summary *tip } // RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader. -func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, CopRespTime time.Duration) { +func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, copRespTime time.Duration) { readerStats := e.GetReaderStats(planID) - readerStats.RecordOneCopTask(CopRespTime) + readerStats.RecordOneCopTask(copRespTime) } // ExistsRootStats checks if the planID exists in the rootStats collection. @@ -359,7 +370,7 @@ func (e *RuntimeStatsColl) GetReaderStats(planID string) *ReaderRuntimeStats { defer e.mu.Unlock() stats, exists := e.readerStats[planID] if !exists { - stats = &ReaderRuntimeStats{maxCopRespTime: 0} + stats = &ReaderRuntimeStats{copRespTime: make([]time.Duration, 0, 20)} e.readerStats[planID] = stats } return stats From bd9c53b7e39bb06bc03835e1394529994dd14f13 Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Fri, 6 Sep 2019 16:56:26 +0800 Subject: [PATCH 07/11] fix data race --- distsql/select_result.go | 8 +++----- distsql/select_result_test.go | 6 +++--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index e28b7fed5f162..f3302d7d9ab84 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -112,9 +112,6 @@ func (r *selectResult) fetch(ctx context.Context) { } else { result.result = resultSubset r.memConsume(int64(resultSubset.MemSize())) - if r.ctx != nil && r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { - r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID.String(), resultSubset.RespTime()) - } } select { @@ -195,7 +192,7 @@ func (r *selectResult) getSelectResp() error { for _, warning := range r.selectResp.Warnings { sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg)) } - r.updateCopRuntimeStats(re.result.GetExecDetails().CalleeAddress) + r.updateCopRuntimeStats(re.result.GetExecDetails().CalleeAddress, re.result.RespTime()) r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ sc.MergeExecDetails(re.result.GetExecDetails(), nil) @@ -206,7 +203,7 @@ func (r *selectResult) getSelectResp() error { } } -func (r *selectResult) updateCopRuntimeStats(callee string) { +func (r *selectResult) updateCopRuntimeStats(callee string, respTime time.Duration) { if r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" { return } @@ -218,6 +215,7 @@ func (r *selectResult) updateCopRuntimeStats(callee string) { return } + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID.String(), respTime) for i, detail := range r.selectResp.GetExecutionSummaries() { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { diff --git a/distsql/select_result_test.go b/distsql/select_result_test.go index c543df050774b..d5799c2febc89 100644 --- a/distsql/select_result_test.go +++ b/distsql/select_result_test.go @@ -28,7 +28,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { ctx.GetSessionVars().StmtCtx = new(stmtctx.StatementContext) sr := selectResult{ctx: ctx} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil) - sr.updateCopRuntimeStats("a") + sr.updateCopRuntimeStats("a", 0) ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() t := uint64(1) @@ -38,13 +38,13 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { }, } c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue) - sr.updateCopRuntimeStats("callee") + sr.updateCopRuntimeStats("callee", 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats("callee"), IsFalse) sr.copPlanIDs = []fmt.Stringer{copPlan{}} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil) c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs)) - sr.updateCopRuntimeStats("callee") + sr.updateCopRuntimeStats("callee", 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats("callee").String(), Equals, "time:1ns, loops:1, rows:1") } From cb0851fa5dd4d8fd9dacb2ebe7b0a88273c0c337 Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Fri, 6 Sep 2019 17:04:32 +0800 Subject: [PATCH 08/11] fix ci --- distsql/select_result_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/distsql/select_result_test.go b/distsql/select_result_test.go index d5799c2febc89..0e913ab0b2d04 100644 --- a/distsql/select_result_test.go +++ b/distsql/select_result_test.go @@ -28,6 +28,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { ctx.GetSessionVars().StmtCtx = new(stmtctx.StatementContext) sr := selectResult{ctx: ctx} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil) + sr.rootPlanID = copPlan{} sr.updateCopRuntimeStats("a", 0) ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() From 8299c98ec4f3c077b65b1d8dd256f4c27dcbc856 Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Wed, 11 Sep 2019 13:02:30 +0800 Subject: [PATCH 09/11] address comments --- planner/core/common_plans.go | 4 ++-- util/execdetails/execdetails.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 964a32016045f..13e55f62d5bd8 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -665,8 +665,8 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st } switch p.(type) { case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader: - if runtimeStatsColl.GetReaderStats(explainID) != nil { - analyzeInfo += ", " + runtimeStatsColl.GetReaderStats(explainID).String() + if s := runtimeStatsColl.GetReaderStats(explainID); s != nil { + analyzeInfo += ", " + s.String() } } row = append(row, analyzeInfo) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 5d5bbf9ce182c..50fc20e6a558b 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -260,8 +260,8 @@ type ReaderRuntimeStats struct { copRespTime []time.Duration } -// RecordOneCopTask record once cop response time to update maxcopRespTime -func (rrs *ReaderRuntimeStats) RecordOneCopTask(t time.Duration) { +// recordOneCopTask record once cop response time to update maxcopRespTime +func (rrs *ReaderRuntimeStats) recordOneCopTask(t time.Duration) { rrs.Lock() defer rrs.Unlock() rrs.copRespTime = append(rrs.copRespTime, t) @@ -271,7 +271,7 @@ func (rrs *ReaderRuntimeStats) String() string { sort.Slice(rrs.copRespTime, func(i, j int) bool { return rrs.copRespTime[i] < rrs.copRespTime[j] }) - s := "cop resp time " + s := "rpc time " size := len(rrs.copRespTime) if size == 0 { return s + "max:0ns, min:0ns, avg:0ns, p80:0ns, p95:0ns" @@ -284,7 +284,7 @@ func (rrs *ReaderRuntimeStats) String() string { } s += fmt.Sprintf(", avg:%v", time.Duration(sum/float64(size))) s += fmt.Sprintf(", p80:%v", rrs.copRespTime[size*4/5]) - s += fmt.Sprintf(", p95:%v", rrs.copRespTime[size*17/20]) + s += fmt.Sprintf(", p95:%v", rrs.copRespTime[size*19/20]) return s } @@ -345,7 +345,7 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID, address string, summary *tip // RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader. func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, copRespTime time.Duration) { readerStats := e.GetReaderStats(planID) - readerStats.RecordOneCopTask(copRespTime) + readerStats.recordOneCopTask(copRespTime) } // ExistsRootStats checks if the planID exists in the rootStats collection. From c3185d064cb518d1e43c32b86ea92a9bb64cc9d6 Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Wed, 11 Sep 2019 15:18:36 +0800 Subject: [PATCH 10/11] remove min/avg when only one cop task --- util/execdetails/execdetails.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 50fc20e6a558b..6b99701595638 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -277,12 +277,14 @@ func (rrs *ReaderRuntimeStats) String() string { return s + "max:0ns, min:0ns, avg:0ns, p80:0ns, p95:0ns" } s += fmt.Sprintf("max:%v", rrs.copRespTime[size-1]) - s += fmt.Sprintf(", min:%v", rrs.copRespTime[0]) - sum := 0.0 - for _, t := range rrs.copRespTime { - sum += float64(t) + if len(rrs.copRespTime) > 1 { + s += fmt.Sprintf(", min:%v", rrs.copRespTime[0]) + sum := 0.0 + for _, t := range rrs.copRespTime { + sum += float64(t) + } + s += fmt.Sprintf(", avg:%v", time.Duration(sum/float64(size))) } - s += fmt.Sprintf(", avg:%v", time.Duration(sum/float64(size))) s += fmt.Sprintf(", p80:%v", rrs.copRespTime[size*4/5]) s += fmt.Sprintf(", p95:%v", rrs.copRespTime[size*19/20]) return s From 0dab6463d539206ec5a40b03f07a4ca5c705f1df Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Wed, 11 Sep 2019 16:07:56 +0800 Subject: [PATCH 11/11] address comments --- planner/core/common_plans.go | 2 +- util/execdetails/execdetails.go | 30 ++++++++++++++---------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 13e55f62d5bd8..6904288c3acd1 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -665,7 +665,7 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st } switch p.(type) { case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader: - if s := runtimeStatsColl.GetReaderStats(explainID); s != nil { + if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 { analyzeInfo += ", " + s.String() } } diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 6b99701595638..4797343ad4fc9 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -268,26 +268,24 @@ func (rrs *ReaderRuntimeStats) recordOneCopTask(t time.Duration) { } func (rrs *ReaderRuntimeStats) String() string { - sort.Slice(rrs.copRespTime, func(i, j int) bool { - return rrs.copRespTime[i] < rrs.copRespTime[j] - }) - s := "rpc time " size := len(rrs.copRespTime) if size == 0 { - return s + "max:0ns, min:0ns, avg:0ns, p80:0ns, p95:0ns" + return "" } - s += fmt.Sprintf("max:%v", rrs.copRespTime[size-1]) - if len(rrs.copRespTime) > 1 { - s += fmt.Sprintf(", min:%v", rrs.copRespTime[0]) - sum := 0.0 - for _, t := range rrs.copRespTime { - sum += float64(t) - } - s += fmt.Sprintf(", avg:%v", time.Duration(sum/float64(size))) + if size == 1 { + return fmt.Sprintf("rpc time:%v", rrs.copRespTime[0]) + } + sort.Slice(rrs.copRespTime, func(i, j int) bool { + return rrs.copRespTime[i] < rrs.copRespTime[j] + }) + vMax, vMin := rrs.copRespTime[size-1], rrs.copRespTime[0] + vP80, vP95 := rrs.copRespTime[size*4/5], rrs.copRespTime[size*19/20] + sum := 0.0 + for _, t := range rrs.copRespTime { + sum += float64(t) } - s += fmt.Sprintf(", p80:%v", rrs.copRespTime[size*4/5]) - s += fmt.Sprintf(", p95:%v", rrs.copRespTime[size*19/20]) - return s + vAvg := time.Duration(sum / float64(size)) + return fmt.Sprintf("rpc max:%v, min:%v, avg:%v, p80:%v, p95:%v", vMax, vMin, vAvg, vP80, vP95) } // RuntimeStatsColl collects executors's execution info.