Skip to content

Commit

Permalink
executor: trace and control memory usage in DistSQL layer (pingcap#10003
Browse files Browse the repository at this point in the history
)
  • Loading branch information
qw4990 committed Apr 19, 2019
1 parent 7bf69bd commit 0898ce8
Show file tree
Hide file tree
Showing 13 changed files with 415 additions and 1 deletion.
5 changes: 5 additions & 0 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
return nil, errors.Trace(err)
}

// kvReq.MemTracker is used to trace and control memory usage in DistSQL layer;
// for streamResult, since it is a pipeline which has no buffer, it's not necessary to trace it;
// for selectResult, we just use the kvReq.MemTracker prepared for co-processor
// instead of creating a new one for simplification.
if kvReq.Streaming {
return &streamResult{
resp: resp,
Expand All @@ -69,6 +73,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
ctx: sctx,
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
return &execdetails.ExecDetails{}
}

// MemSize implements kv.ResultSubset interface.
func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) }

func populateBuffer() []byte {
numCols := 4
numRows := 1024
Expand Down
10 changes: 10 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -41,6 +43,14 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
return &builder.Request, errors.Trace(builder.err)
}

// SetMemTracker sets a memTracker for this request.
func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label string) *RequestBuilder {
t := memory.NewTracker(label, sctx.GetSessionVars().MemQuotaDistSQL)
t.AttachTo(sctx.GetSessionVars().StmtCtx.MemTracker)
builder.Request.MemTracker = t
return builder
}

// SetTableRanges sets "KeyRanges" for "kv.Request" by converting "tableRanges"
// to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableRanges(tid int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
Expand Down
16 changes: 16 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -68,6 +69,8 @@ type selectResult struct {
feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
sqlType string

memTracker *memory.Tracker
}

