Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: tidb tracing prototype #7016

Merged
merged 27 commits into from
Aug 30, 2018
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a8821c8
tidb tracing prototype
zhexuany Jul 19, 2018
92d1764
add benchmark for noop
zhexuany Aug 17, 2018
3e61f1b
remove old span system
zhexuany Aug 21, 2018
93b420a
rename file and add tree relationship tesdt
zhexuany Aug 23, 2018
b8abe7e
tree format
zhexuany Aug 23, 2018
7a9d6db
remvoe useless code
zhexuany Aug 29, 2018
b46a727
fix fmt issue
zhexuany Aug 29, 2018
84b9d78
iterate all data until chk's row count is zero
zhexuany Aug 29, 2018
3fe6951
chk has 0 row count, it will not enter for loop
zhexuany Aug 29, 2018
31961e6
undo format by go1.11
zhexuany Aug 29, 2018
c757655
Merge branch 'master' into tidb_tracing_prototype
zhexuany Aug 29, 2018
70f0d9b
address comments
zhexuany Aug 29, 2018
6181ca2
Merge branch 'tidb_tracing_prototype' of github.com:zhexuany/tidb int…
zhexuany Aug 29, 2018
cb366aa
remove unecessary import
zhexuany Aug 30, 2018
46e681e
add more tests
zhexuany Aug 30, 2018
e0d7cba
add a todo
zhexuany Aug 30, 2018
7657e5e
Merge branch 'master' into tidb_tracing_prototype
zhexuany Aug 30, 2018
227626b
address comments
zhexuany Aug 30, 2018
03c3f7a
remvoe extra line
zhexuany Aug 30, 2018
507b567
move buildTrace to builder.go
zhexuany Aug 30, 2018
0ccae95
remove extra imports from trace.go
zhexuany Aug 30, 2018
e2cb6ff
record optimize time in traceExec's Next
zhexuany Aug 30, 2018
9f5b5bf
open child in next rather in open
zhexuany Aug 30, 2018
e49d58a
fix optimize logic mistake
zhexuany Aug 30, 2018
da0e06b
address comment
zhexuany Aug 30, 2018
2c6aa09
use == not !=
zhexuany Aug 30, 2018
1468f03
Merge branch 'master' into tidb_tracing_prototype
zz-jason Aug 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildDelete(v)
case *plan.Execute:
return b.buildExecute(v)
case *plan.Trace:
return b.buildTrace(v)
case *plan.Explain:
return b.buildExplain(v)
case *plan.PointGetPlan:
Expand Down
19 changes: 0 additions & 19 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"unsafe"

"github.com/juju/errors"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -210,18 +209,6 @@ func splitRanges(ranges []*ranger.Range, keepOrder bool) ([]*ranger.Range, []*ra
return signedRanges, unsignedRanges
}

// startSpanFollowContext is similar to opentracing.StartSpanFromContext, but the span reference use FollowsFrom option.
func startSpanFollowsContext(ctx context.Context, operationName string) (opentracing.Span, context.Context) {
span := opentracing.SpanFromContext(ctx)
if span != nil {
span = opentracing.StartSpan(operationName, opentracing.FollowsFrom(span.Context()))
} else {
span = opentracing.StartSpan(operationName)
}

return span, opentracing.ContextWithSpan(ctx, span)
}

// rebuildIndexRanges will be called if there's correlated column in access conditions. We will rebuild the range
// by substitute correlated column with the constant.
func rebuildIndexRanges(ctx sessionctx.Context, is *plan.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) {
Expand Down Expand Up @@ -298,9 +285,6 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
}

func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
span, ctx := startSpanFollowsContext(ctx, "executor.IndexReader.Open")
defer span.Finish()

var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
Expand Down Expand Up @@ -403,9 +387,6 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaIndexLookupReader)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

span, ctx := startSpanFollowsContext(ctx, "executor.IndexLookUp.Open")
defer span.Finish()

e.finished = make(chan struct{})
e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))

Expand Down
11 changes: 4 additions & 7 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ type TableReaderExecutor struct {

// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
span, ctx := startSpanFollowsContext(ctx, "executor.TableReader.Open")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

defer span.Finish()

var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
Expand Down Expand Up @@ -101,11 +98,11 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
err := e.resultHandler.nextChunk(ctx, chk)
if err != nil {
if err := e.resultHandler.nextChunk(ctx, chk); err != nil {
e.feedback.Invalidate()
return err
}
return errors.Trace(err)
return errors.Trace(nil)
}

// Close implements the Executor Close interface.
Expand All @@ -115,7 +112,7 @@ func (e *TableReaderExecutor) Close() error {
return errors.Trace(err)
}

// buildResp first build request and send it to tikv using distsql.Select. It uses SelectResut returned by the callee
// buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee
// to fetch all results.
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
Expand Down
152 changes: 152 additions & 0 deletions executor/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"fmt"
"time"

"github.com/juju/errors"
"github.com/opentracing/basictracer-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/tracing"
"golang.org/x/net/context"
)

