Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: do not return first row until the frame is completed. #12480

Merged
merged 8 commits into from
Oct 12, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 84 additions & 83 deletions executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,137 +30,138 @@ import (
type WindowExec struct {
baseExecutor

groupChecker *groupChecker
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
groupRows []chunk.Row
childResults []*chunk.Chunk
executed bool
meetNewGroup bool
remainingRowsInGroup int
remainingRowsInChunk int
numWindowFuncs int
processor windowProcessor
groupChecker *groupChecker
// inputIter is the iterator of children chunks
inputIter *chunk.Iterator4Chunk
// executed indicates the child executor is drained or something unexpected happened.
executed bool
requiredRows int
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
// resultChunk stores the chunk to return
resultChunk []*chunk.Chunk
// remainingRowsInChunk indicates how many rows the resultChunk[i] is not prepared.
remainingRowsInChunk []int

numWindowFuncs int
processor windowProcessor
}

// Close implements the Executor Close interface.
func (e *WindowExec) Close() error {
e.childResults = nil
return errors.Trace(e.baseExecutor.Close())
}

// Next implements the Executor Next interface.
func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.requiredRows == 0 { // we assume that each future chk has the same RequiredRows.
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
e.requiredRows = chk.RequiredRows()
}

chk.Reset()
if (e.executed || e.meetNewGroup) && e.remainingRowsInGroup > 0 {
err := e.appendResult2Chunk(chk)
for !e.executed && !e.preparedChunkAvailable() {
err := e.consumeOneGroup(ctx)
if err != nil {
e.executed = true
return err
}
}
for !e.executed && (chk.NumRows() == 0 || e.remainingRowsInChunk > 0) {
err := e.consumeOneGroup(ctx, chk)
if err != nil {
e.executed = true
return errors.Trace(err)
}
if len(e.resultChunk) > 0 {
chk.SwapColumns(e.resultChunk[0])
e.resultChunk[0] = nil // GC it. TODO: reuse it.
e.resultChunk = e.resultChunk[1:]
e.remainingRowsInChunk = e.remainingRowsInChunk[1:]
}
return nil
}

func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) error {
var err error
if err = e.fetchChildIfNecessary(ctx, chk); err != nil {
return errors.Trace(err)
}
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
e.meetNewGroup, err = e.groupChecker.meetNewGroup(e.inputRow)
func (e *WindowExec) preparedChunkAvailable() bool {
return len(e.resultChunk) > 0 && e.remainingRowsInChunk[0] == 0
}

func (e *WindowExec) consumeOneGroup(ctx context.Context) error {
var groupRows []chunk.Row
for {
eof, err := e.fetchChildIfNecessary(ctx)
if err != nil {
return errors.Trace(err)
}
if e.meetNewGroup && e.remainingRowsInGroup > 0 {
err := e.consumeGroupRows()
if eof {
e.executed = true
return e.consumeGroupRows(groupRows)
}
for inputRow := e.inputIter.Current(); inputRow != e.inputIter.End(); inputRow = e.inputIter.Next() {
meetNewGroup, err := e.groupChecker.meetNewGroup(inputRow)
if err != nil {
return errors.Trace(err)
}
err = e.appendResult2Chunk(chk)
return err
if meetNewGroup {
return e.consumeGroupRows(groupRows)
}
groupRows = append(groupRows, inputRow)
}
e.remainingRowsInGroup++
e.groupRows = append(e.groupRows, e.inputRow)
}
return nil
}

func (e *WindowExec) consumeGroupRows() (err error) {
if len(e.groupRows) == 0 {
func (e *WindowExec) consumeGroupRows(groupRows []chunk.Row) (err error) {
remainingRowsInGroup := len(groupRows)
if remainingRowsInGroup == 0 {
return nil
}
e.groupRows, err = e.processor.consumeGroupRows(e.ctx, e.groupRows)
if err != nil {
return errors.Trace(err)
for i := 0; i < len(e.resultChunk); i++ {
remained := mathutil.Min(e.remainingRowsInChunk[i], remainingRowsInGroup)
e.remainingRowsInChunk[i] -= remained
remainingRowsInGroup -= remained

// TODO: combine these three methods
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
// the old implementation needs the processor has these three methods
// but now it does not have to.
groupRows, err = e.processor.consumeGroupRows(e.ctx, groupRows)
if err != nil {
return errors.Trace(err)
}
_, err = e.processor.appendResult2Chunk(e.ctx, groupRows, e.resultChunk[i], remained)
if err != nil {
return errors.Trace(err)
}
if remainingRowsInGroup == 0 {
e.processor.resetPartialResult()
break
}
}
return nil
}

func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.inputIter != nil && e.inputRow != e.inputIter.End() {
return nil
}

// Before fetching a new batch of input, we should consume the last group rows.
err = e.consumeGroupRows()
if err != nil {
return errors.Trace(err)
func (e *WindowExec) fetchChildIfNecessary(ctx context.Context) (EOF bool, err error) {
if e.inputIter != nil && e.inputIter.Current() != e.inputIter.End() {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
return false, nil
}

childResult := newFirstChunk(e.children[0])
err = Next(ctx, e.children[0], childResult)
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}
e.childResults = append(e.childResults, childResult)
// No more data.
if childResult.NumRows() == 0 {
e.executed = true
err = e.appendResult2Chunk(chk)
return errors.Trace(err)
numRows := childResult.NumRows()
if numRows == 0 {
return true, nil
}

e.inputIter = chunk.NewIterator4Chunk(childResult)
e.inputRow = e.inputIter.Begin()
return nil
}

// appendResult2Chunk appends result of the window function to the result chunk.
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) (err error) {
if err := e.copyChk(chk); err != nil {
return err
}
remained := mathutil.Min(e.remainingRowsInChunk, e.remainingRowsInGroup)
e.groupRows, err = e.processor.appendResult2Chunk(e.ctx, e.groupRows, chk, remained)
if err != nil {
return err
}
e.remainingRowsInGroup -= remained
e.remainingRowsInChunk -= remained
if e.remainingRowsInGroup == 0 {
e.processor.resetPartialResult()
e.groupRows = e.groupRows[:0]
resultChk := chunk.New(e.retFieldTypes, 0, e.requiredRows)
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
if err := e.copyChk(childResult, resultChk); err != nil {
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
return false, err
}
return nil
e.resultChunk = append(e.resultChunk, resultChk)
e.remainingRowsInChunk = append(e.remainingRowsInChunk, numRows)

e.inputIter = chunk.NewIterator4Chunk(childResult)
e.inputIter.Begin()
return false, nil
}

func (e *WindowExec) copyChk(chk *chunk.Chunk) error {
if len(e.childResults) == 0 || chk.NumRows() > 0 {
return nil
}
childResult := e.childResults[0]
e.childResults = e.childResults[1:]
e.remainingRowsInChunk = childResult.NumRows()
func (e *WindowExec) copyChk(src, dst *chunk.Chunk) error {
columns := e.Schema().Columns[:len(e.Schema().Columns)-e.numWindowFuncs]
for i, col := range columns {
if err := chk.MakeRefTo(i, childResult, col.Index); err != nil {
if err := dst.MakeRefTo(i, src, col.Index); err != nil {
return err
}
}
Expand Down
12 changes: 10 additions & 2 deletions executor/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

func (s *testSuite4) TestWindowFunctions(c *C) {
tk := testkit.NewTestKit(c, s.store)
var result *testkit.Result
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int)")
Expand All @@ -28,7 +29,7 @@ func (s *testSuite4) TestWindowFunctions(c *C) {
tk.MustExec("set @@tidb_enable_window_function = 0")
}()
tk.MustExec("insert into t values (1,2,3),(4,3,2),(2,3,4)")
result := tk.MustQuery("select count(a) over () from t")
result = tk.MustQuery("select count(a) over () from t")
result.Check(testkit.Rows("3", "3", "3"))
result = tk.MustQuery("select sum(a) over () + count(a) over () from t")
result.Check(testkit.Rows("10", "10", "10"))
Expand Down Expand Up @@ -178,7 +179,8 @@ func (s *testSuite4) TestWindowFunctions(c *C) {
result.Check(testkit.Rows("1 1", "1 2", "2 1", "2 2"))
}

func (s *testSuite4) TestWindowFunctionsIssue11614(c *C) {
func (s *testSuite4) TestWindowFunctionsDataReference(c *C) {
// see https://github.com/pingcap/tidb/issues/11614
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand All @@ -192,4 +194,10 @@ func (s *testSuite4) TestWindowFunctionsIssue11614(c *C) {
result.Check(testkit.Rows("2 1 0", "2 2 0.5", "2 3 1"))
result = tk.MustQuery("select a, b, CUME_DIST() over (partition by a order by b) from t")
result.Check(testkit.Rows("2 1 0.3333333333333333", "2 2 0.6666666666666666", "2 3 1"))

// see https://github.com/pingcap/tidb/issues/12415
result = tk.MustQuery("select b, first_value(b) over (order by b RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) from t")
result.Check(testkit.Rows("1 1", "2 1", "3 1"))
result = tk.MustQuery("select b, first_value(b) over (order by b ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) from t")
result.Check(testkit.Rows("1 1", "2 1", "3 1"))
}