func (r *selectResult) Fetch(ctx context.Context) {
Expand All @@ -91,6 +94,10 @@ func (r *selectResult) fetch(ctx context.Context) {
return
}

if r.memTracker != nil {
r.memTracker.Consume(int64(resultSubset.MemSize()))
}

select {
case r.results <- resultWithErr{result: resultSubset}:
case <-r.closed:
Expand Down Expand Up @@ -141,15 +148,24 @@ func (r *selectResult) getSelectResp() error {
if re.err != nil {
return errors.Trace(re.err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(-int64(r.selectResp.Size()))
}
if re.result == nil {
r.selectResp = nil
return nil
}
if r.memTracker != nil {
r.memTracker.Consume(-int64(re.result.MemSize()))
}
r.selectResp = new(tipb.SelectResponse)
err := r.selectResp.Unmarshal(re.result.GetData())
if err != nil {
return errors.Trace(err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(int64(r.selectResp.Size()))
}
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
Expand Down
235 changes: 235 additions & 0 deletions executor/chunk_size_control_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
"context"
"fmt"
"strings"
"sync"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/testkit"
)

var (
_ = Suite(&testChunkSizeControlSuite{})
)

type testSlowClient struct {
sync.RWMutex
tikv.Client
regionDelay map[uint64]time.Duration
}

func (c *testSlowClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
regionID := req.RegionId
delay := c.GetDelay(regionID)
if req.Type == tikvrpc.CmdCop && delay > 0 {
time.Sleep(delay)
}
return c.Client.SendRequest(ctx, addr, req, timeout)
}

func (c *testSlowClient) SetDelay(regionID uint64, dur time.Duration) {
c.Lock()
defer c.Unlock()
c.regionDelay[regionID] = dur
}

func (c *testSlowClient) GetDelay(regionID uint64) time.Duration {
c.RLock()
defer c.RUnlock()
return c.regionDelay[regionID]
}

// manipulateCluster splits this cluster's region by splitKeys and returns regionIDs after split
func manipulateCluster(cluster *mocktikv.Cluster, splitKeys [][]byte) []uint64 {
regions := cluster.GetAllRegions()
if len(regions) != 1 {
panic("this cluster has already split")
}

allRegionIDs := []uint64{regions[0].Meta.Id}
for i, key := range splitKeys {
newRegionID, newPeerID := cluster.AllocID(), cluster.AllocID()
cluster.Split(allRegionIDs[i], newRegionID, key, []uint64{newPeerID}, newPeerID)
allRegionIDs = append(allRegionIDs, newRegionID)
}
return allRegionIDs
}

func generateTableSplitKeyForInt(tid int64, splitNum []int) [][]byte {
results := make([][]byte, 0, len(splitNum))
for _, num := range splitNum {
results = append(results, tablecodec.EncodeRowKey(tid, codec.EncodeInt(nil, int64(num))))
}
return results
}

func generateIndexSplitKeyForInt(tid, idx int64, splitNum []int) [][]byte {
results := make([][]byte, 0, len(splitNum))
for _, num := range splitNum {
d := new(types.Datum)
d.SetInt64(int64(num))
b, err := codec.EncodeKey(nil, nil, *d)
if err != nil {
panic(err)
}
results = append(results, tablecodec.EncodeIndexSeekKey(tid, idx, b))
}
return results
}

type testChunkSizeControlKit struct {
store kv.Storage
dom *domain.Domain
tk *testkit.TestKit
client *testSlowClient
cluster *mocktikv.Cluster
}

type testChunkSizeControlSuite struct {
m map[string]*testChunkSizeControlKit
}

func (s *testChunkSizeControlSuite) SetUpSuite(c *C) {
tableSQLs := map[string]string{}
tableSQLs["Limit&TableScan"] = "create table t (a int, primary key (a))"
tableSQLs["Limit&IndexScan"] = "create table t (a int, index idx_a(a))"

s.m = make(map[string]*testChunkSizeControlKit)
for name, sql := range tableSQLs {
// BootstrapSession is not thread-safe, so we have to prepare all resources in SetUp.
kit := new(testChunkSizeControlKit)
s.m[name] = kit
kit.client = &testSlowClient{regionDelay: make(map[uint64]time.Duration)}
kit.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(kit.cluster)

var err error
kit.store, err = mockstore.NewMockTikvStore(
mockstore.WithCluster(kit.cluster),
mockstore.WithHijackClient(func(c tikv.Client) tikv.Client {
kit.client.Client = c
return kit.client
}),
)
c.Assert(err, IsNil)

// init domain
kit.dom, err = session.BootstrapSession(kit.store)
c.Assert(err, IsNil)

// create the test table
kit.tk = testkit.NewTestKitWithInit(c, kit.store)
kit.tk.MustExec(sql)
}
}

func (s *testChunkSizeControlSuite) getKit(name string) (
kv.Storage, *domain.Domain, *testkit.TestKit, *testSlowClient, *mocktikv.Cluster) {
x := s.m[name]
return x.store, x.dom, x.tk, x.client, x.cluster
}

func (s *testChunkSizeControlSuite) TestLimitAndTableScan(c *C) {
c.Skip("not stable because coprocessor may result in goroutine leak")
_, dom, tk, client, cluster := s.getKit("Limit&TableScan")
defer client.Close()
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tid := tbl.Meta().ID

// construct two regions split by 100
splitKeys := generateTableSplitKeyForInt(tid, []int{100})
regionIDs := manipulateCluster(cluster, splitKeys)

noDelayThreshold := time.Millisecond * 100
delayDuration := time.Second
delayThreshold := delayDuration * 9 / 10
tk.MustExec("insert into t values (1)") // insert one record into region1, and set a delay duration
client.SetDelay(regionIDs[0], delayDuration)

results := tk.MustQuery("explain analyze select * from t where t.a > 0 and t.a < 200 limit 1")
cost := s.parseTimeCost(c, results.Rows()[0])
c.Assert(cost, Not(Less), delayThreshold) // have to wait for region1

tk.MustExec("insert into t values (101)") // insert one record into region2
results = tk.MustQuery("explain analyze select * from t where t.a > 0 and t.a < 200 limit 1")
cost = s.parseTimeCost(c, results.Rows()[0])
c.Assert(cost, Less, noDelayThreshold) // region2 return quickly

results = tk.MustQuery("explain analyze select * from t where t.a > 0 and t.a < 200 limit 2")
cost = s.parseTimeCost(c, results.Rows()[0])
c.Assert(cost, Not(Less), delayThreshold) // have to wait
}

func (s *testChunkSizeControlSuite) TestLimitAndIndexScan(c *C) {
c.Skip("not stable because coprocessor may result in goroutine leak")
_, dom, tk, client, cluster := s.getKit("Limit&IndexScan")
defer client.Close()
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tid := tbl.Meta().ID
idx := tbl.Meta().Indices[0].ID

// construct two regions split by 100
splitKeys := generateIndexSplitKeyForInt(tid, idx, []int{100})
regionIDs := manipulateCluster(cluster, splitKeys)

noDelayThreshold := time.Millisecond * 100
delayDuration := time.Second
delayThreshold := delayDuration * 9 / 10
tk.MustExec("insert into t values (1)") // insert one record into region1, and set a delay duration
client.SetDelay(regionIDs[0], delayDuration)

results := tk.MustQuery("explain analyze select * from t where t.a > 0 and t.a < 200 limit 1")
cost := s.parseTimeCost(c, results.Rows()[0])
c.Assert(cost, Not(Less), delayThreshold) // have to wait for region1

tk.MustExec("insert into t values (101)") // insert one record into region2
results = tk.MustQuery("explain analyze select * from t where t.a > 0 and t.a < 200 limit 1")
cost = s.parseTimeCost(c, results.Rows()[0])
c.Assert(cost, Less, noDelayThreshold) // region2 return quickly

results = tk.MustQuery("explain analyze select * from t where t.a > 0 and t.a < 200 limit 2")
cost = s.parseTimeCost(c, results.Rows()[0])
c.Assert(cost, Not(Less), delayThreshold) // have to wait
}

func (s *testChunkSizeControlSuite) parseTimeCost(c *C, line []interface{}) time.Duration {
lineStr := fmt.Sprintf("%v", line)
idx := strings.Index(lineStr, "time:")
c.Assert(idx, Not(Equals), -1)
lineStr = lineStr[idx+len("time:"):]
idx = strings.Index(lineStr, ",")
c.Assert(idx, Not(Equals), -1)
timeStr := lineStr[:idx]
d, err := time.ParseDuration(timeStr)
c.Assert(err, IsNil)
return d
}
2 changes: 2 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "IndexReaderDistSQLTracker").
Build()
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -421,6 +422,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "IndexLookupDistSQLTracker").
Build()
if err != nil {
return errors.Trace(err)
Expand Down
1 change: 1 addition & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func (s *testExecSuite) TestProjectionUnparallelRequiredRows(c *C) {
}

func (s *testExecSuite) TestProjectionParallelRequiredRows(c *C) {
c.Skip("not stable because of goroutine schedule")
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
Expand Down
Loading

0 comments on commit 0898ce8

Please sign in to comment.