Skip to content

Commit

Permalink
store/copr: enable store batch by default & add extra copr concurrenc…
Browse files Browse the repository at this point in the history
…y and batch nums in stats (#40711)

ref #39361, close #40399
  • Loading branch information
you06 committed Feb 8, 2023
1 parent ae60542 commit bdef910
Show file tree
Hide file tree
Showing 15 changed files with 493 additions and 134 deletions.
4 changes: 2 additions & 2 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 2ms}"
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 2ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand All @@ -135,7 +135,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand Down
106 changes: 87 additions & 19 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ var (
errQueryInterrupted = dbterror.ClassExecutor.NewStd(errno.ErrQueryInterrupted)
)

var (
telemetryBatchedQueryTaskCnt = metrics.TelemetryBatchedQueryTaskCnt
telemetryStoreBatchedCnt = metrics.TelemetryStoreBatchedCnt
telemetryStoreBatchedFallbackCnt = metrics.TelemetryStoreBatchedFallbackCnt
)

var (
_ SelectResult = (*selectResult)(nil)
_ SelectResult = (*serialSelectResults)(nil)
Expand Down Expand Up @@ -157,7 +163,7 @@ func (r *selectResult) fetchResp(ctx context.Context) error {
if r.stats != nil {
// Ignore internal sql.
if !r.ctx.GetSessionVars().InRestrictedSQL && len(r.stats.copRespTime) > 0 {
ratio := float64(r.stats.CoprCacheHitNum) / float64(len(r.stats.copRespTime))
ratio := r.stats.calcCacheHit()
if ratio >= 1 {
telemetry.CurrentCoprCacheHitRatioGTE100Count.Inc()
}
Expand Down Expand Up @@ -364,6 +370,11 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: r.distSQLConcurrency,
}
if ci, ok := r.resp.(copr.CopInfo); ok {
conc, extraConc := ci.GetConcurrency()
r.stats.distSQLConcurrency = conc
r.stats.extraConcurrency = extraConc
}
}
r.stats.mergeCopRuntimeStats(copStats, respTime)

Expand Down Expand Up @@ -455,26 +466,42 @@ func (r *selectResult) Close() error {
r.memConsume(-respSize)
}
if r.stats != nil {
defer r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
defer func() {
if ci, ok := r.resp.(copr.CopInfo); ok {
r.stats.buildTaskDuration = ci.GetBuildTaskElapsed()
batched, fallback := ci.GetStoreBatchInfo()
if batched != 0 || fallback != 0 {
r.stats.storeBatchedNum, r.stats.storeBatchedFallbackNum = batched, fallback
telemetryStoreBatchedCnt.Add(float64(r.stats.storeBatchedNum))
telemetryStoreBatchedFallbackCnt.Add(float64(r.stats.storeBatchedFallbackNum))
telemetryBatchedQueryTaskCnt.Add(float64(len(r.stats.copRespTime)))
}
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
}()
}
return r.resp.Close()
}

// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats.
// CopRuntimeStats is an interface uses to check whether the result has cop runtime stats.
type CopRuntimeStats interface {
// GetCopRuntimeStats gets the cop runtime stats information.
GetCopRuntimeStats() *copr.CopRuntimeStats
}

type selectResultRuntimeStats struct {
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
distSQLConcurrency int
CoprCacheHitNum int64
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
distSQLConcurrency int
extraConcurrency int
CoprCacheHitNum int64
storeBatchedNum uint64
storeBatchedFallbackNum uint64
buildTaskDuration time.Duration
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) {
Expand All @@ -495,12 +522,16 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim

func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := selectResultRuntimeStats{
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
extraConcurrency: s.extraConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
storeBatchedNum: s.storeBatchedNum,
storeBatchedFallbackNum: s.storeBatchedFallbackNum,
buildTaskDuration: s.buildTaskDuration,
}
newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...)
newRs.procKeys = append(newRs.procKeys, s.procKeys...)
Expand Down Expand Up @@ -528,6 +559,15 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
s.totalWaitTime += other.totalWaitTime
s.rpcStat.Merge(other.rpcStat)
s.CoprCacheHitNum += other.CoprCacheHitNum
if other.distSQLConcurrency > s.distSQLConcurrency {
s.distSQLConcurrency = other.distSQLConcurrency
}
if other.extraConcurrency > s.extraConcurrency {
s.extraConcurrency = other.extraConcurrency
}
s.storeBatchedNum += other.storeBatchedNum
s.storeBatchedFallbackNum += other.storeBatchedFallbackNum
s.buildTaskDuration += other.buildTaskDuration
}

func (s *selectResultRuntimeStats) String() string {
Expand Down Expand Up @@ -579,14 +619,30 @@ func (s *selectResultRuntimeStats) String() string {
}
if config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB > 0 {
buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v",
strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64)))
strconv.FormatFloat(s.calcCacheHit(), 'f', 2, 64)))
} else {
buf.WriteString(", copr_cache: disabled")
}
if s.buildTaskDuration > 0 {
buf.WriteString(", build_task_duration: ")
buf.WriteString(execdetails.FormatDuration(s.buildTaskDuration))
}
if s.distSQLConcurrency > 0 {
buf.WriteString(", distsql_concurrency: ")
buf.WriteString(", max_distsql_concurrency: ")
buf.WriteString(strconv.FormatInt(int64(s.distSQLConcurrency), 10))
}
if s.extraConcurrency > 0 {
buf.WriteString(", max_extra_concurrency: ")
buf.WriteString(strconv.FormatInt(int64(s.extraConcurrency), 10))
}
if s.storeBatchedNum > 0 {
buf.WriteString(", store_batch_num: ")
buf.WriteString(strconv.FormatInt(int64(s.storeBatchedNum), 10))
}
if s.storeBatchedFallbackNum > 0 {
buf.WriteString(", store_batch_fallback_num: ")
buf.WriteString(strconv.FormatInt(int64(s.storeBatchedFallbackNum), 10))
}
buf.WriteString("}")
}

