diff --git a/distsql/distsql.go b/distsql/distsql.go index 57909b5457c2b..2d2b9d5f3ecf8 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -92,11 +92,12 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie // The difference from Select is that SelectWithRuntimeStats will set copPlanIDs into selectResult, // which can help selectResult to collect runtime stats. func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, - fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (SelectResult, error) { + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (SelectResult, error) { sr, err := Select(ctx, sctx, kvReq, fieldTypes, fb) if err == nil { if selectResult, ok := sr.(*selectResult); ok { selectResult.copPlanIDs = copPlanIDs + selectResult.rootPlanID = rootPlanID } } return sr, err diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 4a145717d0fce..1231c6094a18c 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -73,7 +73,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str idx := i planIDFuncs = append(planIDFuncs, stringutil.StringerStr(planIDs[idx])) } - response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs) + response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs, stringutil.StringerStr("root_0")) } c.Assert(err, IsNil) @@ -404,6 +404,9 @@ func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails { // MemSize implements kv.ResultSubset interface. func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) } +// RespTime implements kv.ResultSubset interface. +func (r *mockResultSubset) RespTime() time.Duration { return 0 } + func populateBuffer() []byte { numCols := 4 numRows := 1024 diff --git a/distsql/select_result.go b/distsql/select_result.go index 273ce7b1d1110..f3302d7d9ab84 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -77,6 +77,7 @@ type selectResult struct { // copPlanIDs contains all copTasks' planIDs, // which help to collect copTasks' runtime stats. copPlanIDs []fmt.Stringer + rootPlanID fmt.Stringer memTracker *memory.Tracker } @@ -191,7 +192,7 @@ func (r *selectResult) getSelectResp() error { for _, warning := range r.selectResp.Warnings { sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg)) } - r.updateCopRuntimeStats(re.result.GetExecDetails().CalleeAddress) + r.updateCopRuntimeStats(re.result.GetExecDetails().CalleeAddress, re.result.RespTime()) r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ sc.MergeExecDetails(re.result.GetExecDetails(), nil) @@ -202,7 +203,7 @@ func (r *selectResult) getSelectResp() error { } } -func (r *selectResult) updateCopRuntimeStats(callee string) { +func (r *selectResult) updateCopRuntimeStats(callee string, respTime time.Duration) { if r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" { return } @@ -214,6 +215,7 @@ func (r *selectResult) updateCopRuntimeStats(callee string) { return } + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID.String(), respTime) for i, detail := range r.selectResp.GetExecutionSummaries() { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { diff --git a/distsql/select_result_test.go b/distsql/select_result_test.go index c543df050774b..0e913ab0b2d04 100644 --- a/distsql/select_result_test.go +++ b/distsql/select_result_test.go @@ -28,7 +28,8 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { ctx.GetSessionVars().StmtCtx = new(stmtctx.StatementContext) sr := selectResult{ctx: ctx} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil) - sr.updateCopRuntimeStats("a") + sr.rootPlanID = copPlan{} + sr.updateCopRuntimeStats("a", 0) ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() t := uint64(1) @@ -38,13 +39,13 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { }, } c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue) - sr.updateCopRuntimeStats("callee") + sr.updateCopRuntimeStats("callee", 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats("callee"), IsFalse) sr.copPlanIDs = []fmt.Stringer{copPlan{}} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil) c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs)) - sr.updateCopRuntimeStats("callee") + sr.updateCopRuntimeStats("callee", 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats("callee").String(), Equals, "time:1ns, loops:1, rows:1") } diff --git a/executor/builder.go b/executor/builder.go index df900f61280e1..46f50d3b27ae1 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2106,7 +2106,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex } e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) e.resultHandler = &tableResultHandler{} - result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) + result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) if err != nil { return nil, err } diff --git a/executor/distsql.go b/executor/distsql.go index 6f5d5f2bcffda..2f14dd7f2d788 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -302,7 +302,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.feedback.Invalidate() return err } - e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) + e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) if err != nil { e.feedback.Invalidate() return err @@ -452,7 +452,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k tps = e.idxColTps } // Since the first read only need handle information. So its returned col is only 1. - result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans)) + result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans), e.id) if err != nil { return err } diff --git a/executor/join_test.go b/executor/join_test.go index 1d8fb322c94fd..ec34d95cf5e97 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -16,6 +16,7 @@ package executor_test import ( "context" "fmt" + "strings" "time" . "github.com/pingcap/check" @@ -1065,9 +1066,9 @@ func (s *testSuite2) TestHashJoin(c *C) { c.Assert(len(row), Equals, 7) outerExecInfo := row[1][4].(string) // FIXME: revert this result to 1 after TableReaderExecutor can handle initChunkSize. - c.Assert(outerExecInfo[len(outerExecInfo)-1:], Equals, "5") + c.Assert(outerExecInfo[strings.Index(outerExecInfo, "rows")+5:strings.Index(outerExecInfo, "rows")+6], Equals, "5") innerExecInfo := row[4][4].(string) - c.Assert(innerExecInfo[len(innerExecInfo)-1:], Equals, "0") + c.Assert(innerExecInfo[strings.Index(innerExecInfo, "rows")+5:strings.Index(innerExecInfo, "rows")+6], Equals, "0") } func (s *testSuite2) TestJoinDifferentDecimals(c *C) { diff --git a/executor/table_reader.go b/executor/table_reader.go index 1426c47fb18ba..0d125f7bd973d 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -44,9 +44,9 @@ type selectResultHook struct { } func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, - fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (distsql.SelectResult, error) { + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (distsql.SelectResult, error) { if sr.selectResultFunc == nil { - return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) + return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs, rootPlanID) } return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) } @@ -189,7 +189,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra return nil, err } e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) - result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) if err != nil { return nil, err } diff --git a/kv/kv.go b/kv/kv.go index 9af0b88baf950..836e7f12561e1 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -15,6 +15,7 @@ package kv import ( "context" + "time" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/oracle" @@ -255,6 +256,8 @@ type ResultSubset interface { GetExecDetails() *execdetails.ExecDetails // MemSize returns how many bytes of memory this result use for tracing memory usage. MemSize() int64 + // RespTime returns the response time for the request. + RespTime() time.Duration } // Response represents the response returned from KV layer. diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 0d89157c5a5ff..16c078848e930 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -636,13 +636,21 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st runtimeStatsColl := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl // There maybe some mock information for cop task to let runtimeStatsColl.Exists(p.ExplainID()) is true. // So check copTaskExecDetail first and print the real cop task information if it's not empty. + var analyzeInfo string if runtimeStatsColl.ExistsCopStats(explainID) { - row = append(row, runtimeStatsColl.GetCopStats(explainID).String()) + analyzeInfo = runtimeStatsColl.GetCopStats(explainID).String() } else if runtimeStatsColl.ExistsRootStats(explainID) { - row = append(row, runtimeStatsColl.GetRootStats(explainID).String()) + analyzeInfo = runtimeStatsColl.GetRootStats(explainID).String() } else { - row = append(row, "time:0ns, loops:0, rows:0") + analyzeInfo = "time:0ns, loops:0, rows:0" } + switch p.(type) { + case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader: + if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 { + analyzeInfo += ", " + s.String() + } + } + row = append(row, analyzeInfo) tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String()) if tracker != nil { diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 1dea973858225..579d7ee60d5f8 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -387,6 +387,7 @@ type copResponse struct { startKey kv.Key err error respSize int64 + respTime time.Duration } const ( @@ -429,6 +430,10 @@ func (rs *copResponse) MemSize() int64 { return rs.respSize } +func (rs *copResponse) RespTime() time.Duration { + return rs.respTime +} + const minLogCopTaskTime = 300 * time.Millisecond // run is a worker function that get a copTask from channel, handle it and @@ -663,11 +668,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch metrics.TiKVCoprocessorHistogram.Observe(costTime.Seconds()) if task.cmdType == tikvrpc.CmdCopStream { - return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch) + return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch, costTime) } // Handles the response for non-streaming copTask. - return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, nil) + return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, nil, costTime) } const ( @@ -726,7 +731,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan return logStr } -func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) { defer stream.Close() var resp *coprocessor.Response var lastRange *coprocessor.KeyRange @@ -736,7 +741,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP return nil, nil } for { - remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, task, ch, lastRange) + remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, task, ch, lastRange, costTime) if err != nil || len(remainedTasks) != 0 { return remainedTasks, errors.Trace(err) } @@ -766,7 +771,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP // returns more tasks when that happens, or handles the response if no error. // if we're handling streaming coprocessor response, lastRange is the range of last // successful response, otherwise it's nil. -func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) { +func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) { if regionErr := resp.pbResp.GetRegionError(); regionErr != nil { if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil { return nil, errors.Trace(err) @@ -810,6 +815,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon if rpcCtx != nil { resp.detail.CalleeAddress = rpcCtx.Addr } + resp.respTime = costTime if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil { if handleTime := pbDetails.HandleTime; handleTime != nil { resp.detail.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index c4ae9cfecc7bb..4797343ad4fc9 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -253,11 +253,47 @@ func (crs *CopRuntimeStats) String() string { procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalRows, totalIters, totalTasks) } +// ReaderRuntimeStats collects stats for TableReader, IndexReader and IndexLookupReader +type ReaderRuntimeStats struct { + sync.Mutex + + copRespTime []time.Duration +} + +// recordOneCopTask record once cop response time to update maxcopRespTime +func (rrs *ReaderRuntimeStats) recordOneCopTask(t time.Duration) { + rrs.Lock() + defer rrs.Unlock() + rrs.copRespTime = append(rrs.copRespTime, t) +} + +func (rrs *ReaderRuntimeStats) String() string { + size := len(rrs.copRespTime) + if size == 0 { + return "" + } + if size == 1 { + return fmt.Sprintf("rpc time:%v", rrs.copRespTime[0]) + } + sort.Slice(rrs.copRespTime, func(i, j int) bool { + return rrs.copRespTime[i] < rrs.copRespTime[j] + }) + vMax, vMin := rrs.copRespTime[size-1], rrs.copRespTime[0] + vP80, vP95 := rrs.copRespTime[size*4/5], rrs.copRespTime[size*19/20] + sum := 0.0 + for _, t := range rrs.copRespTime { + sum += float64(t) + } + vAvg := time.Duration(sum / float64(size)) + return fmt.Sprintf("rpc max:%v, min:%v, avg:%v, p80:%v, p95:%v", vMax, vMin, vAvg, vP80, vP95) +} + // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { - mu sync.Mutex - rootStats map[string]*RuntimeStats - copStats map[string]*CopRuntimeStats + mu sync.Mutex + rootStats map[string]*RuntimeStats + copStats map[string]*CopRuntimeStats + readerStats map[string]*ReaderRuntimeStats } // RuntimeStats collects one executor's execution info. @@ -273,7 +309,7 @@ type RuntimeStats struct { // NewRuntimeStatsColl creates new executor collector. func NewRuntimeStatsColl() *RuntimeStatsColl { return &RuntimeStatsColl{rootStats: make(map[string]*RuntimeStats), - copStats: make(map[string]*CopRuntimeStats)} + copStats: make(map[string]*CopRuntimeStats), readerStats: make(map[string]*ReaderRuntimeStats)} } // GetRootStats gets execStat for a executor. @@ -306,6 +342,12 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID, address string, summary *tip copStats.RecordOneCopTask(address, summary) } +// RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader. +func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, copRespTime time.Duration) { + readerStats := e.GetReaderStats(planID) + readerStats.recordOneCopTask(copRespTime) +} + // ExistsRootStats checks if the planID exists in the rootStats collection. func (e *RuntimeStatsColl) ExistsRootStats(planID string) bool { e.mu.Lock() @@ -322,6 +364,18 @@ func (e *RuntimeStatsColl) ExistsCopStats(planID string) bool { return exists } +// GetReaderStats gets the ReaderRuntimeStats specified by planID. +func (e *RuntimeStatsColl) GetReaderStats(planID string) *ReaderRuntimeStats { + e.mu.Lock() + defer e.mu.Unlock() + stats, exists := e.readerStats[planID] + if !exists { + stats = &ReaderRuntimeStats{copRespTime: make([]time.Duration, 0, 20)} + e.readerStats[planID] = stats + } + return stats +} + // Record records executor's execution. func (e *RuntimeStats) Record(d time.Duration, rowNum int) { atomic.AddInt32(&e.loop, 1)