Skip to content

Commit

Permalink
executor: trace and control memory usage in DistSQL layer (#10003) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and zz-jason committed Apr 22, 2019
1 parent 7bf69bd commit a52c48b
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 0 deletions.
5 changes: 5 additions & 0 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
return nil, errors.Trace(err)
}

// kvReq.MemTracker is used to trace and control memory usage in DistSQL layer;
// for streamResult, since it is a pipeline which has no buffer, it's not necessary to trace it;
// for selectResult, we just use the kvReq.MemTracker prepared for co-processor
// instead of creating a new one for simplification.
if kvReq.Streaming {
return &streamResult{
resp: resp,
Expand All @@ -69,6 +73,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
ctx: sctx,
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
return &execdetails.ExecDetails{}
}

// MemSize implements kv.ResultSubset interface.
func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) }

func populateBuffer() []byte {
numCols := 4
numRows := 1024
Expand Down
10 changes: 10 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -41,6 +43,14 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
return &builder.Request, errors.Trace(builder.err)
}

// SetMemTracker sets a memTracker for this request.
func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label string) *RequestBuilder {
t := memory.NewTracker(label, sctx.GetSessionVars().MemQuotaDistSQL)
t.AttachTo(sctx.GetSessionVars().StmtCtx.MemTracker)
builder.Request.MemTracker = t
return builder
}

// SetTableRanges sets "KeyRanges" for "kv.Request" by converting "tableRanges"
// to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableRanges(tid int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
Expand Down
16 changes: 16 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -68,6 +69,8 @@ type selectResult struct {
feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
sqlType string

memTracker *memory.Tracker
}