// TraceExec represents a root executor of trace query.
type TraceExec struct {
baseExecutor
// CollectedSpans collects all span during execution. Span is appended via
// callback method which passes into tracer implementation.
CollectedSpans []basictracer.RawSpan
// exhausted being true means there is no more result.
exhausted bool
zhexuany marked this conversation as resolved.
Show resolved Hide resolved
// plan is the real query plan and it is used for building real query's executor.
plan plan.Plan
// rootTrace represents root span which is father of all other span.
rootTrace opentracing.Span

childrenResults []*chunk.Chunk
}

// buildTrace builds a TraceExec for future executing. This method will be called
// at build().
func (b *executorBuilder) buildTrace(v *plan.Trace) Executor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to maintain this function in the "builder.go" file, like other functions.

e := &TraceExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
}

pp, _ := v.StmtPlan.(plan.PhysicalPlan)
e.children = make([]Executor, 0, len(pp.Children()))
for _, child := range pp.Children() {
switch p := child.(type) {
case *plan.PhysicalTableReader, *plan.PhysicalIndexReader, *plan.PhysicalIndexLookUpReader, *plan.PhysicalHashAgg, *plan.PhysicalProjection, *plan.PhysicalStreamAgg, *plan.PhysicalSort:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please split the long code line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems these operators are also not supported in this PR?

e.children = append(e.children, b.build(p))
default:
panic(fmt.Sprintf("%v is not supported", child))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not panic, set b.err instead.

}

zhexuany marked this conversation as resolved.
Show resolved Hide resolved
}

zhexuany marked this conversation as resolved.
Show resolved Hide resolved
return e
}

// Open opens a trace executor and it will create a root trace span which will be
// used for the following span in a relationship of `ChildOf` or `FollowFrom`.
// for more details, you could refer to http://opentracing.io
func (e *TraceExec) Open(ctx context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that we can not trace the building and optimization time of a query, only the execution time can be traced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only for now. I will file another PR to support this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the TraceExec has no child, I think we can just leave this function to be empty.

e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After that, global opentracing Tracer is modified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will file another PR to better handle this.

e.CollectedSpans = append(e.CollectedSpans, sp)
})
// we actually don't care when underlying executor started. We only care how
// much time was spent
for _, child := range e.children {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only need to call e.stmtExec.Open()

err := child.Open(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will some node open multiple times?
The children node open may also call its children's open ? If something like indexReader open multiple time, would there be resource leak?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, it is not. I just copied the code from basicExecutor with some addition about how to set up tracing.

if err != nil {
return errors.Trace(err)
}
}
e.childrenResults = make([]*chunk.Chunk, 0, len(e.children))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only one Chunk is needed for e.stmtExxec

for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
}

return nil
}

// Next executes real query and collects span later.
func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.exhausted {
return nil
}

// store span into context
ctx = opentracing.ContextWithSpan(ctx, e.rootTrace)
if len(e.children) > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be removed.

for {
if err := e.children[0].Next(ctx, e.childrenResults[0]); err != nil {
return errors.Trace(err)
}
if e.childrenResults[0].NumRows() != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.childrenResults[0].NumRows() == 0

break
}
}
}

e.rootTrace.LogKV("event", "tracing completed")
e.rootTrace.Finish()
var rootSpan basictracer.RawSpan

treeSpans := make(map[uint64][]basictracer.RawSpan)
for _, sp := range e.CollectedSpans {
treeSpans[sp.ParentSpanID] = append(treeSpans[sp.ParentSpanID], sp)
// if a span's parentSpanID is 0, then it is root span
// this is by design
if sp.ParentSpanID == 0 {
rootSpan = sp
}
}

dfsTree(rootSpan, treeSpans, "", false, chk)
e.exhausted = true
return nil
}