Expand Down Expand Up @@ -615,3 +671,15 @@ func (s *selectResultRuntimeStats) String() string {
func (*selectResultRuntimeStats) Tp() int {
return execdetails.TpSelectResultRuntimeStats
}

func (s *selectResultRuntimeStats) calcCacheHit() float64 {
hit := s.CoprCacheHitNum
tot := len(s.copRespTime)
if s.storeBatchedNum > 0 {
tot += int(s.storeBatchedNum)
}
if tot == 0 {
return 0
}
return float64(hit) / float64(tot)
}
1 change: 1 addition & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ type TelemetryInfo struct {
PartitionTelemetry *PartitionTelemetryInfo
AccountLockTelemetry *AccountLockTelemetryInfo
UseIndexMerge bool
UseTableLookUp bool
}

// PartitionTelemetryInfo records table partition telemetry information during execution.
Expand Down
7 changes: 7 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3863,6 +3863,9 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
}

func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor {
if b.Ti != nil {
b.Ti.UseTableLookUp = true
}
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil {
b.err = err
Expand Down Expand Up @@ -4000,6 +4003,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor {
if b.Ti != nil {
b.Ti.UseIndexMerge = true
b.Ti.UseTableLookUp = true
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
Expand Down Expand Up @@ -4445,6 +4449,9 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte

func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
if builder.Ti != nil {
builder.Ti.UseTableLookUp = true
}
e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v)
if err != nil {
return nil, err
Expand Down
40 changes: 39 additions & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ type lookupTableTask struct {
idxRows *chunk.Chunk
cursor int

doneCh chan error
// after the cop task is built, buildDone will be set to the current instant, for Next wait duration statistic.
buildDoneTime time.Time
doneCh chan error

// indexOrder map is used to save the original index order for the handles.
// Without this map, the original index order might be lost.
Expand Down Expand Up @@ -790,13 +792,32 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {
if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) {
return e.resultCurr, nil
}
var (
enableStats = e.stats != nil
start time.Time
indexFetchedInstant time.Time
)
if enableStats {
start = time.Now()
}
task, ok := <-e.resultCh
if !ok {
return nil, nil
}
if enableStats {
indexFetchedInstant = time.Now()
}
if err := <-task.doneCh; err != nil {
return nil, err
}
if enableStats {
e.stats.NextWaitIndexScan += indexFetchedInstant.Sub(start)
if task.buildDoneTime.After(indexFetchedInstant) {
e.stats.NextWaitTableLookUpBuild += task.buildDoneTime.Sub(indexFetchedInstant)
indexFetchedInstant = task.buildDoneTime
}
e.stats.NextWaitTableLookUpResp += time.Since(indexFetchedInstant)
}

// Release the memory usage of last task before we handle a new task.
if e.resultCurr != nil {
Expand Down Expand Up @@ -1119,6 +1140,10 @@ type IndexLookUpRunTimeStats struct {
TableRowScan int64
TableTaskNum int64
Concurrency int
// Record the `Next` call affected wait duration details.
NextWaitIndexScan time.Duration
NextWaitTableLookUpBuild time.Duration
NextWaitTableLookUpResp time.Duration
}

func (e *IndexLookUpRunTimeStats) String() string {
Expand All @@ -1142,6 +1167,15 @@ func (e *IndexLookUpRunTimeStats) String() string {
}
buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency))
}
if e.NextWaitIndexScan > 0 || e.NextWaitTableLookUpBuild > 0 || e.NextWaitTableLookUpResp > 0 {
if buf.Len() > 0 {
buf.WriteByte(',')
fmt.Fprintf(&buf, " next: {wait_index: %s, wait_table_lookup_build: %s, wait_table_lookup_resp: %s}",
execdetails.FormatDuration(e.NextWaitIndexScan),
execdetails.FormatDuration(e.NextWaitTableLookUpBuild),
execdetails.FormatDuration(e.NextWaitTableLookUpResp))
}
}
return buf.String()
}

Expand All @@ -1162,6 +1196,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) {
e.TaskWait += tmp.TaskWait
e.TableRowScan += tmp.TableRowScan
e.TableTaskNum += tmp.TableTaskNum
e.NextWaitIndexScan += tmp.NextWaitIndexScan
e.NextWaitTableLookUpBuild += tmp.NextWaitTableLookUpBuild
e.NextWaitTableLookUpResp += tmp.NextWaitTableLookUpResp
}

// Tp implements the RuntimeStats interface.
Expand Down Expand Up @@ -1300,6 +1337,7 @@ func getDatumRow(r *chunk.Row, fields []*types.FieldType) []types.Datum {
// Then we hold the returning rows and finish this task.
func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error {
tableReader, err := w.idxLookup.buildTableReader(ctx, task)
task.buildDoneTime = time.Now()
if err != nil {
logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
return err
Expand Down
23 changes: 15 additions & 8 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,24 @@ func TestPartitionTableRandomlyIndexLookUpReader(t *testing.T) {

func TestIndexLookUpStats(t *testing.T) {
stats := &executor.IndexLookUpRunTimeStats{
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
NextWaitIndexScan: time.Second,
NextWaitTableLookUpBuild: 2 * time.Second,
NextWaitTableLookUpResp: 3 * time.Second,
}
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}"+
", table_task: {total_time: 2s, num: 2, concurrency: 1}"+
", next: {wait_index: 1s, wait_table_lookup_build: 2s, wait_table_lookup_resp: 3s}", stats.String())
require.Equal(t, stats.Clone().String(), stats.String())
stats.Merge(stats.Clone())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}"+
", table_task: {total_time: 4s, num: 4, concurrency: 1}"+
", next: {wait_index: 2s, wait_table_lookup_build: 4s, wait_table_lookup_resp: 6s}", stats.String())
}

func TestIndexLookUpGetResultChunk(t *testing.T) {
Expand Down
Loading

0 comments on commit bdef910

Please sign in to comment.