diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index f3988ea5f7c4d..d7a2447cf35b6 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -124,7 +124,7 @@ func TestSelectResultRuntimeStats(t *testing.T) { stmtStats.RegisterStats(1, s1) stmtStats.RegisterStats(1, &s2) stats := stmtStats.GetRootStats(1) - expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 2ms}" + expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 2ms}" require.Equal(t, expect, stats.String()) // Test for idempotence. require.Equal(t, expect, stats.String()) @@ -135,7 +135,7 @@ func TestSelectResultRuntimeStats(t *testing.T) { } stmtStats.RegisterStats(2, s1) stats = stmtStats.GetRootStats(2) - expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}" + expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 1ms}" require.Equal(t, expect, stats.String()) // Test for idempotence. require.Equal(t, expect, stats.String()) diff --git a/distsql/select_result.go b/distsql/select_result.go index 6d1f6308e4120..394298e8fa3b0 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -53,6 +53,12 @@ var ( errQueryInterrupted = dbterror.ClassExecutor.NewStd(errno.ErrQueryInterrupted) ) +var ( + telemetryBatchedQueryTaskCnt = metrics.TelemetryBatchedQueryTaskCnt + telemetryStoreBatchedCnt = metrics.TelemetryStoreBatchedCnt + telemetryStoreBatchedFallbackCnt = metrics.TelemetryStoreBatchedFallbackCnt +) + var ( _ SelectResult = (*selectResult)(nil) _ SelectResult = (*serialSelectResults)(nil) @@ -157,7 +163,7 @@ func (r *selectResult) fetchResp(ctx context.Context) error { if r.stats != nil { // Ignore internal sql. if !r.ctx.GetSessionVars().InRestrictedSQL && len(r.stats.copRespTime) > 0 { - ratio := float64(r.stats.CoprCacheHitNum) / float64(len(r.stats.copRespTime)) + ratio := r.stats.calcCacheHit() if ratio >= 1 { telemetry.CurrentCoprCacheHitRatioGTE100Count.Inc() } @@ -364,6 +370,11 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr rpcStat: tikv.NewRegionRequestRuntimeStats(), distSQLConcurrency: r.distSQLConcurrency, } + if ci, ok := r.resp.(copr.CopInfo); ok { + conc, extraConc := ci.GetConcurrency() + r.stats.distSQLConcurrency = conc + r.stats.extraConcurrency = extraConc + } } r.stats.mergeCopRuntimeStats(copStats, respTime) @@ -455,26 +466,42 @@ func (r *selectResult) Close() error { r.memConsume(-respSize) } if r.stats != nil { - defer r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats) + defer func() { + if ci, ok := r.resp.(copr.CopInfo); ok { + r.stats.buildTaskDuration = ci.GetBuildTaskElapsed() + batched, fallback := ci.GetStoreBatchInfo() + if batched != 0 || fallback != 0 { + r.stats.storeBatchedNum, r.stats.storeBatchedFallbackNum = batched, fallback + telemetryStoreBatchedCnt.Add(float64(r.stats.storeBatchedNum)) + telemetryStoreBatchedFallbackCnt.Add(float64(r.stats.storeBatchedFallbackNum)) + telemetryBatchedQueryTaskCnt.Add(float64(len(r.stats.copRespTime))) + } + } + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats) + }() } return r.resp.Close() } -// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats. +// CopRuntimeStats is an interface uses to check whether the result has cop runtime stats. type CopRuntimeStats interface { // GetCopRuntimeStats gets the cop runtime stats information. GetCopRuntimeStats() *copr.CopRuntimeStats } type selectResultRuntimeStats struct { - copRespTime []time.Duration - procKeys []int64 - backoffSleep map[string]time.Duration - totalProcessTime time.Duration - totalWaitTime time.Duration - rpcStat tikv.RegionRequestRuntimeStats - distSQLConcurrency int - CoprCacheHitNum int64 + copRespTime []time.Duration + procKeys []int64 + backoffSleep map[string]time.Duration + totalProcessTime time.Duration + totalWaitTime time.Duration + rpcStat tikv.RegionRequestRuntimeStats + distSQLConcurrency int + extraConcurrency int + CoprCacheHitNum int64 + storeBatchedNum uint64 + storeBatchedFallbackNum uint64 + buildTaskDuration time.Duration } func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) { @@ -495,12 +522,16 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats { newRs := selectResultRuntimeStats{ - copRespTime: make([]time.Duration, 0, len(s.copRespTime)), - procKeys: make([]int64, 0, len(s.procKeys)), - backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)), - rpcStat: tikv.NewRegionRequestRuntimeStats(), - distSQLConcurrency: s.distSQLConcurrency, - CoprCacheHitNum: s.CoprCacheHitNum, + copRespTime: make([]time.Duration, 0, len(s.copRespTime)), + procKeys: make([]int64, 0, len(s.procKeys)), + backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)), + rpcStat: tikv.NewRegionRequestRuntimeStats(), + distSQLConcurrency: s.distSQLConcurrency, + extraConcurrency: s.extraConcurrency, + CoprCacheHitNum: s.CoprCacheHitNum, + storeBatchedNum: s.storeBatchedNum, + storeBatchedFallbackNum: s.storeBatchedFallbackNum, + buildTaskDuration: s.buildTaskDuration, } newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...) newRs.procKeys = append(newRs.procKeys, s.procKeys...) @@ -528,6 +559,15 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) { s.totalWaitTime += other.totalWaitTime s.rpcStat.Merge(other.rpcStat) s.CoprCacheHitNum += other.CoprCacheHitNum + if other.distSQLConcurrency > s.distSQLConcurrency { + s.distSQLConcurrency = other.distSQLConcurrency + } + if other.extraConcurrency > s.extraConcurrency { + s.extraConcurrency = other.extraConcurrency + } + s.storeBatchedNum += other.storeBatchedNum + s.storeBatchedFallbackNum += other.storeBatchedFallbackNum + s.buildTaskDuration += other.buildTaskDuration } func (s *selectResultRuntimeStats) String() string { @@ -579,14 +619,30 @@ func (s *selectResultRuntimeStats) String() string { } if config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB > 0 { buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v", - strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64))) + strconv.FormatFloat(s.calcCacheHit(), 'f', 2, 64))) } else { buf.WriteString(", copr_cache: disabled") } + if s.buildTaskDuration > 0 { + buf.WriteString(", build_task_duration: ") + buf.WriteString(execdetails.FormatDuration(s.buildTaskDuration)) + } if s.distSQLConcurrency > 0 { - buf.WriteString(", distsql_concurrency: ") + buf.WriteString(", max_distsql_concurrency: ") buf.WriteString(strconv.FormatInt(int64(s.distSQLConcurrency), 10)) } + if s.extraConcurrency > 0 { + buf.WriteString(", max_extra_concurrency: ") + buf.WriteString(strconv.FormatInt(int64(s.extraConcurrency), 10)) + } + if s.storeBatchedNum > 0 { + buf.WriteString(", store_batch_num: ") + buf.WriteString(strconv.FormatInt(int64(s.storeBatchedNum), 10)) + } + if s.storeBatchedFallbackNum > 0 { + buf.WriteString(", store_batch_fallback_num: ") + buf.WriteString(strconv.FormatInt(int64(s.storeBatchedFallbackNum), 10)) + } buf.WriteString("}") } @@ -615,3 +671,15 @@ func (s *selectResultRuntimeStats) String() string { func (*selectResultRuntimeStats) Tp() int { return execdetails.TpSelectResultRuntimeStats } + +func (s *selectResultRuntimeStats) calcCacheHit() float64 { + hit := s.CoprCacheHitNum + tot := len(s.copRespTime) + if s.storeBatchedNum > 0 { + tot += int(s.storeBatchedNum) + } + if tot == 0 { + return 0 + } + return float64(hit) / float64(tot) +} diff --git a/executor/adapter.go b/executor/adapter.go index 33b321f60382e..f9fba0fea5d25 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -207,6 +207,7 @@ type TelemetryInfo struct { PartitionTelemetry *PartitionTelemetryInfo AccountLockTelemetry *AccountLockTelemetryInfo UseIndexMerge bool + UseTableLookUp bool } // PartitionTelemetryInfo records table partition telemetry information during execution. diff --git a/executor/builder.go b/executor/builder.go index 12cdeeaaa44ef..1005b51a764c6 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3863,6 +3863,9 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn } func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor { + if b.Ti != nil { + b.Ti.UseTableLookUp = true + } is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil { b.err = err @@ -4000,6 +4003,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor { if b.Ti != nil { b.Ti.UseIndexMerge = true + b.Ti.UseTableLookUp = true } ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil { @@ -4445,6 +4449,9 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader, lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { + if builder.Ti != nil { + builder.Ti.UseTableLookUp = true + } e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v) if err != nil { return nil, err diff --git a/executor/distsql.go b/executor/distsql.go index 3b9a6a7d4b288..a96954b9fba6f 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -76,7 +76,9 @@ type lookupTableTask struct { idxRows *chunk.Chunk cursor int - doneCh chan error + // after the cop task is built, buildDone will be set to the current instant, for Next wait duration statistic. + buildDoneTime time.Time + doneCh chan error // indexOrder map is used to save the original index order for the handles. // Without this map, the original index order might be lost. @@ -790,13 +792,32 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) { if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) { return e.resultCurr, nil } + var ( + enableStats = e.stats != nil + start time.Time + indexFetchedInstant time.Time + ) + if enableStats { + start = time.Now() + } task, ok := <-e.resultCh if !ok { return nil, nil } + if enableStats { + indexFetchedInstant = time.Now() + } if err := <-task.doneCh; err != nil { return nil, err } + if enableStats { + e.stats.NextWaitIndexScan += indexFetchedInstant.Sub(start) + if task.buildDoneTime.After(indexFetchedInstant) { + e.stats.NextWaitTableLookUpBuild += task.buildDoneTime.Sub(indexFetchedInstant) + indexFetchedInstant = task.buildDoneTime + } + e.stats.NextWaitTableLookUpResp += time.Since(indexFetchedInstant) + } // Release the memory usage of last task before we handle a new task. if e.resultCurr != nil { @@ -1119,6 +1140,10 @@ type IndexLookUpRunTimeStats struct { TableRowScan int64 TableTaskNum int64 Concurrency int + // Record the `Next` call affected wait duration details. + NextWaitIndexScan time.Duration + NextWaitTableLookUpBuild time.Duration + NextWaitTableLookUpResp time.Duration } func (e *IndexLookUpRunTimeStats) String() string { @@ -1142,6 +1167,15 @@ func (e *IndexLookUpRunTimeStats) String() string { } buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency)) } + if e.NextWaitIndexScan > 0 || e.NextWaitTableLookUpBuild > 0 || e.NextWaitTableLookUpResp > 0 { + if buf.Len() > 0 { + buf.WriteByte(',') + fmt.Fprintf(&buf, " next: {wait_index: %s, wait_table_lookup_build: %s, wait_table_lookup_resp: %s}", + execdetails.FormatDuration(e.NextWaitIndexScan), + execdetails.FormatDuration(e.NextWaitTableLookUpBuild), + execdetails.FormatDuration(e.NextWaitTableLookUpResp)) + } + } return buf.String() } @@ -1162,6 +1196,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) { e.TaskWait += tmp.TaskWait e.TableRowScan += tmp.TableRowScan e.TableTaskNum += tmp.TableTaskNum + e.NextWaitIndexScan += tmp.NextWaitIndexScan + e.NextWaitTableLookUpBuild += tmp.NextWaitTableLookUpBuild + e.NextWaitTableLookUpResp += tmp.NextWaitTableLookUpResp } // Tp implements the RuntimeStats interface. @@ -1300,6 +1337,7 @@ func getDatumRow(r *chunk.Row, fields []*types.FieldType) []types.Datum { // Then we hold the returning rows and finish this task. func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error { tableReader, err := w.idxLookup.buildTableReader(ctx, task) + task.buildDoneTime = time.Now() if err != nil { logutil.Logger(ctx).Error("build table reader failed", zap.Error(err)) return err diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 65889a10d0377..50c4a311a1eb9 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -358,17 +358,24 @@ func TestPartitionTableRandomlyIndexLookUpReader(t *testing.T) { func TestIndexLookUpStats(t *testing.T) { stats := &executor.IndexLookUpRunTimeStats{ - FetchHandleTotal: int64(5 * time.Second), - FetchHandle: int64(2 * time.Second), - TaskWait: int64(2 * time.Second), - TableRowScan: int64(2 * time.Second), - TableTaskNum: 2, - Concurrency: 1, + FetchHandleTotal: int64(5 * time.Second), + FetchHandle: int64(2 * time.Second), + TaskWait: int64(2 * time.Second), + TableRowScan: int64(2 * time.Second), + TableTaskNum: 2, + Concurrency: 1, + NextWaitIndexScan: time.Second, + NextWaitTableLookUpBuild: 2 * time.Second, + NextWaitTableLookUpResp: 3 * time.Second, } - require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String()) + require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}"+ + ", table_task: {total_time: 2s, num: 2, concurrency: 1}"+ + ", next: {wait_index: 1s, wait_table_lookup_build: 2s, wait_table_lookup_resp: 3s}", stats.String()) require.Equal(t, stats.Clone().String(), stats.String()) stats.Merge(stats.Clone()) - require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String()) + require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}"+ + ", table_task: {total_time: 4s, num: 4, concurrency: 1}"+ + ", next: {wait_index: 2s, wait_table_lookup_build: 4s, wait_table_lookup_resp: 6s}", stats.String()) } func TestIndexLookUpGetResultChunk(t *testing.T) { diff --git a/metrics/telemetry.go b/metrics/telemetry.go index 591823f9952d9..db4c80714c0ea 100644 --- a/metrics/telemetry.go +++ b/metrics/telemetry.go @@ -169,6 +169,34 @@ var ( Name: "compact_partition_usage", Help: "Counter of compact table partition", }) + TelemetryStoreBatchedQueryCnt = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "store_batched_query", + Help: "Counter of queries which use store batched coprocessor tasks", + }) + TelemetryBatchedQueryTaskCnt = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "batched_query_task", + Help: "Counter of coprocessor tasks in batched queries", + }) + TelemetryStoreBatchedCnt = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "store_batched", + Help: "Counter of store batched coprocessor tasks", + }) + TelemetryStoreBatchedFallbackCnt = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "store_batched_fallback", + Help: "Counter of store batched fallback coprocessor tasks", + }) ) // readCounter reads the value of a prometheus.Counter. @@ -422,3 +450,37 @@ func GetIndexMergeCounter() IndexMergeUsageCounter { IndexMergeUsed: readCounter(TelemetryIndexMergeUsage), } } + +// StoreBatchCoprCounter records the usages of batch copr statements. +type StoreBatchCoprCounter struct { + // BatchSize is the global value of `tidb_store_batch_size` + BatchSize int `json:"batch_size"` + // BatchedQuery is the counter of queries that use this feature. + BatchedQuery int64 `json:"query"` + // BatchedQueryTask is the counter of total tasks in queries above. + BatchedQueryTask int64 `json:"tasks"` + // BatchedCount is the counter of successfully batched tasks. + BatchedCount int64 `json:"batched"` + // BatchedFallbackCount is the counter of fallback batched tasks by region miss. + BatchedFallbackCount int64 `json:"batched_fallback"` +} + +// Sub returns the difference of two counters. +func (n StoreBatchCoprCounter) Sub(rhs StoreBatchCoprCounter) StoreBatchCoprCounter { + return StoreBatchCoprCounter{ + BatchedQuery: n.BatchedQuery - rhs.BatchedQuery, + BatchedQueryTask: n.BatchedQueryTask - rhs.BatchedQueryTask, + BatchedCount: n.BatchedCount - rhs.BatchedCount, + BatchedFallbackCount: n.BatchedFallbackCount - rhs.BatchedFallbackCount, + } +} + +// GetStoreBatchCoprCounter gets the IndexMerge usage counter. +func GetStoreBatchCoprCounter() StoreBatchCoprCounter { + return StoreBatchCoprCounter{ + BatchedQuery: readCounter(TelemetryStoreBatchedQueryCnt), + BatchedQueryTask: readCounter(TelemetryBatchedQueryTaskCnt), + BatchedCount: readCounter(TelemetryStoreBatchedCnt), + BatchedFallbackCount: readCounter(TelemetryStoreBatchedFallbackCnt), + } +} diff --git a/session/session.go b/session/session.go index 2d84f1c0fa858..95822c17811a0 100644 --- a/session/session.go +++ b/session/session.go @@ -153,7 +153,8 @@ var ( telemetryUnlockUserUsage = metrics.TelemetryAccountLockCnt.WithLabelValues("unlockUser") telemetryCreateOrAlterUserUsage = metrics.TelemetryAccountLockCnt.WithLabelValues("createOrAlterUser") - telemetryIndexMerge = metrics.TelemetryIndexMergeUsage + telemetryIndexMerge = metrics.TelemetryIndexMergeUsage + telemetryStoreBatchedUsage = metrics.TelemetryStoreBatchedQueryCnt ) // Session context, it is consistent with the lifecycle of a client connection. @@ -4047,6 +4048,10 @@ func (s *session) updateTelemetryMetric(es *executor.ExecStmt) { telemetryUnlockUserUsage.Add(float64(ti.AccountLockTelemetry.UnlockUser)) telemetryCreateOrAlterUserUsage.Add(float64(ti.AccountLockTelemetry.CreateOrAlterUser)) } + + if ti.UseTableLookUp && s.sessionVars.StoreBatchSize > 0 { + telemetryStoreBatchedUsage.Inc() + } } // GetBuiltinFunctionUsage returns the replica of counting of builtin function usage diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 2149e2387f236..3090ab630f3b7 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1168,7 +1168,7 @@ const ( DefTiDBTTLDeleteRateLimit = 0 DefPasswordReuseHistory = 0 DefPasswordReuseTime = 0 - DefTiDBStoreBatchSize = 0 + DefTiDBStoreBatchSize = 4 DefTiDBHistoricalStatsDuration = 7 * 24 * time.Hour DefTiDBEnableHistoricalStatsForCapture = false DefTiDBTTLJobScheduleWindowStartTime = "00:00 +0000" diff --git a/store/copr/copr_test/coprocessor_test.go b/store/copr/copr_test/coprocessor_test.go index 7931fb8432675..30247b8694a72 100644 --- a/store/copr/copr_test/coprocessor_test.go +++ b/store/copr/copr_test/coprocessor_test.go @@ -166,4 +166,19 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { require.Nil(t, errRes) tasks = it.GetTasks() require.Equal(t, len(tasks), 4) + + // only small tasks will be batched. + ranges = copr.BuildKeyRanges("a", "b", "h", "i", "o", "p") + req = &kv.Request{ + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 33, 32}), + Concurrency: 15, + StoreBatchSize: 3, + } + it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) + require.Nil(t, errRes) + tasks = it.GetTasks() + require.Equal(t, len(tasks), 2) + require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1) + require.Equal(t, len(tasks[1].ToPBBatchTasks()), 0) } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index e1c5fb0b91d00..06ab6ab61efd1 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -146,11 +146,13 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars err error ) tryRowHint := optRowHint(req) + elapsed := time.Duration(0) buildOpt := &buildCopTaskOpt{ req: req, cache: c.store.GetRegionCache(), eventCb: eventCb, respChan: req.KeepOrder, + elapsed: &elapsed, } buildTaskFunc := func(ranges []kv.KeyRange, hints []int) error { keyRanges := NewKeyRanges(ranges) @@ -186,14 +188,15 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars return nil, copErrorResponse{err} } it := &copIterator{ - store: c.store, - req: req, - concurrency: req.Concurrency, - finishCh: make(chan struct{}), - vars: vars, - memTracker: req.MemTracker, - replicaReadSeed: c.replicaReadSeed, - rpcCancel: tikv.NewRPCanceller(), + store: c.store, + req: req, + concurrency: req.Concurrency, + finishCh: make(chan struct{}), + vars: vars, + memTracker: req.MemTracker, + replicaReadSeed: c.replicaReadSeed, + rpcCancel: tikv.NewRPCanceller(), + buildTaskElapsed: *buildOpt.elapsed, } it.tasks = tasks if it.concurrency > len(tasks) { @@ -284,13 +287,14 @@ func (r *copTask) ToPBBatchTasks() []*coprocessor.StoreBatchTask { } pbTasks := make([]*coprocessor.StoreBatchTask, 0, len(r.batchTaskList)) for _, task := range r.batchTaskList { - pbTasks = append(pbTasks, &coprocessor.StoreBatchTask{ + storeBatchTask := &coprocessor.StoreBatchTask{ RegionId: task.region.GetRegionId(), RegionEpoch: task.region.GetRegionEpoch(), Peer: task.peer, Ranges: task.region.GetRanges(), TaskId: task.task.taskID, - }) + } + pbTasks = append(pbTasks, storeBatchTask) } return pbTasks } @@ -304,6 +308,7 @@ type buildCopTaskOpt struct { eventCb trxevents.EventCallback respChan bool rowHints []int + elapsed *time.Duration } func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*copTask, error) { @@ -315,7 +320,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c } rangesLen := ranges.Len() // something went wrong, disable hints to avoid out of range index. - if hints != nil && len(hints) != rangesLen { + if len(hints) != rangesLen { hints = nil } @@ -341,7 +346,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c } var builder taskBuilder - if req.StoreBatchSize > 0 { + if req.StoreBatchSize > 0 && hints != nil { builder = newBatchTaskBuilder(bo, req, cache) } else { builder = newLegacyTaskBuilder(len(locs)) @@ -428,6 +433,9 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c if elapsed > time.Millisecond { defer tracing.StartRegion(bo.GetCtx(), "copr.buildCopTasks").End() } + if opt.elapsed != nil { + *opt.elapsed = *opt.elapsed + elapsed + } metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum())) return tasks, nil } @@ -498,7 +506,8 @@ func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) { b.tasks = append(b.tasks, task) } }() - if b.limit <= 0 { + // only batch small tasks for memory control. + if b.limit <= 0 || !isSmallTask(task) { return nil } batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead) @@ -576,7 +585,9 @@ func isSmallTask(task *copTask) bool { // strictly, only RowCountHint == -1 stands for unknown task rows, // but when RowCountHint == 0, it may be caused by initialized value, // to avoid the future bugs, let the tasks with RowCountHint == 0 be non-small tasks. - return task.RowCountHint > 0 && task.RowCountHint <= CopSmallTaskRow + return task.RowCountHint > 0 && + (len(task.batchTaskList) == 0 && task.RowCountHint <= CopSmallTaskRow) || + (len(task.batchTaskList) > 0 && task.RowCountHint <= 2*CopSmallTaskRow) } // smallTaskConcurrency counts the small tasks of tasks, @@ -604,6 +615,16 @@ func smallTaskConcurrency(tasks []*copTask, numcpu int) (int, int) { return res, extraConc } +// CopInfo is used to expose functions of copIterator. +type CopInfo interface { + // GetConcurrency returns the concurrency and small task concurrency. + GetConcurrency() (int, int) + // GetStoreBatchInfo returns the batched and fallback num. + GetStoreBatchInfo() (uint64, uint64) + // GetBuildTaskElapsed returns the duration of building task. + GetBuildTaskElapsed() time.Duration +} + type copIterator struct { store *Store req *kv.Request @@ -641,6 +662,10 @@ type copIterator struct { actionOnExceed *rateLimitAction pagingTaskIdx uint32 + + buildTaskElapsed time.Duration + storeBatchedNum atomic.Uint64 + storeBatchedFallbackNum atomic.Uint64 } // copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan. @@ -660,6 +685,9 @@ type copIteratorWorker struct { enableCollectExecutionInfo bool pagingTaskIdx *uint32 + + storeBatchedNum *atomic.Uint64 + storeBatchedFallbackNum *atomic.Uint64 } // copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit. @@ -792,6 +820,8 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC replicaReadSeed: it.replicaReadSeed, enableCollectExecutionInfo: enableCollectExecutionInfo, pagingTaskIdx: &it.pagingTaskIdx, + storeBatchedNum: &it.storeBatchedNum, + storeBatchedFallbackNum: &it.storeBatchedFallbackNum, } go worker.run(ctx) } @@ -890,6 +920,16 @@ func (it *copIterator) GetConcurrency() (int, int) { return it.concurrency, it.smallTaskConcurrency } +// GetStoreBatchInfo returns the batched and fallback num. +func (it *copIterator) GetStoreBatchInfo() (uint64, uint64) { + return it.storeBatchedNum.Load(), it.storeBatchedFallbackNum.Load() +} + +// GetBuildTaskElapsed returns the duration of building task. +func (it *copIterator) GetBuildTaskElapsed() time.Duration { + return it.buildTaskElapsed +} + // GetSendRate returns the rate-limit object. func (it *copIterator) GetSendRate() *util.RateLimit { return it.sendRate @@ -1086,30 +1126,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch Tasks: task.ToPBBatchTasks(), } - var cacheKey []byte - var cacheValue *coprCacheValue - - // If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since - // computing is not the main cost. Ignore such requests directly to avoid slowly building the cache key. - if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { - cKey, err := coprCacheBuildKey(&copReq) - if err == nil { - cacheKey = cKey - cValue := worker.store.coprCache.Get(cKey) - copReq.IsCacheEnabled = true - - if cValue != nil && cValue.RegionID == task.region.GetID() && cValue.TimeStamp <= worker.req.StartTs { - // Append cache version to the request to skip Coprocessor computation if possible - // when request result is cached - copReq.CacheIfMatchVersion = cValue.RegionDataVersion - cacheValue = cValue - } else { - copReq.CacheIfMatchVersion = 0 - } - } else { - logutil.BgLogger().Warn("Failed to build copr cache key", zap.Error(err)) - } - } + cacheKey, cacheValue := worker.buildCacheKey(task, &copReq) req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{ IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel), @@ -1279,13 +1296,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R if err != nil { return remains, err } - return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.GetBatchResponses(), task, ch) + return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp.GetBatchResponses(), task, ch) } if lockErr := resp.pbResp.GetLocked(); lockErr != nil { if err := worker.handleLockErr(bo, lockErr, task); err != nil { return nil, err } - return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch) + return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch) } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) @@ -1316,74 +1333,23 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R } worker.handleCollectExecutionInfo(bo, rpcCtx, resp) resp.respTime = costTime - if resp.pbResp.IsCacheHit { - coprCacheCounterHit.Add(1) - if cacheValue == nil { - return nil, errors.New("Internal error: received illegal TiKV response") - } - // Cache hit and is valid: use cached data as response data and we don't update the cache. - data := make([]byte, len(cacheValue.Data)) - copy(data, cacheValue.Data) - resp.pbResp.Data = data - if worker.req.Paging.Enable { - var start, end []byte - if cacheValue.PageStart != nil { - start = make([]byte, len(cacheValue.PageStart)) - copy(start, cacheValue.PageStart) - } - if cacheValue.PageEnd != nil { - end = make([]byte, len(cacheValue.PageEnd)) - copy(end, cacheValue.PageEnd) - } - // When paging protocol is used, the response key range is part of the cache data. - if start != nil || end != nil { - resp.pbResp.Range = &coprocessor.KeyRange{ - Start: start, - End: end, - } - } else { - resp.pbResp.Range = nil - } - } - resp.detail.CoprCacheHit = true - } else { - coprCacheCounterMiss.Add(1) - // Cache not hit or cache hit but not valid: update the cache if the response can be cached. - if cacheKey != nil && resp.pbResp.CanBeCached && resp.pbResp.CacheLastVersion > 0 { - if resp.detail != nil { - if worker.store.coprCache.CheckResponseAdmission(resp.pbResp.Data.Size(), resp.detail.TimeDetail.ProcessTime, task.pagingTaskIdx) { - data := make([]byte, len(resp.pbResp.Data)) - copy(data, resp.pbResp.Data) - - newCacheValue := coprCacheValue{ - Data: data, - TimeStamp: worker.req.StartTs, - RegionID: task.region.GetID(), - RegionDataVersion: resp.pbResp.CacheLastVersion, - } - // When paging protocol is used, the response key range is part of the cache data. - if r := resp.pbResp.GetRange(); r != nil { - newCacheValue.PageStart = append([]byte{}, r.GetStart()...) - newCacheValue.PageEnd = append([]byte{}, r.GetEnd()...) - } - worker.store.coprCache.Set(cacheKey, &newCacheValue) - } - } - } + if err := worker.handleCopCache(task, resp, cacheKey, cacheValue); err != nil { + return nil, err } + batchResps := resp.pbResp.BatchResponses worker.sendToRespCh(resp, ch, true) - return worker.handleBatchCopResponse(bo, batchResps, task.batchTaskList, ch) + return worker.handleBatchCopResponse(bo, rpcCtx, batchResps, task.batchTaskList, ch) } -func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { if len(task.batchTaskList) == 0 { return remains, nil } batchedTasks := task.batchTaskList task.batchTaskList = nil - batchedRemains, err := worker.handleBatchCopResponse(bo, batchResp, batchedTasks, ch) + batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, batchResp, batchedTasks, ch) if err != nil { return nil, err } @@ -1392,11 +1358,19 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains // handle the batched cop response. // tasks will be changed, so the input tasks should not be used after calling this function. -func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, batchResps []*coprocessor.StoreBatchTaskResponse, + tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) (remainTasks []*copTask, err error) { if len(tasks) == 0 { return nil, nil } - var remainTasks []*copTask + batchedNum := len(tasks) + defer func() { + if err != nil { + return + } + worker.storeBatchedNum.Add(uint64(batchedNum - len(remainTasks))) + worker.storeBatchedFallbackNum.Add(uint64(len(remainTasks))) + }() appendRemainTasks := func(tasks ...*copTask) { if remainTasks == nil { // allocate size fo remain length @@ -1404,6 +1378,13 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp } remainTasks = append(remainTasks, tasks...) } + // need Addr for recording details. + var dummyRPCCtx *tikv.RPCContext + if rpcCtx != nil { + dummyRPCCtx = &tikv.RPCContext{ + Addr: rpcCtx.Addr, + } + } for _, batchResp := range batchResps { taskID := batchResp.GetTaskId() batchedTask, ok := tasks[taskID] @@ -1413,7 +1394,8 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp delete(tasks, taskID) resp := &copResponse{ pbResp: &coprocessor.Response{ - Data: batchResp.Data, + Data: batchResp.Data, + ExecDetailsV2: batchResp.ExecDetailsV2, }, } task := batchedTask.task @@ -1468,8 +1450,8 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp } return nil, errors.Trace(err) } - // TODO: check OOM - worker.sendToRespCh(resp, ch, false) + worker.handleCollectExecutionInfo(bo, dummyRPCCtx, resp) + worker.sendToRespCh(resp, ch, true) } for _, t := range tasks { task := t.task @@ -1525,6 +1507,90 @@ func (worker *copIteratorWorker) handleLockErr(bo *Backoffer, lockErr *kvrpcpb.L return nil } +func (worker *copIteratorWorker) buildCacheKey(task *copTask, copReq *coprocessor.Request) (cacheKey []byte, cacheValue *coprCacheValue) { + // If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since + // computing is not the main cost. Ignore requests with many ranges directly to avoid slowly building the cache key. + if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { + cKey, err := coprCacheBuildKey(copReq) + if err == nil { + cacheKey = cKey + cValue := worker.store.coprCache.Get(cKey) + copReq.IsCacheEnabled = true + + if cValue != nil && cValue.RegionID == task.region.GetID() && cValue.TimeStamp <= worker.req.StartTs { + // Append cache version to the request to skip Coprocessor computation if possible + // when request result is cached + copReq.CacheIfMatchVersion = cValue.RegionDataVersion + cacheValue = cValue + } else { + copReq.CacheIfMatchVersion = 0 + } + } else { + logutil.BgLogger().Warn("Failed to build copr cache key", zap.Error(err)) + } + } + return +} + +func (worker *copIteratorWorker) handleCopCache(task *copTask, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue) error { + if resp.pbResp.IsCacheHit { + if cacheValue == nil { + return errors.New("Internal error: received illegal TiKV response") + } + coprCacheCounterHit.Add(1) + // Cache hit and is valid: use cached data as response data and we don't update the cache. + data := make([]byte, len(cacheValue.Data)) + copy(data, cacheValue.Data) + resp.pbResp.Data = data + if worker.req.Paging.Enable { + var start, end []byte + if cacheValue.PageStart != nil { + start = make([]byte, len(cacheValue.PageStart)) + copy(start, cacheValue.PageStart) + } + if cacheValue.PageEnd != nil { + end = make([]byte, len(cacheValue.PageEnd)) + copy(end, cacheValue.PageEnd) + } + // When paging protocol is used, the response key range is part of the cache data. + if start != nil || end != nil { + resp.pbResp.Range = &coprocessor.KeyRange{ + Start: start, + End: end, + } + } else { + resp.pbResp.Range = nil + } + } + resp.detail.CoprCacheHit = true + return nil + } + coprCacheCounterMiss.Add(1) + // Cache not hit or cache hit but not valid: update the cache if the response can be cached. + if cacheKey != nil && resp.pbResp.CanBeCached && resp.pbResp.CacheLastVersion > 0 { + if resp.detail != nil { + if worker.store.coprCache.CheckResponseAdmission(resp.pbResp.Data.Size(), resp.detail.TimeDetail.ProcessTime, task.pagingTaskIdx) { + data := make([]byte, len(resp.pbResp.Data)) + copy(data, resp.pbResp.Data) + + newCacheValue := coprCacheValue{ + Data: data, + TimeStamp: worker.req.StartTs, + RegionID: task.region.GetID(), + RegionDataVersion: resp.pbResp.CacheLastVersion, + } + // When paging protocol is used, the response key range is part of the cache data. + if r := resp.pbResp.GetRange(); r != nil { + newCacheValue.PageStart = append([]byte{}, r.GetStart()...) + newCacheValue.PageEnd = append([]byte{}, r.GetEnd()...) + } + worker.store.coprCache.Set(cacheKey, &newCacheValue) + } + } + } + return nil +} + func (worker *copIteratorWorker) getLockResolverDetails() *util.ResolveLockDetail { if !worker.enableCollectExecutionInfo { return nil diff --git a/telemetry/BUILD.bazel b/telemetry/BUILD.bazel index a6c79f7de596f..56376f0031109 100644 --- a/telemetry/BUILD.bazel +++ b/telemetry/BUILD.bazel @@ -77,6 +77,7 @@ go_test( "//testkit", "//testkit/testsetup", "@com_github_jeffail_gabs_v2//:gabs", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", diff --git a/telemetry/data.go b/telemetry/data.go index 0008b7fc8b88b..2a3ecffddaa1e 100644 --- a/telemetry/data.go +++ b/telemetry/data.go @@ -69,6 +69,8 @@ func postReportTelemetryData() { PostSavepointCount() postReportLazyPessimisticUniqueCheckSetCount() postReportDDLUsage() + postReportIndexMergeUsage() + postStoreBatchUsage() } // PostReportTelemetryDataForTest is for test. diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index 1fe696870c291..1a186cc3451d2 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -17,6 +17,7 @@ package telemetry import ( "context" "errors" + "strconv" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" @@ -62,6 +63,7 @@ type featureUsage struct { IndexMergeUsageCounter *m.IndexMergeUsageCounter `json:"indexMergeUsageCounter"` ResourceControlUsage *resourceControlUsage `json:"resourceControl"` TTLUsage *ttlUsageCounter `json:"ttlUsage"` + StoreBatchCoprUsage *m.StoreBatchCoprCounter `json:"storeBatchCopr"` } type placementPolicyUsage struct { @@ -121,6 +123,8 @@ func getFeatureUsage(ctx context.Context, sctx sessionctx.Context) (*featureUsag usage.TTLUsage = getTTLUsageInfo(ctx, sctx) + usage.StoreBatchCoprUsage = getStoreBatchUsage(sctx) + return &usage, nil } @@ -264,6 +268,7 @@ var initialSavepointStmtCounter int64 var initialLazyPessimisticUniqueCheckSetCount int64 var initialDDLUsageCounter m.DDLUsageCounter var initialIndexMergeCounter m.IndexMergeUsageCounter +var initialStoreBatchCoprCounter m.StoreBatchCoprCounter // getTxnUsageInfo gets the usage info of transaction related features. It's exported for tests. func getTxnUsageInfo(ctx sessionctx.Context) *TxnUsage { @@ -432,3 +437,18 @@ func getIndexMergeUsageInfo() *m.IndexMergeUsageCounter { diff := curr.Sub(initialIndexMergeCounter) return &diff } + +func getStoreBatchUsage(ctx sessionctx.Context) *m.StoreBatchCoprCounter { + curr := m.GetStoreBatchCoprCounter() + diff := curr.Sub(initialStoreBatchCoprCounter) + if val, err := ctx.GetSessionVars().GetGlobalSystemVar(context.Background(), variable.TiDBStoreBatchSize); err == nil { + if batchSize, err := strconv.Atoi(val); err == nil { + diff.BatchSize = batchSize + } + } + return &diff +} + +func postStoreBatchUsage() { + initialStoreBatchCoprCounter = m.GetStoreBatchCoprCounter() +} diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index c8932cd27e35b..f8e5701f0e69a 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" _ "github.com/pingcap/tidb/autoid_service" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -777,3 +778,69 @@ func TestTTLTelemetry(t *testing.T) { checkTableHistWithDeleteRows(1, 1, 0, 0, 0) checkTableHistWithDelay(0, 1, 1, 0, 1) } + +func TestStoreBatchCopr(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + init, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, init.StoreBatchCoprUsage.BatchSize, 4) + + tk.MustExec("drop table if exists tele_batch_t") + tk.MustExec("create table tele_batch_t (id int primary key, c int, k int, index i(k))") + tk.MustExec("select * from tele_batch_t force index(i) where k between 1 and 10 and k % 2 != 0") + usage, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, usage.StoreBatchCoprUsage.BatchSize, 4) + diff := usage.StoreBatchCoprUsage.Sub(*init.StoreBatchCoprUsage) + require.Equal(t, diff.BatchedQuery, int64(1)) + require.Equal(t, diff.BatchedQueryTask, int64(0)) + require.Equal(t, diff.BatchedCount, int64(0)) + require.Equal(t, diff.BatchedFallbackCount, int64(0)) + + tk.MustExec("insert into tele_batch_t values(1, 1, 1), (2, 2, 2), (3, 3, 3), (5, 5, 5), (7, 7, 7)") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/setRangesPerTask", "return(1)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/setRangesPerTask")) + }() + tk.MustQuery("select * from tele_batch_t force index(i) where k between 1 and 3 and k % 2 != 0").Sort(). + Check(testkit.Rows("1 1 1", "3 3 3")) + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, usage.StoreBatchCoprUsage.BatchSize, 4) + diff = usage.StoreBatchCoprUsage.Sub(*init.StoreBatchCoprUsage) + require.Equal(t, diff.BatchedQuery, int64(2)) + require.Equal(t, diff.BatchedQueryTask, int64(2)) + require.Equal(t, diff.BatchedCount, int64(1)) + require.Equal(t, diff.BatchedFallbackCount, int64(0)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/batchCopRegionError", "return")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/batchCopRegionError")) + }() + tk.MustQuery("select * from tele_batch_t force index(i) where k between 1 and 3 and k % 2 != 0").Sort(). + Check(testkit.Rows("1 1 1", "3 3 3")) + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, usage.StoreBatchCoprUsage.BatchSize, 4) + diff = usage.StoreBatchCoprUsage.Sub(*init.StoreBatchCoprUsage) + require.Equal(t, diff.BatchedQuery, int64(3)) + require.Equal(t, diff.BatchedQueryTask, int64(4)) + require.Equal(t, diff.BatchedCount, int64(1)) + require.Equal(t, diff.BatchedFallbackCount, int64(1)) + + tk.MustExec("set global tidb_store_batch_size = 0") + tk.MustExec("set session tidb_store_batch_size = 0") + tk.MustQuery("select * from tele_batch_t force index(i) where k between 1 and 3 and k % 2 != 0").Sort(). + Check(testkit.Rows("1 1 1", "3 3 3")) + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, usage.StoreBatchCoprUsage.BatchSize, 0) + diff = usage.StoreBatchCoprUsage.Sub(*init.StoreBatchCoprUsage) + require.Equal(t, diff.BatchedQuery, int64(3)) + require.Equal(t, diff.BatchedQueryTask, int64(4)) + require.Equal(t, diff.BatchedCount, int64(1)) + require.Equal(t, diff.BatchedFallbackCount, int64(1)) +}