func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, prefix string, isLast bool, chk *chunk.Chunk) {
suffix := ""
spans := tree[span.Context.SpanID]
var newPrefix string
if span.ParentSpanID == 0 {
newPrefix = prefix
} else {
if len(tree[span.ParentSpanID]) > 0 && !isLast {
suffix = "├─"
newPrefix = prefix + "│ "
} else {
suffix = "└─"
newPrefix = prefix + " "
}
}

chk.AppendString(0, prefix+suffix+span.Operation)
chk.AppendString(1, span.Start.Format(time.StampNano))
chk.AppendString(2, span.Duration.String())

for i, sp := range spans {
dfsTree(sp, tree, newPrefix, i == (len(spans))-1 /*last element of array*/, chk)
}
}
2 changes: 2 additions & 0 deletions plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func (b *planBuilder) build(node ast.Node) (Plan, error) {
return b.buildExecute(x)
case *ast.ExplainStmt:
return b.buildExplain(x)
case *ast.TraceStmt:
return b.buildTrace(x)
case *ast.InsertStmt:
return b.buildInsert(x)
case *ast.LoadDataStmt:
Expand Down
39 changes: 39 additions & 0 deletions plan/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package plan

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/mysql"
)

// Trace represents a trace plan.
type Trace struct {
baseSchemaProducer

StmtPlan Plan
}

// buildTrace builds a trace plan. Inside this method, it first optimize the
// underlying query and then constructs a schema, which will be used to constructs
// rows result.
func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) {
if _, ok := trace.Stmt.(*ast.SelectStmt); !ok {
return nil, errors.New("trace only supports select query")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about "only the select statement can be traced"?

}

optimizedP, err := Optimize(b.ctx, trace.Stmt, b.is)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/optimizedP/stmtPlan/?

if err != nil {
return nil, errors.New("fail to optimize during build trace")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return the error directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creating a new error with error message "fail to optimize during build trace" seems to make a lot of senses.

}
p := &Trace{StmtPlan: optimizedP}

retFields := []string{"operation", "duration", "spanID"}
schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...)
schema.Append(buildColumn("", "operation", mysql.TypeString, mysql.MaxBlobWidth))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this empty line?

schema.Append(buildColumn("", "startTS", mysql.TypeString, mysql.MaxBlobWidth))
schema.Append(buildColumn("", "duration", mysql.TypeString, mysql.MaxBlobWidth))
p.SetSchema(schema)
return p, nil
}
26 changes: 0 additions & 26 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/juju/errors"
"github.com/ngaut/pools"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
Expand Down Expand Up @@ -375,11 +374,6 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
}

func (s *session) CommitTxn(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("session.CommitTxn", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

err := s.doCommitWithRetry(ctx)
label := metrics.LblOK
if err != nil {
Expand All @@ -390,11 +384,6 @@ func (s *session) CommitTxn(ctx context.Context) error {
}

func (s *session) RollbackTxn(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("session.RollbackTxn", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

var err error
if s.txn.Valid() {
terror.Log(s.txn.Rollback())
Expand Down Expand Up @@ -451,9 +440,6 @@ func (s *session) isRetryableError(err error) bool {
}

func (s *session) retry(ctx context.Context, maxCnt uint) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "retry")
defer span.Finish()

connID := s.sessionVars.ConnectionID
if s.sessionVars.TxnCtx.ForUpdate {
return errForUpdateCantRetry.GenByArgs(connID)
Expand Down Expand Up @@ -545,10 +531,7 @@ func (s *session) sysSessionPool() *pools.ResourcePool {
// Unlike normal Exec, it doesn't reset statement status, doesn't commit or rollback the current transaction
// and doesn't write binlog.
func (s *session) ExecRestrictedSQL(sctx sessionctx.Context, sql string) ([]chunk.Row, []*ast.ResultField, error) {
var span opentracing.Span
ctx := context.TODO()
span, ctx = opentracing.StartSpanFromContext(ctx, "session.ExecRestrictedSQL")
defer span.Finish()

// Use special session to execute the sql.
tmp, err := s.sysSessionPool().Get()
Expand Down Expand Up @@ -712,10 +695,6 @@ func (s *session) SetGlobalSysVar(name, value string) error {
}

func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string) ([]ast.StmtNode, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span1 := opentracing.StartSpan("session.ParseSQL", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
s.parser.SetSQLMode(s.sessionVars.SQLMode)
return s.parser.Parse(sql, charset, collation)
}
Expand Down Expand Up @@ -770,11 +749,6 @@ func (s *session) Execute(ctx context.Context, sql string) (recordSets []ast.Rec
}

func (s *session) execute(ctx context.Context, sql string) (recordSets []ast.RecordSet, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span, ctx = opentracing.StartSpanFromContext(ctx, "session.Execute")
defer span.Finish()
}

s.PrepareTxnCtx(ctx)
connID := s.sessionVars.ConnectionID
err = s.loadCommonGlobalVariablesIfNeeded()
Expand Down
Loading