Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(execute): allocate memory for string content. #5482

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions execute/allocator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package execute

import (
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"

"github.com/influxdata/flux/memory"
)

Expand Down Expand Up @@ -156,17 +158,14 @@ func (a *Allocator) GrowFloats(slice []float64, n int) []float64 {
return s
}

// Strings makes a slice of string values.
// Only the string headers are accounted for.
func (a *Allocator) Strings(l, c int) []string {
// Strings makes a slice of String values.
func (a *Allocator) Strings(l, c int) []String {
a.account(c, stringSize)
return make([]string, l, c)
return make([]String, l, c)
}

// AppendStrings appends strings to a slice.
// Only the string headers are accounted for.
func (a *Allocator) AppendStrings(slice []string, vs ...string) []string {
// TODO(nathanielc): Account for actual size of strings
// AppendStrings appends Strings to a slice.
func (a *Allocator) AppendStrings(slice []String, vs ...String) []String {
if cap(slice)-len(slice) >= len(vs) {
return append(slice, vs...)
}
Expand All @@ -176,14 +175,14 @@ func (a *Allocator) AppendStrings(slice []string, vs ...string) []string {
return s
}

func (a *Allocator) GrowStrings(slice []string, n int) []string {
func (a *Allocator) GrowStrings(slice []String, n int) []String {
newCap := len(slice) + n
if newCap < cap(slice) {
return slice[:newCap]
}
// grow capacity same way as built-in append
newCap = newCap*3/2 + 1
s := make([]string, len(slice)+n, newCap)
s := make([]String, len(slice)+n, newCap)
copy(s, slice)
diff := cap(s) - cap(slice)
a.account(diff, stringSize)
Expand Down Expand Up @@ -220,3 +219,13 @@ func (a *Allocator) GrowTimes(slice []Time, n int) []Time {
a.account(diff, timeSize)
return s
}

// String represents a string stored in some backing byte slice.
type String struct {
offset int
len int
}

func (s String) Bytes(buf *arrowmem.Buffer) []byte {
return buf.Bytes()[s.offset : s.offset+s.len]
}
Comment on lines +223 to +231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 My understanding of this new struct is that it stores the column string size and if a string get too long, we can do something with it

104 changes: 65 additions & 39 deletions execute/table.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package execute

import (
"bytes"
"fmt"
"sort"
"sync/atomic"

arrowmem "github.com/apache/arrow/go/v7/arrow/memory"
"github.com/google/go-cmp/cmp"

"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
"github.com/influxdata/flux/arrow"
Expand Down Expand Up @@ -295,8 +298,9 @@ func TablesEqual(left, right flux.Table, alloc memory.Allocator) (bool, error) {
eq = cmp.Equal(leftBuffer.cols[j].(*floatColumnBuilder).data,
rightBuffer.cols[j].(*floatColumnBuilder).data)
case flux.TString:
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder).data,
rightBuffer.cols[j].(*stringColumnBuilder).data)
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder),
rightBuffer.cols[j].(*stringColumnBuilder),
cmp.Comparer(stringColumnBuilderEqual))
case flux.TTime:
eq = cmp.Equal(leftBuffer.cols[j].(*timeColumnBuilder).data,
rightBuffer.cols[j].(*timeColumnBuilder).data)
Expand Down Expand Up @@ -324,6 +328,27 @@ func colsMatch(left, right []flux.ColMeta) bool {
return true
}

func stringColumnBuilderEqual(x, y *stringColumnBuilder) bool {
if x.Len() != y.Len() {
return false
}
for i := 0; i < x.Len(); i++ {
if x.IsNil(i) {
if !y.IsNil(i) {
return false
}
continue
}
if y.IsNil(i) {
return false
}
if !bytes.Equal(x.data[i].Bytes(x.buf), y.data[i].Bytes(y.buf)) {
return false
}
}
return true
}

// ColMap writes a mapping of builder index to cols index into colMap.
// When colMap does not have enough capacity a new colMap is allocated.
// The colMap is always returned
Expand Down Expand Up @@ -598,6 +623,7 @@ func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error) {
case flux.TString:
b.cols = append(b.cols, &stringColumnBuilder{
columnBuilderBase: colBase,
buf: arrowmem.NewResizableBuffer(b.alloc.Allocator),
})
if b.NRows() > 0 {
if err := b.GrowStrings(newIdx, b.NRows()); err != nil {
Expand Down Expand Up @@ -919,8 +945,9 @@ func (b *ColListTableBuilder) SetString(i int, j int, value string) error {
if err := b.checkCol(j, flux.TString); err != nil {
return err
}
b.cols[j].(*stringColumnBuilder).data[i] = value
b.cols[j].SetNil(i, false)
col := b.cols[j].(*stringColumnBuilder)
col.data[i] = col.makeString(value)
col.SetNil(i, false)
return nil
}

Expand All @@ -929,7 +956,7 @@ func (b *ColListTableBuilder) AppendString(j int, value string) error {
return err
}
col := b.cols[j].(*stringColumnBuilder)
col.data = b.alloc.AppendStrings(col.data, value)
col.data = b.alloc.AppendStrings(col.data, col.makeString(value))
b.nrows = len(col.data)
return nil
}
Expand Down Expand Up @@ -1152,11 +1179,6 @@ func (b *ColListTableBuilder) Floats(j int) []float64 {
CheckColType(b.colMeta[j], flux.TFloat)
return b.cols[j].(*floatColumnBuilder).data
}
func (b *ColListTableBuilder) Strings(j int) []string {
meta := b.colMeta[j]
CheckColType(meta, flux.TString)
return b.cols[j].(*stringColumnBuilder).data
}
func (b *ColListTableBuilder) Times(j int) []values.Time {
CheckColType(b.colMeta[j], flux.TTime)
return b.cols[j].(*timeColumnBuilder).data
Expand All @@ -1180,7 +1202,9 @@ func (b *ColListTableBuilder) GetRow(row int) values.Object {
case flux.TFloat:
val = values.NewFloat(b.cols[j].(*floatColumnBuilder).data[row])
case flux.TString:
val = values.NewString(b.cols[j].(*stringColumnBuilder).data[row])
// TODO(mhilton): avoid a copy
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intended this will be changed in a followup PR.

col := b.cols[j].(*stringColumnBuilder)
val = values.NewString(string(col.data[row].Bytes(col.buf)))
case flux.TTime:
val = values.NewTime(b.cols[j].(*timeColumnBuilder).data[row])
}
Expand Down Expand Up @@ -1866,46 +1890,38 @@ func (c *stringColumn) Copy() column {

type stringColumnBuilder struct {
columnBuilderBase
data []string
data []String

// buf contains a backing buffer containing the bytes of the
// strings.
buf *arrowmem.Buffer
}

func (c *stringColumnBuilder) Clear() {
c.data = c.data[0:0]
c.buf.Release()
c.buf = arrowmem.NewResizableBuffer(c.alloc.Allocator)
c.data = c.data[:0]
}

func (c *stringColumnBuilder) Release() {
c.buf.Release()
c.alloc.Free(cap(c.data), stringSize)
c.data = nil
}

func (c *stringColumnBuilder) Copy() column {
var data *array.String
if len(c.nils) > 0 {
b := arrow.NewStringBuilder(c.alloc.Allocator)
b.Reserve(len(c.data))
sz := 0
for i, v := range c.data {
if c.nils[i] {
continue
}
sz += len(v)
}
b.ReserveData(sz)
for i, v := range c.data {
if c.nils[i] {
b.AppendNull()
continue
}
b.Append(v)
builder := arrow.NewStringBuilder(c.alloc.Allocator)
builder.Reserve(len(c.data))
builder.ReserveData(c.buf.Len())
for i, v := range c.data {
if c.nils[i] {
builder.AppendNull()
continue
}
data = b.NewStringArray()
b.Release()
} else {
data = arrow.NewString(c.data, c.alloc.Allocator)
builder.AppendBytes(v.Bytes(c.buf))
}
col := &stringColumn{
ColMeta: c.ColMeta,
data: data,
data: builder.NewStringArray(),
}
return col
}
Expand All @@ -1916,13 +1932,13 @@ func (c *stringColumnBuilder) Len() int {

func (c *stringColumnBuilder) Equal(i, j int) bool {
return c.EqualFunc(i, j, func(i, j int) bool {
return c.data[i] == c.data[j]
return bytes.Equal(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf))
})
}

func (c *stringColumnBuilder) Less(i, j int) bool {
return c.LessFunc(i, j, func(i, j int) bool {
return c.data[i] < c.data[j]
return bytes.Compare(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf)) < 0
})
}

Expand All @@ -1931,6 +1947,16 @@ func (c *stringColumnBuilder) Swap(i, j int) {
c.data[i], c.data[j] = c.data[j], c.data[i]
}

func (c *stringColumnBuilder) makeString(s string) String {
offset := c.buf.Len()
c.buf.Resize(offset + len(s))
copy(c.buf.Bytes()[offset:], s)
return String{
offset: offset,
len: len(s),
}
}

type timeColumn struct {
flux.ColMeta
data *array.Int
Expand Down
52 changes: 52 additions & 0 deletions execute/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,58 @@ func TestTablesEqual(t *testing.T) {
},
want: false,
},
{
name: "string values",
data0: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
data1: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
want: true,
},
{
name: "string mismatch",
data0: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
data1: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "4"},
},
},
want: false,
},
}
for _, tc := range testCases {
tc := tc
Expand Down