Skip to content

Commit

Permalink
executor: revert mostly changes in pingcap#11678 (pingcap#12481)
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway authored and XiaTianliang committed Dec 21, 2019
1 parent 77c131d commit bcc4858
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 304 deletions.
2 changes: 1 addition & 1 deletion executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func buildLeadLag(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) baseLeadLag
args: aggFuncDesc.Args,
ordinal: ordinal,
}
return baseLeadLag{baseAggFunc: base, offset: offset, defaultExpr: defaultExpr, retTp: aggFuncDesc.RetTp}
return baseLeadLag{baseAggFunc: base, offset: offset, defaultExpr: defaultExpr, valueEvaluator: buildValueEvaluator(aggFuncDesc.RetTp)}
}

func buildLead(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
Expand Down
20 changes: 11 additions & 9 deletions executor/aggfuncs/func_cume_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ type cumeDist struct {
}

type partialResult4CumeDist struct {
partialResult4Rank
cum int64
curIdx int
lastRank int
rows []chunk.Row
}

func (r *cumeDist) AllocPartialResult() PartialResult {
Expand All @@ -34,23 +35,24 @@ func (r *cumeDist) AllocPartialResult() PartialResult {

func (r *cumeDist) ResetPartialResult(pr PartialResult) {
p := (*partialResult4CumeDist)(pr)
p.partialResult4Rank.reset()
p.cum = 0
p.curIdx = 0
p.lastRank = 0
p.rows = p.rows[:0]
}

func (r *cumeDist) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4CumeDist)(pr)
p.partialResult4Rank.updatePartialResult(rowsInGroup, false, r.compareRows)
p.rows = append(p.rows, rowsInGroup...)
return nil
}

func (r *cumeDist) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4CumeDist)(pr)
numRows := int64(len(p.results))
for p.cum < numRows && p.results[p.cum] == p.results[p.curIdx] {
p.cum++
numRows := len(p.rows)
for p.lastRank < numRows && r.compareRows(p.rows[p.curIdx], p.rows[p.lastRank]) == 0 {
p.lastRank++
}
p.curIdx++
chk.AppendFloat64(r.ordinal, float64(p.cum)/float64(numRows))
chk.AppendFloat64(r.ordinal, float64(p.lastRank)/float64(numRows))
return nil
}
188 changes: 32 additions & 156 deletions executor/aggfuncs/func_lead_lag.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,200 +14,76 @@
package aggfuncs

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

type baseLeadLag struct {
baseAggFunc
valueEvaluator // TODO: move it to partial result when parallel execution is supported.

defaultExpr expression.Expression
offset uint64
retTp *types.FieldType
}

type circleBuf struct {
buf []valueExtractor
head, tail int
size int
type partialResult4LeadLag struct {
rows []chunk.Row
curIdx uint64
}

func (cb *circleBuf) reset() {
cb.buf = cb.buf[:0]
cb.head, cb.tail = 0, 0
func (v *baseLeadLag) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4LeadLag{})
}

func (cb *circleBuf) append(e valueExtractor) {
if len(cb.buf) < cb.size {
cb.buf = append(cb.buf, e)
cb.tail++
} else {
if cb.tail >= cb.size {
cb.tail = 0
}
cb.buf[cb.tail] = e
cb.tail++
}
}

func (cb *circleBuf) get() (e valueExtractor) {
if len(cb.buf) < cb.size {
e = cb.buf[cb.head]
cb.head++
} else {
if cb.tail >= cb.size {
cb.tail = 0
}
e = cb.buf[cb.tail]
cb.tail++
}
return e
func (v *baseLeadLag) ResetPartialResult(pr PartialResult) {
p := (*partialResult4LeadLag)(pr)
p.rows = p.rows[:0]
p.curIdx = 0
}

type partialResult4Lead struct {
seenRows uint64
curIdx int
extractors []valueExtractor
defaultExtractors circleBuf
defaultConstExtractor valueExtractor
func (v *baseLeadLag) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4LeadLag)(pr)
p.rows = append(p.rows, rowsInGroup...)
return nil
}

const maxDefaultExtractorBufferSize = 1000

type lead struct {
baseLeadLag
}

func (v *lead) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Lead{
defaultExtractors: circleBuf{
// Do not use v.offset directly since v.offset is defined by user
// and may larger than a table size.
buf: make([]valueExtractor, 0, mathutil.MinUint64(v.offset, maxDefaultExtractorBufferSize)),
size: int(v.offset),
},
})
}

func (v *lead) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Lead)(pr)
p.seenRows = 0
p.curIdx = 0
p.extractors = p.extractors[:0]
p.defaultExtractors.reset()
p.defaultConstExtractor = nil
}