func (r *selectResult) Fetch(ctx context.Context) {
Expand All @@ -91,6 +94,10 @@ func (r *selectResult) fetch(ctx context.Context) {
return
}

if r.memTracker != nil {
r.memTracker.Consume(int64(resultSubset.MemSize()))
}

select {
case r.results <- resultWithErr{result: resultSubset}:
case <-r.closed:
Expand Down Expand Up @@ -141,15 +148,24 @@ func (r *selectResult) getSelectResp() error {
if re.err != nil {
return errors.Trace(re.err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(-int64(r.selectResp.Size()))
}
if re.result == nil {
r.selectResp = nil
return nil
}
if r.memTracker != nil {
r.memTracker.Consume(-int64(re.result.MemSize()))
}
r.selectResp = new(tipb.SelectResponse)
err := r.selectResp.Unmarshal(re.result.GetData())
if err != nil {
return errors.Trace(err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(int64(r.selectResp.Size()))
}
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
Expand Down
2 changes: 2 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "IndexReaderDistSQLTracker").
Build()
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -421,6 +422,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "IndexLookupDistSQLTracker").
Build()
if err != nil {
return errors.Trace(err)
Expand Down
1 change: 1 addition & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func (s *testExecSuite) TestProjectionUnparallelRequiredRows(c *C) {
}

func (s *testExecSuite) TestProjectionParallelRequiredRows(c *C) {
c.Skip("not stable because of goroutine schedule")
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
Expand Down
92 changes: 92 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/errors"
gofail "github.com/pingcap/gofail/runtime"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -63,6 +64,8 @@ import (
"github.com/pingcap/tidb/util/testutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/context"
)

Expand All @@ -77,6 +80,7 @@ var _ = Suite(&testSuite{})
var _ = Suite(&testContextOptionSuite{})
var _ = Suite(&testBypassSuite{})
var _ = Suite(&testUpdateSuite{})
var _ = Suite(&testOOMSuite{})

type testSuite struct {
cluster *mocktikv.Cluster
Expand Down Expand Up @@ -3452,3 +3456,91 @@ func (s *testSuite) TestDoSubquery(c *C) {
c.Assert(err, IsNil, Commentf("err %v", err))
c.Assert(r, IsNil, Commentf("result of Do not empty"))
}

type testOOMSuite struct {
store kv.Storage
do *domain.Domain
oom *oomCapturer
}

func (s *testOOMSuite) SetUpSuite(c *C) {
c.Skip("log.ReplaceGlobals(lg, r) in registerHook() may result in data race")
testleak.BeforeTest()
s.registerHook()
var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.SetSchemaLease(0)
domain.RunAutoAnalyze = false
s.do, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testOOMSuite) registerHook() {
conf := &log.Config{Level: "info", File: log.FileLogConfig{}}
_, r, _ := log.InitLogger(conf)
s.oom = &oomCapturer{r.Core, ""}
lg := zap.New(s.oom)
log.ReplaceGlobals(lg, r)
}

func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int, a int, b int, index idx_a(`a`))")
tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)")

s.oom.tracker = ""
tk.MustQuery("select * from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaDistSQL = 1
tk.MustQuery("select * from t")
c.Assert(s.oom.tracker, Equals, "TableReaderDistSQLTracker")
tk.Se.GetSessionVars().MemQuotaDistSQL = -1

s.oom.tracker = ""
tk.MustQuery("select a from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaDistSQL = 1
tk.MustQuery("select a from t use index(idx_a)")
c.Assert(s.oom.tracker, Equals, "IndexReaderDistSQLTracker")
tk.Se.GetSessionVars().MemQuotaDistSQL = -1

s.oom.tracker = ""
tk.MustQuery("select * from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaDistSQL = 1
tk.MustQuery("select * from t use index(idx_a)")
c.Assert(s.oom.tracker, Equals, "IndexLookupDistSQLTracker")
tk.Se.GetSessionVars().MemQuotaDistSQL = -1
}

type oomCapturer struct {
zapcore.Core
tracker string
}

func (h *oomCapturer) Write(entry zapcore.Entry, fields []zapcore.Field) error {
if strings.Contains(entry.Message, "memory exceeds quota") {
err, _ := fields[0].Interface.(error)
str := err.Error()
begin := strings.Index(str, "8001]")
if begin == -1 {
panic("begin not found")
}
end := strings.Index(str, " holds")
if end == -1 {
panic("end not found")
}
h.tracker = str[begin+len("8001]") : end]
}
return nil
}

func (h *oomCapturer) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if h.Enabled(e.Level) {
return ce.AddCore(e, h)
}
return ce
}
1 change: 1 addition & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "TableReaderDistSQLTracker").
Build()
if err != nil {
return nil, errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -209,6 +210,8 @@ type Request struct {
// Streaming indicates using streaming API for this request, result in that one Next()
// call would not corresponds to a whole region result.
Streaming bool
// MemTracker is used to trace and control memory usage in co-processor layer.
MemTracker *memory.Tracker
}

// ResultSubset represents a result subset from a single storage unit.
Expand All @@ -220,6 +223,8 @@ type ResultSubset interface {
GetStartKey() Key
// GetExecDetails gets the detail information.
GetExecDetails() *execdetails.ExecDetails
// MemSize returns how many bytes of memory this result use for tracing memory usage.
MemSize() int64
}

// Response represents the response returned from KV layer.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func NewSessionVars() *SessionVars {
MemQuotaIndexLookupReader: DefTiDBMemQuotaIndexLookupReader,
MemQuotaIndexLookupJoin: DefTiDBMemQuotaIndexLookupJoin,
MemQuotaNestedLoopApply: DefTiDBMemQuotaNestedLoopApply,
MemQuotaDistSQL: DefTiDBMemQuotaDistSQL,
}
vars.BatchSize = BatchSize{
IndexJoinBatchSize: DefIndexJoinBatchSize,
Expand Down Expand Up @@ -732,6 +733,8 @@ type MemQuota struct {
MemQuotaIndexLookupJoin int64
// MemQuotaNestedLoopApply defines the memory quota for a nested loop apply executor.
MemQuotaNestedLoopApply int64
// MemQuotaDistSQL defines the memory quota for all operators in DistSQL layer like co-processor and selectResult.
MemQuotaDistSQL int64
}

// BatchSize defines batch size values.
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ const (
DefTiDBMemQuotaIndexLookupReader = 32 << 30 // 32GB.
DefTiDBMemQuotaIndexLookupJoin = 32 << 30 // 32GB.
DefTiDBMemQuotaNestedLoopApply = 32 << 30 // 32GB.
DefTiDBMemQuotaDistSQL = 32 << 30 // 32GB.
DefTiDBGeneralLog = 0
DefTiDBRetryLimit = 10
DefTiDBDisableTxnAutoRetry = false
Expand Down
Loading

0 comments on commit a52c48b

Please sign in to comment.