func (v *lead) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4Lead)(pr)
for _, row := range rowsInGroup {
p.seenRows++
if p.seenRows > v.offset {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.args[0], row)
if err != nil {
return err
}
p.extractors = append(p.extractors, e)
}
if v.offset > 0 {
if !v.defaultExpr.ConstItem() {
// We must cache the results of last v.offset lines.
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, row)
if err != nil {
return err
}
p.defaultExtractors.append(e)
} else if p.defaultConstExtractor == nil {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, chunk.Row{})
if err != nil {
return err
}
p.defaultConstExtractor = e
}
}
}
return nil
}

func (v *lead) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4Lead)(pr)
var e valueExtractor
if p.curIdx < len(p.extractors) {
e = p.extractors[p.curIdx]
p := (*partialResult4LeadLag)(pr)
var err error
if p.curIdx+v.offset < uint64(len(p.rows)) {
err = v.evaluateRow(sctx, v.args[0], p.rows[p.curIdx+v.offset])
} else {
if !v.defaultExpr.ConstItem() {
e = p.defaultExtractors.get()
} else {
e = p.defaultConstExtractor
}
err = v.evaluateRow(sctx, v.defaultExpr, p.rows[p.curIdx])
}
e.appendResult(chk, v.ordinal)
if err != nil {
return err
}
v.appendResult(chk, v.ordinal)
p.curIdx++
return nil
}

type partialResult4Lag struct {
seenRows uint64
curIdx uint64
extractors []valueExtractor
defaultExtractors []valueExtractor
}

type lag struct {
baseLeadLag
}

func (v *lag) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Lag{
defaultExtractors: make([]valueExtractor, 0, mathutil.MinUint64(v.offset, maxDefaultExtractorBufferSize)),
})
}

func (v *lag) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Lag)(pr)
p.seenRows = 0
p.curIdx = 0
p.extractors = p.extractors[:0]
p.defaultExtractors = p.defaultExtractors[:0]
}

func (v *lag) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4Lag)(pr)
for _, row := range rowsInGroup {
p.seenRows++
if p.seenRows <= v.offset {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, row)
if err != nil {
return err
}
p.defaultExtractors = append(p.defaultExtractors, e)
}
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.args[0], row)
if err != nil {
return err
}
p.extractors = append(p.extractors, e)
}
return nil
}

func (v *lag) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4Lag)(pr)
var e valueExtractor
if p.curIdx < v.offset {
e = p.defaultExtractors[p.curIdx]
p := (*partialResult4LeadLag)(pr)
var err error
if p.curIdx >= v.offset {
err = v.evaluateRow(sctx, v.args[0], p.rows[p.curIdx-v.offset])
} else {
e = p.extractors[p.curIdx-v.offset]
err = v.evaluateRow(sctx, v.defaultExpr, p.rows[p.curIdx])
}
if err != nil {
return err
}
e.appendResult(chk, v.ordinal)
v.appendResult(chk, v.ordinal)
p.curIdx++
return nil
}
22 changes: 15 additions & 7 deletions executor/aggfuncs/func_percent_rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,31 @@ func (pr *percentRank) AllocPartialResult() PartialResult {

func (pr *percentRank) ResetPartialResult(partial PartialResult) {
p := (*partialResult4Rank)(partial)
p.reset()
p.curIdx = 0
p.lastRank = 0
p.rows = p.rows[:0]
}

func (pr *percentRank) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partial PartialResult) error {
p := (*partialResult4Rank)(partial)
p.updatePartialResult(rowsInGroup, false, pr.compareRows)
p.rows = append(p.rows, rowsInGroup...)
return nil
}

func (pr *percentRank) AppendFinalResult2Chunk(sctx sessionctx.Context, partial PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4Rank)(partial)
numRows := len(p.results)
if numRows == 1 {
numRows := int64(len(p.rows))
p.curIdx++
if p.curIdx == 1 {
p.lastRank = 1
chk.AppendFloat64(pr.ordinal, 0)
} else {
chk.AppendFloat64(pr.ordinal, float64(p.results[p.curIdx]-1)/float64(numRows-1))
return nil
}
p.curIdx++
if pr.compareRows(p.rows[p.curIdx-2], p.rows[p.curIdx-1]) == 0 {
chk.AppendFloat64(pr.ordinal, float64(p.lastRank-1)/float64(numRows-1))
return nil
}
p.lastRank = p.curIdx
chk.AppendFloat64(pr.ordinal, float64(p.lastRank-1)/float64(numRows-1))
return nil
}
Loading

0 comments on commit bcc4858

Please sign in to